Skip to content

Commit b8ff427

Browse files
committed
Scalafmt: Format source code.
Signed-off-by: Pascal Spörri <[email protected]>
1 parent 0ef22d4 commit b8ff427

16 files changed

+365
-312
lines changed

src/main/scala/org/apache/spark/shuffle/ConcurrentObjectMap.scala

+11-7
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,17 @@ class ConcurrentObjectMap[K, V] {
2020
}
2121

2222
def getOrElsePut(key: K, op: K => V): V = {
23-
val l = valueLocks.get(key).getOrElse({
24-
lock.synchronized {
25-
valueLocks.getOrElseUpdate(key, {
26-
new Object()
27-
})
28-
}
29-
})
23+
val l = valueLocks
24+
.get(key)
25+
.getOrElse({
26+
lock.synchronized {
27+
valueLocks.getOrElseUpdate(
28+
key, {
29+
new Object()
30+
}
31+
)
32+
}
33+
})
3034
l.synchronized {
3135
return map.getOrElseUpdate(key, op(key))
3236
}

src/main/scala/org/apache/spark/shuffle/S3MeasureOutputStream.scala

+4-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ class S3MeasureOutputStream(var out: OutputStream, label: String = "") extends O
1111
private var timings: Long = 0
1212
private var bytes: Long = 0
1313

14-
1514
private def checkOpen(): Unit = {
1615
if (!isOpen) {
1716
throw new IOException("The stream is already closed!")
@@ -58,7 +57,9 @@ class S3MeasureOutputStream(var out: OutputStream, label: String = "") extends O
5857
val sAt = tc.stageAttemptNumber()
5958
val t = timings / 1000000
6059
val bw = bytes.toDouble / (t.toDouble / 1000) / (1024 * 1024)
61-
logInfo(s"Statistics: Stage ${sId}.${sAt} TID ${tc.taskAttemptId()} -- " +
62-
s"Writing ${label} ${bytes} took ${t} ms (${bw} MiB/s)")
60+
logInfo(
61+
s"Statistics: Stage ${sId}.${sAt} TID ${tc.taskAttemptId()} -- " +
62+
s"Writing ${label} ${bytes} took ${t} ms (${bw} MiB/s)"
63+
)
6364
}
6465
}

src/main/scala/org/apache/spark/shuffle/S3ShuffleDataIO.scala

+3-4
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ class S3ShuffleDataIO(sparkConf: SparkConf) extends ShuffleDataIO {
3636
}
3737

3838
override def createSingleFileMapOutputWriter(
39-
shuffleId: Int,
40-
mapId: Long
41-
): Optional[SingleSpillShuffleMapOutputWriter] = {
39+
shuffleId: Int,
40+
mapId: Long
41+
): Optional[SingleSpillShuffleMapOutputWriter] = {
4242
Optional.of(new S3SingleSpillShuffleMapOutputWriter(shuffleId, mapId))
4343
}
4444
}
@@ -67,4 +67,3 @@ class S3ShuffleDataIO(sparkConf: SparkConf) extends ShuffleDataIO {
6767
}
6868
}
6969
}
70-

src/main/scala/org/apache/spark/shuffle/S3ShuffleMapOutputWriter.scala

+29-29
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,18 @@ import java.nio.ByteBuffer
1919
import java.nio.channels.{Channels, WritableByteChannel}
2020
import java.util.Optional
2121

22-
/**
23-
* Implements the ShuffleMapOutputWriter interface. It stores the shuffle output in one
24-
* shuffle block.
25-
*
26-
* This file is based on Spark "LocalDiskShuffleMapOutputWriter.java".
27-
*/
22+
/** Implements the ShuffleMapOutputWriter interface. It stores the shuffle output in one shuffle block.
23+
*
24+
* This file is based on Spark "LocalDiskShuffleMapOutputWriter.java".
25+
*/
2826

