Skip to content

Commit e8d58db

Browse files
authored
Bouncer: Acquire child tickets before parent tickets. (#18543)
Follow up to #18520. The logic introduced in that patch for parent/child relationships was not ideal, since it would lead to parent tickets being acquired even if the child was at its max count. In production, this would cause a query with maxThreads = 1 to acquire more than 1 thread ticket from the global Bouncer.
1 parent 8346b50 commit e8d58db

File tree

2 files changed

+165
-15
lines changed

2 files changed

+165
-15
lines changed

processing/src/main/java/org/apache/druid/frame/processor/Bouncer.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -89,24 +89,16 @@ public int getMaxCount()
8989

9090
public ListenableFuture<Ticket> ticket()
9191
{
92-
// Acquire parent ticket first, if there's a parent.
93-
if (parentBouncer != null) {
94-
return FutureUtils.transformAsync(parentBouncer.ticket(), this::ticketInternal);
95-
} else {
96-
return ticketInternal(null);
97-
}
98-
}
99-
100-
/**
101-
* Acquire a ticket from this Bouncer. Precondition: if there is a parentBouncer, only call this method when
102-
* holding a parent ticket.
103-
*/
104-
private ListenableFuture<Ticket> ticketInternal(@Nullable final Ticket parentTicket)
105-
{
92+
// Acquire our ticket first, then acquire a parent ticket. Only return our ticket once the parent ticket
93+
// is also acquired.
10694
synchronized (lock) {
10795
if (currentCount < maxCount) {
10896
currentCount++;
109-
return Futures.immediateFuture(new Ticket(parentTicket));
97+
if (parentBouncer != null) {
98+
return FutureUtils.transform(parentBouncer.ticket(), Ticket::new);
99+
} else {
100+
return Futures.immediateFuture(new Ticket(null));
101+
}
110102
} else {
111103
final SettableFuture<Ticket> future = SettableFuture.create();
112104
waiters.add(future);
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.frame.processor;
21+
22+
import com.google.common.util.concurrent.ListenableFuture;
23+
import org.junit.Assert;
24+
import org.junit.Test;
25+
26+
import java.util.concurrent.ExecutionException;
27+
28+
/**
29+
* Simple single-threaded tests for {@link Bouncer}.
30+
*
31+
* For multithreaded tests, see {@link RunAllFullyWidgetTest}.
32+
*/
33+
public class BouncerTest
34+
{
35+
@Test
36+
public void testBouncerWithoutParent() throws ExecutionException, InterruptedException
37+
{
38+
final Bouncer bouncer = new Bouncer(2);
39+
40+
Assert.assertEquals(2, bouncer.getMaxCount());
41+
Assert.assertEquals(0, bouncer.getCurrentCount());
42+
43+
// First ticket should be immediately available since count is 0 < maxCount
44+
final ListenableFuture<Bouncer.Ticket> future1 = bouncer.ticket();
45+
Assert.assertTrue(future1.isDone());
46+
final Bouncer.Ticket ticket1 = future1.get();
47+
Assert.assertEquals(1, bouncer.getCurrentCount());
48+
49+
// Second ticket should be immediately available since count is 1 < maxCount
50+
final ListenableFuture<Bouncer.Ticket> future2 = bouncer.ticket();
51+
Assert.assertTrue(future2.isDone());
52+
final Bouncer.Ticket ticket2 = future2.get();
53+
Assert.assertEquals(2, bouncer.getCurrentCount());
54+
55+
// Third ticket should not be ready yet because maxCount is reached
56+
final ListenableFuture<Bouncer.Ticket> future3 = bouncer.ticket();
57+
Assert.assertFalse(future3.isDone());
58+
Assert.assertEquals(2, bouncer.getCurrentCount());
59+
60+
// Giving back ticket1 should make future3 ready and transfer the slot
61+
ticket1.giveBack();
62+
Assert.assertEquals(2, bouncer.getCurrentCount());
63+
Assert.assertTrue(future3.isDone());
64+
final Bouncer.Ticket ticket3 = future3.get();
65+
66+
// Giving back ticket2 should decrease count since no waiters remain
67+
ticket2.giveBack();
68+
Assert.assertEquals(1, bouncer.getCurrentCount());
69+
70+
// Giving back ticket3 should decrease count to 0 since no waiters remain
71+
ticket3.giveBack();
72+
Assert.assertEquals(0, bouncer.getCurrentCount());
73+
}
74+
75+
@Test
76+
public void testBouncerWithParentMaxCountOne() throws ExecutionException, InterruptedException
77+
{
78+
final Bouncer parentBouncer = new Bouncer(1);
79+
final Bouncer bouncer = new Bouncer(2, parentBouncer);
80+
81+
Assert.assertEquals(1, bouncer.getMaxCount());
82+
Assert.assertEquals(0, bouncer.getCurrentCount());
83+
Assert.assertEquals(0, parentBouncer.getCurrentCount());
84+
85+
// First ticket should be immediately available, requiring both child and parent slots
86+
final ListenableFuture<Bouncer.Ticket> future1 = bouncer.ticket();
87+
Assert.assertTrue(future1.isDone());
88+
final Bouncer.Ticket ticket1 = future1.get();
89+
Assert.assertEquals(1, bouncer.getCurrentCount());
90+
Assert.assertEquals(1, parentBouncer.getCurrentCount());
91+
92+
// Second ticket should not be ready yet because parent maxCount is reached
93+
final ListenableFuture<Bouncer.Ticket> future2 = bouncer.ticket();
94+
Assert.assertFalse(future2.isDone());
95+
Assert.assertEquals(2, bouncer.getCurrentCount());
96+
Assert.assertEquals(1, parentBouncer.getCurrentCount());
97+
98+
// Giving back ticket1 should make future2 ready and reuse the parent slot
99+
ticket1.giveBack();
100+
Assert.assertEquals(1, bouncer.getCurrentCount());
101+
Assert.assertEquals(1, parentBouncer.getCurrentCount());
102+
Assert.assertTrue(future2.isDone());
103+
final Bouncer.Ticket ticket2 = future2.get();
104+
105+
// Giving back ticket2 should return both child and parent slots to 0
106+
ticket2.giveBack();
107+
Assert.assertEquals(0, bouncer.getCurrentCount());
108+
Assert.assertEquals(0, parentBouncer.getCurrentCount());
109+
}
110+
111+
@Test
112+
public void testBouncerWithParentMaxCountThree() throws ExecutionException, InterruptedException
113+
{
114+
final Bouncer parentBouncer = new Bouncer(3);
115+
final Bouncer bouncer = new Bouncer(2, parentBouncer);
116+
117+
Assert.assertEquals(2, bouncer.getMaxCount());
118+
Assert.assertEquals(0, bouncer.getCurrentCount());
119+
Assert.assertEquals(0, parentBouncer.getCurrentCount());
120+
121+
// First ticket should be immediately available, child is limiting factor
122+
final ListenableFuture<Bouncer.Ticket> future1 = bouncer.ticket();
123+
Assert.assertTrue(future1.isDone());
124+
final Bouncer.Ticket ticket1 = future1.get();
125+
Assert.assertEquals(1, bouncer.getCurrentCount());
126+
Assert.assertEquals(1, parentBouncer.getCurrentCount());
127+
128+
// Second ticket should be immediately available, child is limiting factor
129+
final ListenableFuture<Bouncer.Ticket> future2 = bouncer.ticket();
130+
Assert.assertTrue(future2.isDone());
131+
final Bouncer.Ticket ticket2 = future2.get();
132+
Assert.assertEquals(2, bouncer.getCurrentCount());
133+
Assert.assertEquals(2, parentBouncer.getCurrentCount());
134+
135+
// Third ticket should not be ready yet because child maxCount is reached
136+
final ListenableFuture<Bouncer.Ticket> future3 = bouncer.ticket();
137+
Assert.assertFalse(future3.isDone());
138+
Assert.assertEquals(2, bouncer.getCurrentCount());
139+
Assert.assertEquals(2, parentBouncer.getCurrentCount());
140+
141+
// Giving back ticket1 should make future3 ready and transfer the slot
142+
ticket1.giveBack();
143+
Assert.assertEquals(2, bouncer.getCurrentCount());
144+
Assert.assertEquals(2, parentBouncer.getCurrentCount());
145+
Assert.assertTrue(future3.isDone());
146+
final Bouncer.Ticket ticket3 = future3.get();
147+
148+
// Giving back ticket2 should decrease counts since no waiters remain
149+
ticket2.giveBack();
150+
Assert.assertEquals(1, bouncer.getCurrentCount());
151+
Assert.assertEquals(1, parentBouncer.getCurrentCount());
152+
153+
// Giving back ticket3 should return both counts to 0 since no waiters remain
154+
ticket3.giveBack();
155+
Assert.assertEquals(0, bouncer.getCurrentCount());
156+
Assert.assertEquals(0, parentBouncer.getCurrentCount());
157+
}
158+
}

0 commit comments

Comments
 (0)