-
Notifications
You must be signed in to change notification settings - Fork 326
Full support for multithreaded applications #641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
.../microsoft-spark-2.4.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackendHandler.scala
Show resolved
Hide resolved
.../microsoft-spark-2.3.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackendHandler.scala
Show resolved
Hide resolved
.../microsoft-spark-3.0.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackendHandler.scala
Outdated
Show resolved
Hide resolved
.../microsoft-spark-2.3.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackendHandler.scala
Outdated
Show resolved
Hide resolved
.../microsoft-spark-2.3.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackendHandler.scala
Outdated
Show resolved
Hide resolved
src/scala/microsoft-spark-2.3.x/src/main/scala/org/apache/spark/api/dotnet/ThreadPool.scala
Outdated
Show resolved
Hide resolved
…k/api/dotnet/ThreadPool.scala Co-authored-by: Steve Suh <[email protected]>
… into anfog/thread_local
/// (first created) context. | ||
/// </summary> | ||
/// <param name="session">SparkSession object</param> | ||
public static void SetActiveSession(SparkSession session) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add the version attribute [Since(Versions.V3_0_0)]
for this API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @AFFogarty ! |
src/csharp/Microsoft.Spark.E2ETest/IpcTests/JvmThreadPoolGarbageCollectorTests.cs
Outdated
Show resolved
Hide resolved
src/csharp/Microsoft.Spark.E2ETest/IpcTests/JvmThreadPoolGarbageCollectorTests.cs
Outdated
Show resolved
Hide resolved
src/csharp/Microsoft.Spark.E2ETest/IpcTests/JvmThreadPoolGarbageCollectorTests.cs
Outdated
Show resolved
Hide resolved
src/csharp/Microsoft.Spark.E2ETest/IpcTests/JvmThreadPoolGarbageCollectorTests.cs
Outdated
Show resolved
Hide resolved
src/csharp/Microsoft.Spark.E2ETest/IpcTests/JvmThreadPoolGarbageCollectorTests.cs
Outdated
Show resolved
Hide resolved
src/csharp/Microsoft.Spark.E2ETest/IpcTests/JvmThreadPoolGarbageCollectorTests.cs
Outdated
Show resolved
Hide resolved
src/csharp/Microsoft.Spark/Interop/Ipc/JvmThreadPoolGarbageCollector.cs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few nit/minor comments, but looking good to me!
src/csharp/Microsoft.Spark.E2ETest/IpcTests/JvmThreadPoolGCTests.cs
Outdated
Show resolved
Hide resolved
src/csharp/Microsoft.Spark.E2ETest/IpcTests/JvmThreadPoolGCTests.cs
Outdated
Show resolved
Hide resolved
def run(threadId: Int, task: () => Unit): Unit = { | ||
val executor = getOrCreateExecutor(threadId) | ||
val future = executor.submit(new Runnable { | ||
override def run(): Unit = task() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for verifying this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One nit comment, but LGTM, thanks @AFFogarty!
} | ||
} | ||
|
||
_loggerService.LogDebug("JVM thread GC complete."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this log and line 130? (We don't have a good logger service, so this will always be printed). I think line 111 should be good enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. Also fixed a bug where that function was always returning false.
Design
This PR enables multi-threaded user applications. The PR accomplishes this by creating a 1-to-1 mapping between .NET
Thread
objects and JVM single-threadExecutorService
objects.ExecutorService
is a Java interface that wraps a worker thread with its own task queue.When a .NET thread calls a Spark function, the
JvmBridge
includes the thread'sManagedThreadId
in the payload body. When the request is received on the JVM side,DotnetBackendHandler
uses theManagedThreadId
to map the request onto its correspondingExecutorService
. TheExecutorService
then executes the function on its worker thread.This pattern guarantees that Spark functions called by a particular .NET thread will always be executed on the same JVM thread. In addition, two different .NET threads will always execute Spark functions on two different JVM threads.
An object called
ThreadPool
manages the lifecycle forExecutorService
objects on the JVM side. A corresponding class calledJvmThreadPool
manages the threads on the .NET side, and does periodic garbage collection.Additional features
This PR also enables
GetActiveSession
,SetActiveSession
, andClearActiveSession
APIs onSparkSession
. These APIs were previously unavailable because thread-local sessions were unsupported.Testing
This PR adds a new E2E test class
JvmThreadPoolTests
that tests the basic functionality of the thread pool.Since these changes are on the code path for all Spark API calls, these changes are exercised by all existing E2E tests.
Limitations
The only limitation is that in Spark,
ActiveSession
gives a guarantee that a child thread will inherit its parent's active session. My design does not support that. However, I think that this is not a problem since .NET's thread model doesn't support parent/child relationships between threads.This PR will address bug #333