Skip to content

Full support for multithreaded applications #641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 64 commits into from
Oct 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
fdc1b20
Embed thread ID in payload
AFFogarty Aug 25, 2020
88befdb
Basic execution threadpool
AFFogarty Aug 25, 2020
d9a403c
Thread lifecycle management
AFFogarty Aug 25, 2020
33c0202
Port to spark 2.3
AFFogarty Aug 25, 2020
a074028
Concurrent dictionary for active threads
AFFogarty Aug 25, 2020
6756f13
Logic to clean up expired threads
AFFogarty Sep 8, 2020
c875e63
Copy changes to Spark 2.3
AFFogarty Sep 8, 2020
3e37ca3
Update ThreadPool in 2.3
AFFogarty Sep 8, 2020
3c81a14
Merge branch 'master' of https://github.com/dotnet/spark into anfog/t…
AFFogarty Sep 8, 2020
e8aa36b
Tests should pass now
AFFogarty Sep 9, 2020
9cd83df
Merge branch 'master' of https://github.com/dotnet/spark into anfog/t…
AFFogarty Sep 9, 2020
3942429
Added Spark 3 and fixed thread waiting
Sep 21, 2020
1fff263
Merge branch 'master' of https://github.com/dotnet/spark into anfog/t…
Sep 21, 2020
b86eb00
Fixed imports
Sep 21, 2020
4eceb5d
Clean up ThreadPool
Sep 21, 2020
e0f8d41
Add ActiveSession APIs
Sep 21, 2020
e31fb00
Refactor JvmThreadPool into separate class
Sep 22, 2020
1784d92
Clean-up
Sep 22, 2020
5c45f2e
Update src/scala/microsoft-spark-2.3.x/src/main/scala/org/apache/spar…
AFFogarty Sep 24, 2020
757bae9
Merge branch 'master' of https://github.com/dotnet/spark into anfog/t…
Sep 24, 2020
6202f57
Make executors private
Sep 24, 2020
ac380c5
Update src/csharp/Microsoft.Spark/Sql/SparkSession.cs
AFFogarty Sep 25, 2020
8f81a4f
Update src/csharp/Microsoft.Spark/Interop/Ipc/JvmThreadPool.cs
AFFogarty Sep 25, 2020
5b38890
Update src/csharp/Microsoft.Spark/Interop/Ipc/JvmThreadPool.cs
AFFogarty Sep 25, 2020
7ec524b
Update src/csharp/Microsoft.Spark/Interop/Ipc/JvmThreadPool.cs
AFFogarty Sep 25, 2020
b1d3da2
Update src/csharp/Microsoft.Spark/Sql/SparkSession.cs
AFFogarty Sep 25, 2020
a45a79d
Param documentation
Sep 25, 2020
f265640
Refactor JvmThreadPool
Sep 25, 2020
d4610a1
Dispose of the threadpool
Sep 25, 2020
d606aba
Merge branch 'master' of https://github.com/dotnet/spark into anfog/t…
Sep 25, 2020
4922617
Add mising doc
Sep 25, 2020
4d21362
Change import order
Sep 25, 2020
f71a1fb
Don't need threadpool for removing object from tracker
Sep 25, 2020
5fbf181
Merge branch 'master' into anfog/thread_local
suhsteve Sep 27, 2020
fcf0ccf
Don't run callback client in thread pool
Sep 27, 2020
e43eb62
Merge branch 'anfog/thread_local' of https://github.com/AFFogarty/spa…
Sep 27, 2020
ac95655
Formatting
Sep 28, 2020
7e6b7bb
Just join the thread
Sep 28, 2020
5abc17a
Update src/csharp/Microsoft.Spark/Interop/Ipc/JvmThreadPool.cs
AFFogarty Sep 28, 2020
89cc8e1
Merge branch 'master' of https://github.com/dotnet/spark into anfog/t…
Sep 28, 2020
77b9ddc
Merge branch 'anfog/thread_local' of https://github.com/AFFogarty/spa…
Sep 28, 2020
ec06612
Fixed: start the thread
Sep 28, 2020
70ab43a
Renamed the JvmThreadPool
Sep 29, 2020
9c34fc0
Merge branch 'master' of https://github.com/dotnet/spark into anfog/t…
Sep 29, 2020
dadf60f
Comment explaining all methods are thread-safe
Sep 29, 2020
5bedcec
Merge branch 'master' into anfog/thread_local
imback82 Sep 29, 2020
cf641b7
Make GC interval configurable
Sep 29, 2020
c582f6c
Merge branch 'master' of https://github.com/dotnet/spark into anfog/t…
Sep 29, 2020
39a6e27
Merge branch 'anfog/thread_local' of https://github.com/AFFogarty/spa…
Sep 29, 2020
5ebabb1
Catch throwable
Sep 29, 2020
27f7d72
Merge branch 'master' into anfog/thread_local
suhsteve Oct 2, 2020
53de2c0
Merge branch 'master' into anfog/thread_local
AFFogarty Oct 2, 2020
8c55804
Merge branch 'master' of https://github.com/dotnet/spark into anfog/t…
Oct 3, 2020
15840af
Merge branch 'anfog/thread_local' of https://github.com/AFFogarty/spa…
Oct 3, 2020
606d3f6
Merge branch 'master' into anfog/thread_local
imback82 Oct 3, 2020
4c695b5
Merge branch 'master' into anfog/thread_local
suhsteve Oct 3, 2020
70404d4
Address some of Terry's comments.
Oct 3, 2020
dcd05b9
Merge branch 'master' of https://github.com/dotnet/spark into anfog/t…
Oct 3, 2020
7c585c8
Fix test name
Oct 3, 2020
59fbf1e
Merge branch 'master' into anfog/thread_local
imback82 Oct 4, 2020
80fa44b
Deleting thread returns bool
Oct 4, 2020
1529a27
Address comments
Oct 5, 2020
e217942
Merge branch 'master' of https://github.com/dotnet/spark into anfog/t…
Oct 5, 2020
789a555
Fix return value and clean up logs
Oct 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions src/csharp/Microsoft.Spark.E2ETest/IpcTests/JvmThreadPoolGCTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Threading;
using Microsoft.Spark.Interop;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Services;
using Microsoft.Spark.Sql;
using Xunit;

