Skip to content

Commit 1529a27

Browse files
author
Andrew Fogarty
committed
Address comments
1 parent 80fa44b commit 1529a27

File tree

7 files changed

+46
-20
lines changed

7 files changed

+46
-20
lines changed

src/csharp/Microsoft.Spark.E2ETest/IpcTests/JvmThreadPoolGCTests.cs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Threading;
77
using Microsoft.Spark.Interop;
88
using Microsoft.Spark.Interop.Ipc;
9+
using Microsoft.Spark.Services;
910
using Microsoft.Spark.Sql;
1011
using Xunit;
1112

@@ -14,11 +15,13 @@ namespace Microsoft.Spark.E2ETest.IpcTests
1415
[Collection("Spark E2E Tests")]
1516
public class JvmThreadPoolGCTests
1617
{
18+
private readonly ILoggerService _loggerService;
1719
private readonly SparkSession _spark;
1820
private readonly IJvmBridge _jvmBridge;
1921

2022
public JvmThreadPoolGCTests(SparkFixture fixture)
2123
{
24+
_loggerService = LoggerServiceFactory.GetLogger(typeof(JvmThreadPoolGCTests));
2225
_spark = fixture.Spark;
2326
_jvmBridge = ((IJvmObjectReferenceProvider)_spark).Reference.Jvm;
2427
}
@@ -51,7 +54,7 @@ void testChildThread(string appName)
5154
thread.Join();
5255
}
5356

54-
for (var i = 0; i < 5; ++i)
57+
for (int i = 0; i < 5; ++i)
5558
{
5659
testChildThread(i.ToString());
5760
}
@@ -65,7 +68,8 @@ void testChildThread(string appName)
6568
[Fact]
6669
public void TestTryAddThread()
6770
{
68-
using var threadPool = new JvmThreadPoolGC(_jvmBridge, TimeSpan.FromMinutes(30));
71+
using var threadPool = new JvmThreadPoolGC(
72+
_loggerService, _jvmBridge, TimeSpan.FromMinutes(30));
6973

7074
var thread = new Thread(() => _spark.Sql("SELECT TRUE"));
7175
thread.Start();
@@ -88,7 +92,10 @@ public void TestRmThread()
8892
var thread = new Thread(() => _spark.Sql("SELECT TRUE"));
8993
thread.Start();
9094
thread.Join();
91-
_jvmBridge.CallStaticJavaMethod("DotnetHandler", "rmThread", thread.ManagedThreadId);
95+
96+
// First call should return true. Second call should return false.
97+
Assert.True((bool)_jvmBridge.CallStaticJavaMethod("DotnetHandler", "rmThread", thread.ManagedThreadId));
98+
Assert.False((bool)_jvmBridge.CallStaticJavaMethod("DotnetHandler", "rmThread", thread.ManagedThreadId));
9299
}
93100

94101
/// <summary>
@@ -99,13 +106,13 @@ public void TestRmThread()
99106
public void TestIntervalConfiguration()
100107
{
101108
// Default value is 5 minutes.
102-
Assert.Null(Environment.GetEnvironmentVariable("DOTNET_THREAD_GC_INTERVAL"));
109+
Assert.Null(Environment.GetEnvironmentVariable("DOTNET_JVM_THREAD_GC_INTERVAL"));
103110
Assert.Equal(
104111
TimeSpan.FromMinutes(5),
105112
SparkEnvironment.ConfigurationService.JvmThreadGCInterval);
106113

107114
// Test a custom value.
108-
Environment.SetEnvironmentVariable("DOTNET_THREAD_GC_INTERVAL", "1:30:00");
115+
Environment.SetEnvironmentVariable("DOTNET_JVM_THREAD_GC_INTERVAL", "1:30:00");
109116
Assert.Equal(
110117
TimeSpan.FromMinutes(90),
111118
SparkEnvironment.ConfigurationService.JvmThreadGCInterval);

src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ internal JvmBridge(int portNumber)
4949
_logger.LogInfo($"JvMBridge port is {portNumber}");
5050

5151
_jvmThreadPoolGC = new JvmThreadPoolGC(
52-
this, SparkEnvironment.ConfigurationService.JvmThreadGCInterval);
52+
_logger, this, SparkEnvironment.ConfigurationService.JvmThreadGCInterval);
5353
}
5454

5555
private ISocketWrapper GetConnection()

src/csharp/Microsoft.Spark/Interop/Ipc/JvmThreadPoolGC.cs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Collections.Concurrent;
77
using System.Collections.Generic;
88
using System.Threading;
9+
using Microsoft.Spark.Services;
910

1011
namespace Microsoft.Spark.Interop.Ipc
1112
{
@@ -21,6 +22,7 @@ namespace Microsoft.Spark.Interop.Ipc
2122
/// </summary>
2223
internal class JvmThreadPoolGC : IDisposable
2324
{
25+
private readonly ILoggerService _loggerService;
2426
private readonly IJvmBridge _jvmBridge;
2527
private readonly TimeSpan _threadGCInterval;
2628
private readonly ConcurrentDictionary<int, Thread> _activeThreads;
@@ -31,10 +33,12 @@ internal class JvmThreadPoolGC : IDisposable
3133
/// <summary>
3234
/// Construct the JvmThreadPoolGC.
3335
/// </summary>
36+
/// <param name="loggerService">Logger service.</param>
3437
/// <param name="jvmBridge">The JvmBridge used to call JVM methods.</param>
3538
/// <param name="threadGCInterval">The interval to GC finished threads.</param>
36-
public JvmThreadPoolGC(IJvmBridge jvmBridge, TimeSpan threadGCInterval)
39+
public JvmThreadPoolGC(ILoggerService loggerService, IJvmBridge jvmBridge, TimeSpan threadGCInterval)
3740
{
41+
_loggerService = loggerService;
3842
_jvmBridge = jvmBridge;
3943
_threadGCInterval = threadGCInterval;
4044
_activeThreads = new ConcurrentDictionary<int, Thread>();
@@ -94,16 +98,25 @@ public bool TryAddThread(Thread thread)
9498
/// </summary>
9599
/// <param name="threadId">The ID of the thread to remove.</param>
96100
/// <returns>True if success, false if the thread cannot be found.</returns>
97-
private bool TryRemoveAndDisposeThread(int threadId)
101+
private bool TryDisposeJvmThread(int threadId)
98102
{
99103
if (_activeThreads.TryRemove(threadId, out _))
100104
{
101105
// _activeThreads does not have ownership of the threads on the .NET side. This
102106
// class does not need to call Join() on the .NET Thread. However, this class is
103107
// responsible for sending the rmThread command to the JVM to trigger disposal
104108
// of the corresponding JVM thread.
105-
_jvmBridge.CallStaticJavaMethod("DotnetHandler", "rmThread", threadId);
106-
return true;
109+
if ((bool)_jvmBridge.CallStaticJavaMethod("DotnetHandler", "rmThread", threadId))
110+
{
111+
_loggerService.LogDebug($"GC'd JVM thread {threadId}.");
112+
}
113+
else
114+
{
115+
_loggerService.LogWarn(
116+
$"rmThread returned false for JVM thread {threadId}. " +
117+
$"Either thread does not exist or has already been GC'd.");
118+
return false;
119+
}
107120
}
108121

109122
return false;
@@ -114,11 +127,12 @@ private bool TryRemoveAndDisposeThread(int threadId)
114127
/// </summary>
115128
private void GCThreads()
116129
{
130+
_loggerService.LogDebug("Starting JVM thread GC.");
117131
foreach (KeyValuePair<int, Thread> kvp in _activeThreads)
118132
{
119133
if (!kvp.Value.IsAlive)
120134
{
121-
TryRemoveAndDisposeThread(kvp.Key);
135+
TryDisposeJvmThread(kvp.Key);
122136
}
123137
}
124138

@@ -131,6 +145,8 @@ private void GCThreads()
131145
_activeThreadGCTimer = null;
132146
}
133147
}
148+
149+
_loggerService.LogDebug("JVM thread GC complete.");
134150
}
135151
}
136152
}

src/csharp/Microsoft.Spark/Services/ConfigurationService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public TimeSpan JvmThreadGCInterval
4040
{
4141
get
4242
{
43-
string envVar = Environment.GetEnvironmentVariable("DOTNET_THREAD_GC_INTERVAL");
43+
string envVar = Environment.GetEnvironmentVariable("DOTNET_JVM_THREAD_GC_INTERVAL");
4444
return string.IsNullOrEmpty(envVar) ? TimeSpan.FromMinutes(5) : TimeSpan.Parse(envVar);
4545
}
4646
}

src/scala/microsoft-spark-2.3.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackendHandler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ class DotnetBackendHandler(server: DotnetBackend)
7171
assert(readObjectType(dis) == 'i')
7272
val threadToDelete = readInt(dis)
7373
val result = ThreadPool.tryDeleteThread(threadToDelete)
74-
writeBoolean(dos, result)
75-
writeObject(dos, null)
74+
writeInt(dos, 0)
75+
writeObject(dos, result.asInstanceOf[AnyRef])
7676
} catch {
7777
case e: Exception =>
7878
logError(s"Removing thread $threadId failed", e)

src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackendHandler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ class DotnetBackendHandler(server: DotnetBackend)
7171
assert(readObjectType(dis) == 'i')
7272
val threadToDelete = readInt(dis)
7373
val result = ThreadPool.tryDeleteThread(threadToDelete)
74-
writeBoolean(dos, result)
75-
writeObject(dos, null)
74+
writeInt(dos, 0)
75+
writeObject(dos, result.asInstanceOf[AnyRef])
7676
} catch {
7777
case e: Exception =>
7878
logError(s"Removing thread $threadId failed", e)

src/scala/microsoft-spark-3.0.x/src/main/scala/org/apache/spark/api/dotnet/DotnetBackendHandler.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@ package org.apache.spark.api.dotnet
88

99
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
1010

11+
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
12+
import org.apache.spark.api.dotnet.SerDe._
13+
import org.apache.spark.internal.Logging
14+
import org.apache.spark.util.Utils
15+
1116
import scala.collection.mutable.HashMap
1217
import scala.language.existentials
1318

14-
import org.apache.spark.api.dotnet.SerDe._
15-
1619
/**
1720
* Handler for DotnetBackend.
1821
* This implementation is similar to RBackendHandler.
@@ -68,8 +71,8 @@ class DotnetBackendHandler(server: DotnetBackend)
6871
assert(readObjectType(dis) == 'i')
6972
val threadToDelete = readInt(dis)
7073
val result = ThreadPool.tryDeleteThread(threadToDelete)
71-
writeBoolean(dos, result)
72-
writeObject(dos, null)
74+
writeInt(dos, 0)
75+
writeObject(dos, result.asInstanceOf[AnyRef])
7376
} catch {
7477
case e: Exception =>
7578
logError(s"Removing thread $threadId failed", e)

0 commit comments

Comments
 (0)