1. Data Collection and Storage

Twitter API Data Fetching

To fetch tweets, a Twitter API key will be needed. We can obtain this from the Twitter Developer Portal.

Example Code:

import tweepy
import json
from datetime import datetime
import os

# Twitter API credentials (replace with your credentials)
API_KEY = 'your_api_key'
API_SECRET = 'your_api_secret'
ACCESS_TOKEN = 'your_access_token'
ACCESS_SECRET = 'your_access_secret'

# Set up the Twitter API client
auth = tweepy.OAuthHandler(API_KEY, API_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
api = tweepy.API(auth)

# Define the parameters for tweet fetching
query = "#machinelearning, #ai"  # or any keyword
language = "en"
batch_size = 100  # Number of tweets to fetch per batch

def fetch_tweets():
    tweets_data = []
    for tweet in tweepy.Cursor(api.search_tweets, q=query, lang=language, tweet_mode="extended").items(batch_size):
        tweet_info = {
            "text": tweet.full_text,
            "timestamp": tweet.created_at.isoformat(),
            "language": tweet.lang,
            "user_info": {"name": tweet.user.name, "location": tweet.user.location},
            # Add other fields as needed
        }
        tweets_data.append(tweet_info)
    return tweets_data

# Fetch a batch and print results
tweets = fetch_tweets()
print(tweets[:2])  # Print the first two for inspection

Save Tweets Locally in JSON Format

Save each batch of tweets to a unique JSON file with a timestamp in the filename.

Example Code:

def save_tweets_to_json(tweets):
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"tweets_batch_{timestamp}.json"
    with open(filename, "w") as f:
        json.dump(tweets, f)
    print(f"Saved {len(tweets)} tweets to {filename}")
    return filename

# Save the fetched tweets
filename = save_tweets_to_json(tweets)

2. Batch Upload to AWS S3

We’ll need AWS credentials configured on the machine. Install boto3 (AWS SDK for Python) if there isn’t:

pip install boto3

Example Code:

import boto3

# Initialize S3 client
s3_client = boto3.client('s3')

# Define the S3 bucket name
bucket_name = 'your-s3-bucket-name'

def upload_to_s3(filename):
    try:
        s3_client.upload_file(filename, bucket_name, filename)
        print(f"Uploaded {filename} to S3 bucket {bucket_name}")
    except Exception as e:
        print(f"Error uploading to S3: {e}")

# Upload the file to S3
upload_to_s3(filename)

3. Simulated Real-Time Streaming (Kafka Setup)