In this step-by-step tutorial, I’ll show you how to build a Movie Recap Telegram Bot that sends movie recaps directly to Telegram users. You already have your own code — and here you’ll learn where to place it and how to make everything work.
What You’ll Need
- A Telegram Bot Token (get it from @BotFather)
- Hosting (local server, VPS, or free cloud hosting)
- Python,
Step 1 – Get Your Telegram Bot Token
Open Telegram, search for BotFather, and send /newbot. Follow the instructions. Once done, you’ll receive a token. Save it — you’ll need it inside your code.
Step 2 – Setup your VPS server
- Create folder in your SSH
- Create new Virtual Environment
- Create python and paste the following code
import os
import re
import uuid
import base64
import tempfile
import subprocess
from playwright.async_api import async_playwright
import asyncio
import itertools
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackQueryHandler, ContextTypes
import edge_tts
from google import genai
from google.genai import types
import database as db
import requests# ========== CONFIGURATION ==========
TELEGRAM_TOKEN = “YOUR TELEGRAM BOT TOKEN”GEMINI_API_KEYS = [
“YOUR API 1”,
“YOUR API 2”,
“YOUR API 3”,
“YOUR API 4″,
]key_cycle = itertools.cycle(GEMINI_API_KEYS)
current_key = next(key_cycle)def get_next_api_key():
global current_key
current_key = next(key_cycle)
print(f”🔄 Using API key: {current_key[:10]}…”)
return current_keySCRIPT_MODEL = “gemini-2.5-flash-lite”
VOICES = {
“en”: {“male”: “en-US-GuyNeural”, “female”: “en-US-JennyNeural”}
}user_sessions = {}
# ========== Helper Functions ==========
def split_script_into_chunks(script: str, max_chars: int = 500) -> list:
sentences = script.replace(‘။’, ‘။\n’).split(‘\n’)
chunks = []
current_chunk = “”
for sentence in sentences:
if not sentence.strip():
continue
if len(current_chunk) + len(sentence) <= max_chars:
current_chunk += sentence
else:
if current_chunk:
chunks.append(current_chunk)
current_chunk = sentence
if current_chunk:
chunks.append(current_chunk)
return chunks if chunks else [script]def get_duration(file_path):
try:
result = subprocess.run(
[‘ffprobe’, ‘-v’, ‘error’, ‘-show_entries’, ‘format=duration’,
‘-of’, ‘default=noprint_wrappers=1:nokey=1’, file_path],
capture_output=True, text=True, timeout=30
)
output = result.stdout.strip()
if output and output != ‘N/A’:
return float(output)
return 0.0
except:
return 0.0def replace_audio_in_video(video_path: str, audio_path: str, output_path: str) -> str:
video_duration = get_duration(video_path)
audio_duration = get_duration(audio_path)if video_duration >= audio_duration:
cmd = [‘ffmpeg’, ‘-i’, video_path, ‘-stream_loop’, ‘-1’, ‘-i’, audio_path,
‘-c:v’, ‘copy’, ‘-c:a’, ‘aac’, ‘-map’, ‘0:v:0’, ‘-map’, ‘1:a:0’,
‘-t’, str(video_duration), ‘-y’, output_path]
else:
cmd = [‘ffmpeg’, ‘-stream_loop’, ‘-1’, ‘-i’, video_path, ‘-i’, audio_path,
‘-c:v’, ‘libx264’, ‘-c:a’, ‘aac’, ‘-map’, ‘0:v:0’, ‘-map’, ‘1:a:0’,
‘-t’, str(audio_duration), ‘-y’, output_path]
subprocess.run(cmd, check=True, capture_output=True)
return output_pathdef add_watermark(video_path: str, output_path: str) -> str:
watermark_text = “AMC Voice Cover”filter_str = f”drawtext=text='{watermark_text}’:fontcolor=white:fontsize=20:box=1:[email protected]:boxborderw=5:x=w-tw-10:y=h-th-10″
cmd = [‘ffmpeg’, ‘-i’, video_path, ‘-vf’, filter_str, ‘-c:a’, ‘copy’, ‘-y’, output_path]
subprocess.run(cmd, check=True)
return output_pathasync def local_chrome_render_subtitles(video_path, srt_path, user_id):
import json
current_dir = os.path.dirname(os.path.abspath(__file__)) if ‘__file__’ in locals() else os.getcwd()try:
cmd = [
‘ffprobe’, ‘-v’, ‘error’, ‘-select_streams’, ‘v:0’,
‘-show_entries’, ‘stream=width,height’, ‘-of’, ‘json’, video_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
probe_data = json.loads(result.stdout)
width = probe_data[‘streams’][0][‘width’]
height = probe_data[‘streams’][0][‘height’]
except Exception:
width, height = 1280, 720unique_id = f”{user_id}_{uuid.uuid4().hex[:6]}”
output_video = os.path.join(current_dir, f”final_subtitled_{unique_id}.mp4″)
temp_html = os.path.join(current_dir, f”temp_{unique_id}.html”)
temp_mp4 = os.path.join(current_dir, f”temp_{unique_id}.mp4″)# SRT Function
def srt_time_to_seconds(time_str):
hours, minutes, seconds = time_str.strip().split(‘:’)
seconds, milliseconds = seconds.split(‘,’)
return int(hours) * 3600 + int(minutes) * 60 + int(seconds) + int(milliseconds) / 1000.0print(f”⏳ Reading SRT for Local Rendering (User: {user_id})…”)
try:
with open(srt_path, ‘r’, encoding=’utf-8′) as f:
content = f.read().strip()blocks = content.split(‘\n\n’)
subtitles = []for block in blocks:
lines = block.split(‘\n’)
if len(lines) >= 3:
time_match = re.match(r'(\d+:\d+:\d+,\d+)\s*–>\s*(\d+:\d+:\d+,\d+)’, lines[1])
if time_match:
start_sec = srt_time_to_seconds(time_match.group(1))
end_sec = srt_time_to_seconds(time_match.group(2))
text = ” “.join(lines[2:])
subtitles.append({‘start’: start_sec, ‘end’: end_sec, ‘text’: text})# 🎯 9:16 ,16:9 Responsive CSS
html_content = “””
<!DOCTYPE html>
<html>
<head>
<meta charset=”UTF-8″>
<style>
@font-face {
font-family: ‘PyidaungsuBold’;
src: url(‘file:///usr/share/fonts/truetype/Pyidaungsu-Bold.ttf’) format(‘truetype’);
}
html, body {
margin: 0; padding: 0; background: #000000;
width: 100%; height: 100%; overflow: hidden;
font-family: ‘PyidaungsuBold’, sans-serif;
}
.subtitle-container {
position: absolute;
bottom: 8%;
left: 5%;
width: 90%;
text-align: center;
font-size: 4.8vw;
font-weight: bold;
color: #FFFFFF;
text-shadow: 3px 3px 6px rgba(0, 0, 0, 1), -3px -3px 6px rgba(0, 0, 0, 1);
word-wrap: break-word;
word-break: normal;
white-space: normal;
}
</style>
</head>
<body>
<div class=”subtitle-container” id=”subtitles”></div>
<script>
const subs = “”” + str(subtitles) + “””;
const container = document.getElementById(‘subtitles’);function render(time) {
let activeSub = subs.find(s => time >= s.start && time <= s.end);
if (activeSub) {
container.innerText = activeSub.text;
} else {
container.innerHTML = ”;
}
}
</script>
</body>
</html>
“””with open(temp_html, ‘w’, encoding=’utf-8′) as f:
f.write(html_content)print(f”🚀 Launching Headless Chrome for User: {user_id}…”)
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True, args=[“–no-sandbox”, “–disable-setuid-sandbox”])# 🎯 Playwright Viewport Size
if height > width:
current_viewport = {“width”: 720, “height”: 1280}
else:
current_viewport = {“width”: 1280, “height”: 720}page = await browser.new_page(viewport=current_viewport)
#…page.goto, render
file_url = f”file://{os.path.abspath(temp_html)}”
await page.goto(file_url)cmd = f”ffmpeg -f image2pipe -vcodec png -r 25 -i – -vcodec libx264 -pix_fmt yuv420p \”{temp_mp4}\” -y”
process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE)total_duration = subtitles[-1][‘end’] if subtitles else 10
frames = int(total_duration * 25)for frame in range(frames):
current_time = frame / 25.0
await page.evaluate(f”render({current_time})”)
screenshot = await page.screenshot(type=”png”)
process.stdin.write(screenshot)process.stdin.close()
process.wait()
await browser.close()print(f”🎬 FFmpeg Removing Black Screen & Merging Video for User: {user_id}…”)
final_cmd = f”ffmpeg -i \”{video_path}\” -i \”{temp_mp4}\” -filter_complex \”[1:v]colorkey=0x000000:0.1:0.1[sub_trans];[0:v][sub_trans]overlay=0:0\” -c:a copy \”{output_video}\” -y”
subprocess.run(final_cmd, shell=True, check=True)print(f”✅ Local Rendering Successful: {output_video}”)
return output_videoexcept Exception as e:
print(f”❌ Local Rendering failed: {e}”)
return Nonefinally:
if os.path.exists(temp_html): os.remove(temp_html)
if os.path.exists(temp_mp4): os.remove(temp_mp4)# ========== Gemini Functions ==========
async def generate_recap_script(video_data_base64: str, language: str, mime_type: str = “video/mp4”) -> str:
prompt = “””
You are a professional video recap scriptwriter.
Analyze this video in detail and write an engaging story/recap voiceover script based on the visual events of the video in “English”.[CRITICAL RULES]
1. Do NOT output headings like “Introduction”, “Middle”, “End”, or any conversational text.
2. Do NOT just transcribe on-screen text. Describe the story and whole events happening in the video smoothly.
3. The final output MUST be strictly formatted as a valid SRT subtitle file. Estimate the timelines to match the video duration.[SRT Format Example]
1
00:00:01,000 –> 00:00:05,000
Once upon a time, there was a young shepherd boy living in a peaceful village.2
00:00:05,200 –> 00:00:10,000
Every day, he took his sheep up to the hills to find fresh grass.Output ONLY the raw SRT content. Do not include any other markdown or notes.
“””for attempt in range(len(GEMINI_API_KEYS) * 2):
try:
print(f”🎯 Attempt {attempt+1}: Using API key: {current_key[:15]}…”)
client = genai.Client(api_key=current_key)response = await asyncio.wait_for(
asyncio.to_thread(
client.models.generate_content,
model=SCRIPT_MODEL,
contents=[
prompt,
types.Part.from_bytes(
data=base64.b64decode(video_data_base64),
mime_type=mime_type
)
],
config=types.GenerateContentConfig(
temperature=0.7,
max_output_tokens=8192,
top_p=0.95
)
),
timeout=120.0
)clean_text = response.text.replace(““`srt”, “”).replace(““`”, “”).strip()
return clean_textexcept (asyncio.exceptions.TimeoutError, TimeoutError):
print(f”⚠️ Timeout on attempt {attempt+1}, retrying…”)
await asyncio.sleep(3)
continue
except Exception as e:
if “429” in str(e):
print(f”⚠️ Rate limit on {current_key[:15]}… switching”)
get_next_api_key()
await asyncio.sleep(2)
continue
raise e
raise Exception(“All API keys exhausted”)def clean_srt_for_tts(srt_content: str) -> str:
lines = srt_content.split(‘\n’)
cleaned_lines = []
for line in lines:
line_str = line.strip()
if not line_str:
continue
if line_str.isdigit():
continue
if “–>” in line_str:
continue
cleaned_lines.append(line_str)
return ” “.join(cleaned_lines)async def generate_audio_from_script(script: str, language: str, gender: str) -> str:
clean_text_only = clean_srt_for_tts(script)
chunks = split_script_into_chunks(clean_text_only, 500)
voice = VOICES[language][gender]all_audio_paths = []
for chunk in chunks:
communicate = edge_tts.Communicate(chunk, voice, rate=”+0%”)
audio_file = tempfile.NamedTemporaryFile(delete=False, suffix=”.mp3″)
audio_file.close()
await communicate.save(audio_file.name)
all_audio_paths.append(audio_file.name)if len(all_audio_paths) == 1:
return all_audio_paths[0]final_audio = tempfile.NamedTemporaryFile(delete=False, suffix=”.mp3″)
final_audio.close()
concat_file = tempfile.NamedTemporaryFile(mode=’w’, delete=False, suffix=”.txt”)
for path in all_audio_paths:
concat_file.write(f”file ‘{path}’\n”)
concat_file.close()cmd = [‘ffmpeg’, ‘-f’, ‘concat’, ‘-safe’, ‘0’, ‘-i’, concat_file.name, ‘-c’, ‘copy’, ‘-y’, final_audio.name]
subprocess.run(cmd, check=True, capture_output=True)for path in all_audio_paths:
os.unlink(path)
os.unlink(concat_file.name)
return final_audio.namedef get_package_keyboard():
packages = db.get_packages()
keyboard = []
row = []
for i, (name, pkg) in enumerate(packages.items()):
row.append(InlineKeyboardButton(f”💰 {name.upper()} – {pkg[‘price’]:,}Ks”, callback_data=f”buy_{name}”))
if (i + 1) % 2 == 0:
keyboard.append(row)
row = []
if row:
keyboard.append(row)
return InlineKeyboardMarkup(keyboard)# ========== Telegram Handlers ==========
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
username = update.effective_user.username
first_name = update.effective_user.first_name
last_name = update.effective_user.last_nameuser = db.get_user(user_id)
if not user:
db.create_user(user_id, username, first_name, last_name)
user = db.get_user(user_id)await update.message.reply_text(
f”🎬 *AMC Voice Cover*\n\n👋 Hello {first_name}!\n💰 you have Token `{user[‘tokens’]}` left.\n\n📹 If you send Video / I will out Script for you and merge in your video.\n\n📹 1 Video ‘1’ Token .”,
reply_markup=get_package_keyboard(),
parse_mode=’Markdown’
)async def buy_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()package_name = query.data.replace(“buy_”, “”)
packages = db.get_packages()if package_name not in packages:
await query.message.reply_text(“❌ Package not found”)
returnpkg = packages[package_name]
context.user_data[‘pending_package’] = package_name
context.user_data[‘pending_tokens’] = pkg[‘tokens’]
context.user_data[‘pending_amount’] = pkg[‘price’]
context.user_data[‘waiting_for_transaction_id’] = Trueawait query.message.reply_text(
f”💳 *Payment Instructions*\n\n📦 Package: {package_name.upper()}\n💰 Price: {pkg[‘price’]:,} $\n🎫 Tokens: {pkg[‘tokens’]} tokens\n✅ Type your Transaction ID :”,
parse_mode=’Markdown’
)async def handle_transaction_id(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
message_text = update.message.text.strip()if not context.user_data.get(‘waiting_for_transaction_id’):
returnpackage_name = context.user_data.get(‘pending_package’)
tokens = context.user_data.get(‘pending_tokens’)
amount = context.user_data.get(‘pending_amount’)if not package_name:
await update.message.reply_text(“Session expired. Use /start”)
context.user_data[‘waiting_for_transaction_id’] = False
returndb.add_pending_transaction(user_id, tokens, package_name, amount, message_text)
context.user_data[‘waiting_for_transaction_id’] = False
context.user_data[‘pending_package’] = None
context.user_data[‘pending_tokens’] = None
context.user_data[‘pending_amount’] = Noneuser = db.get_user(user_id)
await update.message.reply_text(
f”✅ Payment Checking! Pending confirmation.\nTransaction ID: {message_text}\nBalance: {user[‘tokens’]} tokens”,
parse_mode=’Markdown’
)async def handle_video(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
video = update.message.videoif video.file_size > 10 * 1024 * 1024:
await update.message.reply_text(“Video must be under 10MB”)
returnuser = db.get_user(user_id)
if not user or user[“tokens”] < 1:
await update.message.reply_text(“Insufficient tokens! Use /start to buy.”, reply_markup=get_package_keyboard())
returnprocessing_msg = await update.message.reply_text(“Processing…”)
try:
video_file = await video.get_file()
with tempfile.NamedTemporaryFile(delete=False, suffix=”.mp4″) as tmp:
await video_file.download_to_drive(tmp.name)
video_path = tmp.namewith open(video_path, “rb”) as f:
video_base64 = base64.b64encode(f.read()).decode()user_sessions[user_id] = {
“video_base64”: video_base64,
“video_path”: video_path,
“mime_type”: video.mime_type
}keyboard = [[InlineKeyboardButton(“🇲🇲 မြန်မာ”, callback_data=”lang_my”),
InlineKeyboardButton(“🇺🇸 English”, callback_data=”lang_en”)]]await processing_msg.edit_text(“Select language:”, reply_markup=InlineKeyboardMarkup(keyboard))
except Exception as e:
await processing_msg.edit_text(f”Error: {e}”)async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()user_id = update.effective_user.id
data = query.dataif data.startswith(“buy_”):
await buy_callback(update, context)
returnif user_id not in user_sessions:
await query.message.reply_text(“Session expired. Send new video.”, reply_markup=get_package_keyboard())
returnif data.startswith(“lang_”):
language = data.split(“_”)[1]
user_sessions[user_id][“language”] = language
lang_name = “MY” if language == “my” else “English”keyboard = [[InlineKeyboardButton(“👨 Male”, callback_data=f”gender_{language}_male”),
InlineKeyboardButton(“👩 Female”, callback_data=f”gender_{language}_female”)]]await query.edit_message_text(f”Select voice ({lang_name}):”, reply_markup=InlineKeyboardMarkup(keyboard))
elif data.startswith(“gender_”):
parts = data.split(“_”)
language = parts[1]
gender = parts[2]
user_sessions[user_id][“gender”] = gender
lang_name = “MY” if language == “my” else “English”
gender_name = “Male” if gender == “male” else “Female”await query.edit_message_text(f”Generating {lang_name} script with {gender_name} voice…”)
try:
script = await generate_recap_script(
user_sessions[user_id][“video_base64”],
language,
user_sessions[user_id][“mime_type”]
)
user_sessions[user_id][“script”] = scriptkeyboard = [
[InlineKeyboardButton(“🎬 Merge Video”, callback_data=”merge_video”)],
[InlineKeyboardButton(“💰 Buy Tokens”, callback_data=”show_packages”)]
]if len(script) > 4000:
for i in range(0, len(script), 4000):
await query.message.reply_text(f”Script Part {i//4000+1}:\n\n{script[i:i+4000]}”)
else:
await query.message.reply_text(f”Script:\n\n{script}”)await query.message.reply_text(“Choose action:”, reply_markup=InlineKeyboardMarkup(keyboard))
await query.delete_message()except Exception as e:
await query.edit_message_text(f”Error: {e}”)elif data == “generate_audio”:
await query.edit_message_text(“Generating audio…”)try:
script = user_sessions[user_id][“script”]
language = user_sessions[user_id][“language”]
gender = user_sessions[user_id][“gender”]audio_path = await generate_audio_from_script(script, language, gender)
user_sessions[user_id][“audio_path”] = audio_pathwith open(audio_path, ‘rb’) as f:
await query.message.reply_voice(voice=f)await query.delete_message()
except Exception as e:
await query.edit_message_text(f”Error: {e}”)elif data == “merge_video”:
video_path = None
audio_path = None
temp_video = None
watermarked_video = None
srt_path = None
final_video_path = Nonetry:
await query.edit_message_text(“⏳ Processing audio and script…”)video_path = user_sessions[user_id][“video_path”]
script_content = user_sessions[user_id][“script”]if “audio_path” not in user_sessions[user_id]:
language = user_sessions[user_id][“language”]
gender = user_sessions[user_id][“gender”]
audio_path = await generate_audio_from_script(script_content, language, gender)
user_sessions[user_id][“audio_path”] = audio_path
else:
audio_path = user_sessions[user_id][“audio_path”]temp_video = f”temp_{uuid.uuid4().hex[:6]}.mp4″
watermarked_video = f”watermarked_{uuid.uuid4().hex[:6]}.mp4″
srt_path = f”sub_{user_id}_{uuid.uuid4().hex[:6]}.srt”await query.edit_message_text(“🔊 Merging Audio with Video Layout…”)
replace_audio_in_video(video_path, audio_path, temp_video)await query.edit_message_text(“🖼️ Adding Watermark…”)
add_watermark(temp_video, watermarked_video)with open(srt_path, “w”, encoding=”utf-8″) as srt_file:
srt_file.write(script_content)await query.edit_message_text(“🎬 Chrome & FFmpeg are burning Subtitles locally…”)
final_video_path = await local_chrome_render_subtitles(watermarked_video, srt_path, user_id)
if final_video_path and os.path.exists(final_video_path):
db.deduct_token(user_id)
user = db.get_user(user_id)await query.edit_message_text(“🚀 Sending Final subtitled video…”)
with open(final_video_path, ‘rb’) as f:
await query.message.reply_document(
document=f,
filename=os.path.basename(final_video_path),
caption=f”🎬 Video Ready!\n1 token deducted\nRemaining: {user[‘tokens’]} tokens”
)
await query.delete_message()if os.path.exists(final_video_path): os.remove(final_video_path)
else:
await query.message.reply_text(“❌ The video captioning process failed. Your token has not been redeemed.”)finally:
for path in [audio_path, temp_video, watermarked_video, srt_path, final_video_path]:
if path and os.path.exists(path):
try:
os.unlink(path)
print(f”🗑️ Deleted temp path: {path}”)
except:
passif video_path and os.path.exists(video_path):
try:
os.unlink(video_path)
except:
passif user_id in user_sessions:
del user_sessions[user_id]# ========== MAIN ==========
def main():
try:
subprocess.run([‘ffmpeg’, ‘-version’], capture_output=True, check=True)
print(“✅ FFmpeg found”)
except:
print(“❌ FFmpeg not found”)
returnprint(“=” * 60)
print(“🎬 Bot Started”)
print(f”🔑 API Keys loaded: {len(GEMINI_API_KEYS)} keys (Round Robin)”)
print(“🔄 Key rotation: ENABLED”)
print(“🎙️ TTS: Edge-TTS”)
print(“💳 Token System: ENABLED”)
print(“=” * 60)app = Application.builder().token(TELEGRAM_TOKEN).build()
app.add_handler(CommandHandler(“start”, start))
app.add_handler(MessageHandler(filters.VIDEO, handle_video))
app.add_handler(CallbackQueryHandler(button_callback))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_transaction_id))app.run_polling()
if __name__ == “__main__”:
main()
Run your python
python your_python_bot.py
Step 3 – Create database.py ( in that folder)
Paste this code in your database.py:
import sqlite3
import json
import glob
from datetime import datetime, dateDB_PATH = “/root/your-folder/users.db”
def init_db():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()# Users table
c.execute(”’CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
first_name TEXT,
last_name TEXT,
tokens INTEGER DEFAULT 1,
total_tokens_used INTEGER DEFAULT 0,
total_tokens_purchased INTEGER DEFAULT 1,
created_at TEXT,
last_used TEXT
)”’)# Transactions table (completed)
c.execute(”’CREATE TABLE IF NOT EXISTS transactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
amount INTEGER,
tokens INTEGER,
package TEXT,
payment_method TEXT,
status TEXT,
transaction_id TEXT,
created_at TEXT
)”’)# Pending transactions table
c.execute(”’CREATE TABLE IF NOT EXISTS pending_transactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
tokens INTEGER,
package TEXT,
amount INTEGER,
transaction_id TEXT,
status TEXT DEFAULT ‘pending’,
created_at TEXT
)”’)# Daily usage table
c.execute(”’CREATE TABLE IF NOT EXISTS daily_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
date TEXT,
tokens_used INTEGER DEFAULT 0,
videos_processed INTEGER DEFAULT 0
)”’)# Settings table
c.execute(”’CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT
)”’)packages = {
“basic”: {“price”: 5, “tokens”: 5},
“standard”: {“price”: 10, “tokens”: 12},
“pro”: {“price”: 20, “tokens”: 28},
“heavy”: {“price”: 50, “tokens”: 80}
}c.execute(“INSERT OR IGNORE INTO settings (key, value) VALUES (‘packages’, ?)”, (json.dumps(packages),))
c.execute(“INSERT OR IGNORE INTO settings (key, value) VALUES (‘free_tokens’, ‘1’)”)conn.commit()
conn.close()
print(“✅ Database initialized”)def get_user(user_id):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“SELECT * FROM users WHERE user_id = ?”, (user_id,))
user = c.fetchone()
conn.close()
if user:
return {
“user_id”: user[0],
“username”: user[1],
“first_name”: user[2],
“last_name”: user[3],
“tokens”: user[4],
“total_tokens_used”: user[5],
“total_tokens_purchased”: user[6],
“created_at”: user[7],
“last_used”: user[8]
}
return Nonedef create_user(user_id, username, first_name, last_name):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“SELECT value FROM settings WHERE key = ‘free_tokens'”)
free_tokens = int(c.fetchone()[0])
c.execute(“””INSERT INTO users
(user_id, username, first_name, last_name, tokens, total_tokens_used, total_tokens_purchased, created_at, last_used)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)”””,
(user_id, username, first_name, last_name, free_tokens, 0, free_tokens,
datetime.now().isoformat(), datetime.now().isoformat()))
conn.commit()
conn.close()def deduct_token(user_id):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()c.execute(“SELECT tokens FROM users WHERE user_id = ?”, (user_id,))
row = c.fetchone()
if not row or row[0] < 1:
conn.close()
return Falsec.execute(“UPDATE users SET tokens = tokens – 1, total_tokens_used = total_tokens_used + 1, last_used = ? WHERE user_id = ?”,
(datetime.now().isoformat(), user_id))today = date.today().isoformat()
c.execute(“SELECT id FROM daily_usage WHERE user_id = ? AND date = ?”, (user_id, today))
exists = c.fetchone()if exists:
c.execute(“UPDATE daily_usage SET tokens_used = tokens_used + 1, videos_processed = videos_processed + 1 WHERE user_id = ? AND date = ?”,
(user_id, today))
else:
c.execute(“INSERT INTO daily_usage (user_id, date, tokens_used, videos_processed) VALUES (?, ?, 1, 1)”,
(user_id, today))conn.commit()
conn.close()
return Truedef add_tokens(user_id, amount, package, payment_method, transaction_id):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()packages = get_packages()
price = packages.get(package, {}).get(“price”, 0)c.execute(“UPDATE users SET tokens = tokens + ?, total_tokens_purchased = total_tokens_purchased + ? WHERE user_id = ?”,
(amount, amount, user_id))
c.execute(“””INSERT INTO transactions (user_id, amount, tokens, package, payment_method, status, transaction_id, created_at)
VALUES (?, ?, ?, ?, ?, ‘completed’, ?, ?)”””,
(user_id, price, amount, package, payment_method, transaction_id, datetime.now().isoformat()))conn.commit()
conn.close()def add_pending_transaction(user_id, tokens, package, amount, transaction_id):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“””INSERT INTO pending_transactions (user_id, tokens, package, amount, transaction_id, status, created_at)
VALUES (?, ?, ?, ?, ?, ‘pending’, ?)”””,
(user_id, tokens, package, amount, transaction_id, datetime.now().isoformat()))
conn.commit()
conn.close()def confirm_pending_transaction(transaction_id):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()c.execute(“SELECT user_id, tokens, package, amount FROM pending_transactions WHERE transaction_id = ? AND status = ‘pending'”, (transaction_id,))
pending = c.fetchone()if pending:
user_id, tokens, package, amount = pending
c.execute(“UPDATE users SET tokens = tokens + ?, total_tokens_purchased = total_tokens_purchased + ? WHERE user_id = ?”,
(tokens, tokens, user_id))
c.execute(“””INSERT INTO transactions (user_id, amount, tokens, package, payment_method, status, transaction_id, created_at)
VALUES (?, ?, ?, ?, ‘manual’, ‘completed’, ?, ?)”””,
(user_id, amount, tokens, package, transaction_id, datetime.now().isoformat()))
c.execute(“UPDATE pending_transactions SET status = ‘completed’ WHERE transaction_id = ?”, (transaction_id,))
conn.commit()
conn.close()
return (True, user_id, tokens, package) # Return user info for notification
conn.close()
return (False, None, None, None)conn.close()
return Falsedef get_pending_transactions():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“SELECT id, user_id, tokens, package, amount, transaction_id, created_at FROM pending_transactions WHERE status = ‘pending’ ORDER BY created_at DESC”)
pending = c.fetchall()
conn.close()
return pendingdef get_packages():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“SELECT value FROM settings WHERE key = ‘packages'”)
result = c.fetchone()
conn.close()
if result:
return json.loads(result[0])
return {
“basic”: {“price”: 5, “tokens”: 5},
“standard”: {“price”: 10, “tokens”: 12},
“pro”: {“price”: 20, “tokens”: 28},
“heavy”: {“price”: 50, “tokens”: 80}
}def get_all_users():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“SELECT user_id, username, first_name, tokens, total_tokens_used FROM users”)
users = c.fetchall()
conn.close()
return usersdef get_total_stats():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“SELECT COUNT(*) FROM users”)
total_users = c.fetchone()[0]
c.execute(“SELECT SUM(total_tokens_used) FROM users”)
total_used = c.fetchone()[0] or 0
c.execute(“SELECT SUM(amount) FROM transactions WHERE status = ‘completed'”)
total_revenue = c.fetchone()[0] or 0
conn.close()
return {“total_users”: total_users, “total_tokens_used”: total_used, “total_revenue”: total_revenue}def get_daily_stats():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
today = date.today().isoformat()
c.execute(“SELECT SUM(tokens_used), SUM(videos_processed) FROM daily_usage WHERE date = ?”, (today,))
result = c.fetchone()
conn.close()
return {“today_tokens”: result[0] or 0, “today_videos”: result[1] or 0}def get_pending_count():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“SELECT COUNT(*) FROM pending_transactions WHERE status = ‘pending'”)
count = c.fetchone()[0]
conn.close()
return countdef cleanup_temp_files():
“””Delete all temp files on startup”””
temp_files = glob.glob(“/root/your-folder/*.mp3”) + glob.glob(“/root/tts-bot/*.mp4”)
temp_files += glob.glob(“/root/your-folder/temp_*.mp4”) + glob.glob(“/root/your-folder/final_*.mp4”)
temp_files += glob.glob(“/root/your-folder/audio_*.mp3″)for f in temp_files:
try:
os.unlink(f)
print(f”🗑️ Cleaned up: {f}”)
except:
pass# Call this at startup
cleanup_temp_files()# Initialize
init_db()
print(“✅ Database ready”)
Run your Database.py
python databse.py
Step 4 – Create admin_panel.py
paste this code in your admin_panel.py
from flask import Flask, render_template, request, jsonify
import database as db
import sqlite3
import requestsapp = Flask(__name__)
TELEGRAM_TOKEN = “Paste your bot TOKEN”
DB_PATH = “/root/your_folder/users.db”
@app.route(‘/’)
def admin():
return render_template(‘admin.html’)@app.route(‘/api/stats’)
def stats():
total = db.get_total_stats()
daily = db.get_daily_stats()
pending_count = db.get_pending_count()
return jsonify({**total, **daily, “pending_count”: pending_count})@app.route(‘/api/users’)
def users():
users = db.get_all_users()
return jsonify([{“user_id”: u[0], “username”: u[1], “first_name”: u[2], “tokens”: u[3], “total_used”: u[4]} for u in users])@app.route(‘/api/pending’)
def pending():
pending = db.get_pending_transactions()
return jsonify([{
“id”: p[0],
“user_id”: p[1],
“tokens”: p[2],
“package”: p[3],
“amount”: p[4],
“transaction_id”: p[5],
“created_at”: p[6]
} for p in pending])@app.route(‘/api/confirm’, methods=[‘POST’])
def confirm():
data = request.json
transaction_id = data.get(‘transaction_id’)
action = data.get(‘action’, ‘confirm’) # ‘confirm’ or ‘reject’# Get pending transaction details
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“SELECT user_id, tokens, package FROM pending_transactions WHERE transaction_id = ? AND status = ‘pending'”, (transaction_id,))
pending = c.fetchone()
conn.close()if not pending:
return jsonify({“success”: False, “error”: “Transaction not found”})user_id, tokens, package = pending
if action == ‘confirm’:
# Confirm transaction
if db.confirm_pending_transaction(transaction_id):
# Send notification to user
try:
url = f”https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage”
payload = {
“chat_id”: user_id,
“text”: f”✅ *Payment Confirmed!*\n\n📦 Package: *{package.upper()}*\n🎫 Tokens: *{tokens} tokens added*\n\n💰 Use /start to check balance”,
“parse_mode”: “Markdown”
}
requests.post(url, json=payload, timeout=10)
print(f”✅ Notification sent to user {user_id}”)
except Exception as e:
print(f”Notification error: {e}”)
return jsonify({“success”: True, “action”: “confirmed”})elif action == ‘reject’:
# Reject transaction – delete from pending
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute(“UPDATE pending_transactions SET status = ‘rejected’ WHERE transaction_id = ?”, (transaction_id,))
conn.commit()
conn.close()# Send rejection notification
try:
url = f”https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage”
payload = {
“chat_id”: user_id,
“text”: f”❌ *Payment Rejected!*\n\n📦 Package: *{package.upper()}*\n🔍 Transaction ID: `{transaction_id}`\n\n⚠️ Please check your payment and try again with correct transaction ID.”,
“parse_mode”: “Markdown”
}
requests.post(url, json=payload, timeout=10)
print(f”❌ Rejection notification sent to user {user_id}”)
except Exception as e:
print(f”Notification error: {e}”)
return jsonify({“success”: True, “action”: “rejected”})return jsonify({“success”: False, “error”: “Unknown action”})
@app.route(‘/api/add_tokens’, methods=[‘POST’])
def add_tokens():
data = request.json
db.add_tokens(data[‘user_id’], data[‘tokens’], “admin”, “admin”, f”admin_{data[‘user_id’]}_{data[‘tokens’]}”)
return jsonify({“success”: True})if __name__ == ‘__main__’:
app.run(host=’0.0.0.0′, port=5000, debug=False)
Run this python
admin_panel.py
Step 5 – Create admin.html on port 5000 or as you like
Paste this code on your admin.html
<!DOCTYPE html>
<html>
<head>
<title>Voice Cover Admin</title>
<style>
body { font-family: Arial; background: #1a1a2e; color: white; padding: 20px; }
.stats { display: flex; gap: 20px; margin-bottom: 30px; }
.card { background: rgba(255,255,255,0.1); padding: 20px; border-radius: 10px; }
table { width: 100%; border-collapse: collapse; }
th, td { padding: 10px; text-align: left; border-bottom: 1px solid #333; }
button { background: #00d4ff; border: none; padding: 5px 15px; border-radius: 5px; cursor: pointer; }
input { padding: 5px; margin-right: 10px; }
</style>
</head>
<body>
<h1>🎬 Voice Cover Admin</h1>
<div class=”stats” id=”stats”></div>
<h2>Users</h2>
<table id=”users”>
<thead><tr><th>ID</th><th>Username</th><th>Name</th><th>Tokens</th><th>Used</th><th>Action</th></tr></thead>
<tbody></tbody>
</table>
<script>
fetch(‘/api/stats’).then(r=>r.json()).then(d=>{
document.getElementById(‘stats’).innerHTML = `
<div class=’card’><h3>Total Users</h3><h2>${d.total_users}</h2></div>
<div class=’card’><h3>Tokens Used</h3><h2>${d.total_tokens_used}</h2></div>
<div class=’card’><h3>Revenue ($)</h3><h2>${d.total_revenue}</h2></div>
`;
});fetch(‘/api/users’).then(r=>r.json()).then(users=>{
const tbody = document.querySelector(‘#users tbody’);
users.forEach(u=>{
const row = tbody.insertRow();
row.insertCell(0).innerText = u.user_id;
row.insertCell(1).innerText = u.username || ‘-‘;
row.insertCell(2).innerText = u.first_name || ”;
row.insertCell(3).innerText = u.tokens;
row.insertCell(4).innerText = u.total_used;
row.insertCell(5).innerHTML = `<input type=’number’ id=’tokens_${u.user_id}’ placeholder=’Amount’><button onclick=’addTokens(${u.user_id})’>Add</button>`;
});
});async function addTokens(userId) {
const tokens = document.getElementById(`tokens_${userId}`).value;
if(!tokens) return;
await fetch(‘/api/add_tokens’, {method:’POST’, headers:{‘Content-Type’:’application/json’}, body:JSON.stringify({user_id:userId, tokens:parseInt(tokens)})});
alert(‘Added!’);
location.reload();
}
</script>
</body>
</html>
