Get Even More Visitors To Your Blog, Upgrade To A Business Listing >>

Custom Partitioning and Analysis using Kafka SQL Windowing

Overview

Apache Kafka uses round-robin fashion to produce messages to multiple partitions. Custom partition technique is used to produce a particular type of message in the defined partition and to make the produced message to be consumed by a particular consumer. This technique allows us to take control over the produced messages. Windowing allows event-time driven analysis and data grouping based on time limits. The three different types of windowing are Tumbling, Session, and Hopping.

In this blog, we will discuss processing Citibike Trip data in the following ways:

  • Partitioning trip data based on user type using the Custom partitioned technique.
  • Analyzing trip details at stream using Kafka SQL Windowing.

Pre-requisites

Install the following:

  • Scala
  • Java
  • Kafka
  • Confluent
  • KSQL

Data Description

Trip dataset of Citi Bike March 2017 is used as the source data. It contains basic details such as trip duration, start time, stop time, station name, station ID, station latitude, and station longitude.

Sample Dataset

Use Case

  • Process Citibike trip data to two different brokers by partitioning the messages according to user types (Subscriber or Customer).
  • Use Kafka SQL Windowing concepts to analyze the following details:
    • Number of trips started at particular time limits using Tumbling Window.
    • Number of trips started using advanced time intervals using Hopping Window.
    • Number of trips started with session intervals using Session Window.

Synopsis

  • Set up Kafka cluster
  • Produce and consume trip details using custom partitioning
  • Create trip data stream
  • Perform streaming analytics using Window Tumbling
  • Perform streaming analytics using Window Session
  • Perform streaming analytics using Window Hopping

Setting Up Kafka Cluster

To setup the cluster on the same server by changing the ports of the brokers in the cluster, perform the following steps:

  • Run ZooKeeper on default port 2181.
    The ZooKeeper data will be stored by default in /tmp/data.
  • Change the default path (/tmp/data) to another path with enough space for non-disrupted producing and consuming.
  • Edit the ZooKeeper configurations in zookeeper.properties file available in the confluent base path etc/kafka/zookeeper.properties as shown in the below diagram:

  • Start the ZooKeeper using the following command:
./bin/zookeeper-server-start etc/kafka/zookeeper.properties
You can view the below ZooKeeper startup screen:

  • Start 1st broker in the cluster by running default Kafka broker in port 9092 and setting broker ID as 0.
    The default log path is /tmp/kafka-logs.
  • Edit the default log path (/tmp/kafka-logs) for starting the 1st broker in the server.properties file available confluent base path.
    vi etc/kafka/server.properties.

  • Start the broker using the following command:
./bin/kafka-server-start etc/kafka/server.properties
 

You can view the 1st broker startup with broker ID 0 and port 9092:

  • Start 2nd broker in the cluster by copying server.properties as server1.properties under etc/kafka/ for configuring 2nd broker in cluster.
  • Edit server1.properties.
    vi etc/kafka/server1.properties.

  • Start the broker using the following command:
./bin/kafka-server-start etc/kafka/server1.properties
You can view the 2nd broker startup with broker ID 1 and port 9093:

  • List the brokers available in the cluster using the following command:
./bin/zookeeper-shell localhost:2181 ls /brokers/ids
You can view the brokers available in the cluster as shown in the below diagram:

In the above case, two brokers are started on the same node. If the brokers are available in different nodes, parallel message processing can be made faster and memory issue can be resolved when a large number of messages are produced by sharing the messages in different nodes memory.

Producing and Consuming Trip Details Using Custom Partitioning

To produce and consume trip details using custom partitioning, perform the following steps:

  • Create topic trip-data with two partitions using the following command:
./bin/kafka-topics --create --zookeeper localhost:2181 --topic trip-data --replication-factor 1 --partitions 1
  • Describe the topic to view the leaders of partitions created.

