Skip to content

Commit 5c09142

Browse files
committed
remove mysql dep + add a configuration option for cassandra consistency
- use cassandra.consistency in properties file - this fixes the problem where we have only one cassandra machine (hence quorum is not achievable and raises an error)
1 parent 4511d90 commit 5c09142

File tree

6 files changed

+51
-18
lines changed

6 files changed

+51
-18
lines changed

pom.xml

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,12 @@
1212

1313
<properties>
1414
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15-
<flink.version>1.10.0</flink.version>
15+
<flink.version>1.10.2</flink.version>
1616
<java.version>1.8</java.version>
1717
<scala.binary.version>2.11</scala.binary.version>
1818
<maven.compiler.source>${java.version}</maven.compiler.source>
1919
<maven.compiler.target>${java.version}</maven.compiler.target>
2020

21-
<mysql.version>6.0.2</mysql.version>
2221
<slf4j.version>1.7.21</slf4j.version>
2322
<cassandra.version>3.9.0</cassandra.version>
2423
</properties>
@@ -102,13 +101,6 @@
102101

103102
<!-- Other, regular dependencies -->
104103

105-
<!--mysql-->
106-
<dependency>
107-
<groupId>mysql</groupId>
108-
<artifactId>mysql-connector-java</artifactId>
109-
<version>${mysql.version}</version>
110-
</dependency>
111-
112104
<!--gson-->
113105
<dependency>
114106
<groupId>com.google.code.gson</groupId>

src/main/java/ch/derlin/bbdata/flink/Configs.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package ch.derlin.bbdata.flink;
22

3+
import com.datastax.driver.core.ConsistencyLevel;
34
import org.apache.flink.api.common.time.Time;
45
import org.apache.flink.configuration.ConfigOption;
56
import org.apache.flink.configuration.ConfigOptions;
@@ -70,6 +71,12 @@ public class Configs {
7071
.noDefaultValue()
7172
.withDescription("List of entrypoints (IP, host) to connect to Cassandra, separated by semi-colons (;).");
7273

74+
public static final ConfigOption<String> configConsistency = ConfigOptions
75+
.key("cassandra.consistency")
76+
.stringType()
77+
.defaultValue("QUORUM")
78+
.withDescription("Cassandra consistency level. If you have a test cluster of one host, set it to ONE.");
79+
7380
// =============== getters
7481

7582
public static long readGranularity(Configuration config) {
@@ -123,10 +130,21 @@ public static List<String> readCassandraEntrypoints(Configuration config) {
123130
List<String> entrypoints = config.get(configEntrypoints);
124131
if (entrypoints.size() == 0)
125132
throw new RuntimeException(String.format(
126-
"Missing Cassandra entrypoing '%s' in config.", configEntrypoints.key()));
133+
"Missing Cassandra entrypoint '%s' in config.", configEntrypoints.key()));
127134
return entrypoints;
128135
}
129136

137+
public static ConsistencyLevel readCassandraConsistency(Configuration config) {
138+
String consistency = config.get(configConsistency);
139+
try {
140+
return ConsistencyLevel.valueOf(consistency);
141+
} catch (Exception e) {
142+
throw new RuntimeException((String.format(
143+
"Wrong consistency value provided: '%s'. " +
144+
"See com.datastax.driver.core.ConsistencyLevel for possible values.", consistency)));
145+
}
146+
}
147+
130148

