Skip to content

Spark 3.0 readiness part 1 #647

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 51 commits into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
03b7939
Adding section for UDF serialization
Niharikadutta Apr 20, 2020
4ef693d
removing guides from master
Niharikadutta Apr 20, 2020
81145ca
Merge latest from master
Niharikadutta May 6, 2020
e4b81af
merging latest from master
Niharikadutta May 7, 2020
4c32173
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 2, 2020
4987a09
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 14, 2020
ca9612e
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 16, 2020
f581c86
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 20, 2020
086b325
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jun 23, 2020
2f72907
Merge remote-tracking branch 'upstream/master'
Niharikadutta Jul 25, 2020
6bab996
CountVectorizer
Jul 27, 2020
e2a566b
moving private methods to bottom
Jul 27, 2020
5f682a6
changing wrap method
Jul 28, 2020
31371db
setting min version required
Jul 31, 2020
60eb82f
undoing csproj change
Jul 31, 2020
ed36375
member doesnt need to be internal
Jul 31, 2020
c7baf72
too many lines
Jul 31, 2020
d13303c
removing whitespace change
Jul 31, 2020
f5b477c
removing whitespace change
Jul 31, 2020
73db52b
ionide
Jul 31, 2020
98f5e4d
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 7, 2020
4c5d502
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 10, 2020
a766146
Merge branch 'master' into ml/countvectorizer
GoEddie Aug 12, 2020
ad6bced
Merge branch 'ml/countvectorizer' of https://github.com/GoEddie/spark
Niharikadutta Aug 13, 2020
8e1685c
Revert "Merge branch 'master' into ml/countvectorizer"
Niharikadutta Aug 13, 2020
255515e
Revert "Merge branch 'ml/countvectorizer' of https://github.com/GoEdd…
Niharikadutta Aug 13, 2020
a44c882
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 14, 2020
3c2c936
fixing merge errors
Niharikadutta Aug 14, 2020
88e834d
removing ionid
Niharikadutta Aug 20, 2020
a13de2d
Merge branch 'master' of github.com:Niharikadutta/spark
Niharikadutta Aug 21, 2020
13d0e4a
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 24, 2020
595b141
Merge remote-tracking branch 'upstream/master'
Niharikadutta Aug 29, 2020
9cf3cd9
changes
Niharikadutta Aug 30, 2020
e6046ca
first commit
Niharikadutta Sep 1, 2020
decfa48
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 2, 2020
aeb84f7
changes
Niharikadutta Sep 3, 2020
c7a23cc
Changes
Niharikadutta Sep 4, 2020
63e4081
Adding all tests and APIs
Niharikadutta Sep 4, 2020
edaafcd
Merge branch 'master' into nidutta/spark3.0readiness_part1
Niharikadutta Sep 4, 2020
1827175
removing unused library
Niharikadutta Sep 4, 2020
6e55b81
Merge branch 'nidutta/spark3.0readiness_part1' of github.com:Niharika…
Niharikadutta Sep 4, 2020
ce694ff
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 8, 2020
8128ba0
Merge remote-tracking branch 'upstream/master'
Niharikadutta Sep 12, 2020
005c818
resolving merge conflicts
Niharikadutta Sep 13, 2020
9d9cb47
PR review comments
Niharikadutta Sep 15, 2020
23d9b5b
Merge branch 'master' into nidutta/spark3.0readiness_part1
Niharikadutta Sep 15, 2020
d951c66
PR review comments
Niharikadutta Sep 15, 2020
c93ca6c
removing *ActiveSession APIs
Niharikadutta Sep 15, 2020
187c71a
PR review comments
Niharikadutta Sep 16, 2020
451bc26
PR review comments
Niharikadutta Sep 18, 2020
4e9a99c
PR review comments
Niharikadutta Sep 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using Microsoft.Spark.E2ETest.Utils;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;
using Xunit;

namespace Microsoft.Spark.E2ETest.IpcTests
Expand Down Expand Up @@ -91,5 +93,20 @@ public void TestDataFrameStatFunctionSignatures()

df = stat.SampleBy("age", new Dictionary<int, double> { { 1, 0.5 } }, 100);
}

/// <summary>
/// Test signatures for APIs introduced in Spark 3.0.*.
/// </summary>
[SkipIfSparkVersionIsLessThan(Versions.V3_0_0)]
public void TestSignaturesV3_0_X()
{
DataFrameStatFunctions stat = _df.Stat();
Column col = Column("age");

Assert.IsType<DataFrame>(stat.SampleBy(
col,
new Dictionary<int, double> { { 1, 0.5 } },
100));
}
}
}
12 changes: 12 additions & 0 deletions src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,18 @@ public void TestSignaturesV3_X_X()
IEnumerable<Row> actual = df.ToLocalIterator(true).ToArray();
IEnumerable<Row> expected = data.Select(r => new Row(r.Values, schema));
Assert.Equal(expected, actual);

