Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d56c961
complete build success
Claudenw Nov 19, 2025
c8ba92b
cleaned up PR
Claudenw Nov 19, 2025
1a02a8d
clean up spotless
Claudenw Nov 19, 2025
d55b9cf
updated S3SinkConfig and tests
Claudenw Nov 10, 2025
a3f5f34
cleaned up s3 configuration
Claudenw Nov 20, 2025
8a702a3
spotless changes
Claudenw Nov 20, 2025
eac2c44
spotless changes
Claudenw Nov 20, 2025
747c62f
updated timescales
Claudenw Nov 21, 2025
2027a37
updated validators
Claudenw Nov 21, 2025
9e40cd6
complete build success
Claudenw Nov 19, 2025
5de7d1e
cleaned up PR
Claudenw Nov 19, 2025
c12c9ab
updated to new commons
Claudenw Nov 21, 2025
a512dc4
updated as per review
Claudenw Nov 27, 2025
88b321a
changed http -> https
Claudenw Nov 27, 2025
cf08c28
updated S3SinkConfig and tests
Claudenw Nov 10, 2025
8969d88
cleaned up s3 configuration
Claudenw Nov 20, 2025
a7b960e
updated timescales
Claudenw Nov 21, 2025
deeab7b
updated validators
Claudenw Nov 21, 2025
296e08e
working gcs sink
Claudenw Nov 21, 2025
4920186
updated site processing
Claudenw Nov 21, 2025
61b094e
update configData template
Claudenw Nov 25, 2025
cbda6d6
cleaned up implementation
Claudenw Nov 28, 2025
d1970ef
updatd javadoc
Claudenw Nov 28, 2025
0371108
fixed site generation issues
Claudenw Nov 28, 2025
afe7e8a
updated documentation links
Claudenw Nov 28, 2025
f45ef58
fixed AzureBlobSinkConfigDef
Claudenw Dec 1, 2025
b63d835
cleaned up code
Claudenw Dec 1, 2025
971b8be
fixed PMD issue
Claudenw Dec 1, 2025
38a2db0
fixed spotless issues
Claudenw Dec 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 1 addition & 28 deletions .github/workflows/build_site.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,39 +29,12 @@ jobs:
- name: Checkout
uses: actions/checkout@v4

- name: Check for S3SourceConfig
run: find . -name S3SourceConfig.\*

- name: Build site tools
run: ./gradlew :site:build

- name: Check for S3SourceConfig
run: find . -name S3SourceConfig.\*

- name: copy site assets
run: ./gradlew :copySiteAssets

- name: Check for S3SourceConfig
run: find . -name S3SourceConfig.\*

- name: create site
run: ./gradlew createSite

- name: Check for S3SourceConfig
run: find . -name S3SourceConfig.\*

- name: generate javadoc
- name: generate site framework
run: ./gradlew :site:build :copySiteAssets createSite javadoc

- name: Check for S3SourceConfig
run: find . -name S3SourceConfig.\*

- name: Populate the site
run: ./gradlew populateSite

- name: Check for S3SourceConfig
run: find . -name S3SourceConfig.\*

- name: Upload artifact
uses: actions/upload-pages-artifact@v3
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,73 +17,19 @@
package io.aiven.kafka.connect.azure.sink;

import java.util.Map;
import java.util.regex.Pattern;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;

import io.aiven.kafka.connect.common.config.CompressionType;
import io.aiven.kafka.connect.common.config.FileNameFragment;
import io.aiven.kafka.connect.common.config.FragmentDataAccess;
import io.aiven.kafka.connect.common.config.OutputFieldType;
import io.aiven.kafka.connect.common.config.SinkCommonConfig;

