Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions docs/content.zh/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ under the License.
| primary-keys | Sink table primary keys, separated by commas | optional |
| partition-keys | Sink table partition keys, separated by commas | optional |
| table-options | used to the configure table creation statement when automatically creating tables | optional |
| table-options.delimiter | delimiter for table-options key-value pairs, default is `,` | optional |
| converter-after-transform | used to add a converter to change DataChangeEvent after transform | optional |
| description | Transform rule description | optional |

Expand Down Expand Up @@ -311,7 +312,13 @@ transform:
table-options: comment=web order
description: auto creating table options example
```
小技巧:table-options 的格式是 `key1=value1,key2=value2`。
小技巧:table-options 的格式是 `key1=value1,key2=value2`;如果 value 中包含逗号或其他特殊字符,可以使用 `table-options.delimiter` 指定自定义分隔符(如 `;`、`|`、`$` 等):
```yaml
transform:
- source-table: mydb.web_order
table-options: sequence.field=gxsj,jjsj;file-index.bloom-filter.columns=jjdbh
table-options.delimiter: ";"
```

## Classification mapping
多个转换规则可以定义为分类映射。
Expand Down Expand Up @@ -466,4 +473,4 @@ pipeline:
|---------------|--------|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| openai.model | STRING | required | Name of model to be called, for example: "text-embedding-3-small", Available options are "text-embedding-3-small", "text-embedding-3-large", "text-embedding-ada-002". |
| openai.host | STRING | required | Host of the Model server to be connected, for example: `http://langchain4j.dev/demo/openai/v1`. |
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
10 changes: 9 additions & 1 deletion docs/content/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ To describe a transform rule, the following parameters can be used:
| primary-keys | Sink table primary keys, separated by commas | optional |
| partition-keys | Sink table partition keys, separated by commas | optional |
| table-options | used to the configure table creation statement when automatically creating tables | optional |
| table-options.delimiter | delimiter for table-options key-value pairs, default is `,` | optional |
| converter-after-transform | used to add a converter to change DataChangeEvent after transform | optional |
| description | Transform rule description | optional |

Expand Down Expand Up @@ -315,6 +316,13 @@ transform:
description: auto creating table options example
```
Tips: The format of table-options is `key1=value1,key2=value2`.
If option values contain commas or other special characters, you can specify a custom delimiter using `table-options.delimiter` (such as `;`, `|`, `$`, etc.):
```yaml
transform:
- source-table: mydb.web_order
table-options: sequence.field=gxsj,jjsj;file-index.bloom-filter.columns=jjdbh
table-options.delimiter: ";"
```

## Classification mapping
Multiple transform rules can be defined to classify input data rows and apply different processing.
Expand Down Expand Up @@ -471,4 +479,4 @@ The following built-in models are provided:
|---------------|--------|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| openai.model | STRING | required | Name of model to be called, for example: "text-embedding-3-small", Available options are "text-embedding-3-small", "text-embedding-3-large", "text-embedding-ada-002". |
| openai.host | STRING | required | Host of the Model server to be connected, for example: `http://langchain4j.dev/demo/openai/v1`. |
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {

public static final String TRANSFORM_TABLE_OPTION_KEY = "table-options";

public static final String TRANSFORM_TABLE_OPTION_DELIMITER_KEY = "table-options.delimiter";

private final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());

/** Parse the specified pipeline definition file. */
Expand Down Expand Up @@ -324,6 +326,7 @@ private TransformDef toTransformDef(JsonNode transformNode) {
TRANSFORM_PRIMARY_KEY_KEY,
TRANSFORM_PARTITION_KEY_KEY,
TRANSFORM_TABLE_OPTION_KEY,
TRANSFORM_TABLE_OPTION_DELIMITER_KEY,
TRANSFORM_DESCRIPTION_KEY,
TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY));

