Skip to content

Commit 63faa28

Browse files
committed
Merge branch 'nidutta/BroadcastEncryptionSupport' of github.com:Niharikadutta/spark into nidutta/BroadcastEncryptionSupport
2 parents 228f2f3 + 5024860 commit 63faa28

File tree

22 files changed

+571
-121
lines changed

22 files changed

+571
-121
lines changed

src/csharp/Extensions/Microsoft.Spark.Extensions.Azure.Synapse.Analytics/Microsoft.Spark.Extensions.Azure.Synapse.Analytics.csproj

-18
This file was deleted.

src/csharp/Extensions/Microsoft.Spark.Extensions.Azure.Synapse.Analytics/Utils/TokenLibrary.cs

-22
This file was deleted.

src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs

+2
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,8 @@ public void TestSignaturesV2_3_X()
628628

629629
Assert.IsType<Row>(_df.First());
630630

631+
Assert.IsType<DataFrame>(_df.Transform(df => df.Drop("age")));
632+
631633
Assert.IsType<Row[]>(_df.Take(3).ToArray());
632634

633635
Assert.IsType<Row[]>(_df.Collect().ToArray());

src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/SparkSessionTests.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for more information.
44

5-
using System;
65
using System.Collections.Generic;
76
using System.Linq;
87
using Microsoft.Spark.E2ETest.Utils;
@@ -42,6 +41,8 @@ public void TestSignaturesV2_3_X()
4241

4342
Assert.IsType<RuntimeConfig>(_spark.Conf());
4443

44+
Assert.IsType<StreamingQueryManager>(_spark.Streams());
45+
4546
Assert.IsType<SparkSession>(_spark.NewSession());
4647

