Skip to content

Commit 3dba6b1

Browse files
committed
changefeed implementation: fix a memory leak
WOW, that was surprisingly easy. There was a massive memory leak caused entirely by this "tracker?.once 'error', (err) ->" event listener not being removed!
1 parent f4b20bd commit 3dba6b1

File tree

2 files changed

+44
-20
lines changed

2 files changed

+44
-20
lines changed

src/packages/database/nats/leak-search.ts

+30-13
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,43 @@
11
/*
2-
Code for testing for memory leaks.
3-
4-
- just doing queries doesn't leak
5-
- as of this writing, creating the changefeed does leak.
6-
2+
Code for testing for memory leaks. As of this commit, nothing tested for here
3+
leaks memory in my dev setup.
74
85
USAGE:
96
107
Run with an account_id from your dev server and pass the expose-gc flag so the
118
gc command is defined:
129
1310
14-
ACCOUNT_ID="6aae57c6-08f1-4bb5-848b-3ceb53e61ede" node --expose-gc
11+
ACCOUNT_ID="6aae57c6-08f1-4bb5-848b-3ceb53e61ede" DEBUG=cocalc:* DEBUG_CONSOLE=yes node --expose-gc
1512
1613
Then do this
1714
1815
a = require('@cocalc/database/nats/leak-search')
1916
await a.testQueryOnly(50)
2017
await a.testChangefeed(50)
2118
19+
Do a test multiple times to see if there is a real leak, e.g., the following is GOOD:
20+
21+
> a.testChangefeed(50)
22+
Promise {
23+
<pending>,
24+
[Symbol(async_id_symbol)]: 47,
25+
[Symbol(trigger_async_id_symbol)]: 6
26+
}
27+
> leaked 5.209536 MB
2228
29+
> a.testChangefeed(50)
30+
Promise {
31+
<pending>,
32+
[Symbol(async_id_symbol)]: 3167,
33+
[Symbol(trigger_async_id_symbol)]: 6
34+
}
35+
> leaked -0.029184 MB <--- GOOD!
2336
*/
2437

2538
import { db } from "@cocalc/database";
2639
import { uuid } from "@cocalc/util/misc";
27-
//import { delay } from "awaiting";
40+
import { delay } from "awaiting";
2841
import { callback2 } from "@cocalc/util/async-utils";
2942

3043
// set env variable to an account_id on your dev server with lots of projects.
@@ -52,12 +65,16 @@ export function cancel(id) {
5265
}
5366

