Skip to content

Commit c9ae4ab

Browse files
yaooqinndongjoon-hyun
authored andcommitted
[SPARK-53999][CORE] Native KQueue Transport support on BSD/MacOS
### What changes were proposed in this pull request? This PR adds Native KQUEUE via JNI support for transport, such as shuffle, file, and rpc procedures ### Why are the changes needed? Feature parity between Linux and MacOS/BSD platforms ### Does this PR introduce _any_ user-facing change? Yes, a new option for io.mode ### How was this patch tested? new unit tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #52703 from yaooqinn/SPARK-53999. Authored-by: Kent Yao <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent f72435a commit c9ae4ab

File tree

3 files changed

+48
-5
lines changed

3 files changed

+48
-5
lines changed

common/network-common/src/main/java/org/apache/spark/network/util/IOMode.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,18 @@
1919

2020
/**
2121
* Selector for which form of low-level IO we should use.
22-
* NIO is always available, while EPOLL is only available on Linux.
23-
* AUTO is used to select EPOLL if it's available, or NIO otherwise.
2422
*/
2523
public enum IOMode {
26-
NIO, EPOLL
24+
/**
25+
* Java NIO (Selector), cross-platform portable
26+
*/
27+
NIO,
28+
/**
29+
* Native EPOLL via JNI, Linux only
30+
*/
31+
EPOLL,
32+
/**
33+
* Native KQUEUE via JNI, MacOS/BSD only
34+
*/
35+
KQUEUE
2736
}

common/network-common/src/main/java/org/apache/spark/network/util/NettyUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import io.netty.channel.epoll.EpollEventLoopGroup;
2727
import io.netty.channel.epoll.EpollServerSocketChannel;
2828
import io.netty.channel.epoll.EpollSocketChannel;
29+
import io.netty.channel.kqueue.KQueueEventLoopGroup;
30+
import io.netty.channel.kqueue.KQueueServerSocketChannel;
31+
import io.netty.channel.kqueue.KQueueSocketChannel;
2932
import io.netty.channel.nio.NioEventLoopGroup;
3033
import io.netty.channel.socket.nio.NioServerSocketChannel;
3134
import io.netty.channel.socket.nio.NioSocketChannel;
@@ -68,6 +71,7 @@ public static EventLoopGroup createEventLoop(IOMode mode, int numThreads, String
6871
return switch (mode) {
6972
case NIO -> new NioEventLoopGroup(numThreads, threadFactory);
7073
case EPOLL -> new EpollEventLoopGroup(numThreads, threadFactory);
74+
case KQUEUE -> new KQueueEventLoopGroup(numThreads, threadFactory);
7175
};
7276
}
7377

@@ -76,6 +80,7 @@ public static Class<? extends Channel> getClientChannelClass(IOMode mode) {
7680
return switch (mode) {
7781
case NIO -> NioSocketChannel.class;
7882
case EPOLL -> EpollSocketChannel.class;
83+
case KQUEUE -> KQueueSocketChannel.class;
7984
};
8085
}
8186

@@ -84,6 +89,7 @@ public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode)
8489
return switch (mode) {
8590
case NIO -> NioServerSocketChannel.class;
8691
case EPOLL -> EpollServerSocketChannel.class;
92+
case KQUEUE -> KQueueServerSocketChannel.class;
8793
};
8894
}
8995

core/src/test/scala/org/apache/spark/ShuffleNettySuite.scala

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,42 @@
1717

1818
package org.apache.spark
1919

20-
import org.scalatest.BeforeAndAfterAll
20+
import org.scalactic.source.Position
21+
import org.scalatest.{BeforeAndAfterAll, Tag}
2122

22-
class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
23+
import org.apache.spark.network.util.IOMode
24+
import org.apache.spark.util.Utils
25+
26+
abstract class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
2327

2428
// This test suite should run all tests in ShuffleSuite with Netty shuffle mode.
2529

30+
def ioMode: IOMode = IOMode.NIO
31+
def shouldRunTests: Boolean = true
2632
override def beforeAll(): Unit = {
2733
super.beforeAll()
2834
conf.set("spark.shuffle.blockTransferService", "netty")
35+
conf.set("spark.shuffle.io.mode", ioMode.toString)
36+
}
37+
38+
override protected def test(testName: String, testTags: Tag*)(testBody: => Any)(
39+
implicit pos: Position): Unit = {
40+
if (!shouldRunTests) {
41+
ignore(s"$testName [disabled on ${Utils.osName} with $ioMode]")(testBody)
42+
} else {
43+
super.test(testName, testTags: _*) {testBody}
44+
}
2945
}
3046
}
47+
48+
class ShuffleNettyNioSuite extends ShuffleNettySuite
49+
50+
class ShuffleNettyEpollSuite extends ShuffleNettySuite {
51+
override def shouldRunTests: Boolean = Utils.isLinux
52+
override def ioMode: IOMode = IOMode.EPOLL
53+
}
54+
55+
class ShuffleNettyKQueueSuite extends ShuffleNettySuite {
56+
override def shouldRunTests: Boolean = Utils.isMac
57+
override def ioMode: IOMode = IOMode.KQUEUE
58+
}

0 commit comments

Comments
 (0)