Integrating Kafka with PySpark

Karthik Sharma
5 min readJan 16, 2021

In this blog we are going to discuss about how to integrate Apache Kafka with Spark using Python and its required configuration.

How Kafka works ?

Kafka is a distributed system consisting of servers and clients that communicate via a high-performance TCP network protocol. It can be deployed on bare-metal hardware, virtual machines, and containers in on-premise as well as cloud environments.

Topics

Brokers

Producers

Consumers

Role of Zookeeper in Kafka

Recap

Version compatibility to integrate Kafka with Spark

A python version with Kafka is compatible with version above 2.7

In order to integrate Kafka with Spark we need to use spark-streaming-kafka packages. The below are the version available for this packages.

It clearly shows that in spark-streaming-kafka-0–10 version the Direct Dstream is available. Using this version we can fetch the data in the form of Data Frame directly.

Therefore in order to run the structured streaming we need to use below mentioned package.

/usr/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark_structured_streaming_kafka_demo.py

Let us see how we decide the package because if we don’t choose the correct package we might face compatibility issues.

-- packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0.

In this package, 0–10 refers to the spark-streaming-kafka version. If we choose to use structured streaming go with 0–10 version and if we choose to go with createStream functions we need to choose 0–8 version.

2.11 refers to the Scala version and 2.3.0 refers to the spark version.

Now based on your configuration choose the corresponding package, otherwise we will face the compatibility issues.

Integrating Kafka with Spark using Python sample code

Kafka Sample commands

Starting the server:

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

kafka-server-start /usr/local/etc/kafka/server.properties

Topic creating and its use:

kafka-topics --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic test

kafka-topic --zookeeper localhost:2181 --list

kafka-topic --zookeeper localhost:2181 --topic test --describe

kafka-topic --zookeeper localhost:2181 --topic second --delete

kafka-console-producer --bootstrap-server localhost:9092 --topic test

kafka-console-consumer --bootstrap-server localhost:9092 -- topic test

Producing Data using Python

Consuming Data using Python

Spark code for integration with Kafka

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import math
import string
import random

KAFKA_INPUT_TOPIC_NAME_CONS = “inputmallstream”
KAFKA_OUTPUT_TOPIC_NAME_CONS = “outputmallstream”
KAFKA_BOOTSTRAP_SERVERS_CONS = ‘localhost:9092’
MALL_LONGITUDE=78.446841
MALL_LATITUDE=17.427229
MALL_THRESHOLD_DISTANCE=100

if __name__ == “__main__”:

print(“PySpark Structured Streaming with Kafka Application Started …”)

spark = SparkSession \
.builder \
.appName(“PySpark Structured Streaming with Kafka”) \
.master(“local[*]”) \
.getOrCreate()

# One way of having the customer data in a table
# customer_data_df=spark.sql(“SELECT * FROM SPA_ASSGN_WAREHOUSE.MYMALL_CUSTOMER_DETAILS”)
# Other way is to create a dataframe from the csv file placed in HDFS location

customer_data_df=spark.read.format(“csv”).option(“header”,”true”).option(“delimiter”,”,”).load(“/home/cloudera/workspace/projects/SPAAssignment/data/customer.csv”)

print(“Printing Schema of customer_data_df: “)
customer_data_df.printSchema()

# Construct a streaming DataFrame that reads from testtopic
stream_detail_df = spark \
.readStream \
.format(“kafka”) \
.option(“kafka.bootstrap.servers”, KAFKA_BOOTSTRAP_SERVERS_CONS) \
.option(“subscribe”, KAFKA_INPUT_TOPIC_NAME_CONS) \
.option(“startingOffsets”, “latest”) \
.load()

stream_detail_df = stream_detail_df.selectExpr(“CAST(key AS STRING)”,”CAST(value AS STRING)”, “timestamp”)

split_col = split(stream_detail_df[‘value’], ‘,’)

stream_detail_df = stream_detail_df.withColumn(‘Cust_id’, split_col.getItem(0))

stream_detail_df = stream_detail_df.withColumn(‘IP address’, split_col.getItem(1))

stream_detail_df = stream_detail_df.withColumn(‘Latitude’, split_col.getItem(2).cast(“float”))

stream_detail_df = stream_detail_df.withColumn(‘Longitude’, split_col.getItem(3).cast(“float”))

print(“Printing Schema of stream_detail_df: “)
stream_detail_df.printSchema()

#Obtaining distance in meters using longitude and latitude

stream_detail_df = stream_detail_df.withColumn(‘a’, (
pow(sin(radians(col(“Latitude”) — lit(MALL_LATITUDE)) / 2), 2) +
cos(radians(lit(MALL_LATITUDE))) * cos(radians(col(“Latitude”))) *
pow(sin(radians(col(“Longitude”) — lit(MALL_LONGITUDE)) / 2), 2)
)).withColumn(“distance”, atan2(sqrt(col(“a”)), sqrt(-col(“a”) + 1)) * 12742000)

#Filtering customers based on distance

stream_detail_df = stream_detail_df.drop(“a”)

stream_detail_df = stream_detail_df.filter(col(“distance”) <= MALL_THRESHOLD_DISTANCE)

#Joining Customer stream data with customer dataset

stream_detail_df = stream_detail_df.join(customer_data_df,stream_detail_df.Cust_id == customer_data_df.CustomerID)

#Discount and Coupon generation

stream_detail_df = stream_detail_df.withColumn(“Discount”, when( (col(“Spending Score”) >= 85) | ((col(“Annual Income”) >=30000) & (col(“Spending Score”) >= 65)) , ‘40%’).when(((col(“Spending Score”) >= 65 ) & ( col(“Spending Score”) < 85 )) | ((col(“Annual Income”) >=20000) & (col(“Annual Income”) < 30000) & (col(“Spending Score”) >=45)), ‘30%’).when(((col(“Spending Score”) >= 45 ) & ( col(“Spending Score”) < 65)) | (col(“Annual Income”) >= 15000), ‘20%’).otherwise(“10%”))

stream_detail_df = stream_detail_df.withColumn(“Coupon_Code”, lit(‘’.join(random.choice(string.ascii_uppercase+string.ascii_lowercase+string.digits) for _ in range(16))))

# Write final result into console for debugging purpose
customer_detail_write_stream = stream_detail_df \
.writeStream \
.trigger(processingTime=’1 seconds’) \
.outputMode(“update”) \
.option(“truncate”, “false”)\
.format(“console”) \
.start()

# Output topic dataframe creation by selecting required columns

final_stream_df = stream_detail_df.selectExpr(“CustomerID”,”distance”,”Discount”,”Coupon_Code”)

final_stream_df = final_stream_df.withColumn(“key”,rand()*3)

# Write key-value data from a DataFrame to a specific Kafka topic specified in an option

customer_detail_write_stream_1 = final_stream_df \
.selectExpr(“CAST(key AS STRING)”, “to_json(struct(*)) AS value”) \
.writeStream \
.format(“kafka”) \
.option(“kafka.bootstrap.servers”, KAFKA_BOOTSTRAP_SERVERS_CONS) \
.option(“topic”, KAFKA_OUTPUT_TOPIC_NAME_CONS) \
.trigger(processingTime=’1 seconds’) \
.outputMode(“update”) \
.option(“checkpointLocation”, “/home/cloudera/workspace/SPAAssignment/chkpoint”) \
.start()

customer_detail_write_stream.awaitTermination()

print(“PySpark Structured Streaming with Kafka Application Completed.”)

Hope you liked this blog. Happy Learning!!!!!

--

--