Skip to content

Commit 3669bfe

Browse files
committed
updated validators
1 parent 47399af commit 3669bfe

File tree

8 files changed

+37
-28
lines changed

8 files changed

+37
-28
lines changed

commons/src/main/java/io/aiven/commons/collections/Scale.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.text.DecimalFormat;
2020
import java.util.ArrayList;
2121
import java.util.Arrays;
22+
import java.util.Collections;
2223
import java.util.List;
2324

2425
@SuppressWarnings("PMD.FieldNamingConventions")
@@ -37,13 +38,13 @@ public String format(final long byteCount) {
3738
/**
3839
* The IEC scale
3940
*/
40-
public static final List<Scale> IEC = Arrays.asList(KiB, MiB, GiB, TiB, PiB);
41+
public static final List<Scale> IEC = Collections.unmodifiableList(Arrays.asList(KiB, MiB, GiB, TiB, PiB));
4142

4243
/**
4344
* The SI scale
4445
*/
4546
@SuppressWarnings("PMD.ShortVariable")
46-
public static final List<Scale> SI = Arrays.asList(KB, MB, GB, TB, PB);
47+
public static final List<Scale> SI = Collections.unmodifiableList(Arrays.asList(KB, MB, GB, TB, PB));
4748

4849
final DecimalFormat dec = new DecimalFormat("0.0 ");
4950

commons/src/main/java/io/aiven/commons/collections/TimeScale.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,39 +26,35 @@
2626
public enum TimeScale {
2727
MILLISECONDS(1) {
2828
@Override
29-
public String format(final long milliseconds ) {
30-
return String.format("%s %s", milliseconds, this.name());
29+
public String format(final long milliseconds) {
30+
return String.format("%s %s", milliseconds, unitName());
3131
}
3232
},
33-
SECONDS(MILLISECONDS.milliseconds * 1000),
34-
MINUTES( SECONDS.milliseconds * 60),
35-
HOURS(MINUTES.milliseconds * 60 ),
36-
DAYS(HOURS.milliseconds * 24);
37-
33+
SECONDS(MILLISECONDS.milliseconds * 1000), MINUTES(SECONDS.milliseconds * 60), HOURS(
34+
MINUTES.milliseconds * 60), DAYS(HOURS.milliseconds * 24);
3835

3936
final DecimalFormat dec = new DecimalFormat("0.0 ");
4037

4138
public final long milliseconds;
4239

4340
TimeScale(final long milliseconds) {
44-
this.milliseconds = milliseconds;
41+
this.milliseconds = milliseconds;
4542
}
4643

4744
public String displayValue(final long value) {
4845
return format(value) + (this == MILLISECONDS ? "" : " (" + MILLISECONDS.format(value) + ")");
4946
}
5047

51-
private String unitName() {
48+
public String unitName() {
5249
return this.name().charAt(0) + this.name().substring(1).toLowerCase(Locale.ROOT);
5350
}
5451

5552
public String format(final long milliseconds) {
56-
double unitCount = milliseconds * 1.0 / this.milliseconds;
57-
return dec.format(unitCount).concat(unitName());
53+
return dec.format(milliseconds * 1.0 / this.milliseconds).concat(unitName());
5854
}
5955

6056
public String units(final int unitCount) {
61-
return dec.format(unitCount * milliseconds).concat(this.name());
57+
return dec.format(unitCount * milliseconds).concat(unitName());
6258
}
6359

6460
public long asMilliseconds(final double unitCount) {

commons/src/main/java/io/aiven/kafka/connect/common/config/BackoffPolicyFragment.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616

1717
package io.aiven.kafka.connect.common.config;
1818

19+
import java.util.Objects;
20+
21+
import org.apache.kafka.common.config.ConfigDef;
22+
1923
import io.aiven.commons.collections.TimeScale;
24+
import io.aiven.kafka.connect.common.config.validators.PredicateGatedValidator;
2025
import io.aiven.kafka.connect.common.config.validators.TimeScaleValidator;
21-
import org.apache.kafka.common.config.ConfigDef;
2226

2327
/**
2428
* Defines the backoff policy for connectors.
@@ -40,8 +44,9 @@ public BackoffPolicyFragment(final FragmentDataAccess dataAccess) {
4044
* @return the number of items in the backoff policy group.
4145
*/
4246
public static int update(final ConfigDef configDef) {
43-
configDef.define(KAFKA_RETRY_BACKOFF_MS_CONFIG, ConfigDef.Type.LONG, TimeScale.SECONDS.asMilliseconds(30),
44-
TimeScaleValidator.between(0, TimeScale.DAYS.asMilliseconds(1)),
47+
configDef.define(KAFKA_RETRY_BACKOFF_MS_CONFIG, ConfigDef.Type.LONG, null,
48+
new PredicateGatedValidator(
49+
Objects::nonNull, TimeScaleValidator.between(0, TimeScale.DAYS.asMilliseconds(1))),
4550
ConfigDef.Importance.MEDIUM,
4651
"The retry backoff in milliseconds. "
4752
+ "This config is used to notify Kafka Connect to retry delivering a message batch or "
@@ -52,7 +57,7 @@ public static int update(final ConfigDef configDef) {
5257
}
5358

5459
/**
55-
* Gets the kafka retry backoff time..
60+
* Gets the kafka retry backoff time.
5661
*
5762
* @return the Kafka retry backoff time in MS.
5863
*/

commons/src/main/java/io/aiven/kafka/connect/common/config/TransformerFragment.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import org.apache.kafka.common.config.ConfigDef;
2525
import org.apache.kafka.common.config.ConfigException;
2626

27+
import io.aiven.commons.collections.Scale;
28+
import io.aiven.kafka.connect.common.config.validators.ScaleValidator;
2729
import io.aiven.kafka.connect.common.source.input.InputFormat;
2830

2931
/**
@@ -75,10 +77,12 @@ public static ConfigDef update(final ConfigDef configDef) {
7577
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", TRANSFORMER_GROUP,
7678
++transformerCounter, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL);
7779
configDef.define(INPUT_FORMAT_KEY, ConfigDef.Type.STRING, InputFormat.BYTES.getValue(),
78-
new InputFormatValidator(), ConfigDef.Importance.MEDIUM, "Input format of messages read from source",
79-
TRANSFORMER_GROUP, transformerCounter++, ConfigDef.Width.NONE, INPUT_FORMAT_KEY);
80+
ConfigDef.CaseInsensitiveValidString
81+
.in(Arrays.stream(InputFormat.values()).map(Object::toString).toArray(String[]::new)),
82+
ConfigDef.Importance.MEDIUM, "Input format of messages read from source", TRANSFORMER_GROUP,
83+
transformerCounter++, ConfigDef.Width.NONE, INPUT_FORMAT_KEY);
8084
configDef.define(TRANSFORMER_MAX_BUFFER_SIZE, ConfigDef.Type.INT, DEFAULT_MAX_BUFFER_SIZE,
81-
ConfigDef.Range.between(1, Integer.MAX_VALUE), ConfigDef.Importance.MEDIUM,
85+
ScaleValidator.between(1, Integer.MAX_VALUE, Scale.IEC), ConfigDef.Importance.MEDIUM,
8286
"Max Size of the byte buffer when using the BYTE Transformer", TRANSFORMER_GROUP, ++transformerCounter,
8387
ConfigDef.Width.NONE, TRANSFORMER_MAX_BUFFER_SIZE);
8488

commons/src/main/java/io/aiven/kafka/connect/common/config/validators/TimeScaleValidator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616

1717
package io.aiven.kafka.connect.common.config.validators;
1818

19-
import io.aiven.commons.collections.TimeScale;
2019
import org.apache.kafka.common.config.ConfigDef;
2120
import org.apache.kafka.common.config.ConfigException;
2221

22+
import io.aiven.commons.collections.TimeScale;
23+
2324
public class TimeScaleValidator implements ConfigDef.Validator {
2425
private final Number min;
2526
private final TimeScale minScale;

commons/src/test/java/io/aiven/kafka/connect/common/config/TransformerFragmentTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@ void validateCorrectBufferSizeIsAccepted() {
4747
@ParameterizedTest
4848
@CsvSource({
4949
"21474836471,Invalid value 21474836471 for configuration transformer.max.buffer.size: Not a number of type INT",
50-
"-1,Invalid value -1 for configuration transformer.max.buffer.size: Value must be at least 1",
50+
"-1,Invalid value -1 for configuration transformer.max.buffer.size: Value must be at least 1 B",
5151
"MAX,Invalid value MAX for configuration transformer.max.buffer.size: Not a number of type INT",
52-
"0,Invalid value 0 for configuration transformer.max.buffer.size: Value must be at least 1",
53-
"-9000,Invalid value -9000 for configuration transformer.max.buffer.size: Value must be at least 1",
52+
"0,Invalid value 0 for configuration transformer.max.buffer.size: Value must be at least 1 B",
53+
"-9000,Invalid value -9000 for configuration transformer.max.buffer.size: Value must be at least 1 B",
5454
"MTA=,Invalid value MTA= for configuration transformer.max.buffer.size: Not a number of type INT" })
5555
void validateInvalidBufferSizeThrowsConfigException(final String value, final String expectedMessage) {
5656
final ConfigDef configDef = TransformerFragment.update(new ConfigDef());

s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.Objects;
2525
import java.util.stream.Collectors;
2626

27-
import io.aiven.kafka.connect.common.config.validators.TimeScaleValidator;
2827
import org.apache.kafka.common.config.ConfigDef;
2928
import org.apache.kafka.common.config.ConfigException;
3029
import org.apache.kafka.common.config.ConfigValue;
@@ -40,6 +39,7 @@
4039
import io.aiven.kafka.connect.common.config.TimestampSource;
4140
import io.aiven.kafka.connect.common.config.validators.NonEmptyPassword;
4241
import io.aiven.kafka.connect.common.config.validators.ScaleValidator;
42+
import io.aiven.kafka.connect.common.config.validators.TimeScaleValidator;
4343
import io.aiven.kafka.connect.common.config.validators.TimeZoneValidator;
4444
import io.aiven.kafka.connect.common.config.validators.TimestampSourceValidator;
4545
import io.aiven.kafka.connect.common.config.validators.UrlValidator;

s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -435,14 +435,16 @@ void wrongAwsS3BackoffPolicy() {
435435
S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG, "blah-blah-blah",
436436
S3ConfigFragment.AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG, "0");
437437
assertThatThrownBy(() -> new S3SinkConfig(wrongDelayProps)).isInstanceOf(ConfigException.class)
438-
.hasMessage("Invalid value 0 for configuration aws.s3.backoff.delay.ms: Value must be at least 1");
438+
.hasMessage(
439+
"Invalid value 0 for configuration aws.s3.backoff.delay.ms: Value must be at least 1 Milliseconds");
439440

440441
final var wrongMaxDelayProps = Map.of(S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah",
441442
S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah",
442443
S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG, "blah-blah-blah",
443444
S3ConfigFragment.AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG, "0");
444445
assertThatThrownBy(() -> new S3SinkConfig(wrongMaxDelayProps)).isInstanceOf(ConfigException.class)
445-
.hasMessage("Invalid value 0 for configuration aws.s3.backoff.max.delay.ms: Value must be at least 1");
446+
.hasMessage(
447+
"Invalid value 0 for configuration aws.s3.backoff.max.delay.ms: Value must be at least 1 Milliseconds");
446448

447449
final var wrongMaxRetriesProps = Map.of(S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah",
448450
S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah",

0 commit comments

Comments
 (0)