news_spanish
topic).news_spanish
(raw articles) and processed_news_spanish
(filtered/transformed articles).news_spanish
.processed_news_spanish
.processed_news_spanish
.translated_news_spanish_to_english
.Install Python and required libraries:
sudo apt update
sudo apt install -y python3 python3-pip
pip3 install kafka-python requests
Create kafka_producer.py
:
import requests
import json
from kafka import KafkaProducer
from time import sleep
# Kafka configuration
KAFKA_BROKER = "<kafka-instance-1-private-ip>:9092"
KAFKA_TOPIC = "news_spanish"
# RSS feed URL
RSS_FEED_URL = "<https://rsshub.app/elpais>"
# Initialize Kafka producer
producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKER,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Fetch Spanish news articles
def fetch_news():
response = requests.get(RSS_FEED_URL)
if response.status_code == 200:
return response.json().get("items", [])
else:
print(f"Failed to fetch RSS feed: {response.status_code}")
return []
# Stream articles to Kafka
def stream_to_kafka():
articles = fetch_news()
for article in articles:
# Format the article
formatted_article = {
"title": article.get("title"),
"summary": article.get("summary"),
"published": article.get("pubDate"),
"link": article.get("link")
}
# Send to Kafka
producer.send(KAFKA_TOPIC, formatted_article)
print(f"Sent to Kafka: {formatted_article['title']}")
sleep(1) # Simulate real-time streaming
if __name__ == "__main__":
while True:
stream_to_kafka()
sleep(300) # Fetch news every 5 minutes
Run the producer:
python3 kafka_producer.py
Install Java and Maven:
sudo apt update
sudo apt install -y openjdk-11-jdk maven
Create and configure a Kafka Streams application (SpanishNewsStreamProcessor
).