Published on

Building a YouTube Analytics Backend: Integrating with the YouTube Data API and MySQL

Authors

Building a YouTube Analytics Backend: Integrating with the YouTube Data API and MySQL

In this tutorial, we will walk through the process of building a backend service for a hypothetical video analytics platform. This service will fetch video data from the YouTube Data API, process it to extract valuable insights, and store the results in a MySQL database. Additionally, we will set up a data pipeline to periodically update the data.

Table of Contents

  1. Project Setup
  2. API Integration
  3. Data Processing
  4. Database Setup
  5. Data Pipeline
  6. Conclusion

Project Setup

First, let's set up the project structure and dependencies. We'll use Python, Flask, SQLAlchemy, and MySQL for this project.

Directory Structure

youtube_analytics/
├── app/
│   ├── __init__.py
│   ├── api/
│   │   ├── __init__.py
│   │   └── youtube.py
│   ├── models/
│   │   ├── __init__.py
│   │   └── video.py
│   ├── services/
│   │   ├── __init__.py
│   │   ├── data_processor.py
│   │   └── youtube_service.py
│   ├── tasks/
│   │   ├── __init__.py
│   │   └── update_data.py
│   └── utils/
│       ├── __init__.py
│       └── database.py
├── config/
│   └── config.py
├── requirements.txt
├── README.md
└── main.py

Installing Dependencies

Create a requirements.txt file with the following dependencies:

Flask==2.2.2
SQLAlchemy==1.4.31
mysqlclient==2.1.0
google-api-python-client==2.37.0
redis==4.1.4
nltk==3.6.7
alembic==1.7.6
gunicorn==20.1.0
pymysql==1.0.2
apscheduler==3.9.1
Werkzeug==2.2.2

Install the dependencies using pip:

pip install -r requirements.txt

API Integration

We'll create a service to interact with the YouTube Data API to fetch video details.

app/services/youtube_service.py

import googleapiclient.discovery
from config.config import YOUTUBE_API_KEY

class YouTubeService:
    def __init__(self):
        self.youtube = googleapiclient.discovery.build(
            "youtube", "v3", developerKey=YOUTUBE_API_KEY
        )

    def get_channel_videos(self, channel_id, max_results=50):
        request = self.youtube.search().list(
            part="id,snippet",
            channelId=channel_id,
            type="video",
            order="date",
            maxResults=max_results
        )
        response = request.execute()
        return response.get('items', [])

    def get_video_details(self, video_id):
        request = self.youtube.videos().list(
            part="snippet,statistics",
            id=video_id
        )
        response = request.execute()
        items = response.get('items', [])
        if not items:
            raise ValueError("No video found with the provided video ID.")
        return items[0]

Data Processing

Next, we'll create a service to process the fetched data.

app/services/data_processor.py

from collections import Counter
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from datetime import datetime, timedelta

nltk.download('punkt')
nltk.download('stopwords')

class DataProcessor:
    def __init__(self):
        self.stop_words = set(stopwords.words('english'))

    def calculate_view_count_trend(self, video_data, days=7):
        now = datetime.utcnow()
        publish_date = datetime.strptime(video_data['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%SZ')
        days_since_publish = (now - publish_date).days

        if days_since_publish < days:
            days = days_since_publish

        views_per_day = int(video_data['statistics']['viewCount']) / days if days > 0 else 0
        return views_per_day

    def extract_top_keywords(self, text, top_n=10):
        words = word_tokenize(text.lower())
        words = [word for word in words if word.isalnum() and word not in self.stop_words]
        return Counter(words).most_common(top_n)

    def analyze_engagement(self, video_data):
        likes = int(video_data['statistics'].get('likeCount', 0))
        comments = int(video_data['statistics'].get('commentCount', 0))
        views = int(video_data['statistics'].get('viewCount', 0))
        engagement_rate = (likes + comments) / views if views > 0 else 0
        return {
            'likes': likes,
            'comments': comments,
            'engagement_rate': engagement_rate
        }

    def categorize_video(self, video_data):
        view_count = int(video_data['statistics'].get('viewCount', 0))
        if view_count > 1000000:
            return 'Viral'
        elif view_count > 100000:
            return 'Popular'
        elif view_count > 10000:
            return 'Trending'
        else:
            return 'Normal'

    def extract_additional_insights(self, video_data_list):
        total_likes = 0
        total_comments = 0
        total_views = 0
        highest_engagement_rate = 0
        most_engaging_video = None

        for video_data in video_data_list:
            engagement = self.analyze_engagement(video_data)
            total_likes += engagement['likes']
            total_comments += engagement['comments']
            total_views += video_data['statistics'].get('viewCount', 0)

            if engagement['engagement_rate'] > highest_engagement_rate:
                highest_engagement_rate = engagement['engagement_rate']
                most_engaging_video = video_data

        average_engagement_rate = (total_likes + total_comments) / total_views if total_views > 0 else 0

        return {
            'most_engaging_video': most_engaging_video,
            'average_engagement_rate': average_engagement_rate
        }

    def process_video_data(self, video_data):
        trends = self.calculate_view_count_trend(video_data)
        keywords = self.extract_top_keywords(video_data['snippet']['title'] + ' ' + video_data['snippet']['description'])
        engagement = self.analyze_engagement(video_data)
        category = self.categorize_video(video_data)
        
        return {
            'trends': trends,
            'keywords': keywords,
            'engagement': engagement,
            'category': category
        }

Database Setup

We'll use SQLAlchemy to define our database models and create the necessary tables.

app/models/video.py

from sqlalchemy import Column, Integer, String, DateTime, Float, Text
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class Video(Base):
    __tablename__ = 'videos'

    id = Column(String, primary_key=True)
    title = Column(String)
    description = Column(String)
    published_at = Column(DateTime)
    view_count = Column(Integer)
    like_count = Column(Integer)
    comment_count = Column(Integer)
    engagement_rate = Column(Float)
    top_keywords = Column(Text)

    def to_dict(self):
        return {
            'id': self.id,
            'title': self.title,
            'description': self.description,
            'published_at': self.published_at,
            'view_count': self.view_count,
            'like_count': self.like_count,
            'comment_count': self.comment_count,
            'engagement_rate': self.engagement_rate,
            'top_keywords': self.top_keywords
        }

app/utils/database.py

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from config.config import DATABASE_URL

engine = create_engine(DATABASE_URL)
Session = sessionmaker(bind=engine)

def get_session():
    return Session()

config/config.py

import os

# YouTube API Configuration
YOUTUBE_API_KEY = os.environ.get('YOUTUBE_API_KEY', 'your-api-key-here')

# Database Configuration
DATABASE_URL = os.environ.get('DATABASE_URL', 'mysql

+pymysql://user:password@localhost/youtube_analytics')

# Flask Configuration
DEBUG = os.environ.get('DEBUG', True)
SECRET_KEY = os.environ.get('SECRET_KEY', 'your-secret-key-here')

Creating Database Tables

Create a script create_db.py to initialize the database:

from sqlalchemy import create_engine
from app.models.video import Base
from config.config import DATABASE_URL

def create_tables():
    engine = create_engine(DATABASE_URL)
    Base.metadata.create_all(engine)
    print("Tables created successfully.")

if __name__ == "__main__":
    create_tables()

Run the script to create the tables:

python create_db.py

Data Pipeline

We'll create a scheduled job to update video data and recalculate insights every 5 minutes using APScheduler.

app/tasks/update_data.py

from app.services.youtube_service import YouTubeService
from app.services.data_processor import DataProcessor
from app.models.video import Video
from app.utils.database import get_session
import json
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler

def save_video_data(video_details):
    session = get_session()
    data_processor = DataProcessor()
    try:
        video_id = video_details['id']
        
        db_video = session.query(Video).get(video_id)
        if not db_video:
            db_video = Video(id=video_id)

        processed_data = data_processor.process_video_data(video_details)
        
        db_video.title = video_details['snippet']['title']
        db_video.description = video_details['snippet']['description']
        db_video.published_at = datetime.strptime(video_details['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%SZ')
        db_video.view_count = int(video_details['statistics'].get('viewCount', 0))
        db_video.like_count = processed_data['engagement']['likes']
        db_video.comment_count = processed_data['engagement']['comments']
        db_video.engagement_rate = processed_data['engagement']['engagement_rate']
        db_video.top_keywords = json.dumps(processed_data['keywords'])
        db_video.category = processed_data['category']

        session.add(db_video)
        session.commit()
    except Exception as e:
        session.rollback()
        raise e
    finally:
        session.close()

def update_video_data(channel_ids):
    print("Processing started")
    youtube_service = YouTubeService()

    try:
        for channel_id in channel_ids:
            videos = youtube_service.get_channel_videos(channel_id)
            print(len(videos))
            for video in videos:
                video_id = video['id']['videoId']
                video_details = youtube_service.get_video_details(video_id)
                save_video_data(video_details)
        return f"Successfully updated data for {len(channel_ids)} channels"
    except Exception as e:
        return f"Error updating data: {str(e)}"

def schedule_updates(channel_ids):
    scheduler = BackgroundScheduler()
    scheduler.add_job(lambda: update_video_data(channel_ids), 'interval', minutes=5)
    scheduler.start()

API Endpoints

Let's create some basic API endpoints using Flask:

app/api/youtube.py

from flask import Blueprint, jsonify, request
from app.models.video import Video
from app.utils.database import get_session
from app.tasks.update_data import update_video_data, save_video_data
from app.services.youtube_service import YouTubeService

youtube_bp = Blueprint('youtube', __name__)

@youtube_bp.route('/videos', methods=['GET'])
def get_videos():
    session = get_session()
    videos = session.query(Video).all()
    return jsonify([video.to_dict() for video in videos])

@youtube_bp.route('/videos/<video_id>', methods=['GET'])
def get_video(video_id):
    session = get_session()
    video = session.query(Video).get(video_id)
    if video:
        return jsonify(video.to_dict())
    return jsonify({'error': 'Video not found'}), 404

@youtube_bp.route('/update_videos', methods=['POST'])
def update_videos():
    channel_ids = request.json.get('channel_ids')
    if not channel_ids:
        return jsonify({'error': 'No channel IDs provided'}), 400

    update_video_data(channel_ids)
    return jsonify({'message': 'Video data update initiated'})

@youtube_bp.route('/youtube_video_details/<video_id>', methods=['GET'])
def get_youtube_video_details(video_id):
    youtube_service = YouTubeService()
    try:
        video_details = youtube_service.get_video_details(video_id)
        save_video_data(video_details)
        return jsonify(video_details)
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@youtube_bp.route('/video_insights/<video_id>', methods=['GET'])
def get_video_insights(video_id):
    youtube_service = YouTubeService()
    data_processor = DataProcessor()
    try:
        video_details = youtube_service.get_video_details(video_id)
        insights = data_processor.process_video_data(video_details)
        return jsonify(insights)
    except Exception as e:
        return jsonify({'error': str(e)}), 500

Running the Application

Ensure your application is running:

python main.py

Example Requests

Get All Videos:

curl http://localhost:5000/api/videos

Get Video Details:

curl http://localhost:5000/api/videos/VIDEO_ID

Update Videos:

curl -X POST -H "Content-Type: application/json" -d '{"channel_ids":["UCjwVmfw_BO-59EMp3i7Z8yg"]}' http://localhost:5000/api/update_videos

Get YouTube Video Details and Save to Database:

curl http://localhost:5000/api/youtube_video_details/VIDEO_ID

Get Video Insights:

curl http://localhost:5000/api/video_insights/VIDEO_ID

Replace VIDEO_ID with an actual YouTube video ID.

Conclusion

By following this article, you have built a backend service that integrates with the YouTube Data API, processes video data to extract insights, and stores the results in a MySQL database. Additionally, you have set up a data pipeline to periodically update the data. This backend service forms the foundation of a robust video analytics platform.