Here's a detailed Python script implementing Step 1: Data Collection and Storage with RSSHub and Kafka integration. The code polls RSSHub for multilingual news feeds, parses them, converts them to JSON, and streams them to Kafka.


Step 1: Set Up RSSHub for Aggregating News Feeds

  1. Install Docker (if not already installed):
  2. Run RSSHub with Docker:
  3. Select News Outlets and Languages:

Step 2: Fetch RSS Feeds and Stream to Kafka

  1. Install Required Python Packages:

    
    pip install feedparser kafka-python boto3
    
    

Create the Python Script for RSS Feeds:

import feedparser
import json
import os
import time
from datetime import datetime
from kafka import KafkaProducer
import boto3

# Configuration for RSS feeds, Kafka, and S3
feeds = 
    'french': '<http://localhost:1200/lemonde>',
    'spanish': '<http://localhost:1200/el_pais/internacional>',
}
kafka_topic_prefix = 'news_'
kafka_broker = 'localhost:9092'
local_storage_dir = './data/'
bucket_name = 'your-s3-bucket-name'

# Set up Kafka and S3 clients
producer = KafkaProducer(
    bootstrap_servers=kafka_broker,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
s3_client = boto3.client('s3')
os.makedirs(local_storage_dir, exist_ok=True)

# Save locally and upload to S3
def save_locally_and_upload(language, articles):
    filename = f"{local_storage_dir}{language}_news_{datetime.now().strftime('%Y%m%d%H%M%S')}.json"
    with open(filename, 'w') as f:
        json.dump(articles, f, indent=4)
    print(f"Saved batch locally: {filename}")
    # Upload to S3
    s3_key = f"{language}/{os.path.basename(filename)}"
    s3_client.upload_file(filename, bucket_name, s3_key)
    print(f"Uploaded to S3: {s3_key}")
    # Optionally delete the local file after upload
    os.remove(filename)

# Fetch, send to Kafka, and store
def fetch_feed(url):
    feed = feedparser.parse(url)
    articles = []
    for entry in feed.entries:
        article = {
            'title': entry.title,
            'summary': entry.summary,
            'published': entry.published,
            'link': entry.link
        }
        articles.append(article)
    return articles

def send_to_kafka(language, articles):
    topic = f"{kafka_topic_prefix}{language}"
    for article in articles:
        producer.send(topic, {'language': language, 'data': article})
        print(f"Sent to {topic}: {article['title']}")

# Main function with 1-minute timer
def main():
    start_time = time.time()
    run_duration = 60  # Run for 1 minute (60 seconds)

    while time.time() - start_time < run_duration:
        for language, url in feeds.items():
            articles = fetch_feed(url)
            send_to_kafka(language, articles)
            save_locally_and_upload(language, articles)
        time.sleep(5)  # Adjust this to control fetch frequency within the minute

    print("Stopped streaming after 1 minute.")

if __name__ == "__main__":
    main()

##### Code for full streaming

import feedparser
import json
import os
from kafka import KafkaProducer
from datetime import datetime
import time
import boto3

# Configuration
feeds = {
    'english': '<http://localhost:1200/bbc/english>',
    'spanish': '<http://localhost:1200/el_pais/internacional>',
}
kafka_topic_prefix = 'news_'
kafka_broker = 'localhost:9092'
local_storage_dir = './data/'
bucket_name = 'your-s3-bucket-name'  # Replace with your actual S3 bucket name

# Set up Kafka and S3 clients
producer = KafkaProducer(
    bootstrap_servers=kafka_broker,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
s3_client = boto3.client('s3')
os.makedirs(local_storage_dir, exist_ok=True)

# Save locally and upload to S3
def save_locally_and_upload(language, articles):
    filename = f"{local_storage_dir}{language}_news_{datetime.now().strftime('%Y%m%d%H%M%S')}.json"
    with open(filename, 'w') as f:
        json.dump(articles, f, indent=4)
    print(f"Saved batch locally: {filename}")
    # Upload to S3
    s3_key = f"{language}/{os.path.basename(filename)}"
    s3_client.upload_file(filename, bucket_name, s3_key)
    print(f"Uploaded to S3: {s3_key}")
    # Optionally delete the local file after upload
    os.remove(filename)

# Fetch, send to Kafka, and store
def fetch_feed(url):
    feed = feedparser.parse(url)
    articles = []
    for entry in feed.entries:
        article = {
            'title': entry.title,
            'summary': entry.summary,
            'published': entry.published,
            'link': entry.link
        }
        articles.append(article)
    return articles

def send_to_kafka(language, articles):
    topic = f"{kafka_topic_prefix}{language}"
    for article in articles:
        producer.send(topic, {'language': language, 'data': article})
        print(f"Sent to {topic}: {article['title']}")

# Main function
def main():
    while True:
        for language, url in feeds.items():
            articles = fetch_feed(url)
            send_to_kafka(language, articles)
            save_locally_and_upload(language, articles)
        time.sleep(300)  # Fetch every 5 minutes

if __name__ == "__main__":
    main()