-
Notifications
You must be signed in to change notification settings - Fork 157
/
Copy pathmain.py
234 lines (206 loc) · 8.18 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import os
import traceback
import logging
from pyrogram import Client
from pyrogram import StopPropagation, filters
from pyrogram.types import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup
import config
from handlers.broadcast import broadcast
from handlers.check_user import handle_user_status
from handlers.database import Database
LOG_CHANNEL = config.LOG_CHANNEL
AUTH_USERS = config.AUTH_USERS
DB_URL = config.DB_URL
DB_NAME = config.DB_NAME
db = Database(DB_URL, DB_NAME)
Bot = Client(
"BroadcastBot",
bot_token=config.BOT_TOKEN,
api_id=config.API_ID,
api_hash=config.API_HASH,
)
@Bot.on_message(filters.private)
async def _(bot, cmd):
await handle_user_status(bot, cmd)
@Bot.on_message(filters.command("start") & filters.private)
async def startprivate(client, message):
# return
chat_id = message.from_user.id
if not await db.is_user_exist(chat_id):
data = await client.get_me()
BOT_USERNAME = data.username
await db.add_user(chat_id)
if LOG_CHANNEL:
await client.send_message(
LOG_CHANNEL,
f"#NEWUSER: \n\nNew User [{message.from_user.first_name}](tg://user?id={message.from_user.id}) started @{BOT_USERNAME} !!",
)
else:
logging.info(f"#NewUser :- Name : {message.from_user.first_name} ID : {message.from_user.id}")
joinButton = InlineKeyboardMarkup(
[
[
InlineKeyboardButton("CHANNEL", url="https://t.me/nacbots"),
InlineKeyboardButton(
"SUPPORT GROUP", url="https://t.me/n_a_c_bot_developers"
),
]
]
)
welcomed = f"Hey <b>{message.from_user.first_name}</b>\nI'm a simple Telegram bot that can broadcast messages and media to the bot subscribers. Made by @NACBOTS.\n\n 🎚 use /settings"
await message.reply_text(welcomed, reply_markup=joinButton)
raise StopPropagation
@Bot.on_message(filters.command("settings"))
async def opensettings(bot, cmd):
user_id = cmd.from_user.id
await cmd.reply_text(
f"`Here You Can Set Your Settings:`\n\nSuccessfully setted notifications to **{await db.get_notif(user_id)}**",
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
f"NOTIFICATION {'🔔' if ((await db.get_notif(user_id)) is True) else '🔕'}",
callback_data="notifon",
)
],
[InlineKeyboardButton("❎", callback_data="closeMeh")],
]
),
)
@Bot.on_message(filters.private & filters.command("broadcast"))
async def broadcast_handler_open(_, m):
if m.from_user.id not in AUTH_USERS:
await m.delete()
return
if m.reply_to_message is None:
await m.delete()
else:
await broadcast(m, db)
@Bot.on_message(filters.private & filters.command("stats"))
async def sts(c, m):
if m.from_user.id not in AUTH_USERS:
await m.delete()
return
await m.reply_text(
text=f"**Total Users in Database 📂:** `{await db.total_users_count()}`\n\n**Total Users with Notification Enabled 🔔 :** `{await db.total_notif_users_count()}`",
quote=True
)
@Bot.on_message(filters.private & filters.command("ban_user"))
async def ban(c, m):
if m.from_user.id not in AUTH_USERS:
await m.delete()
return
if len(m.command) == 1:
await m.reply_text(
f"Use this command to ban 🛑 any user from the bot 🤖.\n\nUsage:\n\n`/ban_user user_id ban_duration ban_reason`\n\nEg: `/ban_user 1234567 28 You misused me.`\n This will ban user with id `1234567` for `28` days for the reason `You misused me`.",
quote=True,
)
return
try:
user_id = int(m.command[1])
ban_duration = int(m.command[2])
ban_reason = " ".join(m.command[3:])
ban_log_text = f"Banning user {user_id} for {ban_duration} days for the reason {ban_reason}."
try:
await c.send_message(
user_id,
f"You are Banned 🚫 to use this bot for **{ban_duration}** day(s) for the reason __{ban_reason}__ \n\n**Message from the admin 🤠**",
)
ban_log_text += "\n\nUser notified successfully!"
except BaseException:
traceback.print_exc()
ban_log_text += (
f"\n\n ⚠️ User notification failed! ⚠️ \n\n`{traceback.format_exc()}`"
)
await db.ban_user(user_id, ban_duration, ban_reason)
print(ban_log_text)
await m.reply_text(ban_log_text, quote=True)
except BaseException:
traceback.print_exc()
await m.reply_text(
f"Error occoured ⚠️! Traceback given below\n\n`{traceback.format_exc()}`",
quote=True
)
@Bot.on_message(filters.private & filters.command("unban_user"))
async def unban(c, m):
if m.from_user.id not in AUTH_USERS:
await m.delete()
return
if len(m.command) == 1:
await m.reply_text(
f"Use this command to unban 😃 any user.\n\nUsage:\n\n`/unban_user user_id`\n\nEg: `/unban_user 1234567`\n This will unban user with id `1234567`.",
quote=True,
)
return
try:
user_id = int(m.command[1])
unban_log_text = f"Unbanning user 🤪 {user_id}"
try:
await c.send_message(user_id, f"Your ban was lifted!")
unban_log_text += "\n\n✅ User notified successfully! ✅"
except BaseException:
traceback.print_exc()
unban_log_text += (
f"\n\n⚠️ User notification failed! ⚠️\n\n`{traceback.format_exc()}`"
)
await db.remove_ban(user_id)
print(unban_log_text)
await m.reply_text(unban_log_text, quote=True)
except BaseException:
traceback.print_exc()
await m.reply_text(
f"⚠️ Error occoured ⚠️! Traceback given below\n\n`{traceback.format_exc()}`",
quote=True,
)
@Bot.on_message(filters.private & filters.command("banned_users"))
async def _banned_usrs(c, m):
if m.from_user.id not in AUTH_USERS:
await m.delete()
return
all_banned_users = await db.get_all_banned_users()
banned_usr_count = 0
text = ""
async for banned_user in all_banned_users:
user_id = banned_user["id"]
ban_duration = banned_user["ban_status"]["ban_duration"]
banned_on = banned_user["ban_status"]["banned_on"]
ban_reason = banned_user["ban_status"]["ban_reason"]
banned_usr_count += 1
text += f"> **User_id**: `{user_id}`, **Ban Duration**: `{ban_duration}`, **Banned on**: `{banned_on}`, **Reason**: `{ban_reason}`\n\n"
reply_text = f"Total banned user(s) 🤭: `{banned_usr_count}`\n\n{text}"
if len(reply_text) > 4096:
with open("banned-users.txt", "w") as f:
f.write(reply_text)
await m.reply_document("banned-users.txt", True)
os.remove("banned-users.txt")
return
await m.reply_text(reply_text, True)
@Bot.on_callback_query()
async def callback_handlers(bot: Client, cb: CallbackQuery):
user_id = cb.from_user.id
if cb.data == "notifon":
notif = await db.get_notif(cb.from_user.id)
if notif is True:
await db.set_notif(user_id, notif=False)
else:
await db.set_notif(user_id, notif=True)
await cb.message.edit(
f"`Here You Can Set Your Settings:`\n\nSuccessfully setted notifications to **{await db.get_notif(user_id)}**",
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
f"NOTIFICATION {'🔔' if ((await db.get_notif(user_id)) is True) else '🔕'}",
callback_data="notifon",
)
],
[InlineKeyboardButton("❎", callback_data="closeMeh")],
]
),
)
await cb.answer(
f"Successfully setted notifications to {await db.get_notif(user_id)}"
)
else:
await cb.message.delete(True)
Bot.run()