-
Notifications
You must be signed in to change notification settings - Fork 474
/
Copy pathpyspark-session-2021-04-21-mapPartitions.txt
executable file
·79 lines (76 loc) · 2.1 KB
/
pyspark-session-2021-04-21-mapPartitions.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
./bin/pyspark
Python 3.7.2 (v3.7.2:9a3ffc0492, Dec 24 2018, 02:44:43)
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.1.1
/_/
Using Python version 3.7.2 (v3.7.2:9a3ffc0492, Dec 24 2018 02:44:43)
Spark context Web UI available at http://10.0.0.93:4040
Spark context available as 'sc' (master = local[*], app id = local-1619061713234).
SparkSession available as 'spark'.
>>>
>>>
>>>
>>> nums = [1, 2, 3, 4, -1, 4, 5, 6, 7, -3, -1, 2, 3, 9, -1, -2]
>>> nums
[1, 2, 3, 4, -1, 4, 5, 6, 7, -3, -1, 2, 3, 9, -1, -2]
>>>
>>>
>>> rdd = sc.parallelize(nums)
>>> rdd.collect()
[1, 2, 3, 4, -1, 4, 5, 6, 7, -3, -1, 2, 3, 9, -1, -2]
>>># find the default number of partitions
>>> rdd.getNumPartitions()
8
>>>
>>># set number of partitions explicitly to 3
>>> rdd = sc.parallelize(nums, 3)
>>> rdd.getNumPartitions()
3
>>> def debug(partition):
... elements = []
... for x in partition:
... elements.append(x)
... print("elements=", elements)
...
>>> rdd.foreachPartition(debug)
elements= [4, 5, 6, 7, -3]
elements= [1, 2, 3, 4, -1]
elements= [-1, 2, 3, 9, -1, -2]
>>>
>>>#define a function which handles a single partition
>>> def min_max_count(partition):
... first_time = False
... local_count = 0
... for n in partition:
... local_count += 1
... if (first_time == False):
... local_min = n
... local_max = n
... first_time = True
... else:
... local_min = min(n, local_min)
... local_max = max(n, local_max)
... return [(local_min, local_max, local_count)]
...
>>># Test your custom function without Spark
>>> x = [1, 2, 3, -3, -6, 9, 10, 4, 5, 6]
>>> result = min_max_count(x)
>>> result
[(-6, 10, 10)]
>>>
>>> rdd.foreachPartition(debug)
elements= [1, 2, 3, 4, -1]
elements= [-1, 2, 3, 9, -1, -2]
elements= [4, 5, 6, 7, -3]
>>>
>>> rdd2 = rdd.mapPartitions(min_max_count)
>>> rdd2.collect()
[(-1, 4, 5), (-3, 7, 5), (-2, 9, 6)]
>>> final_answer = rdd2.reduce(lambda x, y: ( min(x[0], y[0]), max(x[1], y[1]), x[2]+y[2]) )
>>> final_answer
(-3, 9, 16)
>>>