Step 3: Set Up Kafka Environment
Install Libraries for Kafka Consumer:
pip install kafka-python requests
Kafka Consumer Script:
from kafka import KafkaConsumer, KafkaProducer
import json
import requests
kafka_broker = 'localhost:9092'
source_topics = ['news_english', 'news_spanish']
translated_topic = 'news_translated'
translation_service_url = '<http://localhost:8000/translate>'
producer = KafkaProducer(
bootstrap_servers=kafka_broker,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def translate_article(text, source_lang, target_lang="en"):
payload = {
'text': text,
'source_language': source_lang,
'target_language': target_lang
}
response = requests.post(translation_service_url, json=payload)
if response.status_code == 200:
return response.json().get('translated_text')
else:
print(f"Translation failed for: {text}")
return None
def process_message(message):
article = message.value
source_language = article['language']
text_to_translate = article['data']['summary']
translated_text = translate_article(text_to_translate, source_language)
if translated_text:
translated_article = {
'original': article,
'translated_text': translated_text
}
producer.send(translated_topic, translated_article)
print(f"Translated: {translated_article['original']['data']['title']}")
consumer = KafkaConsumer(
*source_topics,
bootstrap_servers=kafka_broker,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for message in consumer:
process_message(message)