Skip to content

Commit 4a48aa8

Browse files
implement "sample" combinator
1 parent 8860da6 commit 4a48aa8

File tree

8 files changed

+258
-62
lines changed

8 files changed

+258
-62
lines changed

src/Streamly/Internal/Data/Fold.hs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1261,8 +1261,14 @@ toParallelSVar svar winfo = Fold step initial extract
12611261
step () x = liftIO $ do
12621262
-- XXX we can have a separate fold for unlimited buffer case to avoid a
12631263
-- branch in the step here.
1264-
decrementBufferLimit svar
1265-
void $ send svar (ChildYield x)
1264+
case maxBufferLimit svar of
1265+
BufferUnlimited ->
1266+
void $ send svar (ChildYield x)
1267+
BufferLast ->
1268+
void $ sendReplace svar (ChildYield x)
1269+
BufferLimited _ policy -> do
1270+
decrementBufferLimit svar policy
1271+
void $ send svar (ChildYield x)
12661272

12671273
extract () = liftIO $ do
12681274
sendStop svar winfo
@@ -1279,8 +1285,14 @@ toParallelSVarLimited svar winfo = Fold step initial extract
12791285
yieldLimitOk <- decrementYieldLimit svar
12801286
if yieldLimitOk
12811287
then do
1282-
decrementBufferLimit svar
1283-
void $ send svar (ChildYield x)
1288+
case maxBufferLimit svar of
1289+
BufferUnlimited ->
1290+
void $ send svar (ChildYield x)
1291+
BufferLast ->
1292+
void $ sendReplace svar (ChildYield x)
1293+
BufferLimited _ policy -> do
1294+
decrementBufferLimit svar policy
1295+
void $ send svar (ChildYield x)
12841296
return True
12851297
else do
12861298
cleanupSVarFromWorker svar

src/Streamly/Internal/Data/SVar.hs

Lines changed: 89 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,15 @@ module Streamly.Internal.Data.SVar
3131

3232
-- State threaded around the stream
3333
, Limit (..)
34+
, BufferStyle (..)
3435
, State (streamVar)
3536
, defState
3637
, adaptState
3738
, getMaxThreads
3839
, setMaxThreads
3940
, getMaxBuffer
41+
, getBufferStyle
42+
, setBufferStyle
4043
, setMaxBuffer
4144
, getStreamRate
4245
, setStreamRate
@@ -62,6 +65,7 @@ module Streamly.Internal.Data.SVar
6265
, ChildEvent (..)
6366
, AheadHeapEntry (..)
6467
, send
68+
, sendReplace
6569
, sendToProducer
6670
, sendYield
6771
, sendStop
@@ -387,11 +391,27 @@ data SVarStopStyle =
387391
-- XXX Maybe we can separate the implementation in two different types instead
388392
-- of using a common SVar type.
389393
--
390-
data PushBufferPolicy =
394+
data BufferOverflowPolicy =
391395
PushBufferDropNew -- drop the latest element and continue
392396
| PushBufferDropOld -- drop the oldest element and continue
393397
| PushBufferBlock -- block the thread until space
394398
-- becomes available
399+
| PushBufferToFile String -- Append the buffer to a file on disk
400+
-- The String is the filename prefix, two files
401+
-- are used, <filename>1 and <filename>2. While
402+
-- the consumer is consuming from one file the
403+
-- producers are writing to the other file. The
404+
-- current Index ownership is maintained in the
405+
-- SVar.
406+
deriving (Show)
407+
408+
-- XXX in general, instead of just the last event we can store last N events in
409+
-- the SVar, we can have a BufferLastN case.
410+
data BufferStyle
411+
= BufferUnlimited
412+
| BufferLast -- Buffer only the latest element
413+
| BufferLimited Word BufferOverflowPolicy
414+
deriving (Show)
395415

