Program

How to Build Movie Recap Tool With Telegram Bot – Step by Step Guide

PSG  vs  ARS

In this step-by-step tutorial, I’ll show you how to build a Movie Recap Telegram Bot that sends movie recaps directly to Telegram users. You already have your own code — and here you’ll learn where to place it and how to make everything work.

What You’ll Need

  • A Telegram Bot Token (get it from @BotFather)
  • Hosting (local server, VPS, or free cloud hosting)
  • Python,

Step 1 – Get Your Telegram Bot Token

Open Telegram, search for BotFather, and send /newbot. Follow the instructions. Once done, you’ll receive a token. Save it — you’ll need it inside your code.

Step 2 – Setup your VPS server

  • Create folder in your SSH
  • Create new Virtual Environment
  • Create python and paste the following code

import os
import re
import uuid
import base64
import tempfile
import subprocess
from playwright.async_api import async_playwright
import asyncio
import itertools
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackQueryHandler, ContextTypes
import edge_tts
from google import genai
from google.genai import types
import database as db
import requests

# ========== CONFIGURATION ==========
TELEGRAM_TOKEN = “YOUR TELEGRAM BOT  TOKEN”

GEMINI_API_KEYS = [
“YOUR API 1”,
“YOUR API 2”,
“YOUR API 3”,
“YOUR API 4″,
]

key_cycle = itertools.cycle(GEMINI_API_KEYS)
current_key = next(key_cycle)

def get_next_api_key():
global current_key
current_key = next(key_cycle)
print(f”🔄 Using API key: {current_key[:10]}…”)
return current_key

SCRIPT_MODEL = “gemini-2.5-flash-lite”

VOICES = {
“en”: {“male”: “en-US-GuyNeural”, “female”: “en-US-JennyNeural”}
}

user_sessions = {}

# ========== Helper Functions ==========
def split_script_into_chunks(script: str, max_chars: int = 500) -> list:
sentences = script.replace(‘။’, ‘။\n’).split(‘\n’)
chunks = []
current_chunk = “”
for sentence in sentences:
if not sentence.strip():
continue
if len(current_chunk) + len(sentence) <= max_chars:
current_chunk += sentence
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = sentence
if current_chunk:
chunks.append(current_chunk)
return chunks if chunks else [script]

def get_duration(file_path):
try:
result = subprocess.run(
[‘ffprobe’, ‘-v’, ‘error’, ‘-show_entries’, ‘format=duration’,
‘-of’, ‘default=noprint_wrappers=1:nokey=1’, file_path],
capture_output=True, text=True, timeout=30
)
output = result.stdout.strip()
if output and output != ‘N/A’:
return float(output)
return 0.0
except:
return 0.0

def replace_audio_in_video(video_path: str, audio_path: str, output_path: str) -> str:
video_duration = get_duration(video_path)
audio_duration = get_duration(audio_path)

if video_duration >= audio_duration:
cmd = [‘ffmpeg’, ‘-i’, video_path, ‘-stream_loop’, ‘-1’, ‘-i’, audio_path,
‘-c:v’, ‘copy’, ‘-c:a’, ‘aac’, ‘-map’, ‘0:v:0’, ‘-map’, ‘1:a:0’,
‘-t’, str(video_duration), ‘-y’, output_path]
else:
cmd = [‘ffmpeg’, ‘-stream_loop’, ‘-1’, ‘-i’, video_path, ‘-i’, audio_path,
‘-c:v’, ‘libx264’, ‘-c:a’, ‘aac’, ‘-map’, ‘0:v:0’, ‘-map’, ‘1:a:0’,
‘-t’, str(audio_duration), ‘-y’, output_path]
subprocess.run(cmd, check=True, capture_output=True)
return output_path

def add_watermark(video_path: str, output_path: str) -> str:
watermark_text = “AMC Voice Cover”

filter_str = f”drawtext=text='{watermark_text}’:fontcolor=white:fontsize=20:box=1:[email protected]:boxborderw=5:x=w-tw-10:y=h-th-10″

cmd = [‘ffmpeg’, ‘-i’, video_path, ‘-vf’, filter_str, ‘-c:a’, ‘copy’, ‘-y’, output_path]

subprocess.run(cmd, check=True)
return output_path

async def local_chrome_render_subtitles(video_path, srt_path, user_id):
import json
current_dir = os.path.dirname(os.path.abspath(__file__)) if ‘__file__’ in locals() else os.getcwd()

