diff --git a/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractDelimitedRowFilter.java b/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractDelimitedRowFilter.java index 86725b805..963e07ee4 100644 --- a/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractDelimitedRowFilter.java +++ b/connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractDelimitedRowFilter.java @@ -42,6 +42,8 @@ public abstract class AbstractDelimitedRowFilter columnsTypesByIndex = new HashMap<>(); /** @@ -103,6 +105,11 @@ public RecordsIterable apply(final FilterContext context, if (schema == null || isSchemaDynamic()) { inferSchemaFromRecord(record, columnValues.length); } + + if (schema != null && configs.extractColumnName() != null && shouldInferSchema(record)) { + inferSchemaFromRecord(record, columnValues.length); + } + final TypedStruct struct = buildStructForFields(columnValues); return RecordsIterable.of(struct); } @@ -115,12 +122,23 @@ public boolean isSchemaDynamic() { configs.isAutoGenerateColumnNames(); } + private boolean shouldInferSchema(TypedStruct record) { + if (cachedHeaders == null) { + return false; + } + final String fieldName = configs.extractColumnName(); + String field = record.first(fieldName).getString(); + return cachedHeaders.length() == field.length() && !cachedHeaders.equals(field); + } + private void inferSchemaFromRecord(final TypedStruct record, int numColumns) { schema = Schema.struct(); if (configs.extractColumnName() != null) { final String fieldName = configs.extractColumnName(); String field = record.first(fieldName).getString(); + cachedHeaders = field; + if (field == null) { throw new FilterException( "Cannot find field for name '" + fieldName + "' to determine columns names" diff --git a/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/CSVFilterTest.java b/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/CSVFilterTest.java index 36ceb836c..0792709bf 100644 --- a/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/CSVFilterTest.java +++ b/connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/CSVFilterTest.java @@ -39,6 +39,100 @@ public void setUp() { configs.put(CSVFilter.PARSER_SEPARATOR_CONFIG, ";"); } + @Test + public void should_extract_column_names_from_diff_order_headers() { + configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers"); + filter.configure(configs, alias -> null); + + RecordsIterable output = filter.apply(null, DEFAULT_STRUCT, false); + Assert.assertNotNull(output); + Assert.assertEquals(1, output.size()); + + final TypedStruct record = output.iterator().next(); + Assert.assertEquals("value1", record.getString("col1")); + Assert.assertEquals("2", record.getString("col2")); + Assert.assertEquals("true", record.getString("col3")); + + final TypedStruct input1 = TypedStruct.create() + .put("message", "false;3;value2") + .put("headers", Arrays.asList("col3;col2;col1")); + RecordsIterable output1 = filter.apply(null, input1, false); + Assert.assertNotNull(output1); + Assert.assertEquals(1, output1.size()); + + final TypedStruct record1 = output1.iterator().next(); + Assert.assertEquals("value2", record1.getString("col1")); + Assert.assertEquals("3", record1.getString("col2")); + Assert.assertEquals("false", record1.getString("col3")); + + final TypedStruct input2 = TypedStruct.create() + .put("message", "4;false;value3") + .put("headers", Arrays.asList("col2;col3;col1")); + + RecordsIterable output2 = filter.apply(null, input2, false); + Assert.assertNotNull(output2); + Assert.assertEquals(1, output2.size()); + + final TypedStruct record2 = output2.iterator().next(); + Assert.assertEquals("value3", record2.getString("col1")); + Assert.assertEquals("4", record2.getString("col2")); + Assert.assertEquals("false", record2.getString("col3")); + } + + @Test + public void should_extract_column_names_from_diff_order_headers_and_null_value() { + configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers"); + filter.configure(configs, alias -> null); + + RecordsIterable output = filter.apply(null, DEFAULT_STRUCT, false); + Assert.assertNotNull(output); + Assert.assertEquals(1, output.size()); + + final TypedStruct record = output.iterator().next(); + Assert.assertEquals("value1", record.getString("col1")); + Assert.assertEquals("2", record.getString("col2")); + Assert.assertEquals("true", record.getString("col3")); + + final TypedStruct input1 = TypedStruct.create() + .put("message", "false;;") + .put("headers", Arrays.asList("col3;col2;col1")); + RecordsIterable output1 = filter.apply(null, input1, false); + Assert.assertNotNull(output1); + Assert.assertEquals(1, output1.size()); + + final TypedStruct record1 = output1.iterator().next(); + Assert.assertNull(record1.getString("col1")); + Assert.assertNull(record1.getString("col2")); + Assert.assertEquals("false", record1.getString("col3")); + } + + @Test + public void should_extract_column_names_from_diff_order_headers_and_diff_size() { + configs.put(READER_EXTRACT_COLUMN_NAME_CONFIG, "headers"); + filter.configure(configs, alias -> null); + + RecordsIterable output = filter.apply(null, DEFAULT_STRUCT, false); + Assert.assertNotNull(output); + Assert.assertEquals(1, output.size()); + + final TypedStruct record = output.iterator().next(); + Assert.assertEquals("value1", record.getString("col1")); + Assert.assertEquals("2", record.getString("col2")); + Assert.assertEquals("true", record.getString("col3")); + + final TypedStruct input1 = TypedStruct.create() + .put("message", "false;4;") + .put("headers", Arrays.asList("col3;col2")); + RecordsIterable output1 = filter.apply(null, input1, false); + Assert.assertNotNull(output1); + Assert.assertEquals(1, output1.size()); + + final TypedStruct record1 = output1.iterator().next(); + Assert.assertEquals("false", record1.getString("col1")); + Assert.assertEquals("4", record1.getString("col2")); + Assert.assertNull(record1.getString("col3")); + } + @Test public void should_auto_generate_schema_given_no_schema_field() { filter.configure(configs, alias -> null);