top of page
Search
  • Writer's pictureHimani Gadve

Real time data streaming using Kafka cluster and data transformation using Apache Flink



As the world becomes increasingly digital, businesses are constantly looking for new ways to analyze their data to gain a competitive advantage. When it comes to e-commerce retailer selling online fresh groceries products, it was struggling to keep up with their competition due to a lack of insight into their customer’s behavior. It needs to better understand how customers were interacting with their website to optimize the user experience, increase sales, and retain customers. To do this, we need to collect and analyze customer clickstream data in real-time. Embarked on a data engineering project to build a real-time clickstream processing pipeline to gain the insights needed from the customer data. In this post, we will explore what Apache Kafka and Flink is, Clickstream data conversion and why it is important, how to measure it, data engineering workflow to set up a data ingestion, data analysis, storage and creating a dashboard to get meaningful insights using Apache Kafka and Apache Flink.

What is Kafka?

Kafka is a distributed streaming platform designed to handle high-volume, real-time data streams. It is widely used in modern data engineering pipelines to efficiently manage and process large amounts of data in real-time. Kafka provides a high level of scalability, fault-tolerance, and durability, making it an ideal choice for applications that require continuous processing of streaming data. With Kafka, data can be published, consumed, and processed in real-time, providing businesses with a real-time view of their data.

What is Kafka Connect?

Kafka Connect is an open-source tool for scalable and reliable data streaming between Apache Kafka and other systems. It enables seamless data integration from various sources and sinks to Kafka. With Kafka Connect, users can easily build and deploy data pipelines for streaming data processing.

Kafka Connect S3 Sink is a Kafka Connect plugin that enables data to be written from Kafka topics to an Amazon S3 bucket. This is a powerful tool for data engineers as it provides an easy and efficient way to store data in S3, which can be further processed for analysis and insights. The Kafka Connect S3 Sink is highly scalable and fault-tolerant, making it an ideal solution for high-volume data processing pipelines.

What is Apache Flink?

Apache Flink is a distributed, open-source stream processing framework that provides a unified platform for processing both batch and real-time data. It is designed to handle complex, high-volume data streams with low latency and high throughput. Flink offers a variety of APIs and connectors for data processing, making it easy to integrate with other tools and systems in a data engineering pipeline. It also provides a rich set of features for stream processing, including state management, fault tolerance, and support for complex event processing. With Flink, businesses can process and analyze real-time data streams quickly and efficiently.

Project Architecture:


Design approach:

Step 1: Collecting Clickstream Data

To begin with, we need to collect clickstream data from the mobile API. For this, we will use Kafka cluster as our messaging system. Kafka provides high-throughput and low-latency streaming data processing capabilities.

from kafka import KafkaConsumer
import json
import boto3
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import *
from awsglue.gluetypes import *
from pyspark.sql.functions import *

# Step 1: Collecting Clickstream Data
consumer = KafkaConsumer('clickstream', bootstrap_servers=['localhost:9092'])

Step 2: Storing Data in S3

Next, we will use Kafka Connect S3 sink to load the clickstream data into an S3 bucket. Kafka Connect S3 sink is a connector that can be used to write data from Kafka topics to S3 buckets. It provides a fault-tolerant, scalable, and efficient way of writing data to S3.

# Step 4: Write to Kafka Connect Sink Connector
import requests

url = 'http://localhost:8083/connectors'
data = {
    "name": "s3-sink",
    "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "topics": "clickstream",
        "s3.bucket.name": "clickstream-analytics",
        "s3.region": "us-west-2",
        "s3.part.size": "5242880",
        "flush.size": "1",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
        "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
        "schema.compatibility": "BACKWARD",
        "transforms": "HoistField",
        "transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",
        "transforms.HoistField.field": "value"
    }
}

headers = {
    'Content-Type': 'application/json'
}

response = requests.post(url, headers=headers, json=data)
print(response.content)

