Skip to content

Commit 0a4b8dc

Browse files
committed
spark: streaming相关、补充spark其他内容
1 parent fa414ba commit 0a4b8dc

File tree

6 files changed

+203
-0
lines changed

6 files changed

+203
-0
lines changed

middleware/kafka/src/main/java/com/example/kafka/KafkaApplication.java

+29
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import org.springframework.kafka.core.KafkaTemplate;
1111

1212
import java.nio.charset.StandardCharsets;
13+
import java.util.Random;
14+
import java.util.concurrent.TimeUnit;
1315

1416
@SpringBootApplication
1517
public class KafkaApplication {
@@ -27,13 +29,40 @@ public NewTopic testTopic() {
2729
.build();
2830
}
2931

32+
@Bean
33+
public NewTopic randomTopic() {
34+
return TopicBuilder
35+
.name("random")
36+
.partitions(10)
37+
.replicas(1)
38+
.build();
39+
}
40+
3041
@Bean
3142
public CommandLineRunner test(KafkaTemplate<String, byte[]> kafkaTemplate) {
3243
return args -> {
3344
kafkaTemplate.send("test", "test1".getBytes(StandardCharsets.UTF_8));
3445
};
3546
}
3647

48+
@Bean
49+
public CommandLineRunner random(KafkaTemplate<String, byte[]> kafkaTemplate) {
50+
return args -> {
51+
Random random = new Random();
52+
while (true) {
53+
int i = Math.abs(random.nextInt() % 100);
54+
String message = Integer.toString(i);
55+
System.out.println(message);
56+
kafkaTemplate.send("random", message.getBytes(StandardCharsets.UTF_8));
57+
try {
58+
TimeUnit.MILLISECONDS.sleep(i * 10 + 500);
59+
} catch (Exception e) {
60+
e.printStackTrace();
61+
}
62+
}
63+
};
64+
}
65+
3766
@KafkaListener(id = "test", topics = "test")
3867
public void listener(String data) {
3968
System.out.println("listener: " + data);

middleware/spark/SparkStreaming.md

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# DStream
2+
离散化流。将数据按时间区间采集到的数据集。是spark处理流式数据的封装。
3+
4+
# 接收器
5+
接收数据,封装为DStream。
6+
7+
# window
8+
窗口操作。将多个连续的时间区间采集数据合并成更大的区间继续处理。
9+
- 窗口时长: 合并后的时间范围
10+
- 滑动步长: 每个窗口滑动时间
11+
12+
**这两个参数必须为采集周期的整数倍**
13+
- 窗口时长=滑动步长: 数据恰好处理
14+
- 窗口时长<滑动步长: 数据可能会丢失
15+
- 窗口时长>滑动步长: 数据可能会重复

middleware/spark/pom.xml

+10
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,16 @@
2323
<artifactId>spark-sql_2.12</artifactId>
2424
<version>3.5.1</version>
2525
</dependency>
26+
<dependency>
27+
<groupId>org.apache.spark</groupId>
28+
<artifactId>spark-streaming_2.12</artifactId>
29+
<version>3.5.1</version>
30+
</dependency>
31+
<dependency>
32+
<groupId>org.apache.spark</groupId>
33+
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
34+
<version>3.5.1</version>
35+
</dependency>
2636
<dependency>
2737
<groupId>com.fasterxml.jackson.core</groupId>
2838
<artifactId>jackson-core</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package base;
2+
3+
import org.apache.spark.api.java.JavaSparkContext;
4+
import org.apache.spark.sql.SparkSession;
5+
import org.apache.spark.util.LongAccumulator;
6+
7+
import java.util.List;
8+
import java.util.concurrent.TimeUnit;
9+
import java.util.stream.Collectors;
10+
import java.util.stream.Stream;
11+
12+
public class Accumulators {
13+
14+
public static final List<Integer> DATA = Stream
15+
.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
16+
.collect(Collectors.toList());
17+
18+
public static void main(String[] args) throws Exception {
19+
SparkSession spark = SparkSession
20+
.builder()
21+
.appName("test")
22+
.master("local")
23+
.getOrCreate();
24+
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
25+
LongAccumulator longAccumulator = spark.sparkContext().longAccumulator("longAccumulator");
26+
javaSparkContext
27+
.parallelize(DATA)
28+
.map(i -> {
29+
longAccumulator.add(i);
30+
return i;
31+
})
32+
.collect()
33+
.forEach(System.out::println);
34+
System.out.println("sum: " + longAccumulator.sum());
35+
36+
// job: 3
37+
System.out.println("open: http://localhost:4040");
38+
TimeUnit.HOURS.sleep(1);
39+
spark.close();
40+
}
41+
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package sql;
2+
3+
import org.apache.spark.api.java.JavaRDD;
4+
import org.apache.spark.api.java.JavaSparkContext;
5+
import org.apache.spark.sql.Row;
6+
import org.apache.spark.sql.RowFactory;
7+
import org.apache.spark.sql.SparkSession;
8+
import org.apache.spark.sql.types.DataTypes;
9+
import org.apache.spark.sql.types.StructType;
10+
11+
import java.util.List;
12+
import java.util.stream.Collectors;
13+
import java.util.stream.Stream;
14+
15+
public class Schema {
16+
17+
public static void main(String[] args) {
18+
SparkSession spark = SparkSession
19+
.builder()
20+
.appName("test")
21+
.master("local")
22+
.getOrCreate();
23+
24+
// 定义schema
25+
StructType schema = DataTypes.createStructType(Stream.of(
26+
DataTypes.createStructField("id", DataTypes.StringType, false),
27+
DataTypes.createStructField("name", DataTypes.StringType, false),
28+
DataTypes.createStructField("age", DataTypes.IntegerType, false)
29+
).collect(Collectors.toList()));
30+
31+
// 获取数据
32+
List<String> data = Stream.of(
33+
"001 user1 10",
34+
"002 user2 20",
35+
"003 user3 15",
36+
"004 user4 35"
37+
).collect(Collectors.toList());
38+
JavaRDD<Row> dataRdd = new JavaSparkContext(spark.sparkContext())
39+
.parallelize(data)
40+
.map(str -> {
41+
String[] s = str.split(" ");
42+
return RowFactory.create(s[0], s[1], Integer.parseInt(s[2]));
43+
});
44+
45+
/*
46+
* +---+-----+---+
47+
* | id| name|age|
48+
* +---+-----+---+
49+
* |001|user1| 10|
50+
* |002|user2| 20|
51+
* |003|user3| 15|
52+
* |004|user4| 35|
53+
* +---+-----+---+
54+
*/
55+
spark.createDataFrame(dataRdd, schema).show();
56+
57+
spark.close();
58+
}
59+
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package streaming;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerConfig;
4+
import org.apache.kafka.clients.consumer.ConsumerRecord;
5+
import org.apache.kafka.common.serialization.StringDeserializer;
6+
import org.apache.spark.SparkConf;
7+
import org.apache.spark.streaming.Durations;
8+
import org.apache.spark.streaming.api.java.JavaInputDStream;
9+
import org.apache.spark.streaming.api.java.JavaStreamingContext;
10+
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
11+
import org.apache.spark.streaming.kafka010.ConsumerStrategy;
12+
import org.apache.spark.streaming.kafka010.KafkaUtils;
13+
import org.apache.spark.streaming.kafka010.LocationStrategies;
14+
15+
import java.util.Collections;
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
19+
public class ByKafka {
20+
21+
public static void main(String[] args) throws InterruptedException {
22+
SparkConf sparkConf = new SparkConf()
23+
.setMaster("local")
24+
.setAppName("test");
25+
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
26+
27+
// kafka配置
28+
Map<String, Object> kafkaParams = new HashMap<>();
29+
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.201:9092,192.168.31.202:9092,192.168.31.203:9092");
30+
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "spark");
31+
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
32+
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
33+
34+
// 数据处理
35+
// 数据发送于: middleware/kafka/src/main/java/com/example/kafka/KafkaApplication.java
36+
ConsumerStrategy<Object, Object> consumerStrategy = ConsumerStrategies.Subscribe(Collections.singleton("random"), kafkaParams);
37+
JavaInputDStream<ConsumerRecord<Object, Object>> dStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(), consumerStrategy);
38+
dStream
39+
.map(ConsumerRecord::value)
40+
.print();
41+
42+
// 启动
43+
jssc.start();
44+
jssc.awaitTermination();
45+
}
46+
47+
}

0 commit comments

Comments
 (0)