Step 3: Set Up Kafka Environment

  1. Install Libraries for Kafka Consumer:

    pip install kafka-python requests
    
  2. Kafka Consumer Script:

    from kafka import KafkaConsumer, KafkaProducer
    import json
    import requests
    
    kafka_broker = 'localhost:9092'
    source_topics = ['news_english', 'news_spanish']
    translated_topic = 'news_translated'
    translation_service_url = '<http://localhost:8000/translate>'
    
    producer = KafkaProducer(
        bootstrap_servers=kafka_broker,
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    
    def translate_article(text, source_lang, target_lang="en"):
        payload = {
            'text': text,
            'source_language': source_lang,
            'target_language': target_lang
        }
        response = requests.post(translation_service_url, json=payload)
        if response.status_code == 200:
            return response.json().get('translated_text')
        else:
            print(f"Translation failed for: {text}")
            return None
    
    def process_message(message):
        article = message.value
        source_language = article['language']
        text_to_translate = article['data']['summary']
    
        translated_text = translate_article(text_to_translate, source_language)
        if translated_text:
            translated_article = {
                'original': article,
                'translated_text': translated_text
            }
            producer.send(translated_topic, translated_article)
            print(f"Translated: {translated_article['original']['data']['title']}")
    
    consumer = KafkaConsumer(
        *source_topics,
        bootstrap_servers=kafka_broker,
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )
    
    for message in consumer:
        process_message(message)