Skip to content

Commit 1d862a1

Browse files
authored
Expose StreamingQueryManager (#690)
1 parent 0e8bc18 commit 1d862a1

File tree

11 files changed

+402
-1
lines changed

11 files changed

+402
-1
lines changed

src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/SparkSessionTests.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for more information.
44

5-
using System;
65
using System.Collections.Generic;
76
using System.Linq;
87
using Microsoft.Spark.E2ETest.Utils;
@@ -42,6 +41,8 @@ public void TestSignaturesV2_3_X()
4241

4342
Assert.IsType<RuntimeConfig>(_spark.Conf());
4443

44+
Assert.IsType<StreamingQueryManager>(_spark.Streams());
45+
4546
Assert.IsType<SparkSession>(_spark.NewSession());
4647

4748
Assert.IsType<DataFrameReader>(_spark.Read());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Linq;
6+
using Microsoft.Spark.E2ETest.Utils;
7+
using Microsoft.Spark.Sql;
8+
using Microsoft.Spark.Sql.Streaming;
9+
using Xunit;
10+
11+
namespace Microsoft.Spark.E2ETest.IpcTests
12+
{
13+
[Collection("Spark E2E Tests")]
14+
public class StreamingQueryManagerTests
15+
{
16+
private readonly SparkSession _spark;
17+
18+
public StreamingQueryManagerTests(SparkFixture fixture)
19+
{
20+
_spark = fixture.Spark;
21+
}
22+
23+
/// <summary>
24+
/// Test signatures for APIs up to Spark 2.3.*.
25+
/// The purpose of this test is to ensure that JVM calls can be successfully made.
26+
/// Note that this is not testing functionality of each function.
27+
/// </summary>
28+
[Fact]
29+
public void TestSignaturesV2_3_X()
30+
{
31+
var intMemoryStream = new MemoryStream<int>(_spark);
32+
StreamingQuery sq1 = intMemoryStream
33+
.ToDF().WriteStream().QueryName("intQuery").Format("console").Start();
34+
string id1 = sq1.Id;
35+
36+
var stringMemoryStream = new MemoryStream<string>(_spark);
37+
StreamingQuery sq2 = stringMemoryStream
38+
.ToDF().WriteStream().QueryName("stringQuery").Format("console").Start();
39+
string id2 = sq2.Id;
40+
41+
StreamingQueryManager sqm = _spark.Streams();
42+
43+
StreamingQuery[] streamingQueries = sqm.Active().ToArray();
44+
Assert.Equal(2, streamingQueries.Length);
45+
46+
Assert.IsType<StreamingQuery>(sqm.Get(id1));
47+
Assert.IsType<StreamingQuery>(sqm.Get(id2));
48+
49+
sqm.ResetTerminated();
50+
51+
sqm.AwaitAnyTermination(10);
52+
53+
sq1.Stop();
54+
sq2.Stop();
55+
}
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Linq;
6+
using Microsoft.Spark.E2ETest.Utils;
7+
using Microsoft.Spark.Sql;
8+
using Microsoft.Spark.Sql.Streaming;
9+
using Xunit;
10+
11+
namespace Microsoft.Spark.E2ETest.IpcTests
12+
{
13+
[Collection("Spark E2E Tests")]
14+
public class StreamingQueryTests
15+
{
16+
private readonly SparkSession _spark;
17+
18+
public StreamingQueryTests(SparkFixture fixture)
19+
{
20+
_spark = fixture.Spark;
21+
}
22+
23+
/// <summary>
24+
/// Test signatures for APIs up to Spark 2.3.*.
25+
/// The purpose of this test is to ensure that JVM calls can be successfully made.
26+
/// Note that this is not testing functionality of each function.
27+
/// </summary>
28+
[Fact]
29+
public void TestSignaturesV2_3_X()
30+
{
31+
var intMemoryStream = new MemoryStream<int>(_spark);
32+
StreamingQuery sq = intMemoryStream
33+
.ToDF().WriteStream().QueryName("testQuery").Format("console").Start();
34+
35+
Assert.IsType<string>(sq.Name);
36+
37+
Assert.IsType<string>(sq.Id);
38+
39+
Assert.IsType<string>(sq.RunId);
40+
41+
Assert.IsType<bool>(sq.IsActive());
42+
43+
Assert.IsType<bool>(sq.AwaitTermination(10));
44+
45+
sq.Explain();
46+
47+
Assert.Null(sq.Exception());
48+
49+
sq.Stop();
50+
}
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using Microsoft.Spark.Interop.Ipc;
6+
using Microsoft.Spark.Sql;
7+
8+
namespace Microsoft.Spark.E2ETest.Utils
9+
{
10+
/// <summary>
11+
/// A source of continually arriving data for a streaming query.
12+
/// Produces value stored in memory as they are added by the user.
13+
/// </summary>
14+
/// <typeparam name="T">
15+
/// Specifies the type of the elements contained in the MemoryStream.
16+
/// </typeparam>
17+
internal class MemoryStream<T> : IJvmObjectReferenceProvider
18+
{
19+
private readonly JvmObjectReference _jvmObject;
20+
21+
internal MemoryStream(SparkSession sparkSession)
22+
{
23+
JvmObjectReference sparkSessionRef =
24+
((IJvmObjectReferenceProvider)sparkSession).Reference;
25+
26+
_jvmObject = (JvmObjectReference)sparkSessionRef.Jvm.CallStaticJavaMethod(
27+
"org.apache.spark.sql.test.TestUtils",
28+
"createMemoryStream",
29+
sparkSessionRef.Invoke("sqlContext"),
30+
typeof(T).Name);
31+
}
32+
33+
JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;
34+
35+
internal DataFrame ToDF() => new DataFrame((JvmObjectReference)_jvmObject.Invoke("toDF"));
36+
37+
// TODO: "addData" returns an Offset. Expose class if needed.
38+
internal void AddData(T[] data) => _jvmObject.Invoke("addData", data);
39+
}
40+
}

src/csharp/Microsoft.Spark/Sql/SparkSession.cs

+8
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,14 @@ public void Dispose()
128128
public RuntimeConfig Conf() =>
129129
new RuntimeConfig((JvmObjectReference)_jvmObject.Invoke("conf"));
130130

131+
/// <summary>
132+
/// Returns a <see cref="StreamingQueryManager"/> that allows managing all the
133+
/// <see cref="StreamingQuery"/> instances active on <c>this</c> context.
134+
/// </summary>
135+
/// <returns><see cref="StreamingQueryManager"/> object</returns>
136+
public StreamingQueryManager Streams() =>
137+
new StreamingQueryManager((JvmObjectReference)_jvmObject.Invoke("streams"));
138+
131139
/// <summary>
132140
/// Start a new session with isolated SQL configurations, temporary tables, registered
133141
/// functions are isolated, but sharing the underlying SparkContext and cached data.

src/csharp/Microsoft.Spark/Sql/Streaming/StreamingQuery.cs

+35
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for more information.
44

5+
using Microsoft.Spark.Interop.Internal.Scala;
56
using Microsoft.Spark.Interop.Ipc;
67

78
namespace Microsoft.Spark.Sql.Streaming
@@ -22,6 +23,24 @@ public sealed class StreamingQuery : IJvmObjectReferenceProvider
2223
/// </summary>
2324
public string Name => (string)_jvmObject.Invoke("name");
2425

26+
/// <summary>
27+
/// Returns the unique id of this query that persists across restarts from checkpoint data.
28+
/// That is, this id is generated when a query is started for the first time, and
29+
/// will be the same every time it is restarted from checkpoint data. Also see
30+
/// <see cref="RunId"/>.
31+
/// </summary>
32+
public string Id =>
33+
(string)((JvmObjectReference)_jvmObject.Invoke("id")).Invoke("toString");
34+
35+
/// <summary>
36+
/// Returns the unique id of this run of the query. That is, every start/restart of
37+
/// a query will generated a unique runId. Therefore, every time a query is restarted
38+
/// from checkpoint, it will have the same <see cref="Id"/> but different
39+
/// <see cref="RunId"/>s.
40+
/// </summary>
41+
public string RunId =>
42+
(string)((JvmObjectReference)_jvmObject.Invoke("runId")).Invoke("toString");
43+
2544
/// <summary>
2645
/// Returns true if this query is actively running.
2746
/// </summary>
@@ -67,5 +86,21 @@ public bool AwaitTermination(long timeoutMs) =>
6786
/// </summary>
6887
/// <param name="extended">Whether to do extended explain or not</param>
6988
public void Explain(bool extended = false) => _jvmObject.Invoke("explain", extended);
89+
90+
/// <summary>
91+
/// The <see cref="StreamingQueryException"/> if the query was terminated by an exception,
92+
/// null otherwise.
93+
/// </summary>
94+
public StreamingQueryException Exception()
95+
{
96+
var optionalException = new Option((JvmObjectReference)_jvmObject.Invoke("exception"));
97+
if (optionalException.IsDefined())
98+
{
99+
var exception = (JvmObjectReference)optionalException.Get();
100+
return new StreamingQueryException((string)exception.Invoke("toString"));
101+
}
102+
103+
return null;
104+
}
70105
}
71106
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using Microsoft.Spark.Interop.Ipc;
6+
7+
namespace Microsoft.Spark.Sql.Streaming
8+
{
9+
/// <summary>
10+
/// Exception that stopped a <see cref="StreamingQuery"/>.
11+
/// </summary>
12+
public class StreamingQueryException : JvmException
13+
{
14+
public StreamingQueryException(string message)
15+
: base(message)
16+
{
17+
}
18+
}
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using Microsoft.Spark.Interop.Ipc;
8+
9+
namespace Microsoft.Spark.Sql.Streaming
10+
{
11+
/// <summary>
12+
/// A class to manage all the <see cref="StreamingQuery"/> active
13+
/// in a <see cref="SparkSession"/>.
14+
/// </summary>
15+
public sealed class StreamingQueryManager : IJvmObjectReferenceProvider
16+
{
17+
private readonly JvmObjectReference _jvmObject;
18+
19+
internal StreamingQueryManager(JvmObjectReference jvmObject) => _jvmObject = jvmObject;
20+
21+
JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;
22+
23+
/// <summary>
24+
/// Returns a list of active queries associated with this SQLContext.
25+
/// </summary>
26+
/// <returns>Active queries associated with this SQLContext.</returns>
27+
public IEnumerable<StreamingQuery> Active() =>
28+
((JvmObjectReference[])_jvmObject.Invoke("active"))
29+
.Select(sq => new StreamingQuery(sq));
30+
31+
/// <summary>
32+
/// Returns an active query from this SQLContext or throws exception if an active
33+
/// query with this name doesn't exist.
34+
/// </summary>
35+
/// <param name="id">Id of the <see cref="StreamingQuery"/>.</param>
36+
/// <returns>
37+
/// <see cref="StreamingQuery"/> if there is an active query with the given id.
38+
/// </returns>
39+
public StreamingQuery Get(string id) =>
40+
new StreamingQuery((JvmObjectReference)_jvmObject.Invoke("get", id));
41+
42+
/// <summary>
43+
/// Wait until any of the queries on the associated SQLContext has terminated since the
44+
/// creation of the context, or since <see cref="ResetTerminated"/> was called. If any
45+
/// query was terminated with an exception, then the exception will be thrown.
46+
///
47+
/// If a query has terminated, then subsequent calls to <see cref="AwaitAnyTermination()"/>
48+
/// will either return immediately (if the query was terminated by
49+
/// <see cref="StreamingQuery.Stop"/>), or throw the exception immediately (if the query
50+
/// was terminated with exception). Use <see cref="ResetTerminated"/> to clear past
51+
/// terminations and wait for new terminations.
52+
///
53+
/// In the case where multiple queries have terminated since <see cref="ResetTerminated"/>
54+
/// was called, if any query has terminated with exception, then
55+
/// <see cref="AwaitAnyTermination()"/> will throw any of the exception. For correctly
56+
/// documenting exceptions across multiple queries, users need to stop all of them after
57+
/// any of them terminates with exception, and then check the
58+
/// <see cref="StreamingQuery.Exception"/> for each query.
59+
///
60+
/// Throws StreamingQueryException on the JVM if any query has terminated with an
61+
/// exception.
62+
/// </summary>
63+
public void AwaitAnyTermination() => _jvmObject.Invoke("awaitAnyTermination");
64+
65+
/// <summary>
66+
/// Wait until any of the queries on the associated SQLContext has terminated since the
67+
/// creation of the context, or since <see cref="ResetTerminated"/> was called. If any
68+
/// query was terminated with an exception, then the exception will be thrown.
69+
///
70+
/// If a query has terminated, then subsequent calls to <see cref="AwaitAnyTermination()"/>
71+
/// will either return immediately (if the query was terminated by
72+
/// <see cref="StreamingQuery.Stop"/>), or throw the exception immediately (if the query
73+
/// was terminated with exception). Use <see cref="ResetTerminated"/> to clear past
74+
/// terminations and wait for new terminations.
75+
///
76+
/// In the case where multiple queries have terminated since <see cref="ResetTerminated"/>
77+
/// was called, if any query has terminated with exception, then
78+
/// <see cref="AwaitAnyTermination()"/> will throw any of the exception. For correctly
79+
/// documenting exceptions across multiple queries, users need to stop all of them after
80+
/// any of them terminates with exception, and then check the
81+
/// <see cref="StreamingQuery.Exception"/> for each query.
82+
///
83+
/// Throws StreamingQueryException on the JVM if any query has terminated with an
84+
/// exception.
85+
/// </summary>
86+
/// <param name="timeoutMs">
87+
/// Milliseconds to wait for query to terminate. Returns whether the query has terminated
88+
/// or not.
89+
/// </param>
90+
public void AwaitAnyTermination(long timeoutMs) =>
91+
_jvmObject.Invoke("awaitAnyTermination", timeoutMs);
92+
93+
/// <summary>
94+
/// Forget about past terminated queries so that <see cref="AwaitAnyTermination()"/> can be
95+
/// used again to wait for new terminations
96+
/// </summary>
97+
public void ResetTerminated() => _jvmObject.Invoke("resetTerminated");
98+
}
99+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the .NET Foundation under one or more agreements.
3+
* The .NET Foundation licenses this file to you under the MIT license.
4+
* See the LICENSE file in the project root for more information.
5+
*/
6+
7+
package org.apache.spark.sql.test
8+
9+
import org.apache.spark.sql.SQLContext
10+
import org.apache.spark.sql.execution.streaming.MemoryStream
11+
12+
object TestUtils {
13+
14+
/**
15+
* Helper method to create typed MemoryStreams intended for use in unit tests.
16+
* @param sqlContext The SQLContext.
17+
* @param streamType The type of memory stream to create. This string is the `Name`
18+
* property of the dotnet type.
19+
* @return A typed MemoryStream.
20+
*/
21+
def createMemoryStream(implicit sqlContext: SQLContext, streamType: String): MemoryStream[_] = {
22+
import sqlContext.implicits._
23+
24+
streamType match {
25+
case "Int32" => MemoryStream[Int]
26+
case "String" => MemoryStream[String]
27+
case _ => throw new Exception(s"$streamType not supported")
28+
}
29+
}
30+
}

0 commit comments

Comments
 (0)