added whitelist check

This commit is contained in:
Guillermo 2024-09-13 04:59:02 +02:00
parent 5ee04c855d
commit fd41e3b356

40
bot.py
View file

@ -5,7 +5,7 @@ from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandl
class Butler(object): class Butler(object):
def __init__(self, logger, token, whitelist): def __init__(self, logger, token, whitelist=[]):
self.logger = logger self.logger = logger
self.token = token self.token = token
self.whitelist = whitelist self.whitelist = whitelist
@ -15,10 +15,10 @@ class Butler(object):
# on different commands - answer in Telegram # on different commands - answer in Telegram
application.add_handler(CommandHandler("start", Butler.start)) application.add_handler(CommandHandler("start", Butler.start))
application.add_handler(CommandHandler("help", Butler.help_command)) application.add_handler(CommandHandler("help", self.help_command))
application.add_handler(CommandHandler("stop", self.stop)) application.add_handler(CommandHandler("stop", self.stop))
application.add_handler(CommandHandler("echo", Butler.echo)) application.add_handler(CommandHandler("echo", self.echo))
application.add_handler(CommandHandler("who", Butler.who)) application.add_handler(CommandHandler("who", self.who))
# on non command i.e message - echo the message on Telegram # on non command i.e message - echo the message on Telegram
# application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.echo)) # application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.echo))
@ -37,17 +37,37 @@ class Butler(object):
/who - Información sobre ti y tu mensaje (para debugging). /who - Información sobre ti y tu mensaje (para debugging).
""" """
def valid_ids(self, update: Update):
user = update.effective_user
chat = update.effective_chat
chat_name = chat.effective_name
ids = [user.id, chat.id]
if not self.whitelist:
return True
for id in ids:
if id in self.whitelist:
return True
self.logger.warning("User {} [{}] or chat {} [{}] are not in the whitelist."
.format(user.username, user.id, chat_name, chat.id))
return False
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /start is issued.""" """Send a message when the command /start is issued."""
user = update.effective_user user = update.effective_user
await update.message.reply_html(rf"Hi {user.mention_html()}!") await update.message.reply_html(rf"Hi {user.mention_html()}!")
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /help is issued.""" """Send a message when the command /help is issued."""
if not self.valid_ids(update):
return
await update.message.reply_text(Butler.help_msg()) await update.message.reply_text(Butler.help_msg())
async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def echo(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Echo the user message.""" """Echo the user message."""
if not self.valid_ids(update):
return
echo_text = update.message.text.split(maxsplit=1) echo_text = update.message.text.split(maxsplit=1)
await update.message.reply_text(echo_text[1]) await update.message.reply_text(echo_text[1])
@ -55,7 +75,10 @@ class Butler(object):
user = update.effective_user user = update.effective_user
self.logger.warning("I got blocked by user {} [{}]".format(user.username, user.id)) self.logger.warning("I got blocked by user {} [{}]".format(user.username, user.id))
async def who(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def who(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not self.valid_ids(update):
return
mssg = update.message mssg = update.message
user = update.effective_user user = update.effective_user
chat = update.effective_chat chat = update.effective_chat
@ -74,5 +97,8 @@ class Butler(object):
def run(self) -> None: def run(self) -> None:
"""Start the bot.""" """Start the bot."""
if self.whitelist:
self.logger.info("Current whitelist: {}".format(self.whitelist))
# Run the bot until the user presses Ctrl-C # Run the bot until the user presses Ctrl-C
self.application.run_polling(allowed_updates=Update.ALL_TYPES) self.application.run_polling(allowed_updates=Update.ALL_TYPES)