Integrating Kafka with PySpark
In this blog we are going to discuss about how to integrate Apache Kafka with Spark using Python and its required configuration.
How Kafka works ?
Kafka is a distributed system consisting of servers and clients that communicate via a high-performance TCP network protocol. It can be deployed on bare-metal hardware, virtual machines, and containers in on-premise as well as cloud environments.
Topics
Brokers
Producers
Consumers
Role of Zookeeper in Kafka
Recap
Version compatibility to integrate Kafka with Spark
A python version with Kafka is compatible with version above 2.7
In order to integrate Kafka with Spark we need to use spark-streaming-kafka packages. The below are the version available for this packages.
It clearly shows that in spark-streaming-kafka-0–10 version the Direct Dstream is available. Using this version we can fetch the data in the form of Data Frame directly.
Therefore in order to run the structured streaming we need to use below mentioned package.
/usr/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark_structured_streaming_kafka_demo.py
Let us see how we decide the package because if we don’t choose the correct package we might face compatibility issues.
-- packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0.
In this package, 0–10 refers to the spark-streaming-kafka version. If we choose to use structured streaming go with 0–10 version and if we choose to go with createStream functions we need to choose 0–8 version.
2.11 refers to the Scala version and 2.3.0 refers to the spark version.
Now based on your configuration choose the corresponding package, otherwise we will face the compatibility issues.
Integrating Kafka with Spark using Python sample code
Kafka Sample commands
Starting the server:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
kafka-server-start /usr/local/etc/kafka/server.properties
Topic creating and its use:
kafka-topics --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic test
kafka-topic --zookeeper localhost:2181 --list
kafka-topic --zookeeper localhost:2181 --topic test --describe
kafka-topic --zookeeper localhost:2181 --topic second --delete
kafka-console-producer --bootstrap-server localhost:9092 --topic test
kafka-console-consumer --bootstrap-server localhost:9092 -- topic test
Producing Data using Python
Consuming Data using Python
Spark code for integration with Kafka
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import math
import string
import random
KAFKA_INPUT_TOPIC_NAME_CONS = “inputmallstream”
KAFKA_OUTPUT_TOPIC_NAME_CONS = “outputmallstream”
KAFKA_BOOTSTRAP_SERVERS_CONS = ‘localhost:9092’
MALL_LONGITUDE=78.446841
MALL_LATITUDE=17.427229
MALL_THRESHOLD_DISTANCE=100
if __name__ == “__main__”:
print(“PySpark Structured Streaming with Kafka Application Started …”)
spark = SparkSession \
.builder \
.appName(“PySpark Structured Streaming with Kafka”) \
.master(“local[*]”) \
.getOrCreate()
# One way of having the customer data in a table
# customer_data_df=spark.sql(“SELECT * FROM SPA_ASSGN_WAREHOUSE.MYMALL_CUSTOMER_DETAILS”)
# Other way is to create a dataframe from the csv file placed in HDFS location
customer_data_df=spark.read.format(“csv”).option(“header”,”true”).option(“delimiter”,”,”).load(“/home/cloudera/workspace/projects/SPAAssignment/data/customer.csv”)
print(“Printing Schema of customer_data_df: “)
customer_data_df.printSchema()
# Construct a streaming DataFrame that reads from testtopic
stream_detail_df = spark \
.readStream \
.format(“kafka”) \
.option(“kafka.bootstrap.servers”, KAFKA_BOOTSTRAP_SERVERS_CONS) \
.option(“subscribe”, KAFKA_INPUT_TOPIC_NAME_CONS) \
.option(“startingOffsets”, “latest”) \
.load()
stream_detail_df = stream_detail_df.selectExpr(“CAST(key AS STRING)”,”CAST(value AS STRING)”, “timestamp”)
split_col = split(stream_detail_df[‘value’], ‘,’)
stream_detail_df = stream_detail_df.withColumn(‘Cust_id’, split_col.getItem(0))
stream_detail_df = stream_detail_df.withColumn(‘IP address’, split_col.getItem(1))
stream_detail_df = stream_detail_df.withColumn(‘Latitude’, split_col.getItem(2).cast(“float”))
stream_detail_df = stream_detail_df.withColumn(‘Longitude’, split_col.getItem(3).cast(“float”))
print(“Printing Schema of stream_detail_df: “)
stream_detail_df.printSchema()
#Obtaining distance in meters using longitude and latitude
stream_detail_df = stream_detail_df.withColumn(‘a’, (
pow(sin(radians(col(“Latitude”) — lit(MALL_LATITUDE)) / 2), 2) +
cos(radians(lit(MALL_LATITUDE))) * cos(radians(col(“Latitude”))) *
pow(sin(radians(col(“Longitude”) — lit(MALL_LONGITUDE)) / 2), 2)
)).withColumn(“distance”, atan2(sqrt(col(“a”)), sqrt(-col(“a”) + 1)) * 12742000)
#Filtering customers based on distance
stream_detail_df = stream_detail_df.drop(“a”)
stream_detail_df = stream_detail_df.filter(col(“distance”) <= MALL_THRESHOLD_DISTANCE)
#Joining Customer stream data with customer dataset
stream_detail_df = stream_detail_df.join(customer_data_df,stream_detail_df.Cust_id == customer_data_df.CustomerID)
#Discount and Coupon generation
stream_detail_df = stream_detail_df.withColumn(“Discount”, when( (col(“Spending Score”) >= 85) | ((col(“Annual Income”) >=30000) & (col(“Spending Score”) >= 65)) , ‘40%’).when(((col(“Spending Score”) >= 65 ) & ( col(“Spending Score”) < 85 )) | ((col(“Annual Income”) >=20000) & (col(“Annual Income”) < 30000) & (col(“Spending Score”) >=45)), ‘30%’).when(((col(“Spending Score”) >= 45 ) & ( col(“Spending Score”) < 65)) | (col(“Annual Income”) >= 15000), ‘20%’).otherwise(“10%”))
stream_detail_df = stream_detail_df.withColumn(“Coupon_Code”, lit(‘’.join(random.choice(string.ascii_uppercase+string.ascii_lowercase+string.digits) for _ in range(16))))
# Write final result into console for debugging purpose
customer_detail_write_stream = stream_detail_df \
.writeStream \
.trigger(processingTime=’1 seconds’) \
.outputMode(“update”) \
.option(“truncate”, “false”)\
.format(“console”) \
.start()
# Output topic dataframe creation by selecting required columns
final_stream_df = stream_detail_df.selectExpr(“CustomerID”,”distance”,”Discount”,”Coupon_Code”)
final_stream_df = final_stream_df.withColumn(“key”,rand()*3)
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
customer_detail_write_stream_1 = final_stream_df \
.selectExpr(“CAST(key AS STRING)”, “to_json(struct(*)) AS value”) \
.writeStream \
.format(“kafka”) \
.option(“kafka.bootstrap.servers”, KAFKA_BOOTSTRAP_SERVERS_CONS) \
.option(“topic”, KAFKA_OUTPUT_TOPIC_NAME_CONS) \
.trigger(processingTime=’1 seconds’) \
.outputMode(“update”) \
.option(“checkpointLocation”, “/home/cloudera/workspace/SPAAssignment/chkpoint”) \
.start()
customer_detail_write_stream.awaitTermination()
print(“PySpark Structured Streaming with Kafka Application Completed.”)
Hope you liked this blog. Happy Learning!!!!!