2927
class S3ShuffleMapOutputWriter(
30-
conf: SparkConf,
31-
shuffleId: Int,
32-
mapId: Long,
33-
numPartitions: Int,
34-
) extends ShuffleMapOutputWriter with Logging {
28+
conf: SparkConf,
29+
shuffleId: Int,
30+
mapId: Long,
31+
numPartitions: Int
32+
) extends ShuffleMapOutputWriter
33+
with Logging {
3534
val dispatcher = S3ShuffleDispatcher.get
3635

3736
/* Target block for writing */
@@ -44,7 +43,8 @@ class S3ShuffleMapOutputWriter(
4443
def initStream(): Unit = {
4544
if (stream == null) {
4645
stream = dispatcher.createBlock(shuffleBlock)
47-
bufferedStream = new S3MeasureOutputStream(new BufferedOutputStream(stream, dispatcher.bufferSize), shuffleBlock.name)
46+
bufferedStream =
47+
new S3MeasureOutputStream(new BufferedOutputStream(stream, dispatcher.bufferSize), shuffleBlock.name)
4848
}
4949
}
5050

@@ -59,10 +59,11 @@ class S3ShuffleMapOutputWriter(
5959
private var totalBytesWritten: Long = 0
6060
private var lastPartitionWriterId: Int = -1
6161

62-
/**
63-
* @param reducePartitionId Monotonically increasing, as per contract in ShuffleMapOutputWriter.
64-
* @return An instance of the ShufflePartitionWriter exposing the single output stream.
65-
*/
62+
/** @param reducePartitionId
63+
* Monotonically increasing, as per contract in ShuffleMapOutputWriter.
64+
* @return
65+
* An instance of the ShufflePartitionWriter exposing the single output stream.
66+
*/
6667
override def getPartitionWriter(reducePartitionId: Int): ShufflePartitionWriter = {
6768
if (reducePartitionId <= lastPartitionWriterId) {
6869
throw new RuntimeException("Precondition: Expect a monotonically increasing reducePartitionId.")
@@ -81,19 +82,21 @@ class S3ShuffleMapOutputWriter(
8182
new S3ShufflePartitionWriter(reducePartitionId)
8283
}
8384

84-
/**
85-
* Close all writers and the shuffle block.
86-
*
87-
* @param checksums Ignored.
88-
* @return
89-
*/
85+
/** Close all writers and the shuffle block.
86+
*
87+
* @param checksums
88+
* Ignored.
89+
* @return
90+
*/
9091
override def commitAllPartitions(checksums: Array[Long]): MapOutputCommitMessage = {
9192
if (bufferedStream != null) {
9293
bufferedStream.flush()
9394
}
9495
if (stream != null) {
9596
if (stream.getPos != totalBytesWritten) {
96-
throw new RuntimeException(f"S3ShuffleMapOutputWriter: Unexpected output length ${stream.getPos}, expected: ${totalBytesWritten}.")
97+
throw new RuntimeException(
98+
f"S3ShuffleMapOutputWriter: Unexpected output length ${stream.getPos}, expected: ${totalBytesWritten}."
99+
)
97100
}
98101
}
99102
if (bufferedStreamAsChannel != null) {
@@ -198,8 +201,7 @@ class S3ShuffleMapOutputWriter(
198201
}
199202
}
200203

201-
private class S3ShufflePartitionWriterChannel(reduceId: Int)
202-
extends WritableByteChannelWrapper {
204+
private class S3ShufflePartitionWriterChannel(reduceId: Int) extends WritableByteChannelWrapper {
203205
private val partChannel = new S3PartitionWritableByteChannel(bufferedStreamAsChannel)
204206

205207
override def channel(): WritableByteChannel = {
@@ -216,8 +218,7 @@ class S3ShuffleMapOutputWriter(
216218
}
217219
}
218220

219-
private class S3PartitionWritableByteChannel(channel: WritableByteChannel)
220-
extends WritableByteChannel {
221+
private class S3PartitionWritableByteChannel(channel: WritableByteChannel) extends WritableByteChannel {
221222

222223
private var count: Long = 0
223224

@@ -229,8 +230,7 @@ class S3ShuffleMapOutputWriter(
229230
channel.isOpen()
230231
}
231232

232-
override def close(): Unit = {
233-
}
233+
override def close(): Unit = {}
234234

235235
override def write(x: ByteBuffer): Int = {
236236
var c = 0

src/main/scala/org/apache/spark/shuffle/S3ShuffleWriter.scala

-1
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,3 @@ class S3ShuffleWriter[K, V](writer: ShuffleWriter[K, V]) extends ShuffleWriter[K
1919

2020
override def getPartitionLengths(): Array[Long] = writer.getPartitionLengths()
2121
}
22-

src/main/scala/org/apache/spark/shuffle/S3SingleSpillShuffleMapOutputWriter.scala

+11-7
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,17 @@ import org.apache.spark.util.Utils
1515
import java.io.{File, FileInputStream}
1616
import java.nio.file.{Files, Path}
1717

18-
class S3SingleSpillShuffleMapOutputWriter(shuffleId: Int, mapId: Long) extends SingleSpillShuffleMapOutputWriter with Logging {
18+
class S3SingleSpillShuffleMapOutputWriter(shuffleId: Int, mapId: Long)
19+
extends SingleSpillShuffleMapOutputWriter
20+
with Logging {
1921

2022
private lazy val dispatcher = S3ShuffleDispatcher.get
2123

2224
override def transferMapSpillFile(
23-
mapSpillFile: File,
24-
partitionLengths: Array[Long],
25-
checksums: Array[Long]
26-
): Unit = {
25+
mapSpillFile: File,
26+
partitionLengths: Array[Long],
27+
checksums: Array[Long]
28+
): Unit = {
2729
val block = ShuffleDataBlockId(shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
2830

2931
if (dispatcher.rootIsLocal) {
@@ -44,8 +46,10 @@ class S3SingleSpillShuffleMapOutputWriter(shuffleId: Int, mapId: Long) extends S
4446
val sAt = tc.stageAttemptNumber()
4547
val t = timings / 1000000
4648
val bw = bytes.toDouble / (t.toDouble / 1000) / (1024 * 1024)
47-
logInfo(s"Statistics: Stage ${sId}.${sAt} TID ${tc.taskAttemptId()} -- " +
48-
s"Writing ${block.name} ${bytes} took ${t} ms (${bw} MiB/s)")
49+
logInfo(
50+
s"Statistics: Stage ${sId}.${sAt} TID ${tc.taskAttemptId()} -- " +
51+
s"Writing ${block.name} ${bytes} took ${t} ms (${bw} MiB/s)"
52+
)
4953
} else {
5054
// Copy using a stream.
5155
val in = new FileInputStream(mapSpillFile)

0 commit comments

Comments
 (0)