5467
let pre: any = { heapUsed: 0 };
55-
function before() {
68+
async function before() {
69+
gc?.();
70+
await delay(500);
5671
gc?.();
5772
pre = process.memoryUsage();
5873
}
5974

60-
function after() {
75+
async function after() {
76+
gc?.();
77+
await delay(500);
6178
gc?.();
6279
const post = process.memoryUsage();
6380
const leak = (post.heapUsed - pre.heapUsed) / 10 ** 6;
@@ -67,18 +84,18 @@ function after() {
6784

6885
// This leaks horribly
6986
export async function testChangefeed(n) {
70-
before();
87+
await before();
7188
for (let i = 0; i < n; i++) {
7289
const id = uuid();
7390
await callback2(create, { id });
7491
cancel(id);
7592
}
76-
return after();
93+
return await after();
7794
}
7895

7996
// query only does NOT leak
8097
export async function testQueryOnly(n) {
81-
before();
98+
await before();
8299
for (let i = 0; i < n; i++) {
83100
const d = db();
84101
await callback2(d.user_query, {
@@ -90,5 +107,5 @@ export async function testQueryOnly(n) {
90107
account_id: "6aae57c6-08f1-4bb5-848b-3ceb53e61ede",
91108
});
92109
}
93-
return after();
110+
return await after();
94111
}

src/packages/database/postgres-user-queries.coffee

+14-7
Original file line numberDiff line numberDiff line change
@@ -1364,6 +1364,11 @@ exports.extend_PostgreSQL = (ext) -> class PostgreSQL extends ext
13641364
if pg_changefeed == 'projects'
13651365
tracker_add = (project_id) => feed.insert({project_id:project_id})
13661366
tracker_remove = (project_id) => feed.delete({project_id:project_id})
1367+
1368+
# Any tracker error means this changefeed is now broken and
1369+
# has to be recreated.
1370+
tracker_error = () => changes.cb("tracker error - ${err}")
1371+
13671372
pg_changefeed = (db, account_id) =>
13681373
where : (obj) =>
13691374
# Check that this is a project we have read access to
@@ -1381,11 +1386,14 @@ exports.extend_PostgreSQL = (ext) -> class PostgreSQL extends ext
13811386
init_tracker : (tracker) =>
13821387
tracker.on "add_user_to_project-#{account_id}", tracker_add
13831388
tracker.on "remove_user_from_project-#{account_id}", tracker_remove
1389+
tracker.once 'error', tracker_error
1390+
13841391

13851392
free_tracker : (tracker) =>
13861393
dbg("freeing project tracker events")
13871394
tracker.removeListener("add_user_to_project-#{account_id}", tracker_add)
13881395
tracker.removeListener("remove_user_from_project-#{account_id}", tracker_remove)
1396+
tracker.removeListener("error", tracker_error)
13891397

13901398

13911399
else if pg_changefeed == 'news'
@@ -1423,6 +1431,7 @@ exports.extend_PostgreSQL = (ext) -> class PostgreSQL extends ext
14231431
return
14241432
tracker_add = (collab_id) => feed.insert({account_id:collab_id})
14251433
tracker_remove = (collab_id) => feed.delete({account_id:collab_id})
1434+
tracker_error = () => changes.cb("tracker error - ${err}")
14261435
pg_changefeed = (db, account_id) ->
14271436
shared_tracker = undefined
14281437
where : (obj) -> # test of "is a collab with me"
@@ -1431,10 +1440,12 @@ exports.extend_PostgreSQL = (ext) -> class PostgreSQL extends ext
14311440
shared_tracker = tracker
14321441
tracker.on "add_collaborator-#{account_id}", tracker_add
14331442
tracker.on "remove_collaborator-#{account_id}", tracker_remove
1443+
tracker.once 'error', tracker_error
14341444
free_tracker : (tracker) =>
14351445
dbg("freeing collab tracker events")
14361446
tracker.removeListener("add_collaborator-#{account_id}", tracker_add)
14371447
tracker.removeListener("remove_collaborator-#{account_id}", tracker_remove)
1448+
tracker.removeListener("error", tracker_error)
14381449

14391450

14401451
x = pg_changefeed(@, account_id)
@@ -1470,11 +1481,10 @@ exports.extend_PostgreSQL = (ext) -> class PostgreSQL extends ext
14701481
select : select
14711482
where : where
14721483
watch : watch
1473-
cb : (err, _feed) =>
1484+
cb : (err, feed) =>
14741485
if err
14751486
cb(err)
14761487
return
1477-
feed = _feed
14781488
feed.on 'change', (x) ->
14791489
process(x)
14801490
changes.cb(undefined, x)
@@ -1484,16 +1494,13 @@ exports.extend_PostgreSQL = (ext) -> class PostgreSQL extends ext
14841494
if tracker? and free_tracker?
14851495
dbg("free_tracker")
14861496
free_tracker(tracker)
1487-
dbg("do NOT free_tracker")
1497+
else
1498+
dbg("do NOT free_tracker")
14881499
feed.on 'error', (err) ->
14891500
changes.cb("feed error - #{err}")
14901501
@_changefeeds ?= {}
14911502
@_changefeeds[changes.id] = feed
14921503
init_tracker?(tracker)
1493-
# Any tracker error means this changefeed is now broken and
1494-
# has to be recreated.
1495-
tracker?.once 'error', (err) ->
1496-
changes.cb("tracker error - #{err}")
14971504
cb()
14981505
], cb)
14991506

0 commit comments

Comments
 (0)