try:
cmd = [
‘ffprobe’, ‘-v’, ‘error’, ‘-select_streams’, ‘v:0’,
‘-show_entries’, ‘stream=width,height’, ‘-of’, ‘json’, video_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
probe_data = json.loads(result.stdout)
width = probe_data[‘streams’][0][‘width’]
height = probe_data[‘streams’][0][‘height’]
except Exception:
width, height = 1280, 720

unique_id = f”{user_id}_{uuid.uuid4().hex[:6]}”
output_video = os.path.join(current_dir, f”final_subtitled_{unique_id}.mp4″)
temp_html = os.path.join(current_dir, f”temp_{unique_id}.html”)
temp_mp4 = os.path.join(current_dir, f”temp_{unique_id}.mp4″)

# SRT  Function
def srt_time_to_seconds(time_str):
hours, minutes, seconds = time_str.strip().split(‘:’)
seconds, milliseconds = seconds.split(‘,’)
return int(hours) * 3600 + int(minutes) * 60 + int(seconds) + int(milliseconds) / 1000.0

print(f”⏳ Reading SRT for Local Rendering (User: {user_id})…”)
try:
with open(srt_path, ‘r’, encoding=’utf-8′) as f:
content = f.read().strip()

blocks = content.split(‘\n\n’)
subtitles = []

for block in blocks:
lines = block.split(‘\n’)
if len(lines) >= 3:
time_match = re.match(r'(\d+:\d+:\d+,\d+)\s*–>\s*(\d+:\d+:\d+,\d+)’, lines[1])
if time_match:
start_sec = srt_time_to_seconds(time_match.group(1))
end_sec = srt_time_to_seconds(time_match.group(2))
text = ” “.join(lines[2:])
subtitles.append({‘start’: start_sec, ‘end’: end_sec, ‘text’: text})

# 🎯 9:16 ,16:9 Responsive CSS
html_content = “””
<!DOCTYPE html>
<html>
<head>
<meta charset=”UTF-8″>
<style>
@font-face {
font-family: ‘PyidaungsuBold’;
src: url(‘file:///usr/share/fonts/truetype/Pyidaungsu-Bold.ttf’) format(‘truetype’);
}
html, body {
margin: 0; padding: 0; background: #000000;
width: 100%; height: 100%; overflow: hidden;
font-family: ‘PyidaungsuBold’, sans-serif;
}
.subtitle-container {
position: absolute;
bottom: 8%;
left: 5%;
width: 90%;
text-align: center;
font-size: 4.8vw;
font-weight: bold;
color: #FFFFFF;
text-shadow: 3px 3px 6px rgba(0, 0, 0, 1), -3px -3px 6px rgba(0, 0, 0, 1);
word-wrap: break-word;
word-break: normal;
white-space: normal;
}
</style>
</head>
<body>
<div class=”subtitle-container” id=”subtitles”></div>
<script>
const subs = “”” + str(subtitles) + “””;
const container = document.getElementById(‘subtitles’);

function render(time) {
let activeSub = subs.find(s => time >= s.start && time <= s.end);
if (activeSub) {
container.innerText = activeSub.text;
} else {
container.innerHTML = ”;
}
}
</script>
</body>
</html>
“””

with open(temp_html, ‘w’, encoding=’utf-8′) as f:
f.write(html_content)

print(f”🚀 Launching Headless Chrome for User: {user_id}…”)
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True, args=[“–no-sandbox”, “–disable-setuid-sandbox”])

# 🎯 Playwright Viewport Size
if height > width:
current_viewport = {“width”: 720, “height”: 1280}
else:
current_viewport = {“width”: 1280, “height”: 720}

page = await browser.new_page(viewport=current_viewport)
#…page.goto, render
file_url = f”file://{os.path.abspath(temp_html)}”
await page.goto(file_url)

cmd = f”ffmpeg -f image2pipe -vcodec png -r 25 -i – -vcodec libx264 -pix_fmt yuv420p \”{temp_mp4}\” -y”
process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE)

total_duration = subtitles[-1][‘end’] if subtitles else 10
frames = int(total_duration * 25)

for frame in range(frames):
current_time = frame / 25.0
await page.evaluate(f”render({current_time})”)
screenshot = await page.screenshot(type=”png”)
process.stdin.write(screenshot)

process.stdin.close()
process.wait()
await browser.close()

print(f”🎬 FFmpeg Removing Black Screen & Merging Video for User: {user_id}…”)
final_cmd = f”ffmpeg -i \”{video_path}\” -i \”{temp_mp4}\” -filter_complex \”[1:v]colorkey=0x000000:0.1:0.1[sub_trans];[0:v][sub_trans]overlay=0:0\” -c:a copy \”{output_video}\” -y”
subprocess.run(final_cmd, shell=True, check=True)

print(f”✅ Local Rendering Successful: {output_video}”)
return output_video

except Exception as e:
print(f”❌ Local Rendering failed: {e}”)
return None

finally:
if os.path.exists(temp_html): os.remove(temp_html)
if os.path.exists(temp_mp4): os.remove(temp_mp4)

# ========== Gemini Functions ==========
async def generate_recap_script(video_data_base64: str, language: str, mime_type: str = “video/mp4”) -> str:
prompt = “””
You are a professional video recap scriptwriter.
Analyze this video in detail and write an engaging story/recap voiceover script based on the visual events of the video in “English”.

[CRITICAL RULES]
1. Do NOT output headings like “Introduction”, “Middle”, “End”, or any conversational text.
2. Do NOT just transcribe on-screen text. Describe the story and whole events happening in the video smoothly.
3. The final output MUST be strictly formatted as a valid SRT subtitle file. Estimate the timelines to match the video duration.

[SRT Format Example]
1
00:00:01,000 –> 00:00:05,000
Once upon a time, there was a young shepherd boy living in a peaceful village.

2
00:00:05,200 –> 00:00:10,000
Every day, he took his sheep up to the hills to find fresh grass.

Output ONLY the raw SRT content. Do not include any other markdown or notes.
“””

for attempt in range(len(GEMINI_API_KEYS) * 2):
try:
print(f”🎯 Attempt {attempt+1}: Using API key: {current_key[:15]}…”)
client = genai.Client(api_key=current_key)

response = await asyncio.wait_for(
asyncio.to_thread(
client.models.generate_content,
model=SCRIPT_MODEL,
contents=[
prompt,
types.Part.from_bytes(
data=base64.b64decode(video_data_base64),
mime_type=mime_type
)
],
config=types.GenerateContentConfig(
temperature=0.7,
max_output_tokens=8192,
top_p=0.95
)
),
timeout=120.0
)

clean_text = response.text.replace(““`srt”, “”).replace(““`”, “”).strip()
return clean_text

except (asyncio.exceptions.TimeoutError, TimeoutError):
print(f”⚠️ Timeout on attempt {attempt+1}, retrying…”)
await asyncio.sleep(3)
continue
except Exception as e:
if “429” in str(e):
print(f”⚠️ Rate limit on {current_key[:15]}… switching”)
get_next_api_key()
await asyncio.sleep(2)
continue
raise e
raise Exception(“All API keys exhausted”)

def clean_srt_for_tts(srt_content: str) -> str:
lines = srt_content.split(‘\n’)
cleaned_lines = []
for line in lines:
line_str = line.strip()
if not line_str:
continue
if line_str.isdigit():
continue
if “–>” in line_str:
continue
cleaned_lines.append(line_str)
return ” “.join(cleaned_lines)

async def generate_audio_from_script(script: str, language: str, gender: str) -> str:
clean_text_only = clean_srt_for_tts(script)
chunks = split_script_into_chunks(clean_text_only, 500)
voice = VOICES[language][gender]

all_audio_paths = []
for chunk in chunks:
communicate = edge_tts.Communicate(chunk, voice, rate=”+0%”)
audio_file = tempfile.NamedTemporaryFile(delete=False, suffix=”.mp3″)
audio_file.close()
await communicate.save(audio_file.name)
all_audio_paths.append(audio_file.name)

if len(all_audio_paths) == 1:
return all_audio_paths[0]

final_audio = tempfile.NamedTemporaryFile(delete=False, suffix=”.mp3″)
final_audio.close()
concat_file = tempfile.NamedTemporaryFile(mode=’w’, delete=False, suffix=”.txt”)
for path in all_audio_paths:
concat_file.write(f”file ‘{path}’\n”)
concat_file.close()

cmd = [‘ffmpeg’, ‘-f’, ‘concat’, ‘-safe’, ‘0’, ‘-i’, concat_file.name, ‘-c’, ‘copy’, ‘-y’, final_audio.name]
subprocess.run(cmd, check=True, capture_output=True)

for path in all_audio_paths:
os.unlink(path)
os.unlink(concat_file.name)
return final_audio.name

def get_package_keyboard():
packages = db.get_packages()
keyboard = []
row = []
for i, (name, pkg) in enumerate(packages.items()):
row.append(InlineKeyboardButton(f”💰 {name.upper()} – {pkg[‘price’]:,}Ks”, callback_data=f”buy_{name}”))
if (i + 1) % 2 == 0:
keyboard.append(row)
row = []
if row:
keyboard.append(row)
return InlineKeyboardMarkup(keyboard)

# ========== Telegram Handlers ==========
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
username = update.effective_user.username
first_name = update.effective_user.first_name
last_name = update.effective_user.last_name

user = db.get_user(user_id)
if not user:
db.create_user(user_id, username, first_name, last_name)
user = db.get_user(user_id)

await update.message.reply_text(
f”🎬 *AMC Voice Cover*\n\n👋 Hello {first_name}!\n💰 you have Token `{user[‘tokens’]}` left.\n\n📹 If you send Video / I will out Script for you and merge in your video.\n\n📹 1 Video ‘1’ Token .”,
reply_markup=get_package_keyboard(),
parse_mode=’Markdown’
)

async def buy_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()

package_name = query.data.replace(“buy_”, “”)
packages = db.get_packages()

if package_name not in packages:
await query.message.reply_text(“❌ Package not found”)
return

pkg = packages[package_name]

context.user_data[‘pending_package’] = package_name
context.user_data[‘pending_tokens’] = pkg[‘tokens’]
context.user_data[‘pending_amount’] = pkg[‘price’]
context.user_data[‘waiting_for_transaction_id’] = True

await query.message.reply_text(
f”💳 *Payment Instructions*\n\n📦 Package: {package_name.upper()}\n💰 Price: {pkg[‘price’]:,} $\n🎫 Tokens: {pkg[‘tokens’]} tokens\n✅ Type your Transaction ID :”,
parse_mode=’Markdown’
)

async def handle_transaction_id(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
message_text = update.message.text.strip()

if not context.user_data.get(‘waiting_for_transaction_id’):
return

package_name = context.user_data.get(‘pending_package’)
tokens = context.user_data.get(‘pending_tokens’)
amount = context.user_data.get(‘pending_amount’)

if not package_name:
await update.message.reply_text(“Session expired. Use /start”)
context.user_data[‘waiting_for_transaction_id’] = False
return

db.add_pending_transaction(user_id, tokens, package_name, amount, message_text)

context.user_data[‘waiting_for_transaction_id’] = False
context.user_data[‘pending_package’] = None
context.user_data[‘pending_tokens’] = None
context.user_data[‘pending_amount’] = None

user = db.get_user(user_id)

await update.message.reply_text(
f”✅ Payment Checking! Pending confirmation.\nTransaction ID: {message_text}\nBalance: {user[‘tokens’]} tokens”,
parse_mode=’Markdown’
)

async def handle_video(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
video = update.message.video

if video.file_size > 10 * 1024 * 1024:
await update.message.reply_text(“Video must be under 10MB”)
return

user = db.get_user(user_id)
if not user or user[“tokens”] < 1:
await update.message.reply_text(“Insufficient tokens! Use /start to buy.”, reply_markup=get_package_keyboard())
return

processing_msg = await update.message.reply_text(“Processing…”)

try:
video_file = await video.get_file()
with tempfile.NamedTemporaryFile(delete=False, suffix=”.mp4″) as tmp:
await video_file.download_to_drive(tmp.name)
video_path = tmp.name

with open(video_path, “rb”) as f:
video_base64 = base64.b64encode(f.read()).decode()

user_sessions[user_id] = {
“video_base64”: video_base64,
“video_path”: video_path,
“mime_type”: video.mime_type
}

keyboard = [[InlineKeyboardButton(“🇲🇲 မြန်မာ”, callback_data=”lang_my”),
InlineKeyboardButton(“🇺🇸 English”, callback_data=”lang_en”)]]

await processing_msg.edit_text(“Select language:”, reply_markup=InlineKeyboardMarkup(keyboard))

except Exception as e:
await processing_msg.edit_text(f”Error: {e}”)

async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()

user_id = update.effective_user.id
data = query.data

if data.startswith(“buy_”):
await buy_callback(update, context)
return

if user_id not in user_sessions:
await query.message.reply_text(“Session expired. Send new video.”, reply_markup=get_package_keyboard())
return

if data.startswith(“lang_”):
language = data.split(“_”)[1]
user_sessions[user_id][“language”] = language
lang_name = “MY” if language == “my” else “English”

keyboard = [[InlineKeyboardButton(“👨 Male”, callback_data=f”gender_{language}_male”),
InlineKeyboardButton(“👩 Female”, callback_data=f”gender_{language}_female”)]]

await query.edit_message_text(f”Select voice ({lang_name}):”, reply_markup=InlineKeyboardMarkup(keyboard))

elif data.startswith(“gender_”):
parts = data.split(“_”)
language = parts[1]
gender = parts[2]
user_sessions[user_id][“gender”] = gender
lang_name = “MY” if language == “my” else “English”
gender_name = “Male” if gender == “male” else “Female”

await query.edit_message_text(f”Generating {lang_name} script with {gender_name} voice…”)

try:
script = await generate_recap_script(
user_sessions[user_id][“video_base64”],
language,
user_sessions[user_id][“mime_type”]
)
user_sessions[user_id][“script”] = script

keyboard = [
[InlineKeyboardButton(“🎬 Merge Video”, callback_data=”merge_video”)],
[InlineKeyboardButton(“💰 Buy Tokens”, callback_data=”show_packages”)]
]

if len(script) > 4000:
for i in range(0, len(script), 4000):
await query.message.reply_text(f”Script Part {i//4000+1}:\n\n{script[i:i+4000]}”)
else:
await query.message.reply_text(f”Script:\n\n{script}”)

await query.message.reply_text(“Choose action:”, reply_markup=InlineKeyboardMarkup(keyboard))
await query.delete_message()

except Exception as e:
await query.edit_message_text(f”Error: {e}”)

elif data == “generate_audio”:
await query.edit_message_text(“Generating audio…”)

try:
script = user_sessions[user_id][“script”]
language = user_sessions[user_id][“language”]
gender = user_sessions[user_id][“gender”]

audio_path = await generate_audio_from_script(script, language, gender)
user_sessions[user_id][“audio_path”] = audio_path

with open(audio_path, ‘rb’) as f:
await query.message.reply_voice(voice=f)

await query.delete_message()

except Exception as e:
await query.edit_message_text(f”Error: {e}”)

elif data == “merge_video”:
video_path = None
audio_path = None
temp_video = None
watermarked_video = None
srt_path = None
final_video_path = None

try:
await query.edit_message_text(“⏳ Processing audio and script…”)

video_path = user_sessions[user_id][“video_path”]
script_content = user_sessions[user_id][“script”]

if “audio_path” not in user_sessions[user_id]:
language = user_sessions[user_id][“language”]
gender = user_sessions[user_id][“gender”]
audio_path = await generate_audio_from_script(script_content, language, gender)
user_sessions[user_id][“audio_path”] = audio_path
else:
audio_path = user_sessions[user_id][“audio_path”]

temp_video = f”temp_{uuid.uuid4().hex[:6]}.mp4″
watermarked_video = f”watermarked_{uuid.uuid4().hex[:6]}.mp4″
srt_path = f”sub_{user_id}_{uuid.uuid4().hex[:6]}.srt”

await query.edit_message_text(“🔊 Merging Audio with Video Layout…”)
replace_audio_in_video(video_path, audio_path, temp_video)

await query.edit_message_text(“🖼️ Adding Watermark…”)
add_watermark(temp_video, watermarked_video)

with open(srt_path, “w”, encoding=”utf-8″) as srt_file:
srt_file.write(script_content)

await query.edit_message_text(“🎬 Chrome & FFmpeg are burning Subtitles locally…”)

final_video_path = await local_chrome_render_subtitles(watermarked_video, srt_path, user_id)

if final_video_path and os.path.exists(final_video_path):
db.deduct_token(user_id)
user = db.get_user(user_id)

await query.edit_message_text(“🚀 Sending Final subtitled video…”)
with open(final_video_path, ‘rb’) as f:
await query.message.reply_document(
document=f,
filename=os.path.basename(final_video_path),
caption=f”🎬 Video Ready!\n1 token deducted\nRemaining: {user[‘tokens’]} tokens”
)
await query.delete_message()

if os.path.exists(final_video_path): os.remove(final_video_path)

else:
await query.message.reply_text(“❌ The video captioning process failed. Your token has not been redeemed.”)

finally:
for path in [audio_path, temp_video, watermarked_video, srt_path, final_video_path]:
if path and os.path.exists(path):
try:
os.unlink(path)
print(f”🗑️ Deleted temp path: {path}”)
except:
pass

if video_path and os.path.exists(video_path):
try:
os.unlink(video_path)
except:
pass

if user_id in user_sessions:
del user_sessions[user_id]

# ========== MAIN ==========
def main():
try:
subprocess.run([‘ffmpeg’, ‘-version’], capture_output=True, check=True)
print(“✅ FFmpeg found”)
except:
print(“❌ FFmpeg not found”)
return

print(“=” * 60)
print(“🎬 Bot Started”)
print(f”🔑 API Keys loaded: {len(GEMINI_API_KEYS)} keys (Round Robin)”)
print(“🔄 Key rotation: ENABLED”)
print(“🎙️ TTS: Edge-TTS”)
print(“💳 Token System: ENABLED”)
print(“=” * 60)

app = Application.builder().token(TELEGRAM_TOKEN).build()

app.add_handler(CommandHandler(“start”, start))
app.add_handler(MessageHandler(filters.VIDEO, handle_video))
app.add_handler(CallbackQueryHandler(button_callback))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_transaction_id))

app.run_polling()

if __name__ == “__main__”:
main()

Run your python

python your_python_bot.py

Step 3 – Create database.py ( in that folder)

Paste this code in your database.py:

import sqlite3
import json
import glob
from datetime import datetime, date

DB_PATH = “/root/your-folder/users.db”

def init_db():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()

# Users table
c.execute(”’CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
first_name TEXT,
last_name TEXT,
tokens INTEGER DEFAULT 1,
total_tokens_used INTEGER DEFAULT 0,
total_tokens_purchased INTEGER DEFAULT 1,
created_at TEXT,
last_used TEXT
)”’)

# Transactions table (completed)
c.execute(”’CREATE TABLE IF NOT EXISTS transactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
amount INTEGER,
tokens INTEGER,
package TEXT,
payment_method TEXT,
status TEXT,
transaction_id TEXT,
created_at TEXT
)”’)

# Pending transactions table
c.execute(”’CREATE TABLE IF NOT EXISTS pending_transactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
tokens INTEGER,
package TEXT,
amount INTEGER,
transaction_id TEXT,
status TEXT DEFAULT ‘pending’,
created_at TEXT
)”’)

