-
Notifications
You must be signed in to change notification settings - Fork 21.8k
Update apache-spark-sql-connector.md #127981
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,157 +1,189 @@ | ||||||||||
| --- | ||||||||||
| title: Azure SQL and SQL Server | ||||||||||
| description: This article provides information on how to use the connector for moving data between Azure MS SQL and serverless Apache Spark pools. | ||||||||||
| author: ms-arali | ||||||||||
| ms.author: arali | ||||||||||
| ms.service: azure-synapse-analytics | ||||||||||
| ms.topic: overview | ||||||||||
| ms.subservice: spark | ||||||||||
| ms.date: 05/19/2020 | ||||||||||
| ms.custom: has-adal-ref | ||||||||||
| title: Spark connector for SQL databases | ||||||||||
| description: Learn how to use Spark Connector to connect to Azure SQL databases from Synapse Spark Runtime | ||||||||||
| author: eric-urban | ||||||||||
| ms.author: eur | ||||||||||
| ms.reviewer: arali | ||||||||||
| ms.topic: how-to | ||||||||||
| ms.date: 10/01/2025 | ||||||||||
| --- | ||||||||||
|
|
||||||||||
| # Azure SQL Database and SQL Server connector for Apache Spark | ||||||||||
| The Apache Spark connector for Azure SQL Database and SQL Server enables these databases to act as input data sources and output data sinks for Apache Spark jobs. It allows you to use real-time transactional data in big data analytics and persist results for ad-hoc queries or reporting. | ||||||||||
| # Spark connector for SQL databases (Preview) | ||||||||||
|
|
||||||||||
| Compared to the built-in JDBC connector, this connector provides the ability to bulk insert data into SQL databases. It can outperform row-by-row insertion with 10 to 20 times faster performance. The Spark connector for SQL Server and Azure SQL Database also supports Microsoft Entra [authentication](/sql/connect/spark/connector#azure-active-directory-authentication), enabling you to connect securely to your Azure SQL databases from Azure Synapse Analytics. | ||||||||||
| > [!IMPORTANT] | ||||||||||
| > This feature is in preview. | ||||||||||
| This article covers how to use the DataFrame API to connect to SQL databases using the MS SQL connector. This article provides detailed examples using the PySpark API. For all of the supported arguments and samples for connecting to SQL databases using the MS SQL connector, see [Azure Data SQL samples](https://github.com/microsoft/sql-server-samples#azure-data-sql-samples-repository). | ||||||||||
| The Spark connector for SQL databases is a high-performance library that lets you read from and write to SQL Server, Azure SQL databases, and Fabric SQL databases. The connector offers the following capabilities: | ||||||||||
|
|
||||||||||
| * Use Spark to run large write and read operations on Azure SQL Database, Azure SQL Managed Instance, SQL Server on Azure VM, and Fabric SQL databases. | ||||||||||
| * When you use a table or a view, the connector supports security models set at the SQL engine level. These models include object-level security (OLS), row-level security (RLS), and column-level security (CLS). | ||||||||||
|
|
||||||||||
|
|
||||||||||
| ## Connection details | ||||||||||
| In this example, we'll use the Microsoft Spark utilities to facilitate acquiring secrets from a preconfigured Key Vault. To learn more about Microsoft Spark utilities, visit [introduction to Microsoft Spark Utilities](../microsoft-spark-utilities.md). | ||||||||||
| The connector is preinstalled in the Synapse Spark 3.5 runtime, so you don't need to install it separately. | ||||||||||
|
|
||||||||||
| ```python | ||||||||||
| # The servername is in the format "jdbc:sqlserver://<AzureSQLServerName>.database.windows.net:1433" | ||||||||||
| servername = "<< server name >>" | ||||||||||
| dbname = "<< database name >>" | ||||||||||
| url = servername + ";" + "databaseName=" + dbname + ";" | ||||||||||
| dbtable = "<< table name >> " | ||||||||||
| user = "<< username >>" | ||||||||||
| principal_client_id = "<< service principal client id >>" | ||||||||||
| principal_secret = "<< service principal secret ho>>" | ||||||||||
| password = mssparkutils.credentials.getSecret('azure key vault name','secret name') | ||||||||||
| ``` | ||||||||||
| ## Authentication | ||||||||||
|
|
||||||||||
| Microsoft Entra authentication is integrated with Azure Synapse. | ||||||||||
| - When you sign in to the Synapse workspace and use it in the notebook, your credentials are automatically passed to the SQL engine for authentication and authorization. | ||||||||||
| - Requires Microsoft Entra ID to be enabled and configured on your SQL database engine. | ||||||||||
| - No extra configuration is needed in your Spark code if Microsoft Entra ID is set up. The credentials are automatically mapped. | ||||||||||
|
|
||||||||||
| You can also use the SQL authentication method (by specifying a SQL username and password) or a service principal (by providing an Azure access token for app-based authentication). | ||||||||||
|
|
||||||||||
| ### Permissions | ||||||||||
|
|
||||||||||
| To use the Spark connector, your identity—whether it's a user or an app—must have the necessary database permissions for the target SQL engine. These permissions are required to read from or write to tables and views. | ||||||||||
|
|
||||||||||
| For Azure SQL Database, Azure SQL Managed Instance, and SQL Server on Azure VM: | ||||||||||
| - The identity running the operation typically needs `db_datawriter` and `db_datareader` permissions, and optionally `db_owner` for full control. | ||||||||||
|
|
||||||||||
| For Fabric SQL databases: | ||||||||||
| - The identity typically needs `db_datawriter` and `db_datareader` permissions, and optionally `db_owner`. | ||||||||||
| - The identity also needs at least read permission on the Fabric SQL database at the item level. | ||||||||||
|
|
||||||||||
| > [!NOTE] | ||||||||||
| > Currently, there's no linked service or Microsoft Entra pass-through support with the Azure SQL connector. | ||||||||||
| > If you use a service principal, it can run as an app (no user context) or as a user if user impersonation is enabled. The service principal must have the required database permissions for the operations you want to perform. | ||||||||||
| ## Use the Azure SQL and SQL Server connector | ||||||||||
| ## Usage and code examples | ||||||||||
|
|
||||||||||
| ### Read data | ||||||||||
| ```python | ||||||||||
| #Read from SQL table using MS SQL Connector | ||||||||||
| print("read data from SQL server table ") | ||||||||||
| jdbcDF = spark.read \ | ||||||||||
| .format("com.microsoft.sqlserver.jdbc.spark") \ | ||||||||||
| .option("url", url) \ | ||||||||||
| .option("dbtable", dbtable) \ | ||||||||||
| .option("user", user) \ | ||||||||||
| .option("password", password).load() | ||||||||||
|
|
||||||||||
| jdbcDF.show(5) | ||||||||||
| ``` | ||||||||||
| In this section, we provide code examples to demonstrate how to use the Spark connector for SQL databases effectively. These examples cover various scenarios, including reading from and writing to SQL tables, and configuring the connector options. | ||||||||||
|
|
||||||||||
| ### Write data | ||||||||||
| ```python | ||||||||||
| try: | ||||||||||
| df.write \ | ||||||||||
| .format("com.microsoft.sqlserver.jdbc.spark") \ | ||||||||||
| .mode("overwrite") \ | ||||||||||
| .option("url", url) \ | ||||||||||
| .option("dbtable", dbtable) \ | ||||||||||
| .option("user", user) \ | ||||||||||
| .option("password", password) \ | ||||||||||
| .save() | ||||||||||
| except ValueError as error : | ||||||||||
| print("MSSQL Connector write failed", error) | ||||||||||
|
|
||||||||||
| print("MSSQL Connector write(overwrite) succeeded ") | ||||||||||
| ``` | ||||||||||
| ### Append data | ||||||||||
| ```python | ||||||||||
| try: | ||||||||||
| df.write \ | ||||||||||
| .format("com.microsoft.sqlserver.jdbc.spark") \ | ||||||||||
| .mode("append") \ | ||||||||||
| .option("url", url) \ | ||||||||||
| .option("dbtable", table_name) \ | ||||||||||
| .option("user", username) \ | ||||||||||
| .option("password", password) \ | ||||||||||
| .save() | ||||||||||
| except ValueError as error : | ||||||||||
| print("Connector write failed", error) | ||||||||||
| ``` | ||||||||||
| ### Supported options | ||||||||||
|
|
||||||||||
| <a name='azure-active-directory-authentication'></a> | ||||||||||
| The minimal required option is `url` as `"jdbc:sqlserver://<server>:<port>;database=<database>;"` or set `spark.mssql.connector.default.url`. | ||||||||||
|
|
||||||||||
| ## Microsoft Entra authentication | ||||||||||
| - When the `url` is provided: | ||||||||||
| - Always use `url` as first preference. | ||||||||||
| - If `spark.mssql.connector.default.url` isn't set, the connector will set it and reuse it for future usage. | ||||||||||
|
|
||||||||||
| ### Python example with service principal | ||||||||||
| ```python | ||||||||||
| import msal | ||||||||||
| - When the `url` isn't provided: | ||||||||||
| - If `spark.mssql.connector.default.url` is set, the connector uses the value from the spark config. | ||||||||||
| - If `spark.mssql.connector.default.url` isn't set, an error is thrown because the required details aren't available. | ||||||||||
|
|
||||||||||
| # Located in App Registrations from Azure Portal | ||||||||||
| tenant_id = "<< tenant id >> " | ||||||||||
| This connector supports the options defined here: [SQL DataSource JDBC Options](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html) | ||||||||||
|
|
||||||||||
| # Located in App Registrations from Azure Portal | ||||||||||
| resource_app_id_url = "https://database.windows.net/" | ||||||||||
| The connector also supports the following options: | ||||||||||
|
|
||||||||||
| # Define scope of the Service for the app registration before requesting from AAD | ||||||||||
| scope ="https://database.windows.net/.default" | ||||||||||
| | Option | Default value | Description | | ||||||||||
| | ----- | ----- | ----- | | ||||||||||
| | `reliabilityLevel` | "BEST_EFFORT" | Controls the reliability of insert operations. Possible values: `BEST_EFFORT` (default, fastest, might result in duplicate rows if an executor restarts), `NO_DUPLICATES` (slower, ensures no duplicate rows are inserted even if an executor restarts). Choose based on your tolerance for duplicates and performance needs. | | ||||||||||
| | `isolationLevel` | "READ_COMMITTED" | Sets the transaction isolation level for SQL operations. Possible values: `READ_COMMITTED` (default, prevents reading uncommitted data), `READ_UNCOMMITTED`, `REPEATABLE_READ`, `SNAPSHOT`, `SERIALIZABLE`. Higher isolation levels can reduce concurrency but improve data consistency. | | ||||||||||
| | `tableLock` | "false" | Controls whether the SQL Server TABLOCK table-level lock hint is used during insert operations. Possible values: `true` (enables TABLOCK, which can improve bulk write performance), `false` (default, doesn't use TABLOCK). Setting to `true` might increase throughput for large inserts but can reduce concurrency for other operations on the table. | | ||||||||||
| | `schemaCheckEnabled` | "true" | Controls whether strict schema validation is enforced between your Spark `DataFrame` and the SQL table. Possible values: `true` (default, enforces strict schema matching), `false` (allows more flexibility and might skip some schema checks). Setting to `false` can help with schema mismatches but might lead to unexpected results if the structures differ significantly. | | ||||||||||
|
|
||||||||||
| # Authority | ||||||||||
| authority = "https://login.microsoftonline.net/" + tenant_id | ||||||||||
| Other [Bulk API options](/sql/connect/jdbc/using-bulk-copy-with-the-jdbc-driver?view=azuresqldb-current#sqlserverbulkcopyoptions&preserve-view=true) can be set as options on the `DataFrame` and are passed to bulk copy APIs on write. | ||||||||||
|
|
||||||||||
| # Get service principal | ||||||||||
| service_principal_id = mssparkutils.credentials.getSecret('azure key vault name','principal_client_id') | ||||||||||
| service_principal_secret = mssparkutils.credentials.getSecret('azure key vault name','principal_secret') | ||||||||||
| ### Write and read example | ||||||||||
|
|
||||||||||
| The following code shows how to write and read data by using the `mssql("<schema>.<table>")` method with automatic Microsoft Entra ID authentication. | ||||||||||
|
|
||||||||||
| context = msal.ConfidentialClientApplication( | ||||||||||
| service_principal_id, service_principal_secret, authority | ||||||||||
| ) | ||||||||||
| > [!TIP] | ||||||||||
| > Data is created inline for demonstration purposes. In a production scenario, you would typically read data from an existing source or create a more complex `DataFrame`. | ||||||||||
| token = app.acquire_token_silent([scope]) | ||||||||||
| access_token = token["access_token"] | ||||||||||
| # [PySpark](#tab/pyspark) | ||||||||||
|
|
||||||||||
| jdbc_df = spark.read \ | ||||||||||
| .format("com.microsoft.sqlserver.jdbc.spark") \ | ||||||||||
| .option("url", url) \ | ||||||||||
| .option("dbtable", dbtable) \ | ||||||||||
| .option("accessToken", access_token) \ | ||||||||||
| .option("encrypt", "true") \ | ||||||||||
| .option("hostNameInCertificate", "*.database.windows.net") \ | ||||||||||
| .load() | ||||||||||
| ```python | ||||||||||
| import com.microsoft.sqlserver.jdbc.spark | ||||||||||
| url = "jdbc:sqlserver://<server>:<port>;database=<database>;" | ||||||||||
| row_data = [("Alice", 1),("Bob", 2),("Charlie", 3)] | ||||||||||
| column_header = ["Name", "Age"] | ||||||||||
| df = spark.createDataFrame(row_data, column_header) | ||||||||||
| df.write.mode("overwrite").option("url", url).mssql("dbo.publicExample") | ||||||||||
| spark.read.option("url", url).mssql("dbo.publicExample").show() | ||||||||||
|
|
||||||||||
| url = "jdbc:sqlserver://<server>:<port>;database=<database2>;" # different database | ||||||||||
| df.write.mode("overwrite").option("url", url).mssql("dbo.tableInDatabase2") # default url is updated | ||||||||||
| spark.read.mssql("dbo.tableInDatabase2").show() # no url option specified and will use database2 | ||||||||||
| ``` | ||||||||||
|
|
||||||||||
| ### Python example with Active Directory password | ||||||||||
| # [Scala Spark](#tab/scalaspark) | ||||||||||
|
|
||||||||||
| ```scala | ||||||||||
| import com.microsoft.sqlserver.jdbc.spark.SparkSqlImplicits._ | ||||||||||
| import org.apache.spark.sql.Row | ||||||||||
| import org.apache.spark.sql.types._ | ||||||||||
| val url = "jdbc:sqlserver://<server>:<port>;database=<database>;" | ||||||||||
| val row_data = Seq( | ||||||||||
| Row("Alice", 2), | ||||||||||
|
||||||||||
| Row("Alice", 2), | |
| Row("Alice", 1), |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This import statement uses Java/Scala syntax in a PySpark code block. Python doesn't use this import syntax. Remove this line or use the correct Python import if one is required for the connector.
| import com.microsoft.sqlserver.jdbc.spark |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The option name 'accesstoken' is inconsistent with typical casing conventions. Consider using 'accessToken' (camelCase) to match the option naming pattern shown in the legacy code examples and common convention.
| df.write.mode("overwrite").option("url", url).option("accesstoken", token).mssql("dbo.publicExample") | |
| spark.read.option("accesstoken", token).mssql("dbo.publicExample").show() | |
| df.write.mode("overwrite").option("url", url).option("accessToken", token).mssql("dbo.publicExample") | |
| spark.read.option("accessToken", token).mssql("dbo.publicExample").show() |
Copilot
AI
Dec 4, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This import statement uses Java/Scala syntax in a PySpark code block. Python doesn't use this import syntax. Remove this line or use the correct Python import if one is required for the connector.
| import com.microsoft.sqlserver.jdbc.spark |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This import statement uses Java/Scala syntax in a PySpark code block. Python doesn't use this import syntax. Remove this line or use the correct Python import if one is required for the connector.