131149
public static void checkConfig(Configuration config) {
132150
// windows

src/main/java/ch/derlin/bbdata/flink/pojo/AggregationRecord.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,7 @@
1919
*
2020
* @author Lucy Linder <[email protected]>
2121
*/
22-
@Table(keyspace = "bbdata2", name = "aggregations",
23-
readConsistency = "QUORUM",
24-
writeConsistency = "QUORUM",
25-
caseSensitiveKeyspace = false,
26-
caseSensitiveTable = false)
22+
@Table(keyspace = "bbdata2", name = "aggregations")
2723
public class AggregationRecord {
2824

2925
@PartitionKey(0)

src/main/java/ch/derlin/bbdata/flink/sinks/CassandraSink.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import ch.derlin.bbdata.flink.pojo.AggregationRecord;
77
import ch.derlin.bbdata.flink.utils.DateUtil;
88
import com.datastax.driver.core.Cluster;
9+
import com.datastax.driver.core.QueryOptions;
910
import com.datastax.driver.core.Session;
1011
import com.datastax.driver.core.policies.DefaultRetryPolicy;
1112
import com.datastax.driver.mapping.Mapper;
@@ -16,6 +17,8 @@
1617
import org.slf4j.Logger;
1718
import org.slf4j.LoggerFactory;
1819

20+
import static com.datastax.driver.mapping.Mapper.Option.consistencyLevel;
21+
1922
/**
2023
* date: 10.03.17
2124
*
@@ -48,18 +51,26 @@ public void open(Configuration parameters) throws Exception {
4851
// TODO does it improve performances ?
4952
;
5053
for (String address : Configs.readCassandraEntrypoints(config)) {
54+
LOG.info("adding cassandra contact point: " + address.trim());
5155
builder.addContactPoint(address.trim());
5256
}
5357

58+
// set consistency
59+
QueryOptions queryOptions = new QueryOptions();
60+
queryOptions.setConsistencyLevel(Configs.readCassandraConsistency(config));
61+
5462
// the keyspace and tables are declared in the AggregationRecord class
5563
// so here, just create a connection
56-
cluster = builder.build();
64+
cluster = builder
65+
.withQueryOptions(queryOptions)
66+
.build();
5767
session = cluster.connect();
5868

5969
// the mapper will do the mapping between cassandra records and AggregationRecord objects
6070
MappingManager manager = new MappingManager(session);
6171
mapper = manager.mapper(AggregationRecord.class);
6272

73+
6374
} catch (Exception e) {
6475
LOG.error("setup/open failed", e);
6576
throw e;

src/main/resources/config.properties-template

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# =====
44
# configuration for communicating with Kafka
55
#
6-
# the brokers address:port, comma-separated
6+
# the brokers address:port, separated by ;
77
kafka.brokers=localhost:6667
88
# the source topic, with the augmented values
99
kafka.augmentation=bbdata2-augmented
@@ -17,8 +17,11 @@ kafka.consumer.group=flink-aggregation-01
1717
# configuration for saving aggregations to cassandra.
1818
# the cassandra instance should have a table bbdata2.aggregations table !
1919
#
20-
# the cassandra entrypoints, a list of ip addresses separated by comma
20+
# the cassandra entrypoints, a list of ip addresses separated by ;
2121
cassandra.entrypoints=127.0.0.1
22+
# the cassandra consistency level to use. See enum com.datastax.driver.core.ConsistencyLevel
23+
# NOTE: if you have only one cassandra instance, ensure you set it to ONE !
24+
cassandra.consistency=QUORUM
2225

2326
# Flink checkpoints
2427
# =================

src/test/java/ch/derlin/bbdata/flink/TestConfigs.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,17 @@ public void testCassandraEntrypoints(){
119119
assertEquals(2, Configs.readCassandraEntrypoints(config).size());
120120

121121
}
122+
123+
124+
@Test
125+
public void testCassandraConsistency(){
126+
Configuration config = new Configuration();
127+
assertDoesNotThrow(() -> Configs.readCassandraConsistency(config));
128+
129+
config.setString(Configs.configConsistency.key(), "ONE");
130+
assertDoesNotThrow(() -> Configs.readCassandraConsistency(config));
131+
132+
config.setString(Configs.configConsistency.key(), "LALA");
133+
assertThrows(Exception.class, () -> Configs.readCassandraConsistency(config));
134+
}
122135
}

0 commit comments

Comments
 (0)