Skip to content

Commit 370fb77

Browse files
committed
spark: 基础RDD
1 parent c419ae8 commit 370fb77

File tree

3 files changed

+133
-0
lines changed

3 files changed

+133
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package base;
2+
3+
import org.apache.spark.api.java.JavaSparkContext;
4+
import org.apache.spark.sql.SparkSession;
5+
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
import java.util.stream.Collectors;
9+
import java.util.stream.Stream;
10+
11+
public class BaseRdd {
12+
13+
public static final List<String> DATA = Stream
14+
.of("Abc", "Abcdef", "bC", "Dd","eC", "dD")
15+
.collect(Collectors.toList());
16+
17+
public static void main(String[] args) {
18+
SparkSession spark = SparkSession
19+
.builder()
20+
.appName("test")
21+
.master("local")
22+
.getOrCreate();
23+
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
24+
List<String> res = javaSparkContext
25+
.parallelize(DATA)
26+
// 过滤: 移除 Abcdef
27+
.filter(s -> s.length() < 5)
28+
// 映射: 转大写
29+
.map(String::toUpperCase)
30+
// 去重: 合并 Dd dD
31+
.distinct()
32+
// 展开: 按字符串长度复制多份
33+
.flatMap(s -> {
34+
List<String> list = new ArrayList<>(s.length());
35+
for (int i = 0; i < s.length(); i++) {
36+
list.add(s);
37+
}
38+
return list.iterator();
39+
})
40+
// 排序: 字典序
41+
.sortBy(s -> s, true, 2)
42+
.collect();
43+
res.forEach(System.out::println);
44+
spark.close();
45+
}
46+
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package base;
2+
3+
import org.apache.spark.api.java.JavaSparkContext;
4+
import org.apache.spark.sql.SparkSession;
5+
import scala.Tuple2;
6+
7+
import java.util.List;
8+
import java.util.stream.Collectors;
9+
import java.util.stream.Stream;
10+
11+
public class PairRdd {
12+
13+
public static final List<Tuple2<String, Integer>> DATA = Stream
14+
.of(
15+
new Tuple2<>("ABC", 1),
16+
new Tuple2<>("abc", 2),
17+
new Tuple2<>("Abc", 3),
18+
new Tuple2<>("Abc", 3),
19+
new Tuple2<>("Abc", 4),
20+
new Tuple2<>("ABc", 3),
21+
new Tuple2<>("aBc", 9)
22+
)
23+
.collect(Collectors.toList());
24+
25+
public static void main(String[] args) {
26+
SparkSession spark = SparkSession
27+
.builder()
28+
.appName("test")
29+
.master("local")
30+
.getOrCreate();
31+
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
32+
List<Tuple2<String, Integer>> res = javaSparkContext
33+
.parallelizePairs(DATA)
34+
// 过滤: 移除 ("aBc", 9)
35+
.filter(t -> t._2() < 5)
36+
// 去重: K-V相同才去重
37+
.distinct()
38+
// 映射值: 双倍
39+
.mapValues(i -> i * 2)
40+
// 排序: 按key排
41+
.sortByKey()
42+
.collect();
43+
res.forEach(System.out::println);
44+
spark.close();
45+
}
46+
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package base;
2+
3+
import org.apache.spark.api.java.JavaSparkContext;
4+
import org.apache.spark.sql.SparkSession;
5+
6+
import java.util.List;
7+
import java.util.Map;
8+
import java.util.stream.Collectors;
9+
import java.util.stream.Stream;
10+
11+
public class WordCount {
12+
13+
public static final List<String> DATA = Stream
14+
.of("Abc", "Abcdef", "bC", "Dd","eC", "dD", "Abcdef", "bC", "bC", "Dd","eC", "dD", "Abcdef", "bC")
15+
.collect(Collectors.toList());
16+
17+
public static void main(String[] args) {
18+
SparkSession spark = SparkSession
19+
.builder()
20+
.appName("test")
21+
.master("local")
22+
.getOrCreate();
23+
JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
24+
Map<String, Integer> res = javaSparkContext
25+
.parallelize(DATA)
26+
.groupBy(s -> s)
27+
.mapValues(strings -> {
28+
int count = 0;
29+
for (String s : strings) {
30+
count++;
31+
}
32+
return count;
33+
})
34+
.collectAsMap();
35+
res.forEach((k, v) -> System.out.println(k + "\t" + v));
36+
spark.close();
37+
}
38+
39+
}

0 commit comments

Comments
 (0)