-
Notifications
You must be signed in to change notification settings - Fork 179
Open
Description
Description
I'm using pyspark to write to eventhubs using this project. There is a new local eventhub emulator I'm using for testing locally https://github.com/Azure/azure-event-hubs-emulator-installer. But I get com.microsoft.azure.eventhubs.CommunicationException: Connection refused
error when I try to use it with pyspark. It works if I use python azure eventhub packages.
Actual behavior
I get the error
Traceback (most recent call last):
File "/Users/user/Projects/azure-event-hubs-emulator-installer/send_via_spark.py", line 76, in <module>
serialized_rows.select("body").write.format("eventhubs").options(**ehConf).save()
File "/Users/user/Projects/azure-event-hubs-emulator-installer/.venv/lib/python3.10/site-packages/pyspark/sql/readwriter.py", line 1461, in save
self._jwrite.save()
File "/Users/user/Projects/azure-event-hubs-emulator-installer/.venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/Users/user/Projects/azure-event-hubs-emulator-installer/.venv/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
return f(*a, **kw)
File "/Users/user/Projects/azure-event-hubs-emulator-installer/.venv/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o86.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 4.0 failed 1 times, most recent failure: Lost task 6.0 in stage 4.0 (TID 26) (192.168.68.105 executor driver): com.microsoft.azure.eventhubs.CommunicationException: Connection refused
Expected behavior
I should be able to change the connection string and consumer group and it should work as if it is a remote/hosted/real eventhub.
Versions
Python: 3.10.13
Python virtualenv (pip freeze):
aiohappyeyeballs==2.4.0
aiohttp==3.10.5
aiosignal==1.3.1
async-timeout==4.0.3
attrs==24.2.0
azure-core==1.30.2
azure-eventhub==5.12.1
azure-eventhub-checkpointstoreblob-aio==1.1.4
certifi==2024.7.4
cffi==1.17.0
charset-normalizer==3.3.2
cryptography==43.0.0
frozenlist==1.4.1
idna==3.8
isodate==0.6.1
msrest==0.7.1
multidict==6.0.5
numpy==2.1.0
oauthlib==3.2.2
pandas==2.2.2
py4j==0.10.9.7
pycparser==2.22
pyspark==3.5.2
python-dateutil==2.9.0.post0
pytz==2024.1
requests==2.32.3
requests-oauthlib==2.0.0
six==1.16.0
typing_extensions==4.12.2
tzdata==2024.1
urllib3==2.2.2
wrapt==1.16.0
yarl==1.9.4
azure-event-hubs-spark: com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22
Reproduce
Working using azure eventhub python package
I use 3 scripts to test.
- Get the local evenhub emulator running, following instructions via https://github.com/Azure/azure-event-hubs-emulator-installer repo.
- Run
listen.py
in one terminal.
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
EVENT_HUB_CONNECTION_STR = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
EVENT_HUB_NAME = "eh1"
CONSUMER_GROUP = "cg1"
async def on_event(partition_context, event):
# Print the event data.
print(
'Received the event: "{}" from the partition with ID: "{}"'.format(
event.body_as_str(encoding="UTF-8"), partition_context.partition_id
)
)
async def main():
# Create a consumer client for the event hub.
client = EventHubConsumerClient.from_connection_string(
EVENT_HUB_CONNECTION_STR,
consumer_group=CONSUMER_GROUP,
eventhub_name=EVENT_HUB_NAME,
)
async with client:
# Call the receive method. Read from the beginning of the
# partition (starting_position: "-1")
await client.receive(on_event=on_event, starting_position="-1")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
# Run the main method.
loop.run_until_complete(main())
- Run
send.py
in another terminal.
import asyncio
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient
EVENT_HUB_CONNECTION_STR = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
EVENT_HUB_NAME = "eh1"
async def run():
# Create a producer client to send messages to the event hub.
# Specify a connection string to your event hubs namespace and
# the event hub name.
producer = EventHubProducerClient.from_connection_string(
conn_str=EVENT_HUB_CONNECTION_STR, eventhub_name=EVENT_HUB_NAME
)
async with producer:
# Create a batch.
event_data_batch = await producer.create_batch()
# Add events to the batch.
event_data_batch.add(EventData("First event "))
event_data_batch.add(EventData("Second event"))
event_data_batch.add(EventData("Third event"))
# Send the batch of events to the event hub.
await producer.send_batch(event_data_batch)
asyncio.run(run())
- In the
listen.py
terminal I receive events
> python listen.py
Received the event: "First event " from the partition with ID: "0"
Received the event: "Second event" from the partition with ID: "0"
Received the event: "Third event" from the partition with ID: "0"
Not working using pyspark and azure-event-hubs-spark
- Get the local evenhub emulator running, following instructions via https://github.com/Azure/azure-event-hubs-emulator-installer repo.
- Run the same
listen.py
script in one terminal. - Run
send_via_spark.py
in another terminal.
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql import SparkSession
from pyspark.sql.functions import struct, to_json
from pyspark import SparkContext as sc
EVENT_HUB_NAME = "eh1"
EVENT_HUB_CONNECTION_STR = f"Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;EntityPath={EVENT_HUB_NAME}"
spark = (
SparkSession.builder.appName("EventHubs")
.config(
"spark.jars.packages",
"com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22,org.apache.hadoop:hadoop-azure:3.4.0", # noqa
)
.config("spark.sql.shuffle.partitions", 5)
.config("spark.hadoop.fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
.config("spark.hadoop.fs.azure.account.auth.type", "OAuth")
.config(
"spark.hadoop.fs.azure.account.oauth.provider.type",
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", # noqa
)
.config("spark.hadoop.fs.azure.test.emulator", "true")
.config(
"fs.azure.storage.emulator.account.name",
"devstoreaccount1.blob.windows.core.net",
)
.config(
"spark.hadoop.fs.azure.storage.emulator.account.name",
"devstoreaccount1.blob.windows.core.net",
)
.getOrCreate()
)
ehConf = {
"eventhubs.connectionString": spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
EVENT_HUB_CONNECTION_STR
)
}
# ehConf["eventhubs.consumerGroup"] = "$Default"
ehConf["eventhubs.consumerGroup"] = "cg1"
df = spark.createDataFrame(
[
("1234", "1", "2021-01-01 00:00:00"),
("1234", "2", "2021-01-01 00:00:01"),
("1234", "3", "2021-01-01 00:00:02"),
],
["cardNumber", "transactionId", "transactionTime"],
)
Schema = StructType(
[
StructField("cardNumber", StringType(), True),
StructField("transactionId", StringType(), True),
StructField("transactionTime", StringType(), True),
]
)
serialized_rows = df.select(to_json(struct([df[x] for x in df.columns])).alias("body"))
# print(serialized_rows.select("body").toPandas())
serialized_rows.show(truncate=False)
print([r["body"] for r in serialized_rows.select("body").collect()])
print(ehConf)
serialized_rows.select("body").write.format("eventhubs").options(**ehConf).save()
- In the
listen.py
terminal I receive nothing and in thesend_via_spark.py
terminal I get a rather lengthy error message
error.log
Metadata
Metadata
Assignees
Labels
No labels