Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to create Kafka Connect with following configs #39

Open
mjain1994 opened this issue Nov 10, 2021 · 0 comments
Open

Unable to create Kafka Connect with following configs #39

mjain1994 opened this issue Nov 10, 2021 · 0 comments

Comments

@mjain1994
Copy link

I am trying to create a Kafka Redis sink that deletes a particular key in Redis. One of the ways is to create a Record or Message in Kafka with a specific key and Value as null. But as per the use case, generating the keys is not possible. As a workaround, I wrote a Single message transformer that takes the message from Kafka, sets a particular Key, and sets Value equals null.

Here are my Kafka Connect Confgurations

{ "connector.class": "com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector", "transforms.invalidaterediskeys.type": "com.github.cjmatta.kafka.connect.smt.InvalidateRedisKeys", "redis.database": "0", "redis.client.mode": "Standalone", "topics": "test_redis_deletion2", "tasks.max": "1", "redis.hosts": "REDIS-HOST", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "transforms": "invalidaterediskeys" }

Here is the code for the transformations :

`
public class InvalidateRedisKeys<R extends ConnectRecord> implements Transformation {
private static final Logger LOG = LoggerFactory.getLogger(InvalidateRedisKeys.class);

private static final ObjectMapper mapper = new ObjectMapper()
        .configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);

@Override
public ConfigDef config() {
    return new ConfigDef();
}

@Override
public void configure(Map<String, ?> settings) {
}

@Override
public void close() {
}

@Override
public R apply(R r) {
    try {
        return r.newRecord(
                r.topic(),
                r.kafkaPartition(),
                Schema.STRING_SCHEMA,
                getKey(r.value()),
                null,
                null,
                r.timestamp()
        );
    } catch (IOException e) {
        LOG.error("a.jsonhandling.{}", e.getMessage());
        return null;
    } catch (Exception e) {
        LOG.error("a.exception.{}", e.getMessage());
        return null;
    }
}

private String getKey(Object value) throws IOException {
    A a = mapper.readValue(value.toString(), A.class);
    long userId = a.getUser_id();
    int roundId = a.getRound_id();
    return KeyGeneratorUtil.getKey(userId, roundId);
}

}
`

where A is
`
public class A {

private long user_id;
private int round_id;

}
`

And KeyGeneratorUtil contains a static function that generates a relevant string and sends the results. When I try to initialize Kafka Connect, it says invalid Configurations. Is there something that I am missing?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant