From a88a0f600539f4fc5cfe4abc3b9f5f3c292171aa Mon Sep 17 00:00:00 2001 From: Isaac Cheng Date: Thu, 22 May 2025 11:28:02 -0500 Subject: [PATCH] feat(filters): process the same headers but different order of columns for CSV files Support this scenario with same formats of CSV files but different order of header columns. Implement the process for the same headers but different order of columns for CSV files. --- .../filter/AbstractDelimitedRowFilter.java | 18 ++++ .../filepulse/filter/CSVFilterTest.java | 94 +++++++++++++++++++ 2 files changed, 112 insertions(+) diff --git a/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractDelimitedRowFilter.java b/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractDelimitedRowFilter.java index 86725b805..963e07ee4 100644 --- a/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractDelimitedRowFilter.java +++ b/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractDelimitedRowFilter.java @@ -42,6 +42,8 @@ public abstract class AbstractDelimitedRowFilter columnsTypesByIndex = new HashMap<>(); /** @@ -103,6 +105,11 @@ public RecordsIterable apply(final FilterContext context, if (schema == null || isSchemaDynamic()) { inferSchemaFromRecord(record, columnValues.length); } + + if (schema != null && configs.extractColumnName() != null && shouldInferSchema(record)) { + inferSchemaFromRecord(record, columnValues.length); + } + final TypedStruct struct = buildStructForFields(columnValues); return RecordsIterable.of(struct); } @@ -115,12 +122,23 @@ public boolean isSchemaDynamic() { configs.isAutoGenerateColumnNames(); } + private boolean shouldInferSchema(TypedStruct record) { + if (cachedHeaders == null) { + return false; + } + final String fieldName = configs.extractColumnName(); + String field = record.first(fieldName).getString(); + return cachedHeaders.length() == field.length() && !cachedHeaders.equals(field); + } + private void inferSchemaFromRecord(final TypedStruct record, int numColumns) { schema = Schema.struct(); if (configs.extractColumnName() != null) { final String fieldName = configs.extractColumnName(); String field = record.first(fieldName).getString(); + cachedHeaders = field; + if (field == null) { throw new FilterException( "Cannot find field for name '" + fieldName + "' to determine columns names" diff --git a/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/CSVFilterTest.java b/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/CSVFilterTest.java index 36ceb836c..0792709bf 100644 --- a/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/CSVFilterTest.java +++ b/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/CSVFilterTest.java @@ -39,6 +39,100 @@ public void setUp() { configs.put(CSVFilter.PARSER_SEPARATOR_CONFIG, ";"); } + @Test + public void should_extract_column_names_from_diff_order_headers() { + configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers"); + filter.configure(configs, alias -> null); + + RecordsIterable output = filter.apply(null, DEFAULT_STRUCT, false); + Assert.assertNotNull(output); + Assert.assertEquals(1, output.size()); + + final TypedStruct record = output.iterator().next(); + Assert.assertEquals("value1", record.getString("col1")); + Assert.assertEquals("2", record.getString("col2")); + Assert.assertEquals("true", record.getString("col3")); + + final TypedStruct input1 = TypedStruct.create() + .put("message", "false;3;value2") + .put("headers", Arrays.asList("col3;col2;col1")); + RecordsIterable output1 = filter.apply(null, input1, false); + Assert.assertNotNull(output1); + Assert.assertEquals(1, output1.size()); + + final TypedStruct record1 = output1.iterator().next(); + Assert.assertEquals("value2", record1.getString("col1")); + Assert.assertEquals("3", record1.getString("col2")); + Assert.assertEquals("false", record1.getString("col3")); + + final TypedStruct input2 = TypedStruct.create() + .put("message", "4;false;value3") + .put("headers", Arrays.asList("col2;col3;col1")); + + RecordsIterable output2 = filter.apply(null, input2, false); + Assert.assertNotNull(output2); + Assert.assertEquals(1, output2.size()); + + final TypedStruct record2 = output2.iterator().next(); + Assert.assertEquals("value3", record2.getString("col1")); + Assert.assertEquals("4", record2.getString("col2")); + Assert.assertEquals("false", record2.getString("col3")); + } + + @Test + public void should_extract_column_names_from_diff_order_headers_and_null_value() { + configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers"); + filter.configure(configs, alias -> null); + + RecordsIterable output = filter.apply(null, DEFAULT_STRUCT, false); + Assert.assertNotNull(output); + Assert.assertEquals(1, output.size()); + + final TypedStruct record = output.iterator().next(); + Assert.assertEquals("value1", record.getString("col1")); + Assert.assertEquals("2", record.getString("col2")); + Assert.assertEquals("true", record.getString("col3")); + + final TypedStruct input1 = TypedStruct.create() + .put("message", "false;;") + .put("headers", Arrays.asList("col3;col2;col1")); + RecordsIterable output1 = filter.apply(null, input1, false); + Assert.assertNotNull(output1); + Assert.assertEquals(1, output1.size()); + + final TypedStruct record1 = output1.iterator().next(); + Assert.assertNull(record1.getString("col1")); + Assert.assertNull(record1.getString("col2")); + Assert.assertEquals("false", record1.getString("col3")); + } + + @Test + public void should_extract_column_names_from_diff_order_headers_and_diff_size() { + configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers"); + filter.configure(configs, alias -> null); + + RecordsIterable output = filter.apply(null, DEFAULT_STRUCT, false); + Assert.assertNotNull(output); + Assert.assertEquals(1, output.size()); + + final TypedStruct record = output.iterator().next(); + Assert.assertEquals("value1", record.getString("col1")); + Assert.assertEquals("2", record.getString("col2")); + Assert.assertEquals("true", record.getString("col3")); + + final TypedStruct input1 = TypedStruct.create() + .put("message", "false;4;") + .put("headers", Arrays.asList("col3;col2")); + RecordsIterable output1 = filter.apply(null, input1, false); + Assert.assertNotNull(output1); + Assert.assertEquals(1, output1.size()); + + final TypedStruct record1 = output1.iterator().next(); + Assert.assertEquals("false", record1.getString("col1")); + Assert.assertEquals("4", record1.getString("col2")); + Assert.assertNull(record1.getString("col3")); + } + @Test public void should_auto_generate_schema_given_no_schema_field() { filter.configure(configs, alias -> null);