Expand Down Expand Up @@ -357,6 +360,10 @@ private TransformDef toTransformDef(JsonNode transformNode) {
Optional.ofNullable(transformNode.get(TRANSFORM_TABLE_OPTION_KEY))
.map(JsonNode::asText)
.orElse(null);
String tableOptionsDelimiter =
Optional.ofNullable(transformNode.get(TRANSFORM_TABLE_OPTION_DELIMITER_KEY))
.map(JsonNode::asText)
.orElse(null);
String description =
Optional.ofNullable(transformNode.get(TRANSFORM_DESCRIPTION_KEY))
.map(JsonNode::asText)
Expand All @@ -373,6 +380,7 @@ private TransformDef toTransformDef(JsonNode transformNode) {
primaryKeys,
partitionKeys,
tableOptions,
tableOptionsDelimiter,
description,
postTransformConverter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
* by `,`. Optional for the definition.
* <li>tableOptions: a string for table options for matching input table IDs, options are
* seperated by `,`, key and value are seperated by `=`. Optional for the definition.
* <li>tableOptionsDelimiter: a string for delimiter of table options, default is `,`. Optional
* for the definition.
* <li>description: description for the transformation. Optional for the definition.
* </ul>
*/
Expand All @@ -47,6 +49,7 @@ public class TransformDef {
private final String primaryKeys;
private final String partitionKeys;
private final String tableOptions;
private final String tableOptionsDelimiter;
private final String postTransformConverter;

public TransformDef(
Expand All @@ -56,6 +59,7 @@ public TransformDef(
String primaryKeys,
String partitionKeys,
String tableOptions,
String tableOptionsDelimiter,
String description,
String postTransformConverter) {
this.sourceTable = sourceTable;
Expand All @@ -64,10 +68,30 @@ public TransformDef(
this.primaryKeys = primaryKeys;
this.partitionKeys = partitionKeys;
this.tableOptions = tableOptions;
this.tableOptionsDelimiter = tableOptionsDelimiter;
this.description = description;
this.postTransformConverter = postTransformConverter;
}

public TransformDef(
String sourceTable,
String projection,
String filter,
String primaryKeys,
String partitionKeys,
String tableOptions,
String description,
String postTransformConverter) {
this(
sourceTable,
projection,
filter,
primaryKeys,
partitionKeys,
tableOptions,
",",
description,
postTransformConverter);
}
public String getSourceTable() {
return sourceTable;
}
Expand Down Expand Up @@ -96,6 +120,10 @@ public String getTableOptions() {
return tableOptions;
}

public String getTableOptionsDelimiter() {
return tableOptionsDelimiter;
}

public String getPostTransformConverter() {
return postTransformConverter;
}
Expand Down Expand Up @@ -137,6 +165,7 @@ public boolean equals(Object o) {
&& Objects.equals(primaryKeys, that.primaryKeys)
&& Objects.equals(partitionKeys, that.partitionKeys)
&& Objects.equals(tableOptions, that.tableOptions)
&& Objects.equals(tableOptionsDelimiter, that.tableOptionsDelimiter)
&& Objects.equals(postTransformConverter, that.postTransformConverter);
}

Expand All @@ -150,6 +179,7 @@ public int hashCode() {
primaryKeys,
partitionKeys,
tableOptions,
tableOptionsDelimiter,
postTransformConverter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ private PreTransformOperator generatePreTransform(
transform.getPrimaryKeys(),
transform.getPartitionKeys(),
transform.getTableOptions(),
transform.getTableOptionsDelimiter(),
transform.getPostTransformConverter(),
supportedMetadataColumns);
}
Expand Down Expand Up @@ -112,6 +113,7 @@ public DataStream<Event> translatePostTransform(
transform.getPrimaryKeys(),
transform.getPartitionKeys(),
transform.getTableOptions(),
transform.getTableOptionsDelimiter(),
transform.getPostTransformConverter(),
supportedMetadataColumns);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,28 @@ public PostTransformOperatorBuilder addTransform(
String tableOptions,
String postTransformConverter,
SupportedMetadataColumn[] supportedMetadataColumns) {
return addTransform(
tableInclusions,
projection,
filter,
primaryKey,
partitionKey,
tableOptions,
null,
postTransformConverter,
supportedMetadataColumns);
}

public PostTransformOperatorBuilder addTransform(
String tableInclusions,
@Nullable String projection,
@Nullable String filter,
String primaryKey,
String partitionKey,
String tableOptions,
String tableOptionsDelimiter,
String postTransformConverter,
SupportedMetadataColumn[] supportedMetadataColumns) {
transformRules.add(
new TransformRule(
tableInclusions,
Expand All @@ -52,6 +74,7 @@ public PostTransformOperatorBuilder addTransform(
primaryKey,
partitionKey,
tableOptions,
tableOptionsDelimiter,
postTransformConverter,
supportedMetadataColumns));
return this;
Expand All @@ -67,6 +90,7 @@ public PostTransformOperatorBuilder addTransform(
"",
"",
"",
"",
null,
new SupportedMetadataColumn[0]));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public void setup(
String primaryKeys = transformRule.getPrimaryKey();
String partitionKeys = transformRule.getPartitionKey();
String tableOptions = transformRule.getTableOption();
String tableOptionsDelimiter = transformRule.getTableOptionsDelimiter();
Selectors selectors =
new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
transforms.add(
Expand All @@ -120,7 +121,11 @@ public void setup(
schemaMetadataTransformers.add(
new Tuple2<>(
selectors,
new SchemaMetadataTransform(primaryKeys, partitionKeys, tableOptions)));
new SchemaMetadataTransform(
primaryKeys,
partitionKeys,
tableOptions,
tableOptionsDelimiter)));
}
this.preTransformProcessorMap = new ConcurrentHashMap<>();
this.hasAsteriskMap = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public PreTransformOperatorBuilder addTransform(
"",
"",
"",
"",
null,
new SupportedMetadataColumn[0]));
return this;
Expand All @@ -57,6 +58,28 @@ public PreTransformOperatorBuilder addTransform(
String tableOption,
@Nullable String postTransformConverter,
SupportedMetadataColumn[] supportedMetadataColumns) {
return addTransform(
tableInclusions,
projection,
filter,
primaryKey,
partitionKey,
tableOption,
null,
postTransformConverter,
supportedMetadataColumns);
}

public PreTransformOperatorBuilder addTransform(
String tableInclusions,
@Nullable String projection,
@Nullable String filter,
String primaryKey,
String partitionKey,
String tableOption,
String tableOptionsDelimiter,
@Nullable String postTransformConverter,
SupportedMetadataColumn[] supportedMetadataColumns) {
transformRules.add(
new TransformRule(
tableInclusions,
Expand All @@ -65,6 +88,7 @@ public PreTransformOperatorBuilder addTransform(
primaryKey,
partitionKey,
tableOption,
tableOptionsDelimiter,
postTransformConverter,
supportedMetadataColumns));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ public class SchemaMetadataTransform implements Serializable {
private Map<String, String> options = new HashMap<>();

public SchemaMetadataTransform(
String primaryKeyString, String partitionKeyString, String tableOptionString) {
String primaryKeyString,
String partitionKeyString,
String tableOptionString,
String tableOptionsDelimiter) {
if (!StringUtils.isNullOrWhitespaceOnly(primaryKeyString)) {
String[] primaryKeyArr = primaryKeyString.split(",");
for (int i = 0; i < primaryKeyArr.length; i++) {
Expand All @@ -58,13 +61,19 @@ public SchemaMetadataTransform(
partitionKeys = Arrays.asList(partitionKeyArr);
}
if (!StringUtils.isNullOrWhitespaceOnly(tableOptionString)) {
for (String tableOption : tableOptionString.split(",")) {
String[] kv = tableOption.split("=");
// Use custom delimiter if provided, otherwise default to comma for backward
// compatibility.
String delimiter =
StringUtils.isNullOrWhitespaceOnly(tableOptionsDelimiter)
? ","
: tableOptionsDelimiter;
for (String tableOption : tableOptionString.split(delimiter)) {
String[] kv = tableOption.split("=", 2);
if (kv.length != 2) {
throw new IllegalArgumentException(
"table option format error: "
+ tableOptionString
+ ", it should be like `key1=value1,key2=value2`.");
String.format(
"table option format error: %s, it should be like `key1=value1%skey2=value2`.",
tableOptionString, delimiter));
}
options.put(kv[0].trim(), kv[1].trim());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class TransformRule implements Serializable {
private final String primaryKey;
private final String partitionKey;
private final String tableOption;
private final String tableOptionsDelimiter;
private final @Nullable String postTransformConverter;
private final SupportedMetadataColumn[] supportedMetadataColumns;

Expand All @@ -44,6 +45,7 @@ public TransformRule(
String primaryKey,
String partitionKey,
String tableOption,
String tableOptionsDelimiter,
@Nullable String postTransformConverter,
SupportedMetadataColumn[] supportedMetadataColumns) {
this.tableInclusions = tableInclusions;
Expand All @@ -52,6 +54,7 @@ public TransformRule(
this.primaryKey = primaryKey;
this.partitionKey = partitionKey;
this.tableOption = tableOption;
this.tableOptionsDelimiter = tableOptionsDelimiter;
this.postTransformConverter = postTransformConverter;
this.supportedMetadataColumns = supportedMetadataColumns;
}
Expand Down Expand Up @@ -82,6 +85,10 @@ public String getTableOption() {
return tableOption;
}

public String getTableOptionsDelimiter() {
return tableOptionsDelimiter;
}

@Nullable
public String getPostTransformConverter() {
return postTransformConverter;
Expand Down
Loading