Here's a detailed Python script implementing Step 1: Data Collection and Storage with RSSHub and Kafka integration. The code polls RSSHub for multilingual news feeds, parses them, converts them to JSON, and streams them to Kafka.
Start RSSHub on port 1200:
docker run -d --name rsshub -p 1200:1200 diygod/rsshub
This command pulls the RSSHub Docker image and runs it in detached mode on your machine.
http://localhost:1200/bbc/english
http://localhost:1200/lemonde
http://localhost:1200/el_pais/internacional
Install Required Python Packages:
pip install feedparser kafka-python boto3
Create the Python Script for RSS Feeds:
import feedparser
import json
import os
import time
from datetime import datetime
from kafka import KafkaProducer
import boto3
# Configuration for RSS feeds, Kafka, and S3
feeds =
'french': '<http://localhost:1200/lemonde>',
'spanish': '<http://localhost:1200/el_pais/internacional>',
}
kafka_topic_prefix = 'news_'
kafka_broker = 'localhost:9092'
local_storage_dir = './data/'
bucket_name = 'your-s3-bucket-name'
# Set up Kafka and S3 clients
producer = KafkaProducer(
bootstrap_servers=kafka_broker,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
s3_client = boto3.client('s3')
os.makedirs(local_storage_dir, exist_ok=True)
# Save locally and upload to S3
def save_locally_and_upload(language, articles):
filename = f"{local_storage_dir}{language}_news_{datetime.now().strftime('%Y%m%d%H%M%S')}.json"
with open(filename, 'w') as f:
json.dump(articles, f, indent=4)
print(f"Saved batch locally: {filename}")
# Upload to S3
s3_key = f"{language}/{os.path.basename(filename)}"
s3_client.upload_file(filename, bucket_name, s3_key)
print(f"Uploaded to S3: {s3_key}")
# Optionally delete the local file after upload
os.remove(filename)
# Fetch, send to Kafka, and store
def fetch_feed(url):
feed = feedparser.parse(url)
articles = []
for entry in feed.entries:
article = {
'title': entry.title,
'summary': entry.summary,
'published': entry.published,
'link': entry.link
}
articles.append(article)
return articles
def send_to_kafka(language, articles):
topic = f"{kafka_topic_prefix}{language}"
for article in articles:
producer.send(topic, {'language': language, 'data': article})
print(f"Sent to {topic}: {article['title']}")
# Main function with 1-minute timer
def main():
start_time = time.time()
run_duration = 60 # Run for 1 minute (60 seconds)
while time.time() - start_time < run_duration:
for language, url in feeds.items():
articles = fetch_feed(url)
send_to_kafka(language, articles)
save_locally_and_upload(language, articles)
time.sleep(5) # Adjust this to control fetch frequency within the minute
print("Stopped streaming after 1 minute.")
if __name__ == "__main__":
main()
##### Code for full streaming
import feedparser
import json
import os
from kafka import KafkaProducer
from datetime import datetime
import time
import boto3
# Configuration
feeds = {
'english': '<http://localhost:1200/bbc/english>',
'spanish': '<http://localhost:1200/el_pais/internacional>',
}
kafka_topic_prefix = 'news_'
kafka_broker = 'localhost:9092'
local_storage_dir = './data/'
bucket_name = 'your-s3-bucket-name' # Replace with your actual S3 bucket name
# Set up Kafka and S3 clients
producer = KafkaProducer(
bootstrap_servers=kafka_broker,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
s3_client = boto3.client('s3')
os.makedirs(local_storage_dir, exist_ok=True)
# Save locally and upload to S3
def save_locally_and_upload(language, articles):
filename = f"{local_storage_dir}{language}_news_{datetime.now().strftime('%Y%m%d%H%M%S')}.json"
with open(filename, 'w') as f:
json.dump(articles, f, indent=4)
print(f"Saved batch locally: {filename}")
# Upload to S3
s3_key = f"{language}/{os.path.basename(filename)}"
s3_client.upload_file(filename, bucket_name, s3_key)
print(f"Uploaded to S3: {s3_key}")
# Optionally delete the local file after upload
os.remove(filename)
# Fetch, send to Kafka, and store
def fetch_feed(url):
feed = feedparser.parse(url)
articles = []
for entry in feed.entries:
article = {
'title': entry.title,
'summary': entry.summary,
'published': entry.published,
'link': entry.link
}
articles.append(article)
return articles
def send_to_kafka(language, articles):
topic = f"{kafka_topic_prefix}{language}"
for article in articles:
producer.send(topic, {'language': language, 'data': article})
print(f"Sent to {topic}: {article['title']}")
# Main function
def main():
while True:
for language, url in feeds.items():
articles = fetch_feed(url)
send_to_kafka(language, articles)
save_locally_and_upload(language, articles)
time.sleep(300) # Fetch every 5 minutes
if __name__ == "__main__":
main()