396416
-- IMPORTANT NOTE: we cannot update the SVar after generating it as we have
397417
-- references to the original SVar stored in several functions which will keep
@@ -441,10 +461,9 @@ data SVar t m a = SVar
441461
-- potentially each worker may yield one value to the buffer in the worst
442462
-- case exceeding the requested buffer size.
443463
, maxWorkerLimit :: Limit
444-
, maxBufferLimit :: Limit
445-
-- These two are valid and used only when maxBufferLimit is Limited.
464+
, maxBufferLimit :: BufferStyle
465+
-- This is valid and used only when maxBufferLimit is BufferLimited.
446466
, pushBufferSpace :: IORef Count
447-
, pushBufferPolicy :: PushBufferPolicy
448467
-- [LOCKING] The consumer puts this MVar after emptying the buffer, workers
449468
-- block on it when the buffer becomes full. No overhead unless the buffer
450469
-- becomes full.
@@ -505,7 +524,7 @@ data State t m a = State
505524
-- persistent configuration, state that remains valid until changed by
506525
-- an explicit setting via a combinator.
507526
, _threadsHigh :: Limit
508-
, _bufferHigh :: Limit
527+
, _bufferHigh :: BufferStyle
509528
-- XXX these two can be collapsed into a single type
510529
, _streamLatency :: Maybe NanoSecond64 -- bootstrap latency
511530
, _maxStreamRate :: Maybe Rate
@@ -523,9 +542,11 @@ data State t m a = State
523542
magicMaxBuffer :: Word
524543
magicMaxBuffer = 1500
525544

526-
defaultMaxThreads, defaultMaxBuffer :: Limit
545+
defaultMaxThreads :: Limit
527546
defaultMaxThreads = Limited magicMaxBuffer
528-
defaultMaxBuffer = Limited magicMaxBuffer
547+
548+
defaultMaxBuffer :: BufferStyle
549+
defaultMaxBuffer = BufferLimited magicMaxBuffer PushBufferBlock
529550

530551
-- The fields prefixed by an _ are not to be accessed or updated directly but
531552
-- via smart accessor APIs.
@@ -592,18 +613,27 @@ setMaxThreads n st =
592613
getMaxThreads :: State t m a -> Limit
593614
getMaxThreads = _threadsHigh
594615

616+
setBufferStyle :: BufferStyle -> State t m a -> State t m a
617+
setBufferStyle style st = st { _bufferHigh = style }
618+
595619
setMaxBuffer :: Int -> State t m a -> State t m a
596-
setMaxBuffer n st =
597-
st { _bufferHigh =
598-
if n < 0
599-
then Unlimited
600-
else if n == 0
601-
then defaultMaxBuffer
602-
else Limited (fromIntegral n)
603-
}
620+
setMaxBuffer n = setBufferStyle style
621+
where
622+
style =
623+
if n < 0
624+
then BufferUnlimited
625+
else if n == 0
626+
then defaultMaxBuffer
627+
else BufferLimited (fromIntegral n) PushBufferBlock
628+
629+
getBufferStyle :: State t m a -> BufferStyle
630+
getBufferStyle = _bufferHigh
604631

605632
getMaxBuffer :: State t m a -> Limit
606-
getMaxBuffer = _bufferHigh
633+
getMaxBuffer st =
634+
case getBufferStyle st of
635+
BufferLimited n _ -> Limited n
636+
_ -> Unlimited
607637

608638
setStreamRate :: Maybe Rate -> State t m a -> State t m a
609639
setStreamRate r st = st { _maxStreamRate = r }
@@ -1000,18 +1030,18 @@ incrementYieldLimit sv =
10001030

10011031
-- XXX Only yields should be counted in the buffer limit and not the Stop
10021032
-- events.
1033+
--
1034+
-- XXX we can parameterize the SVar with a buffer type to reduce the runtime
1035+
-- overhead of determining the buffer type before queuing the elements.
10031036

10041037
{-# INLINE decrementBufferLimit #-}
1005-
decrementBufferLimit :: SVar t m a -> IO ()
1006-
decrementBufferLimit sv =
1007-
case maxBufferLimit sv of
1008-
Unlimited -> return ()
1009-
Limited _ -> do
1038+
decrementBufferLimit :: SVar t m a -> BufferOverflowPolicy -> IO ()
1039+
decrementBufferLimit sv policy = do
10101040
let ref = pushBufferSpace sv
10111041
old <- atomicModifyIORefCAS ref $ \x ->
10121042
(if x >= 1 then x - 1 else x, x)
10131043
when (old <= 0) $
1014-
case pushBufferPolicy sv of
1044+
case policy of
10151045
PushBufferBlock -> blockAndRetry
10161046
PushBufferDropNew -> do
10171047
-- We just drop one item and proceed. It is possible
@@ -1031,6 +1061,7 @@ decrementBufferLimit sv =
10311061
when block blockAndRetry
10321062
-- XXX need a dequeue or ring buffer for this
10331063
PushBufferDropOld -> undefined
1064+
PushBufferToFile _ -> undefined
10341065

10351066
where
10361067

@@ -1053,19 +1084,19 @@ decrementBufferLimit sv =
10531084
incrementBufferLimit :: SVar t m a -> IO ()
10541085
incrementBufferLimit sv =
10551086
case maxBufferLimit sv of
1056-
Unlimited -> return ()
1057-
Limited _ -> do
1087+
BufferLimited _ _ -> do
10581088
atomicModifyIORefCAS_ (pushBufferSpace sv) (+ 1)
10591089
writeBarrier
10601090
void $ liftIO $ tryPutMVar (pushBufferMVar sv) ()
1091+
_ -> return ()
10611092

10621093
{-# INLINE resetBufferLimit #-}
10631094
resetBufferLimit :: SVar t m a -> IO ()
10641095
resetBufferLimit sv =
10651096
case maxBufferLimit sv of
1066-
Unlimited -> return ()
1067-
Limited n -> atomicModifyIORefCAS_ (pushBufferSpace sv)
1068-
(const (fromIntegral n))
1097+
BufferLimited n _ -> atomicModifyIORefCAS_ (pushBufferSpace sv)
1098+
(const (fromIntegral n))
1099+
_ -> return ()
10691100

10701101
{-# INLINE sendWithDoorBell #-}
10711102
sendWithDoorBell ::
@@ -1092,6 +1123,27 @@ sendWithDoorBell q bell msg = do
10921123
send :: SVar t m a -> ChildEvent a -> IO Int
10931124
send sv msg = sendWithDoorBell (outputQueue sv) (outputDoorBell sv) msg
10941125

1126+
-- | Just replace the previous value in the buffer.
1127+
sendReplace :: SVar t m a -> ChildEvent a -> IO ()
1128+
sendReplace sv msg = do
1129+
-- XXX we can use a nonlist buffer to make it faster, we do not need a
1130+
-- tuple here, for Prim/Storable streams we can also avoid using an IORef
1131+
-- we can just use an unboxed reference.
1132+
let q = outputQueue sv
1133+
oldlen <- atomicModifyIORefCAS q $ \(_, n) -> (([msg], 1), n)
1134+
when (oldlen <= 0) $ do
1135+
-- The wake up must happen only after the store has finished otherwise
1136+
-- we can have lost wakeup problems.
1137+
writeBarrier
1138+
-- Since multiple workers can try this at the same time, it is possible
1139+
-- that we may put a spurious MVar after the consumer has already seen
1140+
-- the output. But that's harmless, at worst it may cause the consumer
1141+
-- to read the queue again and find it empty.
1142+
-- The important point is that the consumer is guaranteed to receive a
1143+
-- doorbell if something was added to the queue after it empties it.
1144+
let bell = outputDoorBell sv
1145+
void $ tryPutMVar bell ()
1146+
10951147
-- There is no bound implemented on the buffer, this is assumed to be low
10961148
-- traffic.
10971149
sendToProducer :: SVar t m a -> ChildEvent a -> IO Int
@@ -1206,10 +1258,10 @@ sendYield sv mwinfo msg = do
12061258
oldlen <- send sv msg
12071259
let limit = maxBufferLimit sv
12081260
bufferSpaceOk <- case limit of
1209-
Unlimited -> return True
1210-
Limited lim -> do
1261+
BufferLimited lim _ -> do
12111262
active <- readIORef (workerCount sv)
12121263
return $ (oldlen + 1) < (fromIntegral lim - active)
1264+
_ -> return True
12131265
rateLimitOk <-
12141266
case mwinfo of
12151267
Just winfo ->
@@ -2256,9 +2308,8 @@ getAheadSVar st f mrun = do
22562308
{ outputQueue = outQ
22572309
, outputQueueFromConsumer = undefined
22582310
, remainingWork = yl
2259-
, maxBufferLimit = getMaxBuffer st
2311+
, maxBufferLimit = getBufferStyle st
22602312
, pushBufferSpace = undefined
2261-
, pushBufferPolicy = undefined
22622313
, pushBufferMVar = undefined
22632314
, maxWorkerLimit = min (getMaxThreads st) (getMaxBuffer st)
22642315
, yieldRateInfo = rateInfo
@@ -2334,11 +2385,12 @@ getParallelSVar ss st mrun = do
23342385
Nothing -> return Nothing
23352386
Just x -> Just <$> newIORef x
23362387
rateInfo <- getYieldRateInfo st
2337-
let bufLim =
2338-
case getMaxBuffer st of
2339-
Unlimited -> undefined
2340-
Limited x -> (fromIntegral x)
2341-
remBuf <- newIORef bufLim
2388+
let bufSpace =
2389+
case getBufferStyle st of
2390+
BufferUnlimited -> undefined
2391+
BufferLast -> undefined
2392+
BufferLimited x _ -> fromIntegral x
2393+
remBuf <- newIORef bufSpace
23422394
pbMVar <- newMVar ()
23432395

23442396
stats <- newSVarStats
@@ -2353,9 +2405,8 @@ getParallelSVar ss st mrun = do
23532405
SVar { outputQueue = outQ
23542406
, outputQueueFromConsumer = outQRev
23552407
, remainingWork = yl
2356-
, maxBufferLimit = getMaxBuffer st
2408+
, maxBufferLimit = getBufferStyle st
23572409
, pushBufferSpace = remBuf
2358-
, pushBufferPolicy = PushBufferBlock
23592410
, pushBufferMVar = pbMVar
23602411
, maxWorkerLimit = Unlimited
23612412
-- Used only for diagnostics

src/Streamly/Internal/Data/Stream/Ahead.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,9 @@ underMaxHeap sv hp = do
143143

144144
-- XXX simplify this
145145
let maxHeap = case maxBufferLimit sv of
146-
Limited lim -> Limited $
146+
BufferLimited lim _ -> Limited $
147147
max 0 (lim - fromIntegral len)
148-
Unlimited -> Unlimited
148+
_ -> Unlimited
149149

150150
case maxHeap of
151151
Limited lim -> do

src/Streamly/Internal/Data/Stream/Async.hs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -296,9 +296,8 @@ getLifoSVar st mrun = do
296296
{ outputQueue = outQ
297297
, outputQueueFromConsumer = undefined
298298
, remainingWork = yl
299-
, maxBufferLimit = getMaxBuffer st
299+
, maxBufferLimit = getBufferStyle st
300300
, pushBufferSpace = undefined
301-
, pushBufferPolicy = undefined
302301
, pushBufferMVar = undefined
303302
, maxWorkerLimit = min (getMaxThreads st) (getMaxBuffer st)
304303
, yieldRateInfo = rateInfo
@@ -393,9 +392,8 @@ getFifoSVar st mrun = do
393392
{ outputQueue = outQ
394393
, outputQueueFromConsumer = undefined
395394
, remainingWork = yl
396-
, maxBufferLimit = getMaxBuffer st
395+
, maxBufferLimit = getBufferStyle st
397396
, pushBufferSpace = undefined
398-
, pushBufferPolicy = undefined
399397
, pushBufferMVar = undefined
400398
, maxWorkerLimit = min (getMaxThreads st) (getMaxBuffer st)
401399
, yieldRateInfo = rateInfo

src/Streamly/Internal/Data/Stream/Combinators.hs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,15 @@ maxThreadsSerial :: Int -> SerialT m a -> SerialT m a
6262
maxThreadsSerial _ = id
6363
-}
6464

65+
-- XXX The actual buffer size can be double of the specified value because the
66+
-- consumer thread takes the whole buffer in one go and decrements the used
67+
-- buffer space to 0. Since the full buffer space is now available to the
68+
-- producers they can again fill it even though the consumer may not yet have
69+
-- actually consumed any of the previous items. So the actual buffer is in the
70+
-- range n and 2n where n is the buffer size specified by the user. We can make
71+
-- this precise by having the consumer also modify the buffer count, but then
72+
-- there will be more lock contention.
73+
--
6574
-- | Specify the maximum size of the buffer for storing the results from
6675
-- concurrent computations. If the buffer becomes full we stop spawning more
6776
-- concurrent tasks until there is space in the buffer.

0 commit comments

Comments
 (0)