# Daily usage table
c.execute(”’CREATE TABLE IF NOT EXISTS daily_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
date TEXT,
tokens_used INTEGER DEFAULT 0,
videos_processed INTEGER DEFAULT 0
)”’)

# Settings table
c.execute(”’CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT
)”’)

packages = {
“basic”: {“price”: 5, “tokens”: 5},
“standard”: {“price”: 10, “tokens”: 12},
“pro”: {“price”: 20, “tokens”: 28},
“heavy”: {“price”: 50, “tokens”: 80}
}

c.execute(“INSERT OR IGNORE INTO settings (key, value) VALUES (‘packages’, ?)”, (json.dumps(packages),))
c.execute(“INSERT OR IGNORE INTO settings (key, value) VALUES (‘free_tokens’, ‘1’)”)

conn.commit()
conn.close()
print(“✅ Database initialized”)

def get_user(user_id):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“SELECT * FROM users WHERE user_id = ?”, (user_id,))
user = c.fetchone()
conn.close()
if user:
return {
“user_id”: user[0],
“username”: user[1],
“first_name”: user[2],
“last_name”: user[3],
“tokens”: user[4],
“total_tokens_used”: user[5],
“total_tokens_purchased”: user[6],
“created_at”: user[7],
“last_used”: user[8]
}
return None

