Step 2: Data Loader and Kafka Producer

Now, we’ll create a Python script to read each row of your dataset and send it to a Kafka topic at 30-second intervals. Let’s go through each part of the script in detail.

2.1 Install Necessary Libraries

We must install pandas for data handling and kafka-python for interacting with Kafka. Install them using the following commands:

pip install pandas kafka-python

2.2 Set Up Kafka Broker and Topic

Before running the script, you need a Kafka broker and a topic to publish data. Here’s how you can set it up:

  1. Download and Start Kafka:

    # Start ZooKeeper (in one terminal)
    bin/zookeeper-server-start.sh config/zookeeper.properties
    
    # Start Kafka broker (in another terminal)
    bin/kafka-server-start.sh config/server.properties
    
  2. Create a Kafka Topic:

    bin/kafka-topics.sh --create --topic tweets_raw --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
    

2.3 Write the Data Loader and Kafka Producer Script

Here’s the detailed breakdown of the script:

import pandas as pd
import time
from kafka import KafkaProducer
import json

# Step 1: Load the dataset
df = pd.read_csv('path_to_your_dataset.csv')  # Replace with the path to your dataset file

# Step 2: Configure the Kafka producer
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',  # Kafka broker address
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Convert data to JSON
)

# Step 3: Stream data to Kafka with a 30-second delay
for _, row in df.iterrows():
    # Prepare the message payload
    message = {
        'text': row['text'],
        'timestamp': row['timestamp'],
        'language': row['language'],
        'user_info': row.get('user_info', 'unknown')  # Default to 'unknown' if user_info is missing
    }

    # Send message to Kafka topic
    producer.send('tweets_raw', value=message)
    print(f"Sent message: {message}")

    # Delay to simulate real-time streaming
    time.sleep(30)

Explanation of Each Step:

  1. Loading the Dataset: We load the CSV file containing the tweet data using pandas.
  2. Setting Up the Producer: The KafkaProducer sends messages to the Kafka broker, specifying:
  3. Streaming Data:
  4. Running the Script: