Skip to content

Broadcast encryption support #489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 110 commits into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from 108 commits
Commits
Show all changes
110 commits
Select commit Hold shift + click to select a range
cf9cf89
Broadcast encryption support plus test
Niharikadutta Apr 14, 2020
03b7939
Adding section for UDF serialization
Niharikadutta Apr 20, 2020
4ef693d
removing guides from master
Niharikadutta Apr 20, 2020
81145ca
Merge latest from master
Niharikadutta May 6, 2020
e4b81af
merging latest from master
Niharikadutta May 7, 2020
4c32173
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 2, 2020
4987a09
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 14, 2020
ca9612e
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 16, 2020
f581c86
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 20, 2020
79f6a6f
Adding ChunkedStream
Niharikadutta Jun 23, 2020
086b325
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 23, 2020
a8033a1
Merge branch 'master' into nidutta/BroadcastEncryptionSupport
Niharikadutta Jun 26, 2020
ce135f0
pushing latest changes
Niharikadutta Jul 3, 2020
f8f0420
Adding chunkedstream logic
Niharikadutta Jul 7, 2020
2f72907
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jul 25, 2020
6bab996
CountVectorizer
Jul 27, 2020
e2a566b
moving private methods to bottom
Jul 27, 2020
5f682a6
changing wrap method
Jul 28, 2020
31371db
setting min version required
Jul 31, 2020
60eb82f
undoing csproj change
Jul 31, 2020
ed36375
member doesnt need to be internal
Jul 31, 2020
c7baf72
too many lines
Jul 31, 2020
d13303c
removing whitespace change
Jul 31, 2020
f5b477c
removing whitespace change
Jul 31, 2020
73db52b
ionide
Jul 31, 2020
98f5e4d
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 7, 2020
4c5d502
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 10, 2020
a766146
Merge branch 'master' into ml/countvectorizer
GoEddie Aug 12, 2020
9c33ce8
changes
Niharikadutta Aug 12, 2020
ad6bced
Merge branch 'ml/countvectorizer' of https://github.com/GoEddie/spark
Niharikadutta Aug 13, 2020
1013ed5
Merge branch 'master' into nidutta/BroadcastEncryptionSupport
Niharikadutta Aug 13, 2020
8e1685c
Revert "Merge branch 'master' into ml/countvectorizer"
Niharikadutta Aug 13, 2020
255515e
Revert "Merge branch 'ml/countvectorizer' of https://github.com/GoEdd…
Niharikadutta Aug 13, 2020
fc98715
Merge branch 'nidutta/BroadcastEncryptionSupport' of github.com:Nihar…
Niharikadutta Aug 13, 2020
a44c882
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 14, 2020
3c2c936
fixing merge errors
Niharikadutta Aug 14, 2020
88e834d
removing ionid
Niharikadutta Aug 20, 2020
59e7299
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 20, 2020
cf2d98f
Merge branch 'master' into nidutta/BroadcastEncryptionSupport
Niharikadutta Aug 20, 2020
f8baee5
Working changes
Niharikadutta Aug 20, 2020
a13de2d
Merge branch 'master' of github.com:Niharikadutta/spark
Niharikadutta Aug 21, 2020
e77881e
Fixing worker unit tests failing
Niharikadutta Aug 22, 2020
e3ab1a7
Adding comments and cleaning code
Niharikadutta Aug 22, 2020
4eb0e5e
Merge branch 'master' into nidutta/BroadcastEncryptionSupport
Niharikadutta Aug 22, 2020
0cb63ef
fixing pipeline hung
Niharikadutta Aug 22, 2020
b25d73c
PR review comments
Niharikadutta Aug 24, 2020
13d0e4a
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 24, 2020
7582c85
adding broadcast test to pipeline to filter out
Niharikadutta Aug 24, 2020
7de60e3
PR review changes
Niharikadutta Aug 26, 2020
edd098e
removing extra copy of file
Niharikadutta Aug 27, 2020
f8b9071
Removing ChunkedStream class
Niharikadutta Aug 29, 2020
f83ed47
removing unused library
Niharikadutta Aug 29, 2020
595b141
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 29, 2020
27c9af2
PR review changes
Niharikadutta Aug 29, 2020
63a65ac
Variable name consistency
Niharikadutta Aug 29, 2020
73a4e83
Merge branch 'master' into nidutta/BroadcastEncryptionSupport
Niharikadutta Aug 31, 2020
7555b93
nit
Niharikadutta Aug 31, 2020
1e5c81e
Merge branch 'nidutta/BroadcastEncryptionSupport' of github.com:Nihar…
Niharikadutta Aug 31, 2020
6c066e2
debugging pipeline hang spark 2.4.0 onwards - removed large broadcast…
Niharikadutta Aug 31, 2020
b51ebee
Debugging pipeline hang
Niharikadutta Sep 1, 2020
d4a587b
Merge branch 'master' into nidutta/BroadcastEncryptionSupport
Niharikadutta Sep 1, 2020
87db025
PR review changes
Niharikadutta Sep 2, 2020
b21ab2d
Merge branch 'nidutta/BroadcastEncryptionSupport' of github.com:Nihar…
Niharikadutta Sep 2, 2020
ac87c46
PR review changes
Niharikadutta Sep 2, 2020
04e46e1
reverted changes
Niharikadutta Sep 2, 2020
049f137
fixing error
Niharikadutta Sep 2, 2020
996975e
PR review changes
Niharikadutta Sep 2, 2020
decfa48
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 2, 2020
fe5c6f4
re-triggering pipeline failing due to ActiveSession bug
Niharikadutta Sep 2, 2020
83815b8
debugging pipeline hanging
Niharikadutta Sep 3, 2020
8017605
adding TestDestroyWithEncryption
Niharikadutta Sep 4, 2020
25737cb
filtering test
Niharikadutta Sep 4, 2020
11b27c8
PR review changes
Niharikadutta Sep 8, 2020
6cd4f30
Merge branch 'master' into nidutta/BroadcastEncryptionSupport
Niharikadutta Sep 8, 2020
ce694ff
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 8, 2020
2c14f88
pipeline hang
Niharikadutta Sep 9, 2020
a54b716
Merge branch 'master' into nidutta/BroadcastEncryptionSupport
suhsteve Sep 9, 2020
8128ba0
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 12, 2020
c696fcd
Merge branch 'master' into nidutta/BroadcastEncryptionSupport
Niharikadutta Sep 12, 2020
9b6190f
Merge branch 'nidutta/BroadcastEncryptionSupport' of github.com:Nihar…
Niharikadutta Sep 12, 2020
09c3944
PR review changes.
Niharikadutta Sep 13, 2020
605e305
Merge branch 'master' into nidutta/BroadcastEncryptionSupport
Niharikadutta Sep 15, 2020
43f5054
PR review comments
Niharikadutta Sep 17, 2020
b381ab8
pipeline changes
Niharikadutta Sep 17, 2020
59d55fd
testing pipeline hanging reason
Niharikadutta Sep 17, 2020
a72225a
removing encryption from destroy and unpersist.
Niharikadutta Sep 17, 2020
b7aaa54
removing encryption false from TestBroadcastLargeFalse
Niharikadutta Sep 18, 2020
0ab0527
disabling all broadcast tests
Niharikadutta Sep 18, 2020
fd8ed11
Enabling multiple tests with encryption on and off
Niharikadutta Sep 18, 2020
e092505
Merge branch 'master' into nidutta/BroadcastEncryptionSupport
Niharikadutta Sep 19, 2020
83d0acf
disabling tests again
Niharikadutta Sep 19, 2020
f140297
enabling all encryotion false tests
Niharikadutta Sep 19, 2020
52f0a74
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 19, 2020
fdcd11d
enabling multiple broadcast encryption on(destroying broadcast variab…
Niharikadutta Sep 19, 2020
2c743e1
Enabling encryption on for TestDestroy
Niharikadutta Sep 19, 2020
e295875
disabling encryption for TestDestroy and enabling it for TestUnpersist
Niharikadutta Sep 19, 2020
3d0c7e2
enabling TestLargeBroadcastValue with encryption on
Niharikadutta Sep 20, 2020
e682bf5
Enabling TestLargeBroadcastValue with encryption off
Niharikadutta Sep 20, 2020
a579732
disabling TestLargeBroadcastValue encryption off
Niharikadutta Sep 20, 2020
6a89f01
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 24, 2020
6ba78e1
Resolving merge conflicts
Niharikadutta Sep 24, 2020
7ff736e
Merge branch 'master' into nidutta/BroadcastEncryptionSupport
Niharikadutta Sep 24, 2020
3cfdb6a
Merge branch 'master' into nidutta/BroadcastEncryptionSupport
Niharikadutta Sep 25, 2020
e120cdf
Merge branch 'master' into nidutta/BroadcastEncryptionSupport
Niharikadutta Sep 25, 2020
6be2f17
Merge branch 'master' into nidutta/BroadcastEncryptionSupport
imback82 Sep 26, 2020
5024860
Merge branch 'master' into nidutta/BroadcastEncryptionSupport
imback82 Sep 28, 2020
228f2f3
PR comments
Niharikadutta Sep 28, 2020
63faa28
Merge branch 'nidutta/BroadcastEncryptionSupport' of github.com:Nihar…
Niharikadutta Sep 28, 2020
efa424c
Update src/csharp/Microsoft.Spark.Worker/Processor/BroadcastVariableP…
Niharikadutta Sep 28, 2020
d047ac2
Update src/csharp/Microsoft.Spark.Worker/Processor/BroadcastVariableP…
imback82 Sep 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ variables:
backwardCompatibleTestsToFilterOut: "(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestDataFrameGroupedMapUdf)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestDataFrameVectorUdf)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestDestroy)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestMultipleBroadcastWithoutEncryption)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestMultipleBroadcast)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestUnpersist)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithArrayType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithArrayOfArrayType)&\
Expand Down
23 changes: 17 additions & 6 deletions src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ public BroadcastTests(SparkFixture fixture)
/// <summary>
/// Test Broadcast support by using multiple broadcast variables in a UDF.
/// </summary>
[Fact]
public void TestMultipleBroadcastWithoutEncryption()
[Theory]
[InlineData("true")]
[InlineData("false")]
public void TestMultipleBroadcast(string isEncryptionEnabled)
{
_spark.SparkContext.GetConf().Set("spark.io.encryption.enabled", isEncryptionEnabled);
var obj1 = new TestBroadcastVariable(1, "first");
var obj2 = new TestBroadcastVariable(2, "second");
Broadcast<TestBroadcastVariable> bc1 = _spark.SparkContext.Broadcast(obj1);
Expand All @@ -49,15 +52,20 @@ public void TestMultipleBroadcastWithoutEncryption()

string[] actual = ToStringArray(_df.Select(udf(_df["_1"])));
Assert.Equal(expected, actual);
bc1.Destroy();
bc2.Destroy();
}

/// <summary>
/// Test Broadcast.Destroy() that destroys all data and metadata related to the broadcast
/// variable and makes it inaccessible from workers.
/// </summary>
[Fact]
public void TestDestroy()
[Theory]
[InlineData("true")]
[InlineData("false")]
public void TestDestroy(string isEncryptionEnabled)
{
_spark.SparkContext.GetConf().Set("spark.io.encryption.enabled", isEncryptionEnabled);
var obj1 = new TestBroadcastVariable(5, "destroy");
Broadcast<TestBroadcastVariable> bc1 = _spark.SparkContext.Broadcast(obj1);

Expand Down Expand Up @@ -96,9 +104,12 @@ public void TestDestroy()
/// Test Broadcast.Unpersist() deletes cached copies of the broadcast on the executors. If
/// the broadcast is used after unpersist is called, it is re-sent to the executors.
/// </summary>
[Fact]
public void TestUnpersist()
[Theory]
[InlineData("true")]
[InlineData("false")]
public void TestUnpersist(string isEncryptionEnabled)
{
_spark.SparkContext.GetConf().Set("spark.io.encryption.enabled", isEncryptionEnabled);
var obj = new TestBroadcastVariable(1, "unpersist");
Broadcast<TestBroadcastVariable> bc = _spark.SparkContext.Broadcast(obj);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Runtime.Serialization.Formatters.Binary;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Network;

namespace Microsoft.Spark.Worker.Processor
{
Expand All @@ -25,6 +28,7 @@ internal BroadcastVariableProcessor(Version version)
internal BroadcastVariables Process(Stream stream)
{
var broadcastVars = new BroadcastVariables();
ISocketWrapper socket = null;

if (_version >= new Version(Versions.V2_3_2))
{
Expand All @@ -37,7 +41,14 @@ internal BroadcastVariables Process(Stream stream)
{
broadcastVars.DecryptionServerPort = SerDe.ReadInt32(stream);
broadcastVars.Secret = SerDe.ReadString(stream);
// TODO: Handle the authentication.
if (broadcastVars.Count > 0)
{
socket = SocketFactory.CreateSocket();
socket.Connect(
IPAddress.Loopback,
broadcastVars.DecryptionServerPort,
broadcastVars.Secret);
}
}

var formatter = new BinaryFormatter();
Expand All @@ -48,8 +59,15 @@ internal BroadcastVariables Process(Stream stream)
{
if (broadcastVars.DecryptionServerNeeded)
{
throw new NotImplementedException(
"broadcastDecryptionServer is not implemented.");
long readBid = SerDe.ReadInt64(socket.InputStream);
if (bid != readBid)
{
throw new Exception($"The Broadcast Id received from the encryption" +
$" server {readBid} is different from the Broadcast Id received" +
$" from the payload {bid}.");
}
object value = formatter.Deserialize(socket.InputStream);
BroadcastRegistry.Add(bid, value);
}
else
{
Expand All @@ -66,6 +84,7 @@ internal BroadcastVariables Process(Stream stream)
BroadcastRegistry.Remove(bid);
}
}
socket?.Dispose();
return broadcastVars;
}
}
Expand Down
43 changes: 36 additions & 7 deletions src/csharp/Microsoft.Spark/Broadcast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.Threading;
using Microsoft.Spark.Interop;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Network;
using Microsoft.Spark.Services;


namespace Microsoft.Spark
{
/// <summary>
Expand Down Expand Up @@ -171,21 +172,49 @@ private JvmObjectReference CreateBroadcast_V2_3_2_AndAbove(
bool encryptionEnabled = bool.Parse(
sc.GetConf().Get("spark.io.encryption.enabled", "false"));

var _pythonBroadcast = (JvmObjectReference)javaSparkContext.Jvm.CallStaticJavaMethod(
"org.apache.spark.api.python.PythonRDD",
"setupBroadcast",
_path);

if (encryptionEnabled)
{
throw new NotImplementedException("Broadcast encryption is not supported yet.");
var pair = (JvmObjectReference[])_pythonBroadcast.Invoke("setupEncryptionServer");

using (ISocketWrapper socket = SocketFactory.CreateSocket())
{
socket.Connect(
IPAddress.Loopback,
(int)pair[0].Invoke("intValue"), // port number
(string)pair[1].Invoke("toString")); // secret
WriteToStream(value, socket.OutputStream);
}
_pythonBroadcast.Invoke("waitTillDataReceived");
}
else
{
WriteToFile(value);
}

var pythonBroadcast = (JvmObjectReference)javaSparkContext.Jvm.CallStaticJavaMethod(
"org.apache.spark.api.python.PythonRDD",
"setupBroadcast",
_path);
return (JvmObjectReference)javaSparkContext.Invoke("broadcast", _pythonBroadcast);
}

return (JvmObjectReference)javaSparkContext.Invoke("broadcast", pythonBroadcast);
/// TODO: This is not performant in the case of Broadcast encryption as it writes to stream
/// only after serializing the whole value, instead of serializing and writing in chunks
/// like Python.
/// <summary>
/// Function to write the broadcast value into the stream.
/// </summary>
/// <param name="value">Broadcast value to be written to the stream</param>
/// <param name="stream">Stream to write value to</param>
private void WriteToStream(object value, Stream stream)
{
using var ms = new MemoryStream();
Dump(value, ms);
SerDe.Write(stream, ms.Length);
ms.WriteTo(stream);
// -1 length indicates to the receiving end that we're done.
SerDe.Write(stream, -1);
}

/// <summary>
Expand Down