Skip to content

Commit a5da4ce

Browse files
committed
spark: core相关
1 parent 370fb77 commit a5da4ce

File tree

5 files changed

+267
-0
lines changed

5 files changed

+267
-0
lines changed

middleware/spark/SparkCore.md

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Job/Stage/Task
2+
- Job: 作业,完整的执行流程。即count(Job)=count(行动算子)。
3+
- Stage: 阶段,每次读写算做一个阶段。即count(Stage)=count(Shuffle)+1。各阶段串行执行。
4+
- Task: 任务,每个阶段最后RDD分区数量总和。
5+
6+
Job 1=(1..n) Stage 1=(1..n) Task
7+
8+
# Partition
9+
分区。将数据分成多个区,各区数据不重复。
10+
k-v类型相同key放在同一个区。
11+
Shuffle操作会重新分区。
12+
13+
# Shuffle
14+
将分区内数据重新打乱分发操作称为Shuffle。Shuffle操作是主要的资源消耗原因。
15+
- 可改变分区数目。
16+
- Shuffle一定会落盘。
17+
- 需等待所有数据Shuffle操作执行完毕后才继续执行后续操作。
18+
19+
# RDD
20+
Resilient Distributed Datasets弹性分布式数据集。
21+
22+
- 封装单个简单的数据处理,复杂处理需多个RDD组合
23+
- 不保存具体数据值
24+
- 可处理多个分区
25+
26+
## 数据处理分类
27+
- value: 每条数据只有值,处理也是按值处理。
28+
- key-value: 每条数据包含键值对,可按键值处理。
29+
30+
## 算子(方法处理分类)
31+
指RDD内的方法。对数据进行操作。
32+
- 转换算子: 对数据进行转化操作,返回结果还是RDD。 一般不会创建执行任务。
33+
- 行动算子: 对数据进行收集操作,返回结果不是RDD。 会创建并执行任务。
34+
35+
# 依赖
36+
相邻RDD间数据分区的关系。下游RDD依赖直接上游RDD。
37+
38+
## 窄依赖
39+
- 上游RDD数据被一个RDD独享。
40+
- 上游 (1..n)=1 下游。
41+
- 不会执行shuffle操作。
42+
43+
## 宽依赖
44+
- 上游RDD数据被多个RDD共享。
45+
- 上游 (1..n)=n 下游。
46+
- 会执行shuffle操作。
47+
48+
# 持久化
49+
避免重复计算,将计算中间结果保存。
50+
- cache: 缓存。不切断血缘关系,存储在内存。
51+
- checkPoint: 检测点。切断血缘关系,通常存储在HDFS。会重新计算。
52+
53+
# Broadcast
54+
广播变量,当task使用到共享变量时,避免多次传输时使用。
55+
该方法可将共享变量按工作节点分发。(默认采用task分发)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package base;
2+
3+
import org.apache.spark.api.java.JavaRDD;
4+
import org.apache.spark.api.java.JavaSparkContext;
5+
import org.apache.spark.sql.SparkSession;
6+
7+
import java.util.Collections;
8+
9+
public class CacheRdd {
10+
11+
/**
12+
* map + plus + zip
13+
* | multi |
14+
*/
15+
public static void main(String[] args) {
16+
SparkSession spark = SparkSession
17+
.builder()
18+
.appName("test")
19+
.master("local[2]")
20+
.getOrCreate();
21+
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
22+
23+
// JavaRDD<Integer> map = javaSparkContext.parallelize(Collections.singletonList(1))
24+
// .map(i -> {
25+
// // 执行2次
26+
// System.out.println("1. [" + Thread.currentThread() + "]: " + i);
27+
// return i;
28+
// });
29+
JavaRDD<Integer> map = javaSparkContext.parallelize(Collections.singletonList(1))
30+
.map(i -> {
31+
// 执行1次
32+
System.out.println("1. [" + Thread.currentThread() + "]: " + i);
33+
return i;
34+
})
35+
.cache();
36+
37+
JavaRDD<Integer> plus = map.map(i -> {
38+
// 执行1次
39+
System.out.println("2.1 [" + Thread.currentThread() + "]: " + i);
40+
return i + 1;
41+
});
42+
JavaRDD<Integer> multi = map.map(i -> {
43+
// 执行1次
44+
System.out.println("2.2 [" + Thread.currentThread() + "]: " + i);
45+
return i * 2;
46+
});
47+
JavaRDD<Integer> zip = plus.zip(multi).map(tuple -> {
48+
// 执行1次
49+
System.out.println("3. [" + Thread.currentThread() + "]: " + (tuple._1() + tuple._2()));
50+
return tuple._1() + tuple._2();
51+
});
52+
zip.collect().forEach(System.out::println);
53+
spark.close();
54+
}
55+
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package base;
2+
3+
import org.apache.spark.api.java.JavaRDD;
4+
import org.apache.spark.api.java.JavaSparkContext;
5+
import org.apache.spark.api.java.Optional;
6+
import org.apache.spark.sql.SparkSession;
7+
8+
import java.util.List;
9+
import java.util.concurrent.TimeUnit;
10+
import java.util.stream.Collectors;
11+
import java.util.stream.Stream;
12+
13+
public class CheckPointRdd {
14+
15+
public static final List<Integer> DATA = Stream
16+
.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
17+
.collect(Collectors.toList());
18+
19+
public static void main(String[] args) throws Exception {
20+
SparkSession spark = SparkSession
21+
.builder()
22+
.appName("test")
23+
.master("local")
24+
.getOrCreate();
25+
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
26+
// // 需提前创建目录
27+
javaSparkContext.setCheckpointDir("checkpoint");
28+
JavaRDD<Integer> cache = javaSparkContext
29+
.parallelize(DATA)
30+
.map(i -> {
31+
System.out.println("map: " + i);
32+
return i;
33+
})
34+
.cache();
35+
cache.checkpoint();
36+
cache.collect().forEach(System.out::println);
37+
38+
// 获取checkpoint输出文件
39+
Optional<String> checkpointFile = cache.getCheckpointFile();
40+
System.out.println("file:" + checkpointFile.orElse(null));
41+
42+
javaSparkContext = new JavaSparkContext(spark.sparkContext());
43+
// 读取checkpoint
44+
javaSparkContext.checkpointFile(checkpointFile.get())
45+
.collect()
46+
.forEach(System.out::println);
47+
48+
// job: 3
49+
System.out.println("open: http://localhost:4040");
50+
TimeUnit.HOURS.sleep(1);
51+
spark.close();
52+
}
53+
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package base;
2+
3+
import org.apache.spark.api.java.JavaSparkContext;
4+
import org.apache.spark.sql.SparkSession;
5+
import scala.Tuple2;
6+
7+
import java.util.List;
8+
import java.util.stream.Collectors;
9+
import java.util.stream.Stream;
10+
11+
public class Partition {
12+
13+
public static final List<Integer> DATA = Stream
14+
.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
15+
.collect(Collectors.toList());
16+
17+
public static void main(String[] args) {
18+
SparkSession spark = SparkSession
19+
.builder()
20+
.appName("test")
21+
.master("local")
22+
.getOrCreate();
23+
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
24+
javaSparkContext
25+
.parallelize(DATA)
26+
.groupBy(i -> i)
27+
.groupByKey(new org.apache.spark.Partitioner() {
28+
@Override
29+
public int numPartitions() {
30+
return 3;
31+
}
32+
33+
@Override
34+
public int getPartition(Object key) {
35+
return key.hashCode() % 3;
36+
}
37+
})
38+
.map(Tuple2::_1)
39+
.saveAsTextFile("output");
40+
spark.close();
41+
}
42+
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package base;
2+
3+
import org.apache.spark.api.java.JavaSparkContext;
4+
import org.apache.spark.sql.SparkSession;
5+
6+
import java.util.List;
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.stream.Collectors;
9+
import java.util.stream.Stream;
10+
11+
public class StagePartition {
12+
13+
public static final List<Integer> DATA = Stream
14+
.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
15+
.collect(Collectors.toList());
16+
17+
public static void main(String[] args) throws Exception {
18+
SparkSession spark = SparkSession
19+
.builder()
20+
.appName("test")
21+
.master("local[12]")
22+
.getOrCreate();
23+
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
24+
javaSparkContext
25+
// stage1: 2
26+
.parallelize(DATA, 2)
27+
.map(i -> {
28+
// 2 thread
29+
System.out.println("parallelize[" + Thread.currentThread().hashCode() + "]: " + i);
30+
return i;
31+
})
32+
// stage2: 6
33+
.repartition(12)
34+
.map(i1 -> {
35+
// 6 thread
36+
System.out.println("repartition[" + Thread.currentThread().hashCode() + "]: " + i1);
37+
return i1;
38+
})
39+
.coalesce(6)
40+
.map(i11 -> {
41+
// 6 thread
42+
System.out.println("coalesce1[" + Thread.currentThread().hashCode() + "]: " + i11);
43+
return i11;
44+
})
45+
// stage3: 3
46+
.coalesce(3, true)
47+
.map(i11 -> {
48+
// 3 thread
49+
System.out.println("coalesce2[" + Thread.currentThread().hashCode() + "]: " + i11);
50+
return i11;
51+
})
52+
.collect()
53+
.forEach(System.out::println);
54+
System.out.println("open: http://localhost:4040");
55+
TimeUnit.HOURS.sleep(1);
56+
spark.close();
57+
}
58+
59+
}

0 commit comments

Comments
 (0)