Skip to content

Commit ced2614

Browse files
authored
Fixed streaming bug in stream.ts (#285)
* fixed streaming bug in stream.ts * wrapped whole function
1 parent 3f1efd7 commit ced2614

File tree

2 files changed

+51
-42
lines changed

2 files changed

+51
-42
lines changed

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/apps/src/plugins/http/stream.ts

Lines changed: 49 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ export class HttpStream implements IStreamer {
3131
private _result?: SentActivity;
3232
private _timeout?: NodeJS.Timeout;
3333
private _logger: ILogger;
34+
private _flushing: boolean = false;
3435

3536
constructor(client: Client, ref: ConversationReference, logger?: ILogger) {
3637
this.client = client;
@@ -95,60 +96,68 @@ export class HttpStream implements IStreamer {
9596
}
9697

9798
protected async flush() {
98-
if (!this.queue.length) return;
99-
if (this._timeout) {
100-
clearTimeout(this._timeout);
101-
this._timeout = undefined;
102-
}
99+
// if locked or no queue, return early
100+
if (!this.queue.length || this._flushing) return;
103101

104-
let i = 0;
102+
this._flushing = true;
105103

106-
while (this.queue.length && i < 10) {
107-
const activity = this.queue.shift();
104+
try {
105+
if (this._timeout) {
106+
clearTimeout(this._timeout);
107+
this._timeout = undefined;
108+
}
108109

109-
if (!activity) continue;
110+
let i = 0;
111+
112+
while (this.queue.length && i < 10) {
113+
const activity = this.queue.shift();
110114

111-
if (activity.text) {
112-
this.text += activity.text;
113-
}
115+
if (!activity) continue;
114116

115-
if (activity.attachments) {
116-
this.attachments = [...(this.attachments || []), ...activity.attachments];
117-
}
117+
if (activity.text) {
118+
this.text += activity.text;
119+
}
118120

119-
if (activity.channelData) {
120-
this.channelData = {
121-
...this.channelData,
122-
...activity.channelData,
123-
};
124-
}
121+
if (activity.attachments) {
122+
this.attachments = [...(this.attachments || []), ...activity.attachments];
123+
}
125124

126-
if (activity.entities) {
127-
this.entities = [...(this.entities || []), ...activity.entities];
128-
}
125+
if (activity.channelData) {
126+
this.channelData = {
127+
...this.channelData,
128+
...activity.channelData,
129+
};
130+
}
129131

130-
i++;
131-
}
132+
if (activity.entities) {
133+
this.entities = [...(this.entities || []), ...activity.entities];
134+
}
132135

133-
if (i === 0) return;
136+
i++;
137+
}
134138

135-
const activity = new TypingActivity({ id: this.id })
136-
.withText(this.text)
137-
.addStreamUpdate(this.index + 1);
139+
if (i === 0) return;
138140

139-
const res = await promises.retry(() => this.send(activity), {
140-
logger: this._logger
141-
});
141+
const activity = new TypingActivity({ id: this.id })
142+
.withText(this.text)
143+
.addStreamUpdate(this.index + 1);
142144

143-
this.events.emit('chunk', res);
144-
this.index++;
145+
const res = await promises.retry(() => this.send(activity), {
146+
logger: this._logger
147+
});
145148

146-
if (!this.id) {
147-
this.id = res.id;
148-
}
149+
this.events.emit('chunk', res);
150+
this.index++;
151+
152+
if (!this.id) {
153+
this.id = res.id;
154+
}
149155

150-
if (this.queue.length) {
151-
this._timeout = setTimeout(this.flush.bind(this), 500);
156+
if (this.queue.length) {
157+
this._timeout = setTimeout(this.flush.bind(this), 500);
158+
}
159+
} finally {
160+
this._flushing = false;
152161
}
153162
}
154163

0 commit comments

Comments
 (0)