Now, we’ll create a Python script to read each row of your dataset and send it to a Kafka topic at 30-second intervals. Let’s go through each part of the script in detail.
We must install pandas
for data handling and kafka-python
for interacting with Kafka. Install them using the following commands:
pip install pandas kafka-python
Before running the script, you need a Kafka broker and a topic to publish data. Here’s how you can set it up:
Download and Start Kafka:
# Start ZooKeeper (in one terminal)
bin/zookeeper-server-start.sh config/zookeeper.properties
# Start Kafka broker (in another terminal)
bin/kafka-server-start.sh config/server.properties
Create a Kafka Topic:
tweets_raw
.bin/kafka-topics.sh --create --topic tweets_raw --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Here’s the detailed breakdown of the script:
import pandas as pd
import time
from kafka import KafkaProducer
import json
# Step 1: Load the dataset
df = pd.read_csv('path_to_your_dataset.csv') # Replace with the path to your dataset file
# Step 2: Configure the Kafka producer
producer = KafkaProducer(
bootstrap_servers='localhost:9092', # Kafka broker address
value_serializer=lambda v: json.dumps(v).encode('utf-8') # Convert data to JSON
)
# Step 3: Stream data to Kafka with a 30-second delay
for _, row in df.iterrows():
# Prepare the message payload
message = {
'text': row['text'],
'timestamp': row['timestamp'],
'language': row['language'],
'user_info': row.get('user_info', 'unknown') # Default to 'unknown' if user_info is missing
}
# Send message to Kafka topic
producer.send('tweets_raw', value=message)
print(f"Sent message: {message}")
# Delay to simulate real-time streaming
time.sleep(30)
pandas
.KafkaProducer
sends messages to the Kafka broker, specifying:
bootstrap_servers
as localhost:9092
, where Kafka is running.value_serializer
to convert the data to JSON before sending.message
) containing text
, timestamp
, language
, and optionally user_info
.message
is sent to the tweets_raw
Kafka topic using producer.send
.time.sleep(30)
introduces a 30-second delay between messages to simulate live streaming.python data_loader_script.py
(assuming you save it as data_loader_script.py
).Sent message: ...
) every 30 seconds in your console.