1717package io .aiven .kafka .connect .azure .sink ;
1818
1919import java .time .Duration ;
20- import java .time .ZoneId ;
21- import java .time .ZoneOffset ;
2220import java .util .ArrayList ;
2321import java .util .HashMap ;
2422import java .util .List ;
2523import java .util .Map ;
2624import java .util .regex .Pattern ;
27- import java .util .stream .Collectors ;
2825
2926import org .apache .kafka .common .config .ConfigDef ;
3027import org .apache .kafka .common .config .ConfigException ;
3128
3229import io .aiven .kafka .connect .common .config .AivenCommonConfig ;
3330import io .aiven .kafka .connect .common .config .CompressionType ;
34- import io .aiven .kafka .connect .common .config .FixedSetRecommender ;
3531import io .aiven .kafka .connect .common .config .OutputField ;
3632import io .aiven .kafka .connect .common .config .OutputFieldEncodingType ;
3733import io .aiven .kafka .connect .common .config .OutputFieldType ;
38- import io .aiven .kafka .connect .common .config .TimestampSource ;
39- import io .aiven .kafka .connect .common .config .validators .FilenameTemplateValidator ;
4034
4135import org .slf4j .Logger ;
4236import org .slf4j .LoggerFactory ;
4337
38+ @ SuppressWarnings ("PMD.UnusedPrivateMethod" )
4439public final class AzureBlobSinkConfig extends AivenCommonConfig {
4540 private static final Logger LOG = LoggerFactory .getLogger (AzureBlobSinkConfig .class );
4641 private static final String USER_AGENT_HEADER_FORMAT = "Azure Blob Sink/%s (GPN: Aiven;)" ;
@@ -72,12 +67,10 @@ public final class AzureBlobSinkConfig extends AivenCommonConfig {
7267
7368 public static final String NAME_CONFIG = "name" ;
7469
75- public static ConfigDef configDef () {
76- final ConfigDef configDef = new ConfigDef ( );
70+ public static SinkCommonConfigDef configDef () {
71+ final SinkCommonConfigDef configDef = new SinkCommonConfigDef ( OutputFieldType . VALUE , CompressionType . NONE );
7772 addAzureConfigGroup (configDef );
7873 addFileConfigGroup (configDef );
79- addOutputFieldsFormatConfigGroup (configDef , OutputFieldType .VALUE );
80- addKafkaBackoffPolicy (configDef );
8174 addAzureRetryPolicies (configDef );
8275 addUserAgentConfig (configDef );
8376 return configDef ;
@@ -125,7 +118,6 @@ private static void addAzureRetryPolicies(final ConfigDef configDef) {
125118 }
126119
127120 private static void addFileConfigGroup (final ConfigDef configDef ) {
128- int fileGroupCounter = 0 ;
129121 configDef .define (FILE_NAME_PREFIX_CONFIG , ConfigDef .Type .STRING , "" , new ConfigDef .Validator () {
130122 @ Override
131123 public void ensureValid (final String name , final Object value ) {
@@ -137,79 +129,7 @@ public void ensureValid(final String name, final Object value) {
137129 }
138130 }
139131 }, ConfigDef .Importance .MEDIUM , "The prefix to be added to the name of each file put on Azure Blob." ,
140- GROUP_FILE , fileGroupCounter ++, ConfigDef .Width .NONE , FILE_NAME_PREFIX_CONFIG );
141-
142- configDef .define (FILE_NAME_TEMPLATE_CONFIG , ConfigDef .Type .STRING , null ,
143- new FilenameTemplateValidator (FILE_NAME_TEMPLATE_CONFIG ), ConfigDef .Importance .MEDIUM ,
144- "The template for file names on Azure Blob. "
145- + "Supports `{{ variable }}` placeholders for substituting variables. "
146- + "Currently supported variables are `topic`, `partition`, and `start_offset` "
147- + "(the offset of the first record in the file). "
148- + "Only some combinations of variables are valid, which currently are:\n "
149- + "- `topic`, `partition`, `start_offset`." ,
150- GROUP_FILE , fileGroupCounter ++, ConfigDef .Width .LONG , FILE_NAME_TEMPLATE_CONFIG );
151-
152- final String supportedCompressionTypes = CompressionType .names ()
153- .stream ()
154- .map (f -> "'" + f + "'" )
155- .collect (Collectors .joining (", " ));
156- configDef .define (FILE_COMPRESSION_TYPE_CONFIG , ConfigDef .Type .STRING , CompressionType .NONE .name ,
157- new ConfigDef .Validator () {
158- @ Override
159- public void ensureValid (final String name , final Object value ) {
160- assert value instanceof String ;
161- final String valueStr = (String ) value ;
162- if (!CompressionType .names ().contains (valueStr )) {
163- throw new ConfigException (FILE_COMPRESSION_TYPE_CONFIG , valueStr ,
164- "supported values are: " + supportedCompressionTypes );
165- }
166- }
167- }, ConfigDef .Importance .MEDIUM ,
168- "The compression type used for files put on Azure Blob. " + "The supported values are: "
169- + supportedCompressionTypes + "." ,
170- GROUP_FILE , fileGroupCounter ++, ConfigDef .Width .NONE , FILE_COMPRESSION_TYPE_CONFIG ,
171- FixedSetRecommender .ofSupportedValues (CompressionType .names ()));
172-
173- configDef .define (FILE_MAX_RECORDS , ConfigDef .Type .INT , 0 , new ConfigDef .Validator () {
174- @ Override
175- public void ensureValid (final String name , final Object value ) {
176- assert value instanceof Integer ;
177- if ((Integer ) value < 0 ) {
178- throw new ConfigException (FILE_MAX_RECORDS , value , "must be a non-negative integer number" );
179- }
180- }
181- }, ConfigDef .Importance .MEDIUM ,
182- "The maximum number of records to put in a single file. " + "Must be a non-negative integer number. "
183- + "0 is interpreted as \" unlimited\" , which is the default." ,
184- GROUP_FILE , fileGroupCounter ++, ConfigDef .Width .SHORT , FILE_MAX_RECORDS );
185-
186- configDef .define (FILE_NAME_TIMESTAMP_TIMEZONE , ConfigDef .Type .STRING , ZoneOffset .UTC .toString (),
187- new ConfigDef .Validator () {
188- @ Override
189- public void ensureValid (final String name , final Object value ) {
190- try {
191- ZoneId .of (value .toString ());
192- } catch (final Exception e ) { // NOPMD broad exception catched
193- throw new ConfigException (FILE_NAME_TIMESTAMP_TIMEZONE , value , e .getMessage ());
194- }
195- }
196- }, ConfigDef .Importance .LOW ,
197- "Specifies the timezone in which the dates and time for the timestamp variable will be treated. "
198- + "Use standard shot and long names. Default is UTC" ,
199- GROUP_FILE , fileGroupCounter ++, ConfigDef .Width .SHORT , FILE_NAME_TIMESTAMP_TIMEZONE );
200-
201- configDef .define (FILE_NAME_TIMESTAMP_SOURCE , ConfigDef .Type .STRING , TimestampSource .Type .WALLCLOCK .name (),
202- new ConfigDef .Validator () {
203- @ Override
204- public void ensureValid (final String name , final Object value ) {
205- try {
206- TimestampSource .Type .of (value .toString ());
207- } catch (final Exception e ) { // NOPMD broad exception catched
208- throw new ConfigException (FILE_NAME_TIMESTAMP_SOURCE , value , e .getMessage ());
209- }
210- }
211- }, ConfigDef .Importance .LOW , "Specifies the timestamp variable source. Default is wall-clock." ,
212- GROUP_FILE , fileGroupCounter , ConfigDef .Width .SHORT , FILE_NAME_TIMESTAMP_SOURCE );
132+ GROUP_FILE , 50 , ConfigDef .Width .NONE , FILE_NAME_PREFIX_CONFIG );
213133 }
214134
215135 public AzureBlobSinkConfig (final Map <String , String > properties ) {
0 commit comments