def create_user(user_id, username, first_name, last_name):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“SELECT value FROM settings WHERE key = ‘free_tokens'”)
free_tokens = int(c.fetchone()[0])
c.execute(“””INSERT INTO users
(user_id, username, first_name, last_name, tokens, total_tokens_used, total_tokens_purchased, created_at, last_used)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)”””,
(user_id, username, first_name, last_name, free_tokens, 0, free_tokens,
datetime.now().isoformat(), datetime.now().isoformat()))
conn.commit()
conn.close()

def deduct_token(user_id):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()

c.execute(“SELECT tokens FROM users WHERE user_id = ?”, (user_id,))
row = c.fetchone()
if not row or row[0] < 1:
conn.close()
return False

c.execute(“UPDATE users SET tokens = tokens – 1, total_tokens_used = total_tokens_used + 1, last_used = ? WHERE user_id = ?”,
(datetime.now().isoformat(), user_id))

today = date.today().isoformat()
c.execute(“SELECT id FROM daily_usage WHERE user_id = ? AND date = ?”, (user_id, today))
exists = c.fetchone()

if exists:
c.execute(“UPDATE daily_usage SET tokens_used = tokens_used + 1, videos_processed = videos_processed + 1 WHERE user_id = ? AND date = ?”,
(user_id, today))
else:
c.execute(“INSERT INTO daily_usage (user_id, date, tokens_used, videos_processed) VALUES (?, ?, 1, 1)”,
(user_id, today))

