Skip to content

Commit bb339fd

Browse files
committed
changefeeds: fix issue with multi response maxWait, which was causing changefeeds to get recreated too frequently
1 parent d17f12b commit bb339fd

File tree

4 files changed

+39
-15
lines changed

4 files changed

+39
-15
lines changed

src/packages/nats/changefeed/client.ts

+5-6
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@ export async function* changefeed({
2323
options,
2424
heartbeat,
2525
lifetime,
26-
maxWait,
26+
maxActualLifetime = 1000 * 60 * 60 * 2,
2727
}: {
2828
account_id: string;
2929
query: any;
3030
options?: any[];
31-
maxWait?: number;
31+
// maximum amount of time the changefeed can possibly stay alive, even with
32+
// many calls to extend it.
33+
maxActualLifetime?: number;
3234
// server will send resp='' to ensure there is at least one message every this many ms.
3335
heartbeat?: number;
3436
// changefeed will live at most this long, then definitely die.
@@ -38,9 +40,6 @@ export async function* changefeed({
3840
throw Error("account_id must be a valid uuid");
3941
}
4042
const subject = changefeedSubject({ account_id });
41-
if (maxWait == null && heartbeat) {
42-
maxWait = heartbeat * 2.1;
43-
}
4443

4544
let lastSeq = -1;
4645
const { nc, jc } = await getEnv();
@@ -49,7 +48,7 @@ export async function* changefeed({
4948
for await (const mesg of await nc.requestMany(
5049
subject,
5150
jc.encode({ query, options, heartbeat, lifetime }),
52-
{ maxWait },
51+
{ maxWait: maxActualLifetime },
5352
)) {
5453
if (mesg.data.length == 0) {
5554
// done

src/packages/nats/changefeed/server.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ async function handleMessage(mesg, db) {
260260
while (!done) {
261261
const timeSinceLast = Date.now() - lastSend;
262262
if (timeSinceLast < hb) {
263-
// no neeed to send hearbeat yet
263+
// no neeed to send heartbeat yet
264264
await delay(hb - timeSinceLast);
265265
continue;
266266
}

src/packages/sync/table/changefeed-nats2.ts

+32-7
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,23 @@ import { changefeed, renew } from "@cocalc/nats/changefeed/client";
88
import { delay } from "awaiting";
99

1010
const HEARTBEAT = 15000;
11+
const HEARTBEAT_MISS_THRESH = 5000;
1112

1213
// this should be significantly shorter than HEARTBEAT.
1314
// if user closes browser and comes back, then this is the time they may have to wait
1415
// for their changefeeds to reconnect, since clock jumps forward...
1516
const HEARTBEAT_CHECK_DELAY = 3000;
1617

18+
const MAX_CHANGEFEED_LIFETIME = 1000 * 60 * 60 * 8;
19+
20+
// low level debugging of changefeeds
21+
const LOW_LEVEL_DEBUG = false;
22+
const log = LOW_LEVEL_DEBUG
23+
? (...args) => {
24+
console.log("changefeed: ", ...args);
25+
}
26+
: (..._args) => {};
27+
1728
export class NatsChangefeed extends EventEmitter {
1829
private account_id: string;
1930
private query;
@@ -40,33 +51,34 @@ export class NatsChangefeed extends EventEmitter {
4051
}
4152

4253
connect = async () => {
54+
log("creating new changefeed", this.query);
4355
if (this.state == "closed") return;
4456
this.natsSynctable = await changefeed({
4557
account_id: this.account_id,
4658
query: this.query,
4759
options: this.options,
4860
heartbeat: HEARTBEAT,
61+
maxActualLifetime: MAX_CHANGEFEED_LIFETIME,
4962
});
50-
this.last_hb = Date.now();
5163
// @ts-ignore
5264
if (this.state == "closed") return;
65+
this.last_hb = Date.now();
66+
this.startHeartbeatMonitor();
5367
this.state = "connected";
5468
const {
5569
value: { id, lifetime },
5670
} = await this.natsSynctable.next();
5771
this.id = id;
5872
this.lifetime = lifetime;
59-
// console.log("got changefeed", { id, lifetime, query: this.query });
73+
log("got changefeed", { id, lifetime, query: this.query });
6074
this.startRenewLoop();
6175

6276
// @ts-ignore
6377
while (this.state != "closed") {
6478
const { value } = await this.natsSynctable.next();
6579
this.last_hb = Date.now();
6680
if (value) {
67-
// got first non-heartbeat value (the first query might take LONGER than heartbeats)
6881
this.startWatch();
69-
this.startHeartbeatMonitor();
7082
return value[Object.keys(value)[0]];
7183
}
7284
}
@@ -97,7 +109,10 @@ export class NatsChangefeed extends EventEmitter {
97109
}
98110
this.last_hb = Date.now();
99111
if (x) {
112+
log("got message ", this.query, x);
100113
this.emit("update", x);
114+
} else {
115+
log("got heartbeat", this.query);
101116
}
102117
}
103118
} catch {
@@ -107,11 +122,19 @@ export class NatsChangefeed extends EventEmitter {
107122

108123
private startHeartbeatMonitor = async () => {
109124
while (this.state != "closed") {
110-
if (this.last_hb && Date.now() - this.last_hb > 2 * HEARTBEAT) {
125+
await delay(HEARTBEAT_CHECK_DELAY);
126+
if (
127+
this.last_hb &&
128+
Date.now() - this.last_hb > HEARTBEAT + HEARTBEAT_MISS_THRESH
129+
) {
130+
log("heartbeat failed", this.query, {
131+
last_hb: this.last_hb,
132+
diff: Date.now() - this.last_hb,
133+
thresh: HEARTBEAT + HEARTBEAT_MISS_THRESH,
134+
});
111135
this.close();
112136
return;
113137
}
114-
await delay(HEARTBEAT_CHECK_DELAY);
115138
}
116139
};
117140

@@ -130,7 +153,9 @@ export class NatsChangefeed extends EventEmitter {
130153

131154
private startRenewLoop = async () => {
132155
while (this.state != "closed" && this.lifetime && this.id) {
133-
await delay(this.lifetime / 3);
156+
// max to avoid weird situation bombarding server or infinite loop
157+
await delay(Math.max(7500, this.lifetime / 3));
158+
log("renewing with lifetime ", this.lifetime, this.query);
134159
try {
135160
await renew({
136161
account_id: this.account_id,

src/packages/util/smc-version.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
/* autogenerated by the update_version script */
2-
exports.version=1745552938;
2+
exports.version=1745651779;

0 commit comments

Comments
 (0)