Skip to content

Commit a77fce1

Browse files
authored
Migrate RingBuffer to aiven-commons directory and code (#532)
closes #530 Move RingBuffer to aiven-commons directory and replaced with aiven-commons code.
1 parent 2a7928c commit a77fce1

File tree

4 files changed

+211
-117
lines changed

4 files changed

+211
-117
lines changed
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* Copyright 2025 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.commons.collections;
18+
19+
import java.util.Objects;
20+
21+
import org.apache.commons.collections4.queue.CircularFifoQueue;
22+
import org.apache.commons.collections4.queue.SynchronizedQueue;
23+
24+
/**
25+
* Implements a ring buffer of items. Items are inserted until maximum size is reached and then the earliest items are
26+
* removed when newer items are added.
27+
*
28+
* @param <K>
29+
* the type of item in the queue. Must support equality check.
30+
*/
31+
public final class RingBuffer<K> {
32+
/** How to handle the duplicates in the buffer. */
33+
public enum DuplicateHandling {
34+
/** Allow duplicates in the buffer. */
35+
ALLOW,
36+
/** Reject (do not add) duplicates to the buffer. */
37+
REJECT,
38+
/** Move the duplicate entry to the tail of the buffer. */
39+
DELETE
40+
}
41+
42+
/** The wrapped queue. */
43+
private final SynchronizedQueue<K> queue;
44+
45+
private final CircularFifoQueue<K> wrappedQueue;
46+
47+
/** Flag to indicate ring buffer should always be empty. */
48+
private final boolean alwaysEmpty;
49+
50+
/** Flag to allow duplicates in the buffer. */
51+
private final DuplicateHandling duplicateHandling;
52+
53+
/**
54+
* Create a Ring Buffer of a maximum size that rejects duplicates. If the size is less than or equal to 0 then the
55+
* buffer is always empty.
56+
*
57+
* @param size
58+
* The maximum size of the ring buffer
59+
* @see DuplicateHandling#REJECT
60+
*/
61+
public RingBuffer(final int size) {
62+
this(size, DuplicateHandling.REJECT);
63+
}
64+
65+
/**
66+
* Create a Ring Buffer of specified maximum size and potentially allowing duplicates. If the size is less than or
67+
* equal to 0 then the buffer is always empty.
68+
*
69+
* @param size
70+
* The maximum size of the ring buffer
71+
* @param duplicateHandling
72+
* defines how to handle duplicate values in the buffer.
73+
*/
74+
public RingBuffer(final int size, final DuplicateHandling duplicateHandling) {
75+
wrappedQueue = new CircularFifoQueue<>(size > 0 ? size : 1);
76+
queue = SynchronizedQueue.synchronizedQueue(wrappedQueue);
77+
alwaysEmpty = size <= 0;
78+
this.duplicateHandling = duplicateHandling;
79+
}
80+
81+
@Override
82+
public String toString() {
83+
return String.format("RingBuffer[%s, load %s/%s]", duplicateHandling, queue.size(), wrappedQueue.maxSize());
84+
}
85+
86+
/**
87+
* Adds a new item if it is not already present.
88+
*
89+
* <ul>
90+
* <li>If the buffer is always empty the item is ignored and not enqueued.
91+
* <li>If the buffer already contains the item it is ignored and not enqueued.
92+
* <li>If the buffer is full the oldest entry in the buffer is ejected.
93+
* </ul>
94+
*
95+
* @param item
96+
* Item T which is to be added to the Queue
97+
* @return The item that was ejected. May be {@code null}.
98+
*/
99+
public K add(final K item) {
100+
Objects.requireNonNull(item, "item");
101+
if (!alwaysEmpty && checkDuplicates(item)) {
102+
final K result = isFull() ? queue.poll() : null;
103+
queue.add(item);
104+
return result;
105+
}
106+
return null;
107+
}
108+
109+
/**
110+
* Removes a single instance of the item from the buffer.
111+
*
112+
* @param item
113+
* the item to remove.
114+
*/
115+
public void remove(final K item) {
116+
queue.remove(item);
117+
}
118+
119+
/**
120+
* Determines if the item is in the buffer.
121+
*
122+
* @param item
123+
* the item to look for.
124+
* @return {@code true} if the item is in the buffer, {@code false} othersie.
125+
*/
126+
public boolean contains(final K item) {
127+
return queue.contains(item);
128+
}
129+
130+
/**
131+
* Returns but does not remove the head of the buffer.
132+
*
133+
* @return the item at the head of the buffer. May be {@code null}.
134+
*/
135+
public K head() {
136+
return queue.peek();
137+
}
138+
139+
/**
140+
* Returns but does not remove the teal of the buffer.
141+
*
142+
* @return the item at the tail of the buffer. May be {@code null}.
143+
*/
144+
public K tail() {
145+
final int size = wrappedQueue.size();
146+
return size == 0 ? null : wrappedQueue.get(size - 1);
147+
}
148+
149+
private boolean checkDuplicates(final K item) {
150+
switch (duplicateHandling) {
151+
case ALLOW :
152+
return true;
153+
case REJECT :
154+
return !queue.contains(item);
155+
case DELETE :
156+
queue.remove(item);
157+
return true;
158+
default :
159+
throw new IllegalStateException("Unsupported duplicate handling: " + duplicateHandling);
160+
}
161+
}
162+
163+
/**
164+
* Returns {@code true} if the buffer is full.
165+
*
166+
* @return {@code true} if the buffer is full.
167+
*/
168+
public boolean isFull() {
169+
return wrappedQueue.isAtFullCapacity();
170+
}
171+
172+
/**
173+
* Gets the next item to be ejected. If the buffer is full this will return the oldest value in the buffer. If the
174+
* buffer is not full this method will return {@code null}.
175+
*
176+
* @return A value T from the last place in the buffer, returns null if buffer is not full.
177+
*/
178+
public K getNextEjected() {
179+
return isFull() ? queue.peek() : null;
180+
}
181+
182+
@Override
183+
public boolean equals(final Object object) {
184+
if (object == this) {
185+
return true;
186+
}
187+
return super.equals(object);
188+
}
189+
190+
@SuppressWarnings("PMD.UselessOverridingMethod")
191+
@Override
192+
public int hashCode() {
193+
return super.hashCode();
194+
}
195+
}

commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceRecordIterator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import org.apache.kafka.connect.data.SchemaAndValue;
2828

29+
import io.aiven.commons.collections.RingBuffer;
2930
import io.aiven.kafka.connect.common.config.SourceCommonConfig;
3031
import io.aiven.kafka.connect.common.source.input.Transformer;
3132
import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils;
@@ -186,12 +187,12 @@ public AbstractSourceRecordIterator(final SourceCommonConfig sourceConfig, final
186187
final public boolean hasNext() {
187188
if (!outer.hasNext() && lastSeenNativeKey != null) {
188189
// update the buffer to contain this new objectKey
189-
ringBuffer.enqueue(lastSeenNativeKey);
190+
ringBuffer.add(lastSeenNativeKey);
190191
// Remove the last seen from the offsetmanager as the file has been completely processed.
191192
offsetManager.removeEntry(getOffsetManagerKey(lastSeenNativeKey));
192193
}
193194
if (!inner.hasNext() && !outer.hasNext()) {
194-
inner = getNativeItemStream(ringBuffer.getOldest()).map(fileMatching)
195+
inner = getNativeItemStream(ringBuffer.getNextEjected()).map(fileMatching)
195196
.filter(taskAssignment)
196197
.filter(Optional::isPresent)
197198
.map(Optional::get)

commons/src/main/java/io/aiven/kafka/connect/common/source/RingBuffer.java

Lines changed: 0 additions & 104 deletions
This file was deleted.

commons/src/test/java/io/aiven/kafka/connect/common/source/RingBufferTest.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import io.aiven.commons.collections.RingBuffer;
22+
2123
import org.junit.jupiter.api.Test;
2224
import org.junit.jupiter.params.ParameterizedTest;
2325
import org.junit.jupiter.params.provider.CsvSource;
@@ -32,12 +34,12 @@ void testRingBufferReturnsOldestEntryAndRemovesOldestEntry(final int size) {
3234

3335
final RingBuffer<String> buffer = new RingBuffer<>(size);
3436
for (int i = 0; i < size; i++) {
35-
buffer.enqueue(OBJECT_KEY + i);
37+
buffer.add(OBJECT_KEY + i);
3638
}
37-
assertThat(buffer.getOldest()).isEqualTo("S3ObjectKey" + 0);
39+
assertThat(buffer.getNextEjected()).isEqualTo("S3ObjectKey" + 0);
3840
// Add one more unique ObjectKey
39-
buffer.enqueue(OBJECT_KEY);
40-
assertThat(buffer.getOldest()).isEqualTo("S3ObjectKey" + 1);
41+
buffer.add(OBJECT_KEY);
42+
assertThat(buffer.getNextEjected()).isEqualTo("S3ObjectKey" + 1);
4143
}
4244

4345
@ParameterizedTest
@@ -47,21 +49,21 @@ void testRingBufferOnlyAddsEachItemOnce(final int size) {
4749
final RingBuffer<String> buffer = new RingBuffer<>(size);
4850
for (int i = 0; i < size; i++) {
4951
// add the same objectKey every time, it should onl have one entry.
50-
buffer.enqueue(OBJECT_KEY);
52+
buffer.add(OBJECT_KEY);
5153
}
5254
// Buffer not filled so should return null
53-
assertThat(buffer.getOldest()).isEqualTo(null);
54-
assertThat(buffer.peek()).isEqualTo(OBJECT_KEY);
55+
assertThat(buffer.getNextEjected()).isEqualTo(null);
56+
assertThat(buffer.head()).isEqualTo(OBJECT_KEY);
5557
assertThat(buffer.contains(OBJECT_KEY)).isTrue();
5658
}
5759

5860
@Test
5961
void testRingBufferOfSizeOneOnlyRetainsOneEntry() {
6062

6163
final RingBuffer<String> buffer = new RingBuffer<>(1);
62-
buffer.enqueue(OBJECT_KEY + 0);
63-
assertThat(buffer.getOldest()).isEqualTo(OBJECT_KEY + 0);
64-
buffer.enqueue(OBJECT_KEY + 1);
65-
assertThat(buffer.getOldest()).isEqualTo(OBJECT_KEY + 1);
64+
buffer.add(OBJECT_KEY + 0);
65+
assertThat(buffer.getNextEjected()).isEqualTo(OBJECT_KEY + 0);
66+
buffer.add(OBJECT_KEY + 1);
67+
assertThat(buffer.getNextEjected()).isEqualTo(OBJECT_KEY + 1);
6668
}
6769
}

0 commit comments

Comments
 (0)