Skip to content

Commit d867c33

Browse files
committed
Added support for timeouts on consuming from the ring buffer
1 parent 8e128d6 commit d867c33

File tree

8 files changed

+171
-3
lines changed

8 files changed

+171
-3
lines changed

README.md

+7
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ Maintainer
1111
Changelog
1212
==========
1313

14+
## 3.0.0.beta3 Released (20-Feb-2013)
15+
16+
- Significant Javadoc updates (thanks Jason Koch)
17+
- DSL support for WorkerPool
18+
- Small performance tweaks
19+
- Add TimeoutHandler and TimeoutBlockingWaitStrategy and support timeouts in BatchEventProcessor
20+
1421
## 3.0.0.beta2 Released (7-Jan-2013)
1522

1623
- Remove millisecond wakeup from BlockingWaitStrategy

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ apply plugin: 'eclipse' // Only used so the Eclipse STS Gradle plugin can see
2222
defaultTasks 'build'
2323

2424
group = 'com.lmax'
25-
version = new Version(major: 3, stage: 'beta2')
25+
version = new Version(major: 3, stage: 'beta3')
2626

2727
ext {
2828
fullName = 'Disruptor Framework'

src/main/java/com/lmax/disruptor/BatchEventProcessor.java

+23
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.concurrent.atomic.AtomicBoolean;
1919

20+
2021
/**
2122
* Convenience class for handling the batching semantics of consuming entries from a {@link RingBuffer}
2223
* and delegating the available events to an {@link EventHandler}.
@@ -35,6 +36,7 @@ public final class BatchEventProcessor<T>
3536
private final SequenceBarrier sequenceBarrier;
3637
private final EventHandler<T> eventHandler;
3738
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
39+
private final TimeoutHandler timeoutHandler;
3840

3941
/**
4042
* Construct a {@link EventProcessor} that will automatically track the progress by updating its sequence when
@@ -56,6 +58,8 @@ public BatchEventProcessor(final RingBuffer<T> ringBuffer,
5658
{
5759
((SequenceReportingEventHandler<?>)eventHandler).setSequenceCallback(sequence);
5860
}
61+
62+
timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
5963
}
6064

6165
@Override
@@ -109,6 +113,13 @@ public void run()
109113
try
110114
{
111115
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
116+
117+
if (availableSequence < nextSequence && timeoutHandler != null)
118+
{
119+
notifyTimeout(availableSequence);
120+
continue;
121+
}
122+
112123
while (nextSequence <= availableSequence)
113124
{
114125
event = ringBuffer.getPublished(nextSequence);
@@ -138,6 +149,18 @@ public void run()
138149
running.set(false);
139150
}
140151

152+
private void notifyTimeout(final long availableSequence) throws Exception
153+
{
154+
try
155+
{
156+
timeoutHandler.onTimeout(availableSequence);
157+
}
158+
catch (Throwable e)
159+
{
160+
exceptionHandler.handleEventException(e, availableSequence, null);
161+
}
162+
}
163+
141164
/**
142165
* Notifies the EventHandler when this processor is starting up
143166
*/

src/main/java/com/lmax/disruptor/ExceptionHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public interface ExceptionHandler
2828
*
2929
* @param ex the exception that propagated from the {@link EventHandler}.
3030
* @param sequence of the event which cause the exception.
31-
* @param event being processed when the exception occurred.
31+
* @param event being processed when the exception occurred. This can be null.
3232
*/
3333
void handleEventException(Throwable ex, long sequence, Object event);
3434

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package com.lmax.disruptor;
2+
3+
import java.util.concurrent.TimeUnit;
4+
import java.util.concurrent.locks.Condition;
5+
import java.util.concurrent.locks.Lock;
6+
import java.util.concurrent.locks.ReentrantLock;
7+
8+
import com.lmax.disruptor.AlertException;
9+
import com.lmax.disruptor.Sequence;
10+
import com.lmax.disruptor.SequenceBarrier;
11+
import com.lmax.disruptor.WaitStrategy;
12+
13+
public class TimeoutBlockingWaitStrategy implements WaitStrategy
14+
{
15+
private final Lock lock = new ReentrantLock();
16+
private final Condition processorNotifyCondition = lock.newCondition();
17+
private final long timeoutInNanos;
18+
19+
public TimeoutBlockingWaitStrategy(final long timeout, final TimeUnit units)
20+
{
21+
timeoutInNanos = units.toNanos(timeout);
22+
}
23+
24+
@Override
25+
public long waitFor(final long sequence,
26+
final Sequence cursorSequence,
27+
final Sequence dependentSequence,
28+
final SequenceBarrier barrier)
29+
throws AlertException, InterruptedException
30+
{
31+
long nanos = timeoutInNanos;
32+
33+
long availableSequence;
34+
if ((availableSequence = cursorSequence.get()) < sequence)
35+
{
36+
lock.lock();
37+
try
38+
{
39+
while ((availableSequence = cursorSequence.get()) < sequence)
40+
{
41+
barrier.checkAlert();
42+
nanos = processorNotifyCondition.awaitNanos(nanos);
43+
if (nanos <= 0)
44+
{
45+
return availableSequence;
46+
}
47+
}
48+
}
49+
finally
50+
{
51+
lock.unlock();
52+
}
53+
}
54+
55+
while ((availableSequence = dependentSequence.get()) < sequence)
56+
{
57+
barrier.checkAlert();
58+
}
59+
60+
return availableSequence;
61+
}
62+
63+
@Override
64+
public void signalAllWhenBlocking()
65+
{
66+
lock.lock();
67+
try
68+
{
69+
processorNotifyCondition.signalAll();
70+
}
71+
finally
72+
{
73+
lock.unlock();
74+
}
75+
}
76+
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.lmax.disruptor;
2+
3+
public interface TimeoutHandler
4+
{
5+
void onTimeout(long sequence) throws Exception;
6+
}

src/main/java/com/lmax/disruptor/WaitStrategy.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222
public interface WaitStrategy
2323
{
2424
/**
25-
* Wait for the given sequence to be available
25+
* Wait for the given sequence to be available. It is possible for this method to return a value
26+
* less than the sequence number supplied depending on the implementation of the WaitStrategy. A common
27+
* use for this is to signal a timeout. Any EventProcessor that is using a WaitStragegy to get notifications
28+
* about message becoming available should remember to handle this case. The {@link BatchEventProcessor} explicitly
29+
* handles this case and will signal a timeout if required.
2630
*
2731
* @param sequence to be waited on.
2832
* @param cursor the main sequence from ringbuffer. Wait/notify strategies will
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.lmax.disruptor;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
import com.lmax.disruptor.Sequence;
6+
import com.lmax.disruptor.SequenceBarrier;
7+
8+
import org.jmock.Expectations;
9+
import org.jmock.Mockery;
10+
import org.jmock.integration.junit4.JMock;
11+
import org.junit.Test;
12+
import org.junit.runner.RunWith;
13+
14+
import static org.hamcrest.CoreMatchers.is;
15+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
16+
import static org.junit.Assert.assertThat;
17+
18+
@RunWith(JMock.class)
19+
public class TimeoutBlockingWaitStrategyTest
20+
{
21+
private final Mockery mockery = new Mockery();
22+
23+
@Test
24+
public void shouldTimeoutWaitFor() throws Exception
25+
{
26+
final SequenceBarrier sequenceBarrier = mockery.mock(SequenceBarrier.class);
27+
28+
long theTimeout = 500;
29+
TimeoutBlockingWaitStrategy waitStrategy = new TimeoutBlockingWaitStrategy(theTimeout, TimeUnit.MILLISECONDS);
30+
Sequence cursor = new Sequence(5);
31+
Sequence dependent = cursor;
32+
33+
mockery.checking(new Expectations()
34+
{
35+
{
36+
allowing(sequenceBarrier).checkAlert();
37+
}
38+
});
39+
40+
long t0 = System.currentTimeMillis();
41+
42+
long sequence = waitStrategy.waitFor(6, cursor, dependent, sequenceBarrier);
43+
44+
long t1 = System.currentTimeMillis();
45+
46+
long timeWaiting = t1 - t0;
47+
48+
assertThat(sequence, is(5L));
49+
assertThat(timeWaiting, greaterThanOrEqualTo(theTimeout));
50+
}
51+
}

0 commit comments

Comments
 (0)