@@ -120,22 +120,85 @@ size_t WebTransportStreamImpl::Read(uint8_t* data, size_t length) {
120
120
}
121
121
122
122
size_t WebTransportStreamImpl::ReadableBytes () const {
123
- return stream_->ReadableBytes ();
123
+ if (io_runner_->BelongsToCurrentThread ()) {
124
+ return stream_->ReadableBytes ();
125
+ }
126
+ size_t result;
127
+ base::WaitableEvent done (base::WaitableEvent::ResetPolicy::AUTOMATIC,
128
+ base::WaitableEvent::InitialState::NOT_SIGNALED);
129
+ io_runner_->PostTask (
130
+ FROM_HERE,
131
+ base::BindOnce (
132
+ [](WebTransportStreamImpl const * stream, size_t & result,
133
+ base::WaitableEvent* event) {
134
+ result = stream->stream_ ->ReadableBytes ();
135
+ event->Signal ();
136
+ },
137
+ base::Unretained (this ), std::ref (result), base::Unretained (&done)));
138
+ done.Wait ();
139
+ return result;
124
140
}
125
141
126
142
void WebTransportStreamImpl::Close () {
127
- // TODO: Post to IO runner.
128
- if (!stream_->SendFin ()) {
129
- LOG (ERROR) << " Failed to send FIN." ;
143
+ if (io_runner_->BelongsToCurrentThread ()) {
144
+ if (!stream_->SendFin ()) {
145
+ LOG (ERROR) << " Failed to send FIN." ;
146
+ }
147
+ return ;
130
148
}
149
+ base::WaitableEvent done (base::WaitableEvent::ResetPolicy::AUTOMATIC,
150
+ base::WaitableEvent::InitialState::NOT_SIGNALED);
151
+ io_runner_->PostTask (
152
+ FROM_HERE,
153
+ base::BindOnce (
154
+ [](WebTransportStreamImpl* stream, base::WaitableEvent* event) {
155
+ if (!stream->stream_ ->SendFin ()) {
156
+ LOG (ERROR) << " Failed to send FIN." ;
157
+ }
158
+ event->Signal ();
159
+ },
160
+ base::Unretained (this ), base::Unretained (&done)));
161
+ done.Wait ();
131
162
}
132
163
133
164
uint64_t WebTransportStreamImpl::BufferedDataBytes () const {
134
- return quic_stream_->BufferedDataBytes ();
165
+ if (io_runner_->BelongsToCurrentThread ()) {
166
+ return quic_stream_->BufferedDataBytes ();
167
+ }
168
+ uint64_t result;
169
+ base::WaitableEvent done (base::WaitableEvent::ResetPolicy::AUTOMATIC,
170
+ base::WaitableEvent::InitialState::NOT_SIGNALED);
171
+ io_runner_->PostTask (
172
+ FROM_HERE,
173
+ base::BindOnce (
174
+ [](WebTransportStreamImpl const * stream, uint64_t & result,
175
+ base::WaitableEvent* event) {
176
+ result = stream->quic_stream_ ->BufferedDataBytes ();
177
+ event->Signal ();
178
+ },
179
+ base::Unretained (this ), std::ref (result), base::Unretained (&done)));
180
+ done.Wait ();
181
+ return result;
135
182
}
136
183
137
184
bool WebTransportStreamImpl::CanWrite () const {
138
- return stream_->CanWrite ();
185
+ if (io_runner_->BelongsToCurrentThread ()) {
186
+ return stream_->CanWrite ();
187
+ }
188
+ bool result;
189
+ base::WaitableEvent done (base::WaitableEvent::ResetPolicy::AUTOMATIC,
190
+ base::WaitableEvent::InitialState::NOT_SIGNALED);
191
+ io_runner_->PostTask (
192
+ FROM_HERE,
193
+ base::BindOnce (
194
+ [](WebTransportStreamImpl const * stream, bool & result,
195
+ base::WaitableEvent* event) {
196
+ result = stream->stream_ ->CanWrite ();
197
+ event->Signal ();
198
+ },
199
+ base::Unretained (this ), std::ref (result), base::Unretained (&done)));
200
+ done.Wait ();
201
+ return result;
139
202
}
140
203
141
204
void WebTransportStreamImpl::OnCanRead () {
0 commit comments