Step 3: Real-Time Processing using Apache Flink

Once the data is stored in S3, we will use Apache Flink to process the data in real-time. Apache Flink is an open-source stream processing framework that can be used to process data in real-time. It provides support for a wide range of data sources and sinks.

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
# Step 2: Real-Time Processing using Apache Flink

env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

stream = env.add_source(consumer, 'kafka')
table = t_env.from_data_stream(stream)

Step 4: Data Analysis and Transformation

In this step, we will perform data analysis and transformation on the clickstream data using Apache Flink. We will calculate metrics such as user behavior analysis based on the number of pages visited, fallback time, clickthrough rates on the product, and conversion rate from click to adding to a cart.

# Step 3: Data Analysis and Transformation
result = table.group_by('user_id').select('user_id, count(page_visited), avg(fallback_time), \
                sum(case when product_clicked = true then 1 else 0 end) as clickthrough_rate, \
                sum(case when added_to_cart = true then 1 else 0 end) as conversion_rate')

Step 5: Storing Processed Data in S3

Once the data is processed and analyzed, we will write the results back to an S3 bucket. This data will be used for further analysis and reporting.

# Step 5: Reading Processed Data from S3
s3 = boto3.resource('s3')
bucket_name = 'clickstream-analytics'
key = 'clickstream-analytics.csv'
bucket = s3.Bucket(bucket_name)
obj = bucket.Object(key)
body = obj.get()['Body'].read().decode('utf-8')

# Step 6: Processing Data with Glue
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Read the data from the S3 bucket into a DynamicFrame
dynamic_frame = glueContext.create_dynamic_frame_from_options(
    'csv',
    {'paths': [f's3://{bucket_name}/{key}'], 'delimiter': ','},
    format='csv',
    format_options={'withHeader': True}
)

# Convert the DynamicFrame to a DataFrame
data_frame = dynamic_frame.toDF()

# Do some data transformations
data_frame = data_frame.withColumn('avg_fallback_time', data_frame['avg_fallback_time'].cast('float'))

# Convert the DataFrame back to a DynamicFrame
dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, 'dynamic_frame')

# Write the data back to S3 as a Parquet file
glueContext.write_dynamic_frame.from_options(
    frame=dynamic_frame,
    connection_type='s3',
    connection_options={
        'path': f's3://{bucket_name}/processed-data/',
        'partitionKeys': ['event_date']
    },
    format='parquet'
)

Step 6: Create and update a Glue catalog to sync a transformed data from S3 bucket to Glue table which can be queries using AWS Athena for adhoc analysis

Explanation: In this step, a new Glue catalog is created to sync the clean, transformed data from the S3 bucket to the Glue table.

Step 7: Creating a Dashboard

Finally, we will create a dashboard to provide insights into the clickstream data. The dashboard will be connected to AWS Athena.

Sample Script using Python:

# Step 7: Creating a Glue Catalog Database and Table
from awsglue.catalog import Database

db_name = 'clickstream_db'
table_name = 'clickstream_table'

# Create the database if it doesn't already exist
if not glueContext.database_exists(db_name):
    glueContext.create_database(db_name)

# Create the table schema
table_schema = {
    'columns': [
        {'name': 'user_id', 'type': 'string'},
        {'name': 'page_visited_count', 'type': 'int'},
        {'name': 'avg_fallback_time', 'type': 'float'},
        {'name': 'clickthrough_rate', 'type': 'int'},
        {'name': 'conversion_rate', 'type': 'int'},
        {'name': 'event_date', 'type': 'date'}
    ],
    'partitionKeys': ['event_date']
}

# Create the table
glueContext.create_dynamic_frame.from_options(
    's3',
    {'paths': [f's3://{bucket_name}/processed-data/']},
    format='parquet'
).toDF().write.partitionBy('event_date').mode('overwrite').saveAsTable(f'{db_name}.{table_name}')

