-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathS3ShuffleDataIO.scala
69 lines (55 loc) · 2.21 KB
/
S3ShuffleDataIO.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
//
// Copyright 2022- IBM Inc. All rights reserved
// SPDX-License-Identifier: Apache 2.0
//
package org.apache.spark.shuffle
import org.apache.spark.shuffle.api.{
ShuffleDataIO,
ShuffleDriverComponents,
ShuffleExecutorComponents,
ShuffleMapOutputWriter,
SingleSpillShuffleMapOutputWriter
}
import org.apache.spark.shuffle.helper.S3ShuffleDispatcher
import org.apache.spark.storage.BlockManagerMaster
import org.apache.spark.{SparkConf, SparkEnv}
import java.util
import java.util.{Collections, Optional}
class S3ShuffleDataIO(sparkConf: SparkConf) extends ShuffleDataIO {
override def executor(): ShuffleExecutorComponents = new S3ShuffleExecutorComponents()
override def driver(): ShuffleDriverComponents = new S3ShuffleDriverComponents()
private class S3ShuffleExecutorComponents extends ShuffleExecutorComponents {
private val blockManager = SparkEnv.get.blockManager
override def initializeExecutor(appId: String, execId: String, extraConfigs: util.Map[String, String]): Unit = {
S3ShuffleDispatcher.get.reinitialize(appId)
}
override def createMapOutputWriter(shuffleId: Int, mapTaskId: Long, numPartitions: Int): ShuffleMapOutputWriter = {
new S3ShuffleMapOutputWriter(sparkConf, shuffleId, mapTaskId, numPartitions)
}
override def createSingleFileMapOutputWriter(
shuffleId: Int,
mapId: Long
): Optional[SingleSpillShuffleMapOutputWriter] = {
Optional.of(new S3SingleSpillShuffleMapOutputWriter(shuffleId, mapId))
}
}
private class S3ShuffleDriverComponents extends ShuffleDriverComponents {
private var blockManagerMaster: BlockManagerMaster = null
override def initializeApplication(): util.Map[String, String] = {
blockManagerMaster = SparkEnv.get.blockManager.master
Collections.emptyMap()
}
override def cleanupApplication(): Unit = {
// Dispatcher cleanup
if (S3ShuffleDispatcher.get.cleanupShuffleFiles) {
S3ShuffleDispatcher.get.removeRoot()
}
}
override def registerShuffle(shuffleId: Int): Unit = {
super.registerShuffle(shuffleId)
}
override def removeShuffle(shuffleId: Int, blocking: Boolean): Unit = {
super.removeShuffle(shuffleId, blocking)
}
}
}