1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import sqlite3
from datetime import datetime, timedelta
from telegram import Update
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext
# Database setup
def setup_database():
conn = sqlite3.connect('telegram_bot.db')
cursor = conn.cursor()
# Create users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
user_id INTEGER PRIMARY KEY,
username TEXT,
first_name TEXT,
last_name TEXT,
registration_date TEXT,
last_visit TEXT,
is_paid BOOLEAN DEFAULT 0,
subscription_end_date TEXT
)
''')
# Create visits table
cursor.execute('''
CREATE TABLE IF NOT EXISTS visits (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
visit_date TEXT,
FOREIGN KEY (user_id) REFERENCES users (user_id)
)
''')
conn.commit()
conn.close()
# User tracking functions
async def track_visit(user_id):
conn = sqlite3.connect('telegram_bot.db')
cursor = conn.cursor()
current_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Update last visit in users table
cursor.execute('''
UPDATE users SET last_visit = ? WHERE user_id = ?
''', (current_date, user_id))
# Add entry to visits table
cursor.execute('''
INSERT INTO visits (user_id, visit_date) VALUES (?, ?)
''', (user_id, current_date))
conn.commit()
conn.close()
async def register_user(user_id, username, first_name, last_name):
conn = sqlite3.connect('telegram_bot.db')
cursor = conn.cursor()
# Check if user already exists
cursor.execute('SELECT user_id FROM users WHERE user_id = ?', (user_id,))
if cursor.fetchone() is None:
current_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('''
INSERT INTO users (user_id, username, first_name, last_name, registration_date, last_visit)
VALUES (?, ?, ?, ?, ?, ?)
''', (user_id, username, first_name, last_name, current_date, current_date))
conn.commit()
conn.close()
async def process_payment(user_id, months=1):
conn = sqlite3.connect('telegram_bot.db')
cursor = conn.cursor()
# Get current subscription end date
cursor.execute('SELECT subscription_end_date FROM users WHERE user_id = ?', (user_id,))
result = cursor.fetchone()
if result and result[0]:
current_end_date = datetime.strptime(result[0], '%Y-%m-%d %H:%M:%S')
# If subscription is still active, extend from current end date
if current_end_date > datetime.now():
new_end_date = current_end_date + timedelta(days=30*months)
else:
# If expired, start from now
new_end_date = datetime.now() + timedelta(days=30*months)
else:
# First time subscription
new_end_date = datetime.now() + timedelta(days=30*months)
# Update user as paid with new subscription end date
cursor.execute('''
UPDATE users SET is_paid = 1, subscription_end_date = ? WHERE user_id = ?
''', (new_end_date.strftime('%Y-%m-%d %H:%M:%S'), user_id))
conn.commit()
conn.close()
# Analytics functions
def get_analytics():
conn = sqlite3.connect('telegram_bot.db')
cursor = conn.cursor()
# Total visitors (unique users who visited)
cursor.execute('SELECT COUNT(DISTINCT user_id) FROM visits')
total_visitors = cursor.fetchone()[0]
# Total registered users
cursor.execute('SELECT COUNT(*) FROM users')
registered_users = cursor.fetchone()[0]
# Paid users
cursor.execute('SELECT COUNT(*) FROM users WHERE is_paid = 1')
paid_users = cursor.fetchone()[0]
# Unpaid users
unpaid_users = registered_users - paid_users
# Total visits
cursor.execute('SELECT COUNT(*) FROM visits')
total_visits = cursor.fetchone()[0]
# Users with expired subscriptions
current_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('''
SELECT COUNT(*) FROM users
WHERE is_paid = 1 AND subscription_end_date < ?
''', (current_date,))
expired_subscriptions = cursor.fetchone()[0]
# Users who didn't renew last month
one_month_ago = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute('''
SELECT COUNT(*) FROM users
WHERE is_paid = 0 AND subscription_end_date BETWEEN ? AND ?
''', (one_month_ago, current_date))
non_renewed = cursor.fetchone()[0]
conn.close()
return {
"total_visitors": total_visitors,
"registered_users": registered_users,
"paid_users": paid_users,
"unpaid_users": unpaid_users,
"total_visits": total_visits,
"expired_subscriptions": expired_subscriptions,
"non_renewed_subscriptions": non_renewed
}
# Check for expired subscriptions daily
async def check_expired_subscriptions(context: CallbackContext):
conn = sqlite3.connect('telegram_bot.db')
cursor = conn.cursor()
current_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Find users with expired subscriptions
cursor.execute('''
SELECT user_id FROM users
WHERE is_paid = 1 AND subscription_end_date < ?
''', (current_date,))
expired_users = cursor.fetchall()
# Mark users as unpaid
for user in expired_users:
user_id = user[0]
cursor.execute('''
UPDATE users SET is_paid = 0 WHERE user_id = ?
''', (user_id,))
# Notify user about expiration (in a real bot)
try:
await context.bot.send_message(
chat_id=user_id,
text="Your subscription has expired. Please renew to continue using premium features."
)
except Exception as e:
print(f"Failed to notify user {user_id}: {e}")
conn.commit()
conn.close()
# Command handlers
async def start(update: Update, context: CallbackContext):
user = update.effective_user
await register_user(user.id, user.username, user.first_name, user.last_name)
await track_visit(user.id)
await update.message.reply_text(f'Hello {user.first_name}! Welcome to our bot.')
async def stats(update: Update, context: CallbackContext):
# Only allow admin to see stats (you would need to define admin_user_id)
admin_user_id = 12345 # Replace with your admin user ID
if update.effective_user.id != admin_user_id:
await update.message.reply_text("You don't have permission to view stats.")
return
analytics = get_analytics()
stats_message = f"""
📊 *Bot Analytics*
👥 *Users*:
- Visitors: {analytics['total_visitors']}
- Registered: {analytics['registered_users']}
💰 *Payments*:
- Paid users: {analytics['paid_users']}
- Unpaid users: {analytics['unpaid_users']}
📈 *Activity*:
- Total visits: {analytics['total_visits']}
🔄 *Subscriptions*:
- Expired subscriptions: {analytics['expired_subscriptions']}
- Non-renewed last month: {analytics['non_renewed_subscriptions']}
"""
await update.message.reply_text(stats_message, parse_mode='Markdown')
async def handle_message(update: Update, context: CallbackContext):
user_id = update.effective_user.id
await track_visit(user_id)
# Process the message (your bot logic here)
await update.message.reply_text("I received your message!")
# For demonstration: command to simulate payment
async def pay(update: Update, context: CallbackContext):
user_id = update.effective_user.id
# In a real bot, this would be connected to an actual payment processor
await process_payment(user_id)
await update.message.reply_text("Payment processed successfully! Your subscription is now active.")
def main():
# Set up the database
setup_database()
# Create the application
application = Application.builder().token("YOUR_BOT_TOKEN").build()
# Add handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("stats", stats))
application.add_handler(CommandHandler("pay", pay))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
# Schedule daily check for expired subscriptions
job_queue = application.job_queue
job_queue.run_daily(check_expired_subscriptions, time=datetime.time(hour=0, minute=0))
# Start the Bot
application.run_polling()
if __name__ == '__main__':
main()No Output
Run the code to generate an output.