diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameFunctionsTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameFunctionsTests.cs index aa7c83887..f982c721d 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameFunctionsTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameFunctionsTests.cs @@ -3,7 +3,9 @@ // See the LICENSE file in the project root for more information. using System.Collections.Generic; +using Microsoft.Spark.E2ETest.Utils; using Microsoft.Spark.Sql; +using static Microsoft.Spark.Sql.Functions; using Xunit; namespace Microsoft.Spark.E2ETest.IpcTests @@ -91,5 +93,20 @@ public void TestDataFrameStatFunctionSignatures() df = stat.SampleBy("age", new Dictionary { { 1, 0.5 } }, 100); } + + /// + /// Test signatures for APIs introduced in Spark 3.0.*. + /// + [SkipIfSparkVersionIsLessThan(Versions.V3_0_0)] + public void TestSignaturesV3_0_X() + { + DataFrameStatFunctions stat = _df.Stat(); + Column col = Column("age"); + + Assert.IsType(stat.SampleBy( + col, + new Dictionary { { 1, 0.5 } }, + 100)); + } } } diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs index 423f026ca..c8cd1f819 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs @@ -712,6 +712,18 @@ public void TestSignaturesV3_X_X() IEnumerable actual = df.ToLocalIterator(true).ToArray(); IEnumerable expected = data.Select(r => new Row(r.Values, schema)); Assert.Equal(expected, actual); + + Assert.IsType(df.Observe("metrics", Count("Name").As("CountNames"))); + + Assert.IsType(_df.Tail(1).ToArray()); + + _df.PrintSchema(1); + + _df.Explain("simple"); + _df.Explain("extended"); + _df.Explain("codegen"); + _df.Explain("cost"); + _df.Explain("formatted"); } } } diff --git a/src/csharp/Microsoft.Spark/Sql/DataFrame.cs b/src/csharp/Microsoft.Spark/Sql/DataFrame.cs index 1c4d1de8d..587b3d351 100644 --- a/src/csharp/Microsoft.Spark/Sql/DataFrame.cs +++ b/src/csharp/Microsoft.Spark/Sql/DataFrame.cs @@ -70,6 +70,16 @@ public void PrintSchema() => Console.WriteLine( (string)((JvmObjectReference)_jvmObject.Invoke("schema")).Invoke("treeString")); + /// + /// Prints the schema up to the given level to the console in a nice tree format. + /// + [Since(Versions.V3_0_0)] + public void PrintSchema(int level) + { + var schema = (JvmObjectReference)_jvmObject.Invoke("schema"); + Console.WriteLine((string)schema.Invoke("treeString", level)); + } + /// /// Prints the plans (logical and physical) to the console for debugging purposes. /// @@ -80,6 +90,30 @@ public void Explain(bool extended = false) Console.WriteLine((string)execution.Invoke(extended ? "toString" : "simpleString")); } + /// + /// Prints the plans (logical and physical) with a format specified by a given explain + /// mode. + /// + /// + /// Specifies the expected output format of plans. + /// 1. `simple` Print only a physical plan. + /// 2. `extended`: Print both logical and physical plans. + /// 3. `codegen`: Print a physical plan and generated codes if they are available. + /// 4. `cost`: Print a logical plan and statistics if they are available. + /// 5. `formatted`: Split explain output into two sections: a physical plan outline and + /// node details. + /// + [Since(Versions.V3_0_0)] + public void Explain(string mode) + { + var execution = (JvmObjectReference)_jvmObject.Invoke("queryExecution"); + var explainMode = (JvmObjectReference)_jvmObject.Jvm.CallStaticJavaMethod( + "org.apache.spark.sql.execution.ExplainMode", + "fromString", + mode); + Console.WriteLine((string)execution.Invoke("explainString", explainMode)); + } + /// /// Returns all column names and their data types as an IEnumerable of Tuples. /// @@ -480,6 +514,27 @@ public RelationalGroupedDataset Cube(string column, params string[] columns) => public DataFrame Agg(Column expr, params Column[] exprs) => WrapAsDataFrame(_jvmObject.Invoke("agg", expr, exprs)); + /// + /// Define (named) metrics to observe on the Dataset. This method returns an 'observed' + /// DataFrame that returns the same result as the input, with the following guarantees: + /// + /// 1. It will compute the defined aggregates(metrics) on all the data that is flowing + /// through the Dataset at that point. + /// 2. It will report the value of the defined aggregate columns as soon as we reach a + /// completion point.A completion point is either the end of a query(batch mode) or the end + /// of a streaming epoch. The value of the aggregates only reflects the data processed + /// since the previous completion point. + /// + /// Please note that continuous execution is currently not supported. + /// + /// Named metrics to observe + /// Defined aggregate to observe + /// Defined aggregates to observe + /// DataFrame object + [Since(Versions.V3_0_0)] + public DataFrame Observe(string name, Column expr, params Column[] exprs) => + WrapAsDataFrame(_jvmObject.Invoke("observe", name, expr, exprs)); + /// /// Returns a new `DataFrame` by taking the first `number` rows. /// @@ -702,6 +757,17 @@ public DataFrame Summary(params string[] statistics) => /// First `n` rows public IEnumerable Take(int n) => Head(n); + /// + /// Returns the last `n` rows in the `DataFrame`. + /// + /// Number of rows + /// Last `n` rows + [Since(Versions.V3_0_0)] + public IEnumerable Tail(int n) + { + return GetRows("tailToPython", n); + } + /// /// Returns an array that contains all rows in this `DataFrame`. /// @@ -929,16 +995,15 @@ public DataStreamWriter WriteStream() => new DataStreamWriter((JvmObjectReference)_jvmObject.Invoke("writeStream"), this); /// - /// Returns row objects based on the function (either "toPythonIterator" or - /// "collectToPython"). + /// Returns row objects based on the function (either "toPythonIterator", + /// "collectToPython", or "tailToPython"). /// - /// - /// The name of the function to call, either "toPythonIterator" or "collectToPython". - /// - /// objects - private IEnumerable GetRows(string funcName) + /// String name of function to call + /// Arguments to the function + /// IEnumerable of Rows from Spark + private IEnumerable GetRows(string funcName, params object[] args) { - (int port, string secret, _) = GetConnectionInfo(funcName); + (int port, string secret, _) = GetConnectionInfo(funcName, args); using ISocketWrapper socket = SocketFactory.CreateSocket(); socket.Connect(IPAddress.Loopback, port, secret); foreach (Row row in new RowCollector().Collect(socket)) @@ -952,9 +1017,11 @@ private IEnumerable GetRows(string funcName) /// used for connecting with Spark to receive rows for this `DataFrame`. /// /// A tuple of port number, secret string, and JVM socket auth server. - private (int, string, JvmObjectReference) GetConnectionInfo(string funcName) + private (int, string, JvmObjectReference) GetConnectionInfo( + string funcName, + params object[] args) { - object result = _jvmObject.Invoke(funcName); + object result = _jvmObject.Invoke(funcName, args); Version version = SparkEnvironment.SparkVersion; return (version.Major, version.Minor, version.Build) switch { diff --git a/src/csharp/Microsoft.Spark/Sql/DataFrameStatFunctions.cs b/src/csharp/Microsoft.Spark/Sql/DataFrameStatFunctions.cs index 45c4621c2..492163a2b 100644 --- a/src/csharp/Microsoft.Spark/Sql/DataFrameStatFunctions.cs +++ b/src/csharp/Microsoft.Spark/Sql/DataFrameStatFunctions.cs @@ -121,6 +121,22 @@ public DataFrame SampleBy( long seed) => WrapAsDataFrame(_jvmObject.Invoke("sampleBy", columnName, fractions, seed)); + /// + /// Returns a stratified sample without replacement based on the fraction given + /// on each stratum. + /// + /// Stratum type + /// Column that defines strata + /// + /// Sampling fraction for each stratum. If a stratum is not specified, we treat + /// its fraction as zero. + /// + /// Random seed + /// DataFrame object + [Since(Versions.V3_0_0)] + public DataFrame SampleBy(Column column, IDictionary fractions, long seed) => + WrapAsDataFrame(_jvmObject.Invoke("sampleBy", column, fractions, seed)); + private DataFrame WrapAsDataFrame(object obj) => new DataFrame((JvmObjectReference)obj); } } diff --git a/src/csharp/Microsoft.Spark/Sql/SparkSession.cs b/src/csharp/Microsoft.Spark/Sql/SparkSession.cs index f0eab693f..a41f585ec 100644 --- a/src/csharp/Microsoft.Spark/Sql/SparkSession.cs +++ b/src/csharp/Microsoft.Spark/Sql/SparkSession.cs @@ -255,6 +255,30 @@ public DataFrame CreateDataFrame(IEnumerable data) => public DataFrame Sql(string sqlText) => new DataFrame((JvmObjectReference)_jvmObject.Invoke("sql", sqlText)); + /// + /// Execute an arbitrary string command inside an external execution engine rather than + /// Spark. This could be useful when user wants to execute some commands out of Spark. For + /// example, executing custom DDL/DML command for JDBC, creating index for ElasticSearch, + /// creating cores for Solr and so on. + /// The command will be eagerly executed after this method is called and the returned + /// DataFrame will contain the output of the command(if any). + /// + /// The class name of the runner that implements + /// `ExternalCommandRunner` + /// The target command to be executed + /// The options for the runner + /// >DataFrame object + [Since(Versions.V3_0_0)] + public DataFrame ExecuteCommand( + string runner, + string command, + Dictionary options) => + new DataFrame((JvmObjectReference)_jvmObject.Invoke( + "executeCommand", + runner, + command, + options)); + /// /// Returns a DataFrameReader that can be used to read non-streaming data in /// as a DataFrame.