Kafka + Spark
Kafka is installed on our Cluster. We will be using it for an assignment, and it could be used for your Project.
Spark
In Spark, you can read a Kafka stream from our cluster, and convert the byte strings Kafka produces into usable strings like this:
messages = spark.readStream.format('kafka') \
.option('kafka.bootstrap.servers', 'node1.local:9092,node2.local:9092') \
.option('subscribe', topic).load()
values = messages.select(messages['value'].cast('string'))
Please be cautious about how long your job runs. Likely, you don't want an infinite-listening process and should start the stream like this:
stream = streaming_df.….start()
stream.awaitTermination(600)
And you can run with the necessary Spark package with a command line like:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 job.py
Non-Spark Python
You can also access Kafka with the kafka-python package like this:
from kafka import KafkaConsumer
consumer = KafkaConsumer(topic, bootstrap_servers=['node1.local', 'node2.local'])
Updated Thu Aug. 22 2024, 11:06 by ggbaker.