You can see broker 0 responsible for partition 0 and broker 1 responsible for partition 1 for message transfer as shown in the below diagram:

• Use custom partitioner technique to produce messages.
• Create CustomPartitioner class by overriding partitioner interface using the below commands:

override def partition(topic : String, key : Any, keyBytes : Array[Byte],value : Any, valueBytes : Array[Byte], cluster : Cluster) : Int = {
var partition = 0
val keyInt = Integer.parseInt(key.asInstanceOf[String])
val tripData = value.asInstanceOf[String]
//Gets the UserType from the message produced
val userType = tripData.split(",")(12)
//Assigns the partitions to the messages based on the user types
if("Subscriber".equalsIgnoreCase(userType)) {
partition = 0;
} else if ("Customer".equalsIgnoreCase(userType)){
partition = 1;
}
println("Partition for message "+value+" is "+partition)
partition
}

You can view the Subscriber user type messages produced into partition 0 and Customer user type messages turned to partition 1.

  • Define the CustomPartitioner class in producer properties as shown below:
//Splits messages to particular partitions
props.put("partitioner.class", "com.treselle.core.CustomPartitioner");
  • Define the partitions to the topic in the consumer by assigning different partitions to the consumers as shown below:
val topicPartition = new TopicPartition(TOPIC,partition)
consumer.assign(Collections.singletonList(topicPartition))
  • Pass the partition as input in arguments in the consumer when running multiple consumers with each consumer listening to different partitions.
  • Start multiple consumers with different partitions.
  • Start Consumer1 using the below command:
java -cp custom_partitioner.jar com.treselle.core.ConsumerBasedOnPartition trip-data localhost:9092 0
  • Start Consumer2 using the below command:
java -cp custom_partitioner.jar com.treselle.core.ConsumerBasedOnPartition trip-data localhost:9092 1
  • Produce the trip details by defining the custom partitioner using the below command:
java –cp custom_partitioner.jar com.treselle.core. CustomPartionedProducer trip-data localhost:9092
You can view the consumer 1 consuming only Subscriber messages from Partition 0 and consumer 2 consuming only Customer messages from partition 1.

Consumer1

Consumer2

  • Check the memory of the brokers after consuming all the messages in both consumers.

The memory shared between the brokers and the memory of the brokers’ logs can be viewed in the below diagram:

Here, the Customer messages are consumed by broker localhosy:9092 and Subscriber messages are consumed by the broker localhost:9093. As the Customer messages are less, only less memory is occupied in kafka-logs (localhost:9092).

Creating Trip Data Stream

In KSQL, there is no option to consume the messages based on the partitions. The messages are consumed from all the partitions in the given topic for stream or table creation.

To create trip data stream, perform the following steps:

  • Separate the Subscriber and Customer data using conditions for Window processing.
  • Create trip_data_stream with columns in trip data produced using the following command:
CREATE STREAM
trip_data_stream
(
tripduration BIGINT,
starttime VARCHAR,
stoptime VARCHAR,
start_station_id BIGINT,
start_station_name VARCHAR,
start_station_latitude DOUBLE,
start_station_longitude DOUBLE,
end_station_id BIGINT,
end_station_name VARCHAR,
end_station_latitude DOUBLE,
end_station_longitude DOUBLE,
bikeid INT,usertype VARCHAR,
birth_year VARCHAR,
gender VARCHAR
)
WITH
(
kafka_topic='trip-data',
value_format='DELIMITED'
);
  • Extract Unix TIMESTAMP for Windowing using the start times of trips.
  • Set the extracted start time Unix TIMESTAMP as property of stream for Windowing using the start times of trips instead of the message produced time.
  • Create the stream with extracted Unix TIMESTAMP and the subscriber messages for finding the trip details of the subscribers using the below command:
CREATE STREAM
subscribers_trip_data_stream
WITH
(
TIMESTAMP='startime_timestamp',
PARTITIONS=2
) AS
select
STRINGTOTIMESTAMP(starttime, 'yyyy-MM-dd HH:mm:ss') AS startime_timestamp,
tripduration,
starttime,
usertype
FROM TRIP_DATA_STREAM
where usertype='Subscriber';

Performing Streaming Analytics Using Window Tumbling

Window tumbling groups the data in the given Interval into non-overlapping, fixed-size Windows. It is used in anomaly detection of the stream on a certain time interval. For example, consider tumbling with a time interval of 5 minutes.

To find the number of trips started by subscribers at the interval of 5 minutes, execute the following command:

SELECT
COUNT(*),
starttime
FROM subscribers_trip_data_stream
WINDOW TUMBLING (SIZE 5 MINUTE)
GROUP BY usertype;

From the above result, it is evident that 19 trips have been started at the end of the 4th minute, 25 trips have been started at the end of the 9th minute, and 26 strips have been started at the end of the 14th minute. Thus, the started trips are counted at each given interval of time.

Performing Streaming Analytics Using Window Session

In Window session, data is grouped in a particular session. For example, when a session 1 minute is set and if data is not available in the interval of 1 minute, then a new session is started for grouping the data. For example, consider a session of 1 minute working as stated in the following diagram:

To group start the trip details of the subscribers in the particular session, set the session interval as 20 seconds using the below command:

SELECT
count(*),
starttime
FROM subscribers_trip_data_stream
WINDOW SESSION (20 SECOND)
GROUP BY usertype;

From the above diagram, it is evident that the data grouping is made in the particular interval session. When the data is not available in the interval 20 second, then a new session is started for grouping the data.

For example, consider the time interval between 00:01:09 and 00:01:57. At an interval between 00:01:09 and 00:01:33, you can view no time difference of 20 second or more than that. So, trip counts are incremented. At an interval between 00:01:33 and 00:01:57, you can view an inactivity gap of more than 20 second. So, a new session is started from 57th second.

Performing Streaming Analytics Using Window Hopping

In Window hopping, data are grouped in a given time interval into overlapping Windows by advancing to the given interval of time. For example, consider interval 5 minute with an advanced interval of 1-minute working as shown in the below diagram:

To group start the trip details in the interval of 5 minutes advanced by 1 minute, execute the following command for hopping Window analysis:

SELECT
count(*),
starttime
FROM subscribers_trip_data_stream
WINDOW HOPPING (SIZE 5 MINUTE, ADVANCE BY 1 MINUTE)
GROUP BY usertype;

From the above diagram, it is evident that 5 entries for each record are consumed in the interval of 5 minutes’ size and advanced by 1 minute. Entry size varies based on the interval size and advanced interval given.

In the above example, consider 00:02:12 time record scenario to check the working of the hopping with 5 minutes and advanced 1-minute size given. 00:02:12 scenario has five entries with trip counts 7,7,7,6,1. In 2 minutes, only two advances of 1 minute are made for first three entries. 00:00:00 to 00:02:12 time interval has 7 trips started. 4th entry made an advance of 1 minute. 00:01:00 to 00:02:12 time interval has 6 trips and 5th entry made another advance of 1-minute. So, trip considered from 00:02:00 to 00:02:12 has only 1 trip.

Conclusion

In this blog, we discussed custom partitioning technique to partition the trip details using user type in two different partitions. We also discussed Kafka Windowing concepts such as Window Tumbling, Window Session, and Window Hopping and its working using trips start timings to understand the difference between the types of windowing.

References

  • Sample Citibike Trip Data
  • Apache Kafka Custom Partitioner
  • KSQL Concepts


This post first appeared on Front-end Code Review & Validation Tools | Treselle Systems, please read the originial post: here

Share the post

Custom Partitioning and Analysis using Kafka SQL Windowing

×

Subscribe to Front-end Code Review & Validation Tools | Treselle Systems

Get updates delivered right to your inbox!

Thank you for your subscription

×