conn.commit()
conn.close()
return True

def add_tokens(user_id, amount, package, payment_method, transaction_id):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()

packages = get_packages()
price = packages.get(package, {}).get(“price”, 0)

c.execute(“UPDATE users SET tokens = tokens + ?, total_tokens_purchased = total_tokens_purchased + ? WHERE user_id = ?”,
(amount, amount, user_id))
c.execute(“””INSERT INTO transactions (user_id, amount, tokens, package, payment_method, status, transaction_id, created_at)
VALUES (?, ?, ?, ?, ?, ‘completed’, ?, ?)”””,
(user_id, price, amount, package, payment_method, transaction_id, datetime.now().isoformat()))

conn.commit()
conn.close()

def add_pending_transaction(user_id, tokens, package, amount, transaction_id):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“””INSERT INTO pending_transactions (user_id, tokens, package, amount, transaction_id, status, created_at)
VALUES (?, ?, ?, ?, ?, ‘pending’, ?)”””,
(user_id, tokens, package, amount, transaction_id, datetime.now().isoformat()))
conn.commit()
conn.close()

def confirm_pending_transaction(transaction_id):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()

c.execute(“SELECT user_id, tokens, package, amount FROM pending_transactions WHERE transaction_id = ? AND status = ‘pending'”, (transaction_id,))
pending = c.fetchone()

if pending:
user_id, tokens, package, amount = pending
c.execute(“UPDATE users SET tokens = tokens + ?, total_tokens_purchased = total_tokens_purchased + ? WHERE user_id = ?”,
(tokens, tokens, user_id))
c.execute(“””INSERT INTO transactions (user_id, amount, tokens, package, payment_method, status, transaction_id, created_at)
VALUES (?, ?, ?, ?, ‘manual’, ‘completed’, ?, ?)”””,
(user_id, amount, tokens, package, transaction_id, datetime.now().isoformat()))
c.execute(“UPDATE pending_transactions SET status = ‘completed’ WHERE transaction_id = ?”, (transaction_id,))
conn.commit()
conn.close()
return (True, user_id, tokens, package) # Return user info for notification
conn.close()
return (False, None, None, None)

conn.close()
return False

def get_pending_transactions():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“SELECT id, user_id, tokens, package, amount, transaction_id, created_at FROM pending_transactions WHERE status = ‘pending’ ORDER BY created_at DESC”)
pending = c.fetchall()
conn.close()
return pending

