@@ -104,7 +104,7 @@ public void tryNextNotAllowed() {
104
104
new AsyncResultSetImpl (
105
105
mockedProvider , mock (ResultSet .class ), AsyncResultSetImpl .DEFAULT_BUFFER_SIZE )) {
106
106
rs .setCallback (mock (Executor .class ), mock (ReadyCallback .class ));
107
- IllegalStateException e = assertThrows (IllegalStateException .class , () -> rs . tryNext () );
107
+ IllegalStateException e = assertThrows (IllegalStateException .class , rs :: tryNext );
108
108
assertThat (e .getMessage ()).contains ("tryNext may only be called from a DataReady callback." );
109
109
}
110
110
}
@@ -152,7 +152,7 @@ public void toListAsync() throws InterruptedException, ExecutionException {
152
152
}
153
153
154
154
@ Test
155
- public void toListAsyncPropagatesError () throws InterruptedException {
155
+ public void toListAsyncPropagatesError () {
156
156
ExecutorService executor = Executors .newFixedThreadPool (1 );
157
157
ResultSet delegate = mock (ResultSet .class );
158
158
when (delegate .next ())
@@ -326,10 +326,7 @@ public void testCallbackIsNotCalledWhilePaused() throws InterruptedException, Ex
326
326
@ Override
327
327
public Boolean answer (InvocationOnMock invocation ) throws Throwable {
328
328
row ++;
329
- if (row > simulatedRows ) {
330
- return false ;
331
- }
332
- return true ;
329
+ return row <= simulatedRows ;
333
330
}
334
331
});
335
332
when (delegate .getCurrentRowAsStruct ()).thenReturn (mock (Struct .class ));
@@ -345,17 +342,17 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
345
342
assertFalse (paused .get ());
346
343
callbackCounter .incrementAndGet ();
347
344
try {
348
- while ( true ) {
349
- switch ( resultSet . tryNext ()) {
350
- case OK :
351
- paused . set ( true );
352
- queue . put ( new Object ()) ;
353
- return CallbackResponse . PAUSE ;
354
- case DONE :
355
- return CallbackResponse . DONE ;
356
- case NOT_READY :
357
- return CallbackResponse . CONTINUE ;
358
- }
345
+ switch ( resultSet . tryNext () ) {
346
+ case OK :
347
+ paused . set ( true );
348
+ queue . put ( new Object () );
349
+ return CallbackResponse . PAUSE ;
350
+ case DONE :
351
+ return CallbackResponse . DONE ;
352
+ case NOT_READY :
353
+ return CallbackResponse . CONTINUE ;
354
+ default :
355
+ throw new IllegalStateException ();
359
356
}
360
357
} catch (InterruptedException e ) {
361
358
throw SpannerExceptionFactory .propagateInterrupt (e );
@@ -384,9 +381,8 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
384
381
}
385
382
386
383
@ Test
387
- public void testCallbackIsNotCalledWhilePausedAndCanceled ()
388
- throws InterruptedException , ExecutionException {
389
- Executor executor = Executors .newSingleThreadExecutor ();
384
+ public void testCallbackIsNotCalledWhilePausedAndCanceled () {
385
+ ExecutorService executor = Executors .newSingleThreadExecutor ();
390
386
StreamingResultSet delegate = mock (StreamingResultSet .class );
391
387
392
388
final AtomicInteger callbackCounter = new AtomicInteger ();
@@ -414,6 +410,8 @@ public void testCallbackIsNotCalledWhilePausedAndCanceled()
414
410
SpannerException exception = assertThrows (SpannerException .class , () -> get (callbackResult ));
415
411
assertEquals (ErrorCode .CANCELLED , exception .getErrorCode ());
416
412
assertEquals (1 , callbackCounter .get ());
413
+ } finally {
414
+ executor .shutdown ();
417
415
}
418
416
}
419
417
0 commit comments