print(f"Glue Catalog database {db_name} and table {table_name} created successfully.")

Here is a full script in Python that demonstrates the above pipeline:

from kafka import KafkaConsumer
import json
import boto3
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import *
from awsglue.gluetypes import *
from pyspark.sql.functions import *


# Step 1: Collecting Clickstream Data
consumer = KafkaConsumer('clickstream', bootstrap_servers=['localhost:9092'])

# Step 2: Real-Time Processing using Apache Flink
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

stream = env.add_source(consumer, 'kafka')
table = t_env.from_data_stream(stream)

# Step 3: Data Analysis and Transformation
result = table.group_by('user_id').select('user_id, count(page_visited), avg(fallback_time), \
                sum(case when product_clicked = true then 1 else 0 end) as clickthrough_rate, \
                sum(case when added_to_cart = true then 1 else 0 end) as conversion_rate')

# Step 4: Write to Kafka Connect Sink Connector
import requests

url = 'http://localhost:8083/connectors'
data = {
    "name": "s3-sink",
    "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "topics": "clickstream",
        "s3.bucket.name": "clickstream-analytics",
        "s3.region": "us-west-2",
        "s3.part.size": "5242880",
        "flush.size": "1",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
        "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
        "schema.compatibility": "BACKWARD",
        "transforms": "HoistField",
        "transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",
        "transforms.HoistField.field": "value"
    }
}

headers = {
    'Content-Type': 'application/json'
}

response = requests.post(url, headers=headers, json=data)
print(response.content)

# Step 5: Reading Processed Data from S3
s3 = boto3.resource('s3')
bucket_name = 'clickstream-analytics'
key = 'clickstream-analytics.csv'
bucket = s3.Bucket(bucket_name)
obj = bucket.Object(key)
body = obj.get()['Body'].read().decode('utf-8')

# Step 6: Processing Data with Glue
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Read the data from the S3 bucket into a DynamicFrame
dynamic_frame = glueContext.create_dynamic_frame_from_options(
    'csv',
    {'paths': [f's3://{bucket_name}/{key}'], 'delimiter': ','},
    format='csv',
    format_options={'withHeader': True}
)

# Convert the DynamicFrame to a DataFrame
data_frame = dynamic_frame.toDF()

# Do some data transformations
data_frame = data_frame.withColumn('avg_fallback_time', data_frame['avg_fallback_time'].cast('float'))

# Convert the DataFrame back to a DynamicFrame
dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, 'dynamic_frame')

# Write the data back to S3 as a Parquet file
glueContext.write_dynamic_frame.from_options(
    frame=dynamic_frame,
    connection_type='s3',
    connection_options={
        'path': f's3://{bucket_name}/processed-data/',
        'partitionKeys': ['event_date']
    },
    format='parquet'
)

# Step 7: Creating a Glue Catalog Database and Table
from awsglue.catalog import Database

db_name = 'clickstream_db'
table_name = 'clickstream_table'

# Create the database if it doesn't already exist
if not glueContext.database_exists(db_name):
    glueContext.create_database(db_name)

# Create the table schema
table_schema = {
    'columns': [
        {'name': 'user_id', 'type': 'string'},
        {'name': 'page_visited_count', 'type': 'int'},
        {'name': 'avg_fallback_time', 'type': 'float'},
        {'name': 'clickthrough_rate', 'type': 'int'},
        {'name': 'conversion_rate', 'type': 'int'},
        {'name': 'event_date', 'type': 'date'}
    ],
    'partitionKeys': ['event_date']
}

# Create the table
glueContext.create_dynamic_frame.from_options(
    's3',
    {'paths': [f's3://{bucket_name}/processed-data/']},
    format='parquet'
).toDF().write.partitionBy('event_date').mode('overwrite').saveAsTable(f'{db_name}.{table_name}')

print(f"Glue Catalog database {db_name} and table {table_name} created successfully.")
6 views0 comments

Comments


bottom of page