- Published on
Building a YouTube Analytics Backend: Integrating with the YouTube Data API and MySQL
Building a YouTube Analytics Backend: Integrating with the YouTube Data API and MySQL
In this tutorial, we will walk through the process of building a backend service for a hypothetical video analytics platform. This service will fetch video data from the YouTube Data API, process it to extract valuable insights, and store the results in a MySQL database. Additionally, we will set up a data pipeline to periodically update the data.
Table of Contents
Project Setup
First, let's set up the project structure and dependencies. We'll use Python, Flask, SQLAlchemy, and MySQL for this project.
Directory Structure
youtube_analytics/
├── app/
│ ├── __init__.py
│ ├── api/
│ │ ├── __init__.py
│ │ └── youtube.py
│ ├── models/
│ │ ├── __init__.py
│ │ └── video.py
│ ├── services/
│ │ ├── __init__.py
│ │ ├── data_processor.py
│ │ └── youtube_service.py
│ ├── tasks/
│ │ ├── __init__.py
│ │ └── update_data.py
│ └── utils/
│ ├── __init__.py
│ └── database.py
├── config/
│ └── config.py
├── requirements.txt
├── README.md
└── main.py
Installing Dependencies
Create a requirements.txt
file with the following dependencies:
Flask==2.2.2
SQLAlchemy==1.4.31
mysqlclient==2.1.0
google-api-python-client==2.37.0
redis==4.1.4
nltk==3.6.7
alembic==1.7.6
gunicorn==20.1.0
pymysql==1.0.2
apscheduler==3.9.1
Werkzeug==2.2.2
Install the dependencies using pip:
pip install -r requirements.txt
API Integration
We'll create a service to interact with the YouTube Data API to fetch video details.
app/services/youtube_service.py
import googleapiclient.discovery
from config.config import YOUTUBE_API_KEY
class YouTubeService:
def __init__(self):
self.youtube = googleapiclient.discovery.build(
"youtube", "v3", developerKey=YOUTUBE_API_KEY
)
def get_channel_videos(self, channel_id, max_results=50):
request = self.youtube.search().list(
part="id,snippet",
channelId=channel_id,
type="video",
order="date",
maxResults=max_results
)
response = request.execute()
return response.get('items', [])
def get_video_details(self, video_id):
request = self.youtube.videos().list(
part="snippet,statistics",
id=video_id
)
response = request.execute()
items = response.get('items', [])
if not items:
raise ValueError("No video found with the provided video ID.")
return items[0]
Data Processing
Next, we'll create a service to process the fetched data.
app/services/data_processor.py
from collections import Counter
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from datetime import datetime, timedelta
nltk.download('punkt')
nltk.download('stopwords')
class DataProcessor:
def __init__(self):
self.stop_words = set(stopwords.words('english'))
def calculate_view_count_trend(self, video_data, days=7):
now = datetime.utcnow()
publish_date = datetime.strptime(video_data['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%SZ')
days_since_publish = (now - publish_date).days
if days_since_publish < days:
days = days_since_publish
views_per_day = int(video_data['statistics']['viewCount']) / days if days > 0 else 0
return views_per_day
def extract_top_keywords(self, text, top_n=10):
words = word_tokenize(text.lower())
words = [word for word in words if word.isalnum() and word not in self.stop_words]
return Counter(words).most_common(top_n)
def analyze_engagement(self, video_data):
likes = int(video_data['statistics'].get('likeCount', 0))
comments = int(video_data['statistics'].get('commentCount', 0))
views = int(video_data['statistics'].get('viewCount', 0))
engagement_rate = (likes + comments) / views if views > 0 else 0
return {
'likes': likes,
'comments': comments,
'engagement_rate': engagement_rate
}
def categorize_video(self, video_data):
view_count = int(video_data['statistics'].get('viewCount', 0))
if view_count > 1000000:
return 'Viral'
elif view_count > 100000:
return 'Popular'
elif view_count > 10000:
return 'Trending'
else:
return 'Normal'
def extract_additional_insights(self, video_data_list):
total_likes = 0
total_comments = 0
total_views = 0
highest_engagement_rate = 0
most_engaging_video = None
for video_data in video_data_list:
engagement = self.analyze_engagement(video_data)
total_likes += engagement['likes']
total_comments += engagement['comments']
total_views += video_data['statistics'].get('viewCount', 0)
if engagement['engagement_rate'] > highest_engagement_rate:
highest_engagement_rate = engagement['engagement_rate']
most_engaging_video = video_data
average_engagement_rate = (total_likes + total_comments) / total_views if total_views > 0 else 0
return {
'most_engaging_video': most_engaging_video,
'average_engagement_rate': average_engagement_rate
}
def process_video_data(self, video_data):
trends = self.calculate_view_count_trend(video_data)
keywords = self.extract_top_keywords(video_data['snippet']['title'] + ' ' + video_data['snippet']['description'])
engagement = self.analyze_engagement(video_data)
category = self.categorize_video(video_data)
return {
'trends': trends,
'keywords': keywords,
'engagement': engagement,
'category': category
}
Database Setup
We'll use SQLAlchemy to define our database models and create the necessary tables.
app/models/video.py
from sqlalchemy import Column, Integer, String, DateTime, Float, Text
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Video(Base):
__tablename__ = 'videos'
id = Column(String, primary_key=True)
title = Column(String)
description = Column(String)
published_at = Column(DateTime)
view_count = Column(Integer)
like_count = Column(Integer)
comment_count = Column(Integer)
engagement_rate = Column(Float)
top_keywords = Column(Text)
def to_dict(self):
return {
'id': self.id,
'title': self.title,
'description': self.description,
'published_at': self.published_at,
'view_count': self.view_count,
'like_count': self.like_count,
'comment_count': self.comment_count,
'engagement_rate': self.engagement_rate,
'top_keywords': self.top_keywords
}
app/utils/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from config.config import DATABASE_URL
engine = create_engine(DATABASE_URL)
Session = sessionmaker(bind=engine)
def get_session():
return Session()
config/config.py
import os
# YouTube API Configuration
YOUTUBE_API_KEY = os.environ.get('YOUTUBE_API_KEY', 'your-api-key-here')
# Database Configuration
DATABASE_URL = os.environ.get('DATABASE_URL', 'mysql
+pymysql://user:password@localhost/youtube_analytics')
# Flask Configuration
DEBUG = os.environ.get('DEBUG', True)
SECRET_KEY = os.environ.get('SECRET_KEY', 'your-secret-key-here')
Creating Database Tables
Create a script create_db.py
to initialize the database:
from sqlalchemy import create_engine
from app.models.video import Base
from config.config import DATABASE_URL
def create_tables():
engine = create_engine(DATABASE_URL)
Base.metadata.create_all(engine)
print("Tables created successfully.")
if __name__ == "__main__":
create_tables()
Run the script to create the tables:
python create_db.py
Data Pipeline
We'll create a scheduled job to update video data and recalculate insights every 5 minutes using APScheduler.
app/tasks/update_data.py
from app.services.youtube_service import YouTubeService
from app.services.data_processor import DataProcessor
from app.models.video import Video
from app.utils.database import get_session
import json
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
def save_video_data(video_details):
session = get_session()
data_processor = DataProcessor()
try:
video_id = video_details['id']
db_video = session.query(Video).get(video_id)
if not db_video:
db_video = Video(id=video_id)
processed_data = data_processor.process_video_data(video_details)
db_video.title = video_details['snippet']['title']
db_video.description = video_details['snippet']['description']
db_video.published_at = datetime.strptime(video_details['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%SZ')
db_video.view_count = int(video_details['statistics'].get('viewCount', 0))
db_video.like_count = processed_data['engagement']['likes']
db_video.comment_count = processed_data['engagement']['comments']
db_video.engagement_rate = processed_data['engagement']['engagement_rate']
db_video.top_keywords = json.dumps(processed_data['keywords'])
db_video.category = processed_data['category']
session.add(db_video)
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()
def update_video_data(channel_ids):
print("Processing started")
youtube_service = YouTubeService()
try:
for channel_id in channel_ids:
videos = youtube_service.get_channel_videos(channel_id)
print(len(videos))
for video in videos:
video_id = video['id']['videoId']
video_details = youtube_service.get_video_details(video_id)
save_video_data(video_details)
return f"Successfully updated data for {len(channel_ids)} channels"
except Exception as e:
return f"Error updating data: {str(e)}"
def schedule_updates(channel_ids):
scheduler = BackgroundScheduler()
scheduler.add_job(lambda: update_video_data(channel_ids), 'interval', minutes=5)
scheduler.start()
API Endpoints
Let's create some basic API endpoints using Flask:
app/api/youtube.py
from flask import Blueprint, jsonify, request
from app.models.video import Video
from app.utils.database import get_session
from app.tasks.update_data import update_video_data, save_video_data
from app.services.youtube_service import YouTubeService
youtube_bp = Blueprint('youtube', __name__)
@youtube_bp.route('/videos', methods=['GET'])
def get_videos():
session = get_session()
videos = session.query(Video).all()
return jsonify([video.to_dict() for video in videos])
@youtube_bp.route('/videos/<video_id>', methods=['GET'])
def get_video(video_id):
session = get_session()
video = session.query(Video).get(video_id)
if video:
return jsonify(video.to_dict())
return jsonify({'error': 'Video not found'}), 404
@youtube_bp.route('/update_videos', methods=['POST'])
def update_videos():
channel_ids = request.json.get('channel_ids')
if not channel_ids:
return jsonify({'error': 'No channel IDs provided'}), 400
update_video_data(channel_ids)
return jsonify({'message': 'Video data update initiated'})
@youtube_bp.route('/youtube_video_details/<video_id>', methods=['GET'])
def get_youtube_video_details(video_id):
youtube_service = YouTubeService()
try:
video_details = youtube_service.get_video_details(video_id)
save_video_data(video_details)
return jsonify(video_details)
except Exception as e:
return jsonify({'error': str(e)}), 500
@youtube_bp.route('/video_insights/<video_id>', methods=['GET'])
def get_video_insights(video_id):
youtube_service = YouTubeService()
data_processor = DataProcessor()
try:
video_details = youtube_service.get_video_details(video_id)
insights = data_processor.process_video_data(video_details)
return jsonify(insights)
except Exception as e:
return jsonify({'error': str(e)}), 500
Running the Application
Ensure your application is running:
python main.py
Example Requests
Get All Videos:
curl http://localhost:5000/api/videos
Get Video Details:
curl http://localhost:5000/api/videos/VIDEO_ID
Update Videos:
curl -X POST -H "Content-Type: application/json" -d '{"channel_ids":["UCjwVmfw_BO-59EMp3i7Z8yg"]}' http://localhost:5000/api/update_videos
Get YouTube Video Details and Save to Database:
curl http://localhost:5000/api/youtube_video_details/VIDEO_ID
Get Video Insights:
curl http://localhost:5000/api/video_insights/VIDEO_ID
Replace VIDEO_ID
with an actual YouTube video ID.
Conclusion
By following this article, you have built a backend service that integrates with the YouTube Data API, processes video data to extract insights, and stores the results in a MySQL database. Additionally, you have set up a data pipeline to periodically update the data. This backend service forms the foundation of a robust video analytics platform.