4748
Assert.IsType<DataFrameReader>(_spark.Read());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Linq;
6+
using Microsoft.Spark.E2ETest.Utils;
7+
using Microsoft.Spark.Sql;
8+
using Microsoft.Spark.Sql.Streaming;
9+
using Xunit;
10+
11+
namespace Microsoft.Spark.E2ETest.IpcTests
12+
{
13+
[Collection("Spark E2E Tests")]
14+
public class StreamingQueryManagerTests
15+
{
16+
private readonly SparkSession _spark;
17+
18+
public StreamingQueryManagerTests(SparkFixture fixture)
19+
{
20+
_spark = fixture.Spark;
21+
}
22+
23+
/// <summary>
24+
/// Test signatures for APIs up to Spark 2.3.*.
25+
/// The purpose of this test is to ensure that JVM calls can be successfully made.
26+
/// Note that this is not testing functionality of each function.
27+
/// </summary>
28+
[Fact]
29+
public void TestSignaturesV2_3_X()
30+
{
31+
var intMemoryStream = new MemoryStream<int>(_spark);
32+
StreamingQuery sq1 = intMemoryStream
33+
.ToDF().WriteStream().QueryName("intQuery").Format("console").Start();
34+
string id1 = sq1.Id;
35+
36+
var stringMemoryStream = new MemoryStream<string>(_spark);
37+
StreamingQuery sq2 = stringMemoryStream
38+
.ToDF().WriteStream().QueryName("stringQuery").Format("console").Start();
39+
string id2 = sq2.Id;
40+
41+
StreamingQueryManager sqm = _spark.Streams();
42+
43+
StreamingQuery[] streamingQueries = sqm.Active().ToArray();
44+
Assert.Equal(2, streamingQueries.Length);
45+
46+
Assert.IsType<StreamingQuery>(sqm.Get(id1));
47+
Assert.IsType<StreamingQuery>(sqm.Get(id2));
48+
49+
sqm.ResetTerminated();
50+
51+
sqm.AwaitAnyTermination(10);
52+
53+
sq1.Stop();
54+
sq2.Stop();
55+
}
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System.Linq;
6+
using Microsoft.Spark.E2ETest.Utils;
7+
using Microsoft.Spark.Sql;
8+
using Microsoft.Spark.Sql.Streaming;
9+
using Xunit;
10+
11+
namespace Microsoft.Spark.E2ETest.IpcTests
12+
{
13+
[Collection("Spark E2E Tests")]
14+
public class StreamingQueryTests
15+
{
16+
private readonly SparkSession _spark;
17+
18+
public StreamingQueryTests(SparkFixture fixture)
19+
{
20+
_spark = fixture.Spark;
21+
}
22+
23+
/// <summary>
24+
/// Test signatures for APIs up to Spark 2.3.*.
25+
/// The purpose of this test is to ensure that JVM calls can be successfully made.
26+
/// Note that this is not testing functionality of each function.
27+
/// </summary>
28+
[Fact]
29+
public void TestSignaturesV2_3_X()
30+
{
31+
var intMemoryStream = new MemoryStream<int>(_spark);
32+
StreamingQuery sq = intMemoryStream
33+
.ToDF().WriteStream().QueryName("testQuery").Format("console").Start();
34+
35+
Assert.IsType<string>(sq.Name);
36+
37+
Assert.IsType<string>(sq.Id);
38+
39+
Assert.IsType<string>(sq.RunId);
40+
41+
Assert.IsType<bool>(sq.IsActive());
42+
43+
Assert.IsType<bool>(sq.AwaitTermination(10));
44+
45+
sq.Explain();
46+
47+
Assert.Null(sq.Exception());
48+
49+
sq.Stop();
50+
}
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using Microsoft.Spark.Interop.Ipc;
6+
using Microsoft.Spark.Sql;
7+
8+
namespace Microsoft.Spark.E2ETest.Utils
9+
{
10+
/// <summary>
11+
/// A source of continually arriving data for a streaming query.
12+
/// Produces value stored in memory as they are added by the user.
13+
/// </summary>
14+
/// <typeparam name="T">
15+
/// Specifies the type of the elements contained in the MemoryStream.
16+
/// </typeparam>
17+
internal class MemoryStream<T> : IJvmObjectReferenceProvider
18+
{
19+
private readonly JvmObjectReference _jvmObject;
20+
21+
internal MemoryStream(SparkSession sparkSession)
22+
{
23+
JvmObjectReference sparkSessionRef =
24+
((IJvmObjectReferenceProvider)sparkSession).Reference;
25+
26+
_jvmObject = (JvmObjectReference)sparkSessionRef.Jvm.CallStaticJavaMethod(
27+
"org.apache.spark.sql.test.TestUtils",
28+
"createMemoryStream",
29+
sparkSessionRef.Invoke("sqlContext"),
30+
typeof(T).Name);
31+
}
32+
33+
JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;
34+
35+
internal DataFrame ToDF() => new DataFrame((JvmObjectReference)_jvmObject.Invoke("toDF"));
36+
37+
// TODO: "addData" returns an Offset. Expose class if needed.
38+
internal void AddData(T[] data) => _jvmObject.Invoke("addData", data);
39+
}
40+
}

src/csharp/Microsoft.Spark.sln

+2-9
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Spark.Extensions.
3333
EndProject
3434
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Spark.Extensions.Delta.E2ETest", "Extensions\Microsoft.Spark.Extensions.Delta.E2ETest\Microsoft.Spark.Extensions.Delta.E2ETest.csproj", "{206E16CA-ED59-4F5E-8EA1-9BB7BEEACB63}"
3535
EndProject
36-
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Spark.Extensions.Azure.Synapse.Analytics", "Extensions\Microsoft.Spark.Extensions.Azure.Synapse.Analytics\Microsoft.Spark.Extensions.Azure.Synapse.Analytics.csproj", "{47652C7D-B076-4FD9-98AC-959E38BE18E3}"
37-
EndProject
3836
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Spark.Extensions.DotNet.Interactive", "Extensions\Microsoft.Spark.Extensions.DotNet.Interactive\Microsoft.Spark.Extensions.DotNet.Interactive.csproj", "{9C32014D-8C0C-40F1-9ABA-C3BF19687E5C}"
3937
EndProject
40-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest", "Extensions\Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest\Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest.csproj", "{7BDE09ED-04B3-41B2-A466-3D6F7225291E}"
38+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest", "Extensions\Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest\Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest.csproj", "{7BDE09ED-04B3-41B2-A466-3D6F7225291E}"
4139
EndProject
42-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Spark.Extensions.Hyperspace", "Extensions\Microsoft.Spark.Extensions.Hyperspace\Microsoft.Spark.Extensions.Hyperspace.csproj", "{70DDA4E9-1195-4A29-9AA1-96A8223A6D4F}"
40+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Spark.Extensions.Hyperspace", "Extensions\Microsoft.Spark.Extensions.Hyperspace\Microsoft.Spark.Extensions.Hyperspace.csproj", "{70DDA4E9-1195-4A29-9AA1-96A8223A6D4F}"
4341
EndProject
4442
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Spark.Extensions.Hyperspace.E2ETest", "Extensions\Microsoft.Spark.Extensions.Hyperspace.E2ETest\Microsoft.Spark.Extensions.Hyperspace.E2ETest.csproj", "{C6019E44-C777-4DE2-B70E-EA025B7D044D}"
4543
EndProject
@@ -93,10 +91,6 @@ Global
9391
{206E16CA-ED59-4F5E-8EA1-9BB7BEEACB63}.Debug|Any CPU.Build.0 = Debug|Any CPU
9492
{206E16CA-ED59-4F5E-8EA1-9BB7BEEACB63}.Release|Any CPU.ActiveCfg = Release|Any CPU
9593
{206E16CA-ED59-4F5E-8EA1-9BB7BEEACB63}.Release|Any CPU.Build.0 = Release|Any CPU
96-
{47652C7D-B076-4FD9-98AC-959E38BE18E3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
97-
{47652C7D-B076-4FD9-98AC-959E38BE18E3}.Debug|Any CPU.Build.0 = Debug|Any CPU
98-
{47652C7D-B076-4FD9-98AC-959E38BE18E3}.Release|Any CPU.ActiveCfg = Release|Any CPU
99-
{47652C7D-B076-4FD9-98AC-959E38BE18E3}.Release|Any CPU.Build.0 = Release|Any CPU
10094
{9C32014D-8C0C-40F1-9ABA-C3BF19687E5C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
10195
{9C32014D-8C0C-40F1-9ABA-C3BF19687E5C}.Debug|Any CPU.Build.0 = Debug|Any CPU
10296
{9C32014D-8C0C-40F1-9ABA-C3BF19687E5C}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -122,7 +116,6 @@ Global
122116
{4E379DB3-7741-43C2-B32D-17AD96FEA7D0} = {C8C53525-4FEB-4B5B-91A2-619566C72F3E}
123117
{2048446B-45AB-4304-B230-50EDF6E8E6A4} = {71A19F75-8279-40AB-BEA0-7D4B153FC416}
124118
{206E16CA-ED59-4F5E-8EA1-9BB7BEEACB63} = {71A19F75-8279-40AB-BEA0-7D4B153FC416}
125-
{47652C7D-B076-4FD9-98AC-959E38BE18E3} = {71A19F75-8279-40AB-BEA0-7D4B153FC416}
126119
{9C32014D-8C0C-40F1-9ABA-C3BF19687E5C} = {71A19F75-8279-40AB-BEA0-7D4B153FC416}
127120
{7BDE09ED-04B3-41B2-A466-3D6F7225291E} = {71A19F75-8279-40AB-BEA0-7D4B153FC416}
128121
{70DDA4E9-1195-4A29-9AA1-96A8223A6D4F} = {71A19F75-8279-40AB-BEA0-7D4B153FC416}

src/csharp/Microsoft.Spark/Sql/DataFrame.cs

+9
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,15 @@ public DataFrame Summary(params string[] statistics) =>
750750
/// <returns>First row</returns>
751751
public Row First() => Head();
752752

753+
/// <summary>
754+
/// Concise syntax for chaining custom transformations.
755+
/// </summary>
756+
/// <param name="func">
757+
/// A function that takes and returns a <see cref="DataFrame"/>
758+
/// </param>
759+
/// <returns>Transformed DataFrame object.</returns>
760+
public DataFrame Transform(Func<DataFrame, DataFrame> func) => func(this);
761+
753762
/// <summary>
754763
/// Returns the first `n` rows in the `DataFrame`.
755764
/// </summary>

0 commit comments

Comments
 (0)