namespace Microsoft.Spark.E2ETest.IpcTests
{
[Collection("Spark E2E Tests")]
public class JvmThreadPoolGCTests
{
private readonly ILoggerService _loggerService;
private readonly SparkSession _spark;
private readonly IJvmBridge _jvmBridge;

public JvmThreadPoolGCTests(SparkFixture fixture)
{
_loggerService = LoggerServiceFactory.GetLogger(typeof(JvmThreadPoolGCTests));
_spark = fixture.Spark;
_jvmBridge = ((IJvmObjectReferenceProvider)_spark).Reference.Jvm;
}

/// <summary>
/// Test that the active SparkSession is thread-specific.
/// </summary>
[Fact]
public void TestThreadLocalSessions()
{
SparkSession.ClearActiveSession();

void testChildThread(string appName)
{
var thread = new Thread(() =>
{
Assert.Null(SparkSession.GetActiveSession());

SparkSession.SetActiveSession(
SparkSession.Builder().AppName(appName).GetOrCreate());

// Since we are in the child thread, GetActiveSession() should return the child
// SparkSession.
SparkSession activeSession = SparkSession.GetActiveSession();
Assert.NotNull(activeSession);
Assert.Equal(appName, activeSession.Conf().Get("spark.app.name", null));
});

thread.Start();
thread.Join();
}

for (int i = 0; i < 5; ++i)
{
testChildThread(i.ToString());
}

Assert.Null(SparkSession.GetActiveSession());
}

/// <summary>
/// Monitor a thread via the JvmThreadPoolGC.
/// </summary>
[Fact]
public void TestTryAddThread()
{
using var threadPool = new JvmThreadPoolGC(
_loggerService, _jvmBridge, TimeSpan.FromMinutes(30));

var thread = new Thread(() => _spark.Sql("SELECT TRUE"));
thread.Start();

Assert.True(threadPool.TryAddThread(thread));
// Subsequent call should return false, because the thread has already been added.
Assert.False(threadPool.TryAddThread(thread));

thread.Join();
}

/// <summary>
/// Create a Spark worker thread in the JVM ThreadPool then remove it directly through
/// the JvmBridge.
/// </summary>
[Fact]
public void TestRmThread()
{
// Create a thread and ensure that it is initialized in the JVM ThreadPool.
var thread = new Thread(() => _spark.Sql("SELECT TRUE"));
thread.Start();
thread.Join();

// First call should return true. Second call should return false.
Assert.True((bool)_jvmBridge.CallStaticJavaMethod("DotnetHandler", "rmThread", thread.ManagedThreadId));
Assert.False((bool)_jvmBridge.CallStaticJavaMethod("DotnetHandler", "rmThread", thread.ManagedThreadId));
}

/// <summary>
/// Test that the GC interval configuration defaults to 5 minutes, and can be updated
/// correctly by setting the environment variable.
/// </summary>
[Fact]
public void TestIntervalConfiguration()
{
// Default value is 5 minutes.
Assert.Null(Environment.GetEnvironmentVariable("DOTNET_JVM_THREAD_GC_INTERVAL"));
Assert.Equal(
TimeSpan.FromMinutes(5),
SparkEnvironment.ConfigurationService.JvmThreadGCInterval);

// Test a custom value.
Environment.SetEnvironmentVariable("DOTNET_JVM_THREAD_GC_INTERVAL", "1:30:00");
Assert.Equal(
TimeSpan.FromMinutes(90),
SparkEnvironment.ConfigurationService.JvmThreadGCInterval);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public void TestSignaturesV2_3_X()

Assert.IsType<Builder>(SparkSession.Builder());

SparkSession.ClearActiveSession();
SparkSession.SetActiveSession(_spark);
Assert.IsType<SparkSession>(SparkSession.GetActiveSession());

SparkSession.ClearDefaultSession();
SparkSession.SetDefaultSession(_spark);
Assert.IsType<SparkSession>(SparkSession.GetDefaultSession());
Expand Down Expand Up @@ -76,7 +80,7 @@ public void TestSignaturesV2_4_X()
/// </summary>
[Fact]
public void TestCreateDataFrame()
{
{
// Calling CreateDataFrame with schema
{
var data = new List<GenericRow>
Expand Down
10 changes: 10 additions & 0 deletions src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.IO;
using System.Net;
using System.Text;
using System.Threading;
using Microsoft.Spark.Network;
using Microsoft.Spark.Services;

Expand Down Expand Up @@ -35,6 +36,7 @@ internal sealed class JvmBridge : IJvmBridge
private readonly ILoggerService _logger =
LoggerServiceFactory.GetLogger(typeof(JvmBridge));
private readonly int _portNumber;
private readonly JvmThreadPoolGC _jvmThreadPoolGC;

internal JvmBridge(int portNumber)
{
Expand All @@ -45,6 +47,9 @@ internal JvmBridge(int portNumber)

_portNumber = portNumber;
_logger.LogInfo($"JvMBridge port is {portNumber}");

_jvmThreadPoolGC = new JvmThreadPoolGC(
_logger, this, SparkEnvironment.ConfigurationService.JvmThreadGCInterval);
}

private ISocketWrapper GetConnection()
Expand Down Expand Up @@ -158,11 +163,13 @@ private object CallJavaMethod(
ISocketWrapper socket = null;
try
{
Thread thread = Thread.CurrentThread;
MemoryStream payloadMemoryStream = s_payloadMemoryStream ??= new MemoryStream();
payloadMemoryStream.Position = 0;
PayloadHelper.BuildPayload(
payloadMemoryStream,
isStatic,
thread.ManagedThreadId,
classNameOrJvmObjectReference,
methodName,
args);
Expand All @@ -176,6 +183,8 @@ private object CallJavaMethod(
(int)payloadMemoryStream.Position);
outputStream.Flush();

_jvmThreadPoolGC.TryAddThread(thread);

Stream inputStream = socket.InputStream;
int isMethodCallFailed = SerDe.ReadInt32(inputStream);
if (isMethodCallFailed != 0)
Expand Down Expand Up @@ -410,6 +419,7 @@ private object ReadCollection(Stream s)

public void Dispose()
{
_jvmThreadPoolGC.Dispose();
while (_sockets.TryDequeue(out ISocketWrapper socket))
{
if (socket != null)
Expand Down
149 changes: 149 additions & 0 deletions src/csharp/Microsoft.Spark/Interop/Ipc/JvmThreadPoolGC.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using Microsoft.Spark.Services;

namespace Microsoft.Spark.Interop.Ipc
{
/// <summary>
/// In .NET for Apache Spark, we maintain a 1-to-1 mapping between .NET application threads
/// and corresponding JVM threads. When a .NET thread calls a Spark API, that call is executed
/// by its corresponding JVM thread. This functionality allows for multithreaded applications
/// with thread-local variables.
///
/// This class keeps track of the .NET application thread lifecycle. When a .NET application
/// thread is no longer alive, this class submits an rmThread command to the JVM backend to
/// dispose of its corresponding JVM thread. All methods are thread-safe.
/// </summary>
internal class JvmThreadPoolGC : IDisposable
{
private readonly ILoggerService _loggerService;
private readonly IJvmBridge _jvmBridge;
private readonly TimeSpan _threadGCInterval;
private readonly ConcurrentDictionary<int, Thread> _activeThreads;

private readonly object _activeThreadGCTimerLock;
private Timer _activeThreadGCTimer;

/// <summary>
/// Construct the JvmThreadPoolGC.
/// </summary>
/// <param name="loggerService">Logger service.</param>
/// <param name="jvmBridge">The JvmBridge used to call JVM methods.</param>
/// <param name="threadGCInterval">The interval to GC finished threads.</param>
public JvmThreadPoolGC(ILoggerService loggerService, IJvmBridge jvmBridge, TimeSpan threadGCInterval)
{
_loggerService = loggerService;
_jvmBridge = jvmBridge;
_threadGCInterval = threadGCInterval;
_activeThreads = new ConcurrentDictionary<int, Thread>();

_activeThreadGCTimerLock = new object();
_activeThreadGCTimer = null;
}

/// <summary>
/// Dispose of the GC timer and run a final round of thread GC.
/// </summary>
public void Dispose()
{
lock (_activeThreadGCTimerLock)
{
if (_activeThreadGCTimer != null)
{
_activeThreadGCTimer.Dispose();
_activeThreadGCTimer = null;
}
}

GCThreads();
}

/// <summary>
/// Try to start monitoring a thread.
/// </summary>
/// <param name="thread">The thread to add.</param>
/// <returns>True if success, false if already added.</returns>
public bool TryAddThread(Thread thread)
{
bool returnValue = _activeThreads.TryAdd(thread.ManagedThreadId, thread);

// Initialize the GC timer if necessary.
if (_activeThreadGCTimer == null)
{
lock (_activeThreadGCTimerLock)
{
if (_activeThreadGCTimer == null && _activeThreads.Count > 0)
{
_activeThreadGCTimer = new Timer(
(state) => GCThreads(),
null,
_threadGCInterval,
_threadGCInterval);
}
}
}

return returnValue;
}

/// <summary>
/// Try to remove a thread from the pool. If the removal is successful, then the
/// corresponding JVM thread will also be disposed.
/// </summary>
/// <param name="threadId">The ID of the thread to remove.</param>
/// <returns>True if success, false if the thread cannot be found.</returns>
private bool TryDisposeJvmThread(int threadId)
{
if (_activeThreads.TryRemove(threadId, out _))
{
// _activeThreads does not have ownership of the threads on the .NET side. This
// class does not need to call Join() on the .NET Thread. However, this class is
// responsible for sending the rmThread command to the JVM to trigger disposal
// of the corresponding JVM thread.
if ((bool)_jvmBridge.CallStaticJavaMethod("DotnetHandler", "rmThread", threadId))
{
_loggerService.LogDebug($"GC'd JVM thread {threadId}.");
return true;
}
else
{
_loggerService.LogWarn(
$"rmThread returned false for JVM thread {threadId}. " +
$"Either thread does not exist or has already been GC'd.");
}
}

return false;
}

/// <summary>
/// Remove any threads that are no longer active.
/// </summary>
private void GCThreads()
{
foreach (KeyValuePair<int, Thread> kvp in _activeThreads)
{
if (!kvp.Value.IsAlive)
{
TryDisposeJvmThread(kvp.Key);
}
}

lock (_activeThreadGCTimerLock)
{
// Dispose of the timer if there are no threads to monitor.
if (_activeThreadGCTimer != null && _activeThreads.IsEmpty)
{
_activeThreadGCTimer.Dispose();
_activeThreadGCTimer = null;
}
}
}
}
}
6 changes: 4 additions & 2 deletions src/csharp/Microsoft.Spark/Interop/Ipc/PayloadHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal class PayloadHelper
private static readonly byte[] s_timestampTypeId = new[] { (byte)'t' };
private static readonly byte[] s_jvmObjectTypeId = new[] { (byte)'j' };
private static readonly byte[] s_byteArrayTypeId = new[] { (byte)'r' };
private static readonly byte[] s_doubleArrayArrayTypeId = new[] { ( byte)'A' };
private static readonly byte[] s_doubleArrayArrayTypeId = new[] { (byte)'A' };
private static readonly byte[] s_arrayTypeId = new[] { (byte)'l' };
private static readonly byte[] s_dictionaryTypeId = new[] { (byte)'e' };
private static readonly byte[] s_rowArrTypeId = new[] { (byte)'R' };
Expand All @@ -39,6 +39,7 @@ internal class PayloadHelper
internal static void BuildPayload(
MemoryStream destination,
bool isStaticMethod,
int threadId,
object classNameOrJvmObjectReference,
string methodName,
object[] args)
Expand All @@ -48,6 +49,7 @@ internal static void BuildPayload(
destination.Position += sizeof(int);

SerDe.Write(destination, isStaticMethod);
SerDe.Write(destination, threadId);
SerDe.Write(destination, classNameOrJvmObjectReference.ToString());
SerDe.Write(destination, methodName);
SerDe.Write(destination, args.Length);
Expand Down Expand Up @@ -140,7 +142,7 @@ internal static void ConvertArgsToBytes(
SerDe.Write(destination, d);
}
break;

case double[][] argDoubleArrayArray:
SerDe.Write(destination, s_doubleArrayArrayTypeId);
SerDe.Write(destination, argDoubleArrayArray.Length);
Expand Down
Loading