public final class AzureBlobSinkConfigDef extends SinkCommonConfig.SinkCommonConfigDef {

private final static Pattern CONTAINER_NAME_PATTERN = Pattern.compile("[0-9a-z][0-9a-z\\-]+[0-9a-z]");

/**
* From Azure documentation:
* <ul>
* <li>Container names must start or end with a letter or number, and can contain only letters, numbers, and the
* hyphen/minus (-) character.</li>
* <li>Every hyphen/minus (-) character must be immediately preceded and followed by a letter or number; consecutive
* hyphens aren't permitted in container names.</li>
* <li>All letters in a container name must be lowercase.</li>
* <li>Container names must be from 3 through 63 characters long.</li>
* </ul>
*/
public static final ConfigDef.Validator CONTAINER_NAME_VALIDATOR = ConfigDef.CompositeValidator
.of(ConfigDef.LambdaValidator.with((name, value) -> {
final int len = value == null ? 0 : value.toString().length();
if (len < 3 || len > 63) {
throw new ConfigException(name, value, "names must be from 3 through 63 characters long.");
}
}, () -> "must be from 3 through 63 characters long"), ConfigDef.LambdaValidator.with((name, value) -> {
if (value.toString().contains("--")) {
throw new ConfigException(name, value,
"Every hyphen/minus (-) character must be immediately preceded and followed by a letter or number; consecutive hyphens aren't permitted in container names.");
}
}, () -> "consecutive hyphens aren't permitted in container names"),
// regex last for speed
ConfigDef.LambdaValidator.with((name, value) -> {
if (!CONTAINER_NAME_PATTERN.matcher(value.toString()).matches()) {
throw new ConfigException(name, value,
"must start or end with a letter or number, and can contain only lower case letters, numbers, and the hyphen/minus (-) character.");
}
}, () -> "start or end with a letter or number, and can contain only lower case letters, numbers, and the hyphen/minus (-) character"));

AzureBlobSinkConfigDef() {
super(OutputFieldType.VALUE, CompressionType.NONE);
AzureBlobConfigFragment.update(this, true);
addFileConfigGroup(this);
}

static void addFileConfigGroup(final ConfigDef configDef) {

configDef.define(AzureBlobSinkConfig.FILE_NAME_PREFIX_CONFIG, ConfigDef.Type.STRING, "",
ConfigDef.LambdaValidator.with((name, value) -> {
assert value instanceof String;
final String valueStr = (String) value;
if (valueStr.length() > 1024) { // NOPMD avoid literal
throw new ConfigException(AzureBlobSinkConfig.AZURE_STORAGE_CONTAINER_NAME_CONFIG, value,
"cannot be longer than 1024 characters");
}
}, () -> ""), ConfigDef.Importance.MEDIUM,
"The prefix to be added to the name of each file put on Azure Blob.", FileNameFragment.GROUP_NAME, 10,
ConfigDef.Width.NONE, AzureBlobSinkConfig.FILE_NAME_PREFIX_CONFIG);

}

@Override
Expand Down
6 changes: 2 additions & 4 deletions azure-source-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,7 @@ tasks.register<JavaExec>("buildConfigMd") {
.plus(sourceSets.main.get().runtimeClasspath)
args =
listOf(
"io.aiven.kafka.connect.azure.source.config.AzureBlobSourceConfig",
"configDef",
"io.aiven.kafka.connect.azure.source.config.AzureBlobSourceConfigDef",
"src/templates/configData.md.vm",
"build/site/markdown/azure-source-connector/AzureBlobSourceConfig.md")
}
Expand All @@ -271,8 +270,7 @@ tasks.register<JavaExec>("buildConfigYml") {
.plus(sourceSets.main.get().runtimeClasspath)
args =
listOf(
"io.aiven.kafka.connect.azure.source.config.AzureBlobSourceConfig",
"configDef",
"io.aiven.kafka.connect.azure.source.config.AzureBlobSourceConfigDef",
"src/templates/configData.yml.vm",
"build/site/azure-source-connector/AzureBlobSourceConfig.yml")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public static TimeScale scaleOf(final long milliseconds) {
* @return the String representation.
* @see #scaleOf(long)
*/
public static String size(final int milliseconds) {
public static String size(final long milliseconds) {
return scaleOf(milliseconds).format(milliseconds);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected static String validationMessage(final String name, final Object value,
* @param message
* the message for the error.
*/
protected void registerIssue(final Map<String, ConfigValue> configMap, final String name, final Object value,
public static void registerIssue(final Map<String, ConfigValue> configMap, final String name, final Object value,
final String message) {
configMap.get(name).addErrorMessage(validationMessage(name, value, message));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@
* Fragment to handle all file name extraction operations.
*/
public final class FileNameFragment extends ConfigFragment {
/**
* Flag to support Prefix Template as opposed to a prefix string. TODO To be removed when all implementations
* support the prefix template.
*/
public enum PrefixTemplateSupport {
TRUE, FALSE
}
/**
* The name of the group that this fragment places items in.
*/
Expand All @@ -67,6 +74,8 @@ public final class FileNameFragment extends ConfigFragment {
static final String DEFAULT_FILENAME_TEMPLATE = "{{topic}}-{{partition}}-{{start_offset}}";
@VisibleForTesting
public static final String FILE_PATH_PREFIX_TEMPLATE_CONFIG = "file.prefix.template";
@VisibleForTesting
public static final String FILE_NAME_PREFIX_CONFIG = "file.name.prefix";

public static final ConfigDef.Validator COMPRESSION_TYPE_VALIDATOR = new PredicateGatedValidator(Objects::nonNull,
ConfigDef.CaseInsensitiveValidString.in(CompressionType.names().toArray(new String[0])));
Expand All @@ -77,7 +86,7 @@ public final class FileNameFragment extends ConfigFragment {
.collect(Collectors.toList())
.toArray(new String[0]));

public static final ConfigDef.Validator TEMPLATE_VALIDATOR = ConfigDef.LambdaValidator.with((name, value) -> {
public static final ConfigDef.Validator PREFIX_VALIDATOR = ConfigDef.LambdaValidator.with((name, value) -> {
if (value == null) {
return;
}
Expand All @@ -90,18 +99,24 @@ public final class FileNameFragment extends ConfigFragment {
} catch (IllegalArgumentException e) {
throw new ConfigException(name, value, e.getMessage());
}
}, () -> "See documentation for proper template construction.");
}, () -> "may not start with '.well-known/acme-challenge'");

/**
* The flag that indicates this fragment is being used as for a sink connector.
*/
private final boolean isSink;

/** Map of template variable name to the template variable definition */
private static final Map<String, FilenameTemplateVariable> FILENAME_VARIABLES = new TreeMap<>();
private static final String TEMPLATE_GROUPINGS;

static {
Arrays.stream(FilenameTemplateVariable.values())
.forEach(variable -> FILENAME_VARIABLES.put(variable.name, variable));
TEMPLATE_GROUPINGS = "[" + RecordGrouperFactory.getSupportedVariableGroups()
.stream()
.map(strings -> FilePatternUtils.asPatterns(strings, ", "))
.collect(Collectors.joining("] \n [")) + "]";
}

/**
Expand Down Expand Up @@ -253,51 +268,44 @@ private void validateSink(final Map<String, ConfigValue> configMap, final Templa
}
}

/**
* Adds the FileName properties to the configuration definition.
*
* @param configDef
* the configuration definition to update.
* @return the updated configuration definition.
*/
public static int update(final ConfigDef configDef) {
return update(configDef, CompressionType.NONE);
}

/**
* Adds the FileName properties to the configuration definition.
*
* @param configDef
* the configuration definition to update.
* @param defaultCompressionType
* The default compression type. May be {@code null}.
* @return number of items in the file group..
* @return number of items in the file group.
*/
public static int update(final ConfigDef configDef, final CompressionType defaultCompressionType) {
public static int update(final ConfigDef configDef, final CompressionType defaultCompressionType,
final PrefixTemplateSupport prefixTemplateSupport) {
int fileGroupCounter = 0;

configDef.define(FILE_NAME_TEMPLATE_CONFIG, ConfigDef.Type.STRING, null, TEMPLATE_VALIDATOR,
configDef.define(FILE_NAME_TEMPLATE_CONFIG, ConfigDef.Type.STRING, null, PREFIX_VALIDATOR,
ConfigDef.Importance.MEDIUM,
"The template for file names on storage system. "
+ "Supports `{{ variable }}` placeholders for substituting variables. "
+ "Currently supported variables are `topic`, `partition`, and `start_offset` "
+ "(the offset of the first record in the file). "
+ "Only some combinations of variables are valid, which currently are:\n"
+ "- `topic`, `partition`, `start_offset`."
+ "There is also `key` only variable {{key}} for grouping by keys",
+ "Currently supported variables are "
+ String.join(", ", FilePatternUtils.asPatterns(FILENAME_VARIABLES.keySet(), ", "))
+ ". Only some combinations of variables are valid, which currently are: " + TEMPLATE_GROUPINGS,
GROUP_NAME, ++fileGroupCounter, ConfigDef.Width.LONG, FILE_NAME_TEMPLATE_CONFIG);

configDef.define(FILE_PATH_PREFIX_TEMPLATE_CONFIG, ConfigDef.Type.STRING, null, TEMPLATE_VALIDATOR,
ConfigDef.Importance.MEDIUM,
"The template for file names prefixes on storage system. "
+ "Supports `{{ variable }}` placeholders for substituting variables. "
+ "Currently supported variables are `topic`, `partition`, and `start_offset` "
+ "(the offset of the first record in the file). "
+ "Only some combinations of variables are valid, which currently are:\n"
+ "- `topic`, `partition`, `start_offset`."
+ "There is also `key` only variable {{key}} for grouping by keys",
GROUP_NAME, ++fileGroupCounter, ConfigDef.Width.LONG, FILE_PATH_PREFIX_TEMPLATE_CONFIG);

if (prefixTemplateSupport.equals(PrefixTemplateSupport.TRUE)) {
configDef.define(FILE_PATH_PREFIX_TEMPLATE_CONFIG, ConfigDef.Type.STRING, null, PREFIX_VALIDATOR,
ConfigDef.Importance.MEDIUM,
"The template for file names prefixes on storage system. "
+ "Supports `{{ variable }}` placeholders for substituting variables. "
+ "Currently supported variables are `topic`, `partition`, and `start_offset` "
+ "(the offset of the first record in the file). "
+ "Only some combinations of variables are valid, which currently are:\n"
+ "- `topic`, `partition`, `start_offset`."
+ "There is also `key` only variable {{key}} for grouping by keys",
GROUP_NAME, ++fileGroupCounter, ConfigDef.Width.LONG, FILE_PATH_PREFIX_TEMPLATE_CONFIG);
} else {
configDef.define(FILE_NAME_PREFIX_CONFIG, ConfigDef.Type.STRING, "", PREFIX_VALIDATOR,
ConfigDef.Importance.MEDIUM, "The prefix to be added to the name of each file.",
FileNameFragment.GROUP_NAME, ++fileGroupCounter, ConfigDef.Width.LONG, FILE_NAME_PREFIX_CONFIG);
}
configDef.define(FILE_COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING, defaultCompressionType.name(),
COMPRESSION_TYPE_VALIDATOR, ConfigDef.Importance.MEDIUM, "The compression type used for files.",
GROUP_NAME, ++fileGroupCounter, ConfigDef.Width.NONE, FILE_COMPRESSION_TYPE_CONFIG,
Expand Down Expand Up @@ -394,6 +402,9 @@ public String getPrefixTemplate() {
return getString(FILE_PATH_PREFIX_TEMPLATE_CONFIG);
}

public String getPrefix() {
return getString(FILE_NAME_PREFIX_CONFIG);
}
public static void replaceYyyyUppercase(final String name, final Map<String, String> properties) {
String template = properties.get(name);
if (template != null) {
Expand Down Expand Up @@ -522,5 +533,16 @@ public Setter template(final String template) {
public Setter prefixTemplate(final String prefixTemplate) {
return setValue(FILE_PATH_PREFIX_TEMPLATE_CONFIG, prefixTemplate);
}

/**
* Sets the file name prefix template.
*
* @param prefix
* the prefix to use.
* @return this
*/
public Setter prefix(final String prefix) {
return setValue(FILE_NAME_PREFIX_CONFIG, prefix);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,7 @@ public static class SinkCommonConfigDef extends CommonConfigDef {
public SinkCommonConfigDef(final OutputFieldType defaultFieldType, final CompressionType compressionType) {
super();
OutputFormatFragment.update(this, defaultFieldType);
FileNameFragment.update(this, compressionType);
// not supported at this time.
configKeys().remove(FileNameFragment.FILE_PATH_PREFIX_TEMPLATE_CONFIG);
FileNameFragment.update(this, compressionType, FileNameFragment.PrefixTemplateSupport.FALSE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public SourceCommonConfigDef() {
super();
TransformerFragment.update(this);
SourceConfigFragment.update(this);
FileNameFragment.update(this);
FileNameFragment.update(this, CompressionType.NONE, FileNameFragment.PrefixTemplateSupport.TRUE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,19 @@ public final class RecordGrouperFactory {
.map(v -> v.stream().map(Pair::getLeft).collect(Collectors.joining(",")))
.collect(Collectors.joining("; "));

/**
* Gets a list of the supported variable groupings. Each list within the list is a grouping of variables that is
* supported.
*
* @return a list of supported variable groupings.
*/
public static List<List<String>> getSupportedVariableGroups() {
return SUPPORTED_VARIABLES.values()
.stream()
.map(lstPair -> lstPair.stream().map(Pair::getLeft).collect(Collectors.toList()))
.collect(Collectors.toList());
}

private RecordGrouperFactory() {
}

Expand Down
Loading
Loading