Skip to content
This repository was archived by the owner on Oct 11, 2022. It is now read-only.

Commit f206bbf

Browse files
authored
Merge pull request #2533 from withspectrum/last-merge
Last merge
2 parents f527d1e + e0ab03e commit f206bbf

File tree

6 files changed

+32
-61
lines changed

6 files changed

+32
-61
lines changed

iris/models/directMessageThread.js

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//@flow
22
const { db } = require('./db');
3-
import { NEW_DOCUMENTS } from './utils';
3+
import { NEW_DOCUMENTS, eachAsyncNewValue } from './utils';
44

55
export type DBDirectMessageThread = {
66
createdAt: Date,
@@ -95,14 +95,7 @@ const listenToUpdatedDirectMessageThreads = (userId: string) => (
9595
right: ['id', 'createdAt', 'threadId', 'lastActive', 'lastSeen'],
9696
})
9797
.zip()
98-
.run({ cursor: true }, (err, cursor) => {
99-
if (err) throw err;
100-
cursor.each((err, data) => {
101-
if (err) throw err;
102-
// Call the passed callback with the notification
103-
cb(data);
104-
});
105-
});
98+
.run(eachAsyncNewValue(cb));
10699
};
107100

108101
// prettier-ignore

iris/models/message.js

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
processReputationEventQueue,
77
_adminProcessToxicMessageQueue,
88
} from 'shared/bull/queues';
9-
import { NEW_DOCUMENTS } from './utils';
9+
import { NEW_DOCUMENTS, eachAsyncNewValue } from './utils';
1010
import { setThreadLastActive } from './thread';
1111

1212
export type MessageTypes = 'text' | 'media';
@@ -159,16 +159,8 @@ export const listenToNewMessagesInThread = (threadId: string) => (
159159
.changes({
160160
includeInitial: false,
161161
})
162-
.filter(NEW_DOCUMENTS)
163-
.run({ cursor: true }, (err, cursor) => {
164-
if (err) throw err;
165-
cursor.each((err, data) => {
166-
// TODO(@mxstbr): Maybe we need to cursor.close here?
167-
if (err) throw err;
168-
// Call the passed callback with the message directly
169-
cb(data.new_val);
170-
});
171-
});
162+
.filter(NEW_DOCUMENTS)('new_val')
163+
.run(eachAsyncNewValue(cb));
172164
};
173165

174166
export const getMessageCount = (threadId: string): Promise<number> => {

iris/models/notification.js

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// @flow
22
const { db } = require('./db');
3-
import { NEW_DOCUMENTS } from './utils';
3+
import { NEW_DOCUMENTS, eachAsyncNewValue } from './utils';
44

55
export const getNotificationsByUser = (
66
userId: string,
@@ -78,17 +78,7 @@ export const listenToNewNotifications = (userId: string) => (
7878
})
7979
.zip()
8080
.filter(row => row('context')('type').ne('DIRECT_MESSAGE_THREAD'))
81-
.run({ cursor: true }, (err, cursor) => {
82-
if (err) throw err;
83-
cursor.each((err, data) => {
84-
if (err) throw err;
85-
// For some reason this can be called without data, in which case
86-
// we don't want to call the callback with it obviously
87-
if (!data) return;
88-
// Call the passed callback with the notification
89-
cb(data);
90-
});
91-
});
81+
.run(eachAsyncNewValue(cb));
9282
};
9383

9484
export const listenToNewDirectMessageNotifications = (userId: string) => (
@@ -107,12 +97,5 @@ export const listenToNewDirectMessageNotifications = (userId: string) => (
10797
})
10898
.zip()
10999
.filter(row => row('context')('type').eq('DIRECT_MESSAGE_THREAD'))
110-
.run({ cursor: true }, (err, cursor) => {
111-
if (err) throw err;
112-
cursor.each((err, data) => {
113-
if (err) throw err;
114-
// Call the passed callback with the notification
115-
cb(data);
116-
});
117-
});
100+
.run(eachAsyncNewValue(cb));
118101
};

iris/models/thread.js

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
sendThreadNotificationQueue,
77
_adminProcessToxicThreadQueue,
88
} from 'shared/bull/queues';
9-
const { NEW_DOCUMENTS, parseRange } = require('./utils');
9+
const { NEW_DOCUMENTS, parseRange, eachAsyncNewValue } = require('./utils');
1010
import { deleteMessagesInThread } from '../models/message';
1111
import { turnOffAllThreadNotifications } from '../models/usersThreads';
1212
import type { PaginationOptions } from '../utils/paginate-arrays';
@@ -517,12 +517,5 @@ export const listenToUpdatedThreads = (channelIds: Array<string>) => (
517517
.filter(thread =>
518518
db.expr(channelIds).contains(thread('new_val')('channelId'))
519519
)('new_val')
520-
.run({ cursor: true }, (err, cursor) => {
521-
if (err) throw err;
522-
cursor.each((err, data) => {
523-
if (err) throw err;
524-
// Call the passed callback with the notification
525-
cb(data);
526-
});
527-
});
520+
.run(eachAsyncNewValue(cb));
528521
};

iris/models/utils.js

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,24 @@ export const NEW_DOCUMENTS = db
55
.eq(null)
66
.and(db.not(db.row('new_val').eq(null)));
77

8+
export const eachAsyncNewValue = (cb: Function) => (
9+
err?: Error,
10+
cursor: Cursor
11+
) => {
12+
if (err) throw err;
13+
cursor
14+
.eachAsync(data => {
15+
// Call the passed callback with the message directly
16+
cb(data);
17+
})
18+
.catch(err => {
19+
console.error(err);
20+
try {
21+
cursor.close();
22+
} catch (err) {}
23+
});
24+
};
25+
826
export const listenToNewDocumentsIn = (table, cb) => {
927
return (
1028
db
@@ -14,14 +32,7 @@ export const listenToNewDocumentsIn = (table, cb) => {
1432
})
1533
// Filter to only include newly inserted messages in the changefeed
1634
.filter(NEW_DOCUMENTS)
17-
.run({ cursor: true }, (err, cursor) => {
18-
if (err) throw err;
19-
cursor.each((err, data) => {
20-
if (err) throw err;
21-
// Call the passed callback with the message directly
22-
cb(data.new_val);
23-
});
24-
})
35+
.run(eachAsyncNewValue(cb))
2536
);
2637
};
2738

iris/mutations/message/addMessage.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,12 @@ export default async (
4848
}".`
4949
);
5050

51-
const thread = await loaders.thread.load(message.threadId);
52-
53-
if (thread.isLocked) throw new UserError("Can't reply in a locked thread.");
54-
5551
let contextPermissions;
52+
let thread;
5653
// Make sure that we have permission to send a message in the community
5754
if (message.threadType === 'story') {
55+
thread = await loaders.thread.load(message.threadId);
56+
if (thread.isLocked) throw new UserError("Can't reply in a locked thread.");
5857
const permissions = await loaders.userPermissionsInCommunity.load([
5958
currentUser.id,
6059
thread.communityId,

0 commit comments

Comments
 (0)