@@ -21,6 +21,7 @@ type Channel{T} <: AbstractChannel
21
21
cond_take:: Condition # waiting for data to become available
22
22
cond_put:: Condition # waiting for a writeable slot
23
23
state:: Symbol
24
+ excp:: Nullable{Exception} # Exception to be thrown when state != :open
24
25
25
26
data:: Array{T,1}
26
27
sz_max:: Int # maximum size of channel
@@ -39,7 +40,7 @@ type Channel{T} <: AbstractChannel
39
40
if sz < 0
40
41
throw (ArgumentError (" Channel size must be either 0, a positive integer or Inf" ))
41
42
end
42
- new (Condition (), Condition (), :open , Array {T} (0 ), sz, Array {Condition} (0 ))
43
+ new (Condition (), Condition (), :open , Nullable {Exception} (), Array {T} (0 ), sz, Array {Condition} (0 ))
43
44
end
44
45
45
46
# deprecated empty constructor
53
54
54
55
Channel (sz) = Channel {Any} (sz)
55
56
57
+ # special constructors
58
+ """
59
+ Channel(func::Function; ctype=Any, csize=0, taskref=nothing)
60
+
61
+ Creates a new task from `func`, binds it to a new channel of type
62
+ `ctype` and size `csize`, schedules the task, all in a single call.
63
+
64
+ `func` must accept the bound channel as its only argument.
65
+
66
+ If you need a reference to the created task, pass a `Ref{Task}` object via
67
+ keyword argument `taskref`.
68
+
69
+ Returns a Channel.
70
+
71
+ ```jldoctest
72
+ julia> chnl = Channel(c->foreach(i->put!(c,i), 1:4));
73
+
74
+ julia> @show typeof(chnl);
75
+ typeof(chnl) = Channel{Any}
76
+
77
+ julia> for i in chnl
78
+ @show i
79
+ end;
80
+ i = 1
81
+ i = 2
82
+ i = 3
83
+ i = 4
84
+
85
+ ```
86
+
87
+ An example of referencing the created task:
88
+
89
+ ```jldoctest
90
+ julia> taskref = Ref{Task}();
91
+
92
+ julia> chnl = Channel(c->(@show take!(c)); taskref=taskref);
93
+
94
+ julia> task = taskref[];
95
+
96
+ julia> @show istaskdone(task);
97
+ istaskdone(task) = false
98
+
99
+ julia> put!(chnl, "Hello");
100
+ take!(c) = "Hello"
101
+
102
+ julia> @show istaskdone(task);
103
+ istaskdone(task) = true
104
+
105
+ ```
106
+ """
107
+ function Channel (func:: Function ; ctype= Any, csize= 0 , taskref= nothing )
108
+ chnl = Channel {ctype} (csize)
109
+ task = Task (()-> func (chnl))
110
+ bind (chnl,task)
111
+ schedule (task)
112
+ yield ()
113
+
114
+ isa (taskref, Ref{Task}) && (taskref. x = task)
115
+ return chnl
116
+ end
117
+
118
+
119
+
56
120
# deprecated empty constructor
57
121
Channel () = Channel {Any} ()
58
122
59
123
closed_exception () = InvalidStateException (" Channel is closed." , :closed )
60
124
61
125
isbuffered (c:: Channel ) = c. sz_max== 0 ? false : true
62
126
127
+ function check_channel_state (c:: Channel )
128
+ if ! isopen (c)
129
+ ! isnull (c. excp) && throw (get (c. excp))
130
+ throw (closed_exception ())
131
+ end
132
+ end
63
133
"""
64
134
close(c::Channel)
65
135
@@ -70,11 +140,110 @@ Closes a channel. An exception is thrown by:
70
140
"""
71
141
function close (c:: Channel )
72
142
c. state = :closed
73
- notify_error (c:: Channel , closed_exception ())
143
+ c. excp = Nullable {} (closed_exception ())
144
+ notify_error (c)
74
145
nothing
75
146
end
76
147
isopen (c:: Channel ) = (c. state == :open )
77
148
149
+ """
150
+ bind(chnl::Channel, task::Task)
151
+
152
+ Associates the lifetime of `chnl` with a task.
153
+ Channel `chnl` is automatically closed when the task terminates.
154
+ Any uncaught exception in the task is propagated to all waiters on `chnl`.
155
+
156
+ The `chnl` object can be explicitly closed independent of task termination.
157
+ Terminating tasks have no effect on already closed Channel objects.
158
+
159
+ When a channel is bound to multiple tasks, the first task to terminate will
160
+ close the channel. When multiple channels are bound to the same task,
161
+ termination of the task will close all channels.
162
+
163
+ ```jldoctest
164
+ julia> c = Channel(0);
165
+
166
+ julia> task = @schedule foreach(i->put!(c, i), 1:4);
167
+
168
+ julia> bind(c,task);
169
+
170
+ julia> for i in c
171
+ @show i
172
+ end;
173
+ i = 1
174
+ i = 2
175
+ i = 3
176
+ i = 4
177
+
178
+ julia> @show isopen(c);
179
+ isopen(c) = false
180
+
181
+ ```
182
+
183
+ ```jldoctest
184
+ julia> c = Channel(0);
185
+
186
+ julia> task = @schedule (put!(c,1);error("foo"));
187
+
188
+ julia> bind(c,task);
189
+
190
+ julia> take!(c);
191
+
192
+ julia> put!(c,1);
193
+ ERROR: foo
194
+ Stacktrace:
195
+ [1] check_channel_state(::Channel{Any}) at ./channels.jl:129
196
+ [2] put!(::Channel{Any}, ::Int64) at ./channels.jl:247
197
+
198
+ ```
199
+ """
200
+ function bind (c:: Channel , task:: Task )
201
+ ref = WeakRef (c)
202
+ register_taskdone_hook (task, tsk-> close_chnl_on_taskdone (tsk, ref))
203
+ c
204
+ end
205
+
206
+ """
207
+ channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n))
208
+
209
+ A convenience method to create `n` channels and bind them to tasks started
210
+ from the provided functions in a single call. Each `func` must accept `n` arguments
211
+ which are the created channels. Channel types and sizes may be specified via
212
+ keyword arguments `ctypes` and `csizes` respectively. If unspecified, all channels are
213
+ of type `Channel{Any}(0)`.
214
+
215
+ Returns a tuple, `(Array{Channel}, Array{Task})`, of the created channels and tasks.
216
+ """
217
+ function channeled_tasks (n:: Int , funcs... ; ctypes= fill (Any,n), csizes= fill (0 ,n))
218
+ @assert length (csizes) == n
219
+ @assert length (ctypes) == n
220
+
221
+ chnls = map (i-> Channel {ctypes[i]} (csizes[i]), 1 : n)
222
+ tasks= Task[Task (()-> f (chnls... )) for f in funcs]
223
+
224
+ # bind all tasks to all channels and schedule them
225
+ foreach (t -> foreach (c -> bind (c,t), chnls), tasks)
226
+ foreach (t-> schedule (t), tasks)
227
+
228
+ yield () # Allow scheduled tasks to run
229
+
230
+ return (chnls, tasks)
231
+ end
232
+
233
+ function close_chnl_on_taskdone (t:: Task , ref:: WeakRef )
234
+ if ref. value != = nothing
235
+ c = ref. value
236
+ ! isopen (c) && return
237
+ if istaskfailed (t)
238
+ c. state = :closed
239
+ c. excp = Nullable {Exception} (task_result (t))
240
+ notify_error (c)
241
+ else
242
+ close (c)
243
+ end
244
+ end
245
+ end
246
+
78
247
type InvalidStateException <: Exception
79
248
msg:: AbstractString
80
249
state:: Symbol
@@ -89,7 +258,7 @@ For unbuffered channels, blocks until a [`take!`](@ref) is performed by a differ
89
258
task.
90
259
"""
91
260
function put! (c:: Channel , v)
92
- ! isopen (c) && throw ( closed_exception () )
261
+ check_channel_state (c )
93
262
isbuffered (c) ? put_buffered (c,v) : put_unbuffered (c,v)
94
263
end
95
264
@@ -98,7 +267,9 @@ function put_buffered(c::Channel, v)
98
267
wait (c. cond_put)
99
268
end
100
269
push! (c. data, v)
101
- notify (c. cond_take, nothing , true , false ) # notify all, since some of the waiters may be on a "fetch" call.
270
+
271
+ # notify all, since some of the waiters may be on a "fetch" call.
272
+ notify (c. cond_take, nothing , true , false )
102
273
v
103
274
end
104
275
@@ -108,7 +279,7 @@ function put_unbuffered(c::Channel, v)
108
279
wait (c. cond_put)
109
280
end
110
281
cond_taker = shift! (c. takers)
111
- notify (cond_taker, v, false , false )
282
+ notify (cond_taker, v, false , false ) > 0 && yield ()
112
283
v
113
284
end
114
285
@@ -148,7 +319,7 @@ shift!(c::Channel) = take!(c)
148
319
149
320
# 0-size channel
150
321
function take_unbuffered (c:: Channel )
151
- ! isopen (c) && throw ( closed_exception () )
322
+ check_channel_state (c )
152
323
cond_taker = Condition ()
153
324
push! (c. takers, cond_taker)
154
325
notify (c. cond_put, nothing , false , false )
@@ -178,7 +349,7 @@ n_avail(c::Channel) = isbuffered(c) ? length(c.data) : n_waiters(c.cond_put)
178
349
179
350
function wait (c:: Channel )
180
351
while ! isready (c)
181
- ! isopen (c) && throw ( closed_exception () )
352
+ check_channel_state (c )
182
353
wait (c. cond_take)
183
354
end
184
355
nothing
@@ -189,19 +360,20 @@ function notify_error(c::Channel, err)
189
360
notify_error (c. cond_put, err)
190
361
foreach (x-> notify_error (x, err), c. takers)
191
362
end
363
+ notify_error (c:: Channel ) = notify_error (c, get (c. excp))
192
364
193
365
eltype {T} (:: Type{Channel{T}} ) = T
194
366
195
367
show (io:: IO , c:: Channel ) = print (io, " $(typeof (c)) (sz_max:$(c. sz_max) ,sz_curr:$(n_avail (c)) )" )
196
368
197
- type ChannelState {T}
369
+ type ChannelIterState {T}
198
370
hasval:: Bool
199
371
val:: T
200
- ChannelState (x) = new (x)
372
+ ChannelIterState (x) = new (x)
201
373
end
202
374
203
- start {T} (c:: Channel{T} ) = ChannelState {T} (false )
204
- function done (c:: Channel , state:: ChannelState )
375
+ start {T} (c:: Channel{T} ) = ChannelIterState {T} (false )
376
+ function done (c:: Channel , state:: ChannelIterState )
205
377
try
206
378
# we are waiting either for more data or channel to be closed
207
379
state. hasval && return false
0 commit comments