Assert.IsType<DataFrame>(df.Observe("metrics", Count("Name").As("CountNames")));

Assert.IsType<Row[]>(_df.Tail(1).ToArray());

_df.PrintSchema(1);

_df.Explain("simple");
_df.Explain("extended");
_df.Explain("codegen");
_df.Explain("cost");
_df.Explain("formatted");
}
}
}
87 changes: 77 additions & 10 deletions src/csharp/Microsoft.Spark/Sql/DataFrame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ public void PrintSchema() =>
Console.WriteLine(
(string)((JvmObjectReference)_jvmObject.Invoke("schema")).Invoke("treeString"));

/// <summary>
/// Prints the schema up to the given level to the console in a nice tree format.
/// </summary>
[Since(Versions.V3_0_0)]
public void PrintSchema(int level)
{
var schema = (JvmObjectReference)_jvmObject.Invoke("schema");
Console.WriteLine((string)schema.Invoke("treeString", level));
}

/// <summary>
/// Prints the plans (logical and physical) to the console for debugging purposes.
/// </summary>
Expand All @@ -80,6 +90,30 @@ public void Explain(bool extended = false)
Console.WriteLine((string)execution.Invoke(extended ? "toString" : "simpleString"));
}

/// <summary>
/// Prints the plans (logical and physical) with a format specified by a given explain
/// mode.
///
/// </summary>
/// <param name="mode">Specifies the expected output format of plans.
/// 1. `simple` Print only a physical plan.
/// 2. `extended`: Print both logical and physical plans.
/// 3. `codegen`: Print a physical plan and generated codes if they are available.
/// 4. `cost`: Print a logical plan and statistics if they are available.
/// 5. `formatted`: Split explain output into two sections: a physical plan outline and
/// node details.
/// </param>
[Since(Versions.V3_0_0)]
public void Explain(string mode)
{
var execution = (JvmObjectReference)_jvmObject.Invoke("queryExecution");
var explainMode = (JvmObjectReference)_jvmObject.Jvm.CallStaticJavaMethod(
"org.apache.spark.sql.execution.ExplainMode",
"fromString",
mode);
Console.WriteLine((string)execution.Invoke("explainString", explainMode));
}

/// <summary>
/// Returns all column names and their data types as an IEnumerable of Tuples.
/// </summary>
Expand Down Expand Up @@ -480,6 +514,27 @@ public RelationalGroupedDataset Cube(string column, params string[] columns) =>
public DataFrame Agg(Column expr, params Column[] exprs) =>
WrapAsDataFrame(_jvmObject.Invoke("agg", expr, exprs));

/// <summary>
/// Define (named) metrics to observe on the Dataset. This method returns an 'observed'
/// DataFrame that returns the same result as the input, with the following guarantees:
///
/// 1. It will compute the defined aggregates(metrics) on all the data that is flowing
/// through the Dataset at that point.
/// 2. It will report the value of the defined aggregate columns as soon as we reach a
/// completion point.A completion point is either the end of a query(batch mode) or the end
/// of a streaming epoch. The value of the aggregates only reflects the data processed
/// since the previous completion point.
///
/// Please note that continuous execution is currently not supported.
/// </summary>
/// <param name="name">Named metrics to observe</param>
/// <param name="expr">Defined aggregate to observe</param>
/// <param name="exprs">Defined aggregates to observe</param>
/// <returns>DataFrame object</returns>
[Since(Versions.V3_0_0)]
public DataFrame Observe(string name, Column expr, params Column[] exprs) =>
WrapAsDataFrame(_jvmObject.Invoke("observe", name, expr, exprs));

/// <summary>
/// Returns a new `DataFrame` by taking the first `number` rows.
/// </summary>
Expand Down Expand Up @@ -702,6 +757,17 @@ public DataFrame Summary(params string[] statistics) =>
/// <returns>First `n` rows</returns>
public IEnumerable<Row> Take(int n) => Head(n);

/// <summary>
/// Returns the last `n` rows in the `DataFrame`.
/// </summary>
/// <param name="n">Number of rows</param>
/// <returns>Last `n` rows</returns>
[Since(Versions.V3_0_0)]
public IEnumerable<Row> Tail(int n)
{
return GetRows("tailToPython", n);
}

/// <summary>
/// Returns an array that contains all rows in this `DataFrame`.
/// </summary>
Expand Down Expand Up @@ -929,16 +995,15 @@ public DataStreamWriter WriteStream() =>
new DataStreamWriter((JvmObjectReference)_jvmObject.Invoke("writeStream"), this);

/// <summary>
/// Returns row objects based on the function (either "toPythonIterator" or
/// "collectToPython").
/// Returns row objects based on the function (either "toPythonIterator",
/// "collectToPython", or "tailToPython").
/// </summary>
/// <param name="funcName">
/// The name of the function to call, either "toPythonIterator" or "collectToPython".
/// </param>
/// <returns><see cref="Row"/> objects</returns>
private IEnumerable<Row> GetRows(string funcName)
/// <param name="funcName">String name of function to call</param>
/// <param name="args">Arguments to the function</param>
/// <returns>IEnumerable of Rows from Spark</returns>
private IEnumerable<Row> GetRows(string funcName, params object[] args)
{
(int port, string secret, _) = GetConnectionInfo(funcName);
(int port, string secret, _) = GetConnectionInfo(funcName, args);
using ISocketWrapper socket = SocketFactory.CreateSocket();
socket.Connect(IPAddress.Loopback, port, secret);
foreach (Row row in new RowCollector().Collect(socket))
Expand All @@ -952,9 +1017,11 @@ private IEnumerable<Row> GetRows(string funcName)
/// used for connecting with Spark to receive rows for this `DataFrame`.
/// </summary>
/// <returns>A tuple of port number, secret string, and JVM socket auth server.</returns>
private (int, string, JvmObjectReference) GetConnectionInfo(string funcName)
private (int, string, JvmObjectReference) GetConnectionInfo(
string funcName,
params object[] args)
{
object result = _jvmObject.Invoke(funcName);
object result = _jvmObject.Invoke(funcName, args);
Version version = SparkEnvironment.SparkVersion;
return (version.Major, version.Minor, version.Build) switch
{
Expand Down
16 changes: 16 additions & 0 deletions src/csharp/Microsoft.Spark/Sql/DataFrameStatFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,22 @@ public DataFrame SampleBy<T>(
long seed) =>
WrapAsDataFrame(_jvmObject.Invoke("sampleBy", columnName, fractions, seed));

/// <summary>
/// Returns a stratified sample without replacement based on the fraction given
/// on each stratum.
/// </summary>
/// <typeparam name="T">Stratum type</typeparam>
/// <param name="column">Column that defines strata</param>
/// <param name="fractions">
/// Sampling fraction for each stratum. If a stratum is not specified, we treat
/// its fraction as zero.
/// </param>
/// <param name="seed">Random seed</param>
/// <returns>DataFrame object</returns>
[Since(Versions.V3_0_0)]
public DataFrame SampleBy<T>(Column column, IDictionary<T, double> fractions, long seed) =>
WrapAsDataFrame(_jvmObject.Invoke("sampleBy", column, fractions, seed));

private DataFrame WrapAsDataFrame(object obj) => new DataFrame((JvmObjectReference)obj);
}
}
24 changes: 24 additions & 0 deletions src/csharp/Microsoft.Spark/Sql/SparkSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,30 @@ public DataFrame CreateDataFrame(IEnumerable<Timestamp> data) =>
public DataFrame Sql(string sqlText) =>
new DataFrame((JvmObjectReference)_jvmObject.Invoke("sql", sqlText));

/// <summary>
/// Execute an arbitrary string command inside an external execution engine rather than
/// Spark. This could be useful when user wants to execute some commands out of Spark. For
/// example, executing custom DDL/DML command for JDBC, creating index for ElasticSearch,
/// creating cores for Solr and so on.
/// The command will be eagerly executed after this method is called and the returned
/// DataFrame will contain the output of the command(if any).
/// </summary>
/// <param name="runner">The class name of the runner that implements
/// `ExternalCommandRunner`</param>
/// <param name="command">The target command to be executed</param>
/// <param name="options">The options for the runner</param>
/// <returns>>DataFrame object</returns>
[Since(Versions.V3_0_0)]
public DataFrame ExecuteCommand(
string runner,
string command,
Dictionary<string, string> options) =>
new DataFrame((JvmObjectReference)_jvmObject.Invoke(
"executeCommand",
runner,
command,
options));

/// <summary>
/// Returns a DataFrameReader that can be used to read non-streaming data in
/// as a DataFrame.
Expand Down