Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
Expand All @@ -31,7 +35,9 @@

import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.NoSuchFileException;
import java.util.HashMap;
import java.util.List;
Expand All @@ -42,6 +48,9 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class DeleteRecordsCommandTest {

Expand Down Expand Up @@ -162,6 +171,83 @@ public void testParse() throws Exception {
assertEquals(List.of(1L), res.get(new TopicPartition("t", 1)));
}

@Test
public void testExecuteSuccessOutputWithMockAdmin() throws Exception {
TopicPartition tp = new TopicPartition("t", 0);

KafkaFuture<DeletedRecords> future = KafkaFuture.completedFuture(new DeletedRecords(5L));

DeleteRecordsResult deleteResult = mock(DeleteRecordsResult.class);
when(deleteResult.lowWatermarks()).thenReturn(Map.of(tp, future));

Admin mockAdmin = mock(Admin.class);
when(mockAdmin.deleteRecords(any())).thenReturn(deleteResult);

ByteArrayOutputStream bout = new ByteArrayOutputStream();
DeleteRecordsCommand.execute(
mockAdmin,
"{\"partitions\":[{\"topic\":\"t\",\"partition\":0,\"offset\":5}]}",
new PrintStream(bout)
);

String output = bout.toString();
assertTrue(output.contains("Executing records delete operation"));
assertTrue(output.contains("Records delete operation completed:"));
assertTrue(output.contains("partition: t-0\tlow_watermark: 5"));
}

@Test
public void testExecutePartitionErrorWithMockAdmin() throws Exception {
TopicPartition tp = new TopicPartition("t", 0);

KafkaFutureImpl<DeletedRecords> future = new KafkaFutureImpl<>();
future.completeExceptionally(new RuntimeException("Partition not found"));

DeleteRecordsResult deleteResult = mock(DeleteRecordsResult.class);
when(deleteResult.lowWatermarks()).thenReturn(Map.of(tp, future));

Admin mockAdmin = mock(Admin.class);
when(mockAdmin.deleteRecords(any())).thenReturn(deleteResult);

ByteArrayOutputStream bout = new ByteArrayOutputStream();
DeleteRecordsCommand.execute(
mockAdmin,
"{\"partitions\":[{\"topic\":\"t\",\"partition\":0,\"offset\":1}]}",
new PrintStream(bout)
);

String output = bout.toString();
assertTrue(output.contains("partition: t-0\terror:"));
assertTrue(output.contains("Partition not found"));
}

@Test
public void testExecuteDuplicatePartitionsWithoutCluster() {
Admin mockAdmin = mock(Admin.class);

AdminCommandFailedException exception = assertThrows(
AdminCommandFailedException.class,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind checking the error message as well?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — the duplicate-partition test now asserts the full AdminCommandFailedException message, and I also tightened the per-partition error test to check the printed error text (Partition not found). Thanks for the review, @chia7712!

() -> DeleteRecordsCommand.execute(
mockAdmin,
"{\"partitions\":[" +
"{\"topic\":\"t\",\"partition\":0,\"offset\":1}," +
"{\"topic\":\"t\",\"partition\":0,\"offset\":2}]}",
System.out
)
);

assertEquals(
"Offset json file contains duplicate topic partitions: t-0",
exception.getMessage()
);
}

@Test
public void testParseInvalidJsonThrows() {
assertCommandThrows(AdminOperationException.class, "not-valid-json");
assertCommandThrows(AdminOperationException.class, "");
}

/**
* Asserts that {@link DeleteRecordsCommand#parseOffsetJsonStringWithoutDedup(String)} throws {@link AdminOperationException}.
* @param jsonData Data to check.
Expand Down