def get_packages():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“SELECT value FROM settings WHERE key = ‘packages'”)
result = c.fetchone()
conn.close()
if result:
return json.loads(result[0])
return {
“basic”: {“price”: 5, “tokens”: 5},
“standard”: {“price”: 10, “tokens”: 12},
“pro”: {“price”: 20, “tokens”: 28},
“heavy”: {“price”: 50, “tokens”: 80}
}

def get_all_users():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“SELECT user_id, username, first_name, tokens, total_tokens_used FROM users”)
users = c.fetchall()
conn.close()
return users

def get_total_stats():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“SELECT COUNT(*) FROM users”)
total_users = c.fetchone()[0]
c.execute(“SELECT SUM(total_tokens_used) FROM users”)
total_used = c.fetchone()[0] or 0
c.execute(“SELECT SUM(amount) FROM transactions WHERE status = ‘completed'”)
total_revenue = c.fetchone()[0] or 0
conn.close()
return {“total_users”: total_users, “total_tokens_used”: total_used, “total_revenue”: total_revenue}

def get_daily_stats():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
today = date.today().isoformat()
c.execute(“SELECT SUM(tokens_used), SUM(videos_processed) FROM daily_usage WHERE date = ?”, (today,))
result = c.fetchone()
conn.close()
return {“today_tokens”: result[0] or 0, “today_videos”: result[1] or 0}

def get_pending_count():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“SELECT COUNT(*) FROM pending_transactions WHERE status = ‘pending'”)
count = c.fetchone()[0]
conn.close()
return count

def cleanup_temp_files():
“””Delete all temp files on startup”””
temp_files = glob.glob(“/root/your-folder/*.mp3”) + glob.glob(“/root/tts-bot/*.mp4”)
temp_files += glob.glob(“/root/your-folder/temp_*.mp4”) + glob.glob(“/root/your-folder/final_*.mp4”)
temp_files += glob.glob(“/root/your-folder/audio_*.mp3″)

for f in temp_files:
try:
os.unlink(f)
print(f”🗑️ Cleaned up: {f}”)
except:
pass

# Call this at startup
cleanup_temp_files()

# Initialize
init_db()
print(“✅ Database ready”)

Run your Database.py

python databse.py

Step 4 – Create admin_panel.py

paste this code in your admin_panel.py

from flask import Flask, render_template, request, jsonify
import database as db
import sqlite3
import requests

app = Flask(__name__)

TELEGRAM_TOKEN = “Paste your bot TOKEN”

DB_PATH = “/root/your_folder/users.db”

@app.route(‘/’)
def admin():
return render_template(‘admin.html’)

@app.route(‘/api/stats’)
def stats():
total = db.get_total_stats()
daily = db.get_daily_stats()
pending_count = db.get_pending_count()
return jsonify({**total, **daily, “pending_count”: pending_count})

@app.route(‘/api/users’)
def users():
users = db.get_all_users()
return jsonify([{“user_id”: u[0], “username”: u[1], “first_name”: u[2], “tokens”: u[3], “total_used”: u[4]} for u in users])

@app.route(‘/api/pending’)
def pending():
pending = db.get_pending_transactions()
return jsonify([{
“id”: p[0],
“user_id”: p[1],
“tokens”: p[2],
“package”: p[3],
“amount”: p[4],
“transaction_id”: p[5],
“created_at”: p[6]
} for p in pending])

@app.route(‘/api/confirm’, methods=[‘POST’])
def confirm():
data = request.json
transaction_id = data.get(‘transaction_id’)
action = data.get(‘action’, ‘confirm’) # ‘confirm’ or ‘reject’

# Get pending transaction details
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“SELECT user_id, tokens, package FROM pending_transactions WHERE transaction_id = ? AND status = ‘pending'”, (transaction_id,))
pending = c.fetchone()
conn.close()

if not pending:
return jsonify({“success”: False, “error”: “Transaction not found”})

user_id, tokens, package = pending

if action == ‘confirm’:
# Confirm transaction
if db.confirm_pending_transaction(transaction_id):
# Send notification to user
try:
url = f”https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage”
payload = {
“chat_id”: user_id,
“text”: f”✅ *Payment Confirmed!*\n\n📦 Package: *{package.upper()}*\n🎫 Tokens: *{tokens} tokens added*\n\n💰 Use /start to check balance”,
“parse_mode”: “Markdown”
}
requests.post(url, json=payload, timeout=10)
print(f”✅ Notification sent to user {user_id}”)
except Exception as e:
print(f”Notification error: {e}”)
return jsonify({“success”: True, “action”: “confirmed”})

elif action == ‘reject’:
# Reject transaction – delete from pending
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“UPDATE pending_transactions SET status = ‘rejected’ WHERE transaction_id = ?”, (transaction_id,))
conn.commit()
conn.close()

# Send rejection notification
try:
url = f”https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage”
payload = {
“chat_id”: user_id,
“text”: f”❌ *Payment Rejected!*\n\n📦 Package: *{package.upper()}*\n🔍 Transaction ID: `{transaction_id}`\n\n⚠️ Please check your payment and try again with correct transaction ID.”,
“parse_mode”: “Markdown”
}
requests.post(url, json=payload, timeout=10)
print(f”❌ Rejection notification sent to user {user_id}”)
except Exception as e:
print(f”Notification error: {e}”)
return jsonify({“success”: True, “action”: “rejected”})

return jsonify({“success”: False, “error”: “Unknown action”})

@app.route(‘/api/add_tokens’, methods=[‘POST’])
def add_tokens():
data = request.json
db.add_tokens(data[‘user_id’], data[‘tokens’], “admin”, “admin”, f”admin_{data[‘user_id’]}_{data[‘tokens’]}”)
return jsonify({“success”: True})

if __name__ == ‘__main__’:
app.run(host=’0.0.0.0′, port=5000, debug=False)

Run this python

admin_panel.py

Step 5 – Create admin.html on port 5000 or as you like

Paste this code on your admin.html

<!DOCTYPE html>
<html>
<head>
<title>Voice Cover Admin</title>
<style>
body { font-family: Arial; background: #1a1a2e; color: white; padding: 20px; }
.stats { display: flex; gap: 20px; margin-bottom: 30px; }
.card { background: rgba(255,255,255,0.1); padding: 20px; border-radius: 10px; }
table { width: 100%; border-collapse: collapse; }
th, td { padding: 10px; text-align: left; border-bottom: 1px solid #333; }
button { background: #00d4ff; border: none; padding: 5px 15px; border-radius: 5px; cursor: pointer; }
input { padding: 5px; margin-right: 10px; }
</style>
</head>
<body>
<h1>🎬 Voice Cover Admin</h1>
<div class=”stats” id=”stats”></div>
<h2>Users</h2>
<table id=”users”>
<thead><tr><th>ID</th><th>Username</th><th>Name</th><th>Tokens</th><th>Used</th><th>Action</th></tr></thead>
<tbody></tbody>
</table>
<script>
fetch(‘/api/stats’).then(r=>r.json()).then(d=>{
document.getElementById(‘stats’).innerHTML = `
<div class=’card’><h3>Total Users</h3><h2>${d.total_users}</h2></div>
<div class=’card’><h3>Tokens Used</h3><h2>${d.total_tokens_used}</h2></div>
<div class=’card’><h3>Revenue ($)</h3><h2>${d.total_revenue}</h2></div>
`;
});

fetch(‘/api/users’).then(r=>r.json()).then(users=>{
const tbody = document.querySelector(‘#users tbody’);
users.forEach(u=>{
const row = tbody.insertRow();
row.insertCell(0).innerText = u.user_id;
row.insertCell(1).innerText = u.username || ‘-‘;
row.insertCell(2).innerText = u.first_name || ”;
row.insertCell(3).innerText = u.tokens;
row.insertCell(4).innerText = u.total_used;
row.insertCell(5).innerHTML = `<input type=’number’ id=’tokens_${u.user_id}’ placeholder=’Amount’><button onclick=’addTokens(${u.user_id})’>Add</button>`;
});
});

async function addTokens(userId) {
const tokens = document.getElementById(`tokens_${userId}`).value;
if(!tokens) return;
await fetch(‘/api/add_tokens’, {method:’POST’, headers:{‘Content-Type’:’application/json’}, body:JSON.stringify({user_id:userId, tokens:parseInt(tokens)})});
alert(‘Added!’);
location.reload();
}
</script>
</body>
</html>

 

Leave a Reply

Your email address will not be published. Required fields are marked *