Skip to content

Commit

Permalink
Added telegram id restriction; Added tweet length check
Browse files Browse the repository at this point in the history
Signed-off-by: avaakash <[email protected]>
  • Loading branch information
avaakash committed Jan 8, 2022
1 parent 5879945 commit 5792ac5
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 55 deletions.
3 changes: 2 additions & 1 deletion env.json.sample
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"TWITTER_ACCESS_TOKEN": "",
"TWITTER_ACCESS_SECRET": "",
"TWITTER_USERNAME": "",
"TELEGRAM_SECRET": ""
"TELEGRAM_SECRET": "",
"TELEGRAM_ALLOWED_IDS": "[]"
}
4 changes: 1 addition & 3 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
from telegram_bot import bot

def main():
"""
main function to start the bot
"""
"""main function to start the bot"""
bot()

if __name__ == "__main__":
Expand Down
9 changes: 3 additions & 6 deletions settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,11 @@

# To get the file path in base dir
def base(filename):
"""
Returns the file path in base dir
"""
"""Returns the file path in base dir"""
return os.path.join(BASE_DIR, filename)

def get_env():
"""
Loads the environment variables
"""
"""Loads the environment variables"""
environ_secrets = {}
with open(base("env.json"), encoding='utf-8') as file:
env = json.load(file)
Expand All @@ -26,5 +22,6 @@ def get_env():
environ_secrets["twitter_secret"] = env.get("TWITTER_ACCESS_SECRET", "")
environ_secrets["twitter_username"] = env.get("TWITTER_USERNAME", "")
environ_secrets["telegram_secret"] = env.get("TELEGRAM_SECRET", "")
environ_secrets["telegram_allowed_ids"] = env.get("TELEGRAM_ALLOWED_IDS", "")

return environ_secrets
94 changes: 61 additions & 33 deletions telegram_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# Importing app modules
from settings import get_env
from twitter import twitter, models
import validation as check

# Setting up the logger
logging.basicConfig(
Expand All @@ -19,51 +20,76 @@
tweet = models.Tweet(None, "", [])

def get_secret():
"""
Fetches the secrets
"""
"""Fetches the secrets"""
environ_secrets = get_env()
return environ_secrets["telegram_secret"]

def start(update: Update, context: CallbackContext):
"""
Start will listen to the /start command and send a message to the user
"""
context.bot.send_message(
def update_message(update: Update, context: CallbackContext, command: str = None):
"""sends a message if an old message was edited"""
if command is None:
message = "You edited an old message."
else:
message = f"You edited an old message with the /{command} command."
return context.bot.send_message(
chat_id=update.effective_chat.id,
text="I am the tweet bot. Send me a message and I will tweet it for you!"
text=message
)

def get_tweet_text(update: Update, context: CallbackContext):
"""
get_tweet_text will get the text from the message and tweet it
"""
print("Message: " + update.message.text)
try:
tweet.tweet_id, tweet.text = twitter.tweet(update.message.text)
message = "Tweeted: " + tweet.text
context.bot.send_message(chat_id=update.effective_chat.id, text=message)
except Exception as error:
print(error)
def start(update: Update, context: CallbackContext):
"""Start will listen to the /start command and send a message to the user"""
if update.message is not None:
context.bot.send_message(
chat_id=update.effective_chat.id,
text="I am the tweet bot. Send me a message and I will tweet it for you!"
)
else:
update_message(update, context, "start")

@check.restricted
def test(update: Update, context: CallbackContext):
"""Test function to test the bot"""
if update.message is not None:
print(check.check_tweet_length(update.message.text))
context.bot.send_message(
chat_id=update.effective_chat.id, text="Something went wrong!" + str(error))
chat_id=update.effective_chat.id, text="Test\n" + update.message.text)
else:
update_message(update, context, "test")


@check.restricted
def get_tweet_text(update: Update, context: CallbackContext):
"""get_tweet_text will get the text from the message and tweet it"""
if update.message is not None:
print("Message: " + update.message.text)
if not check.check_tweet_length(update.message.text):
context.bot.send_message(chat_id=update.effective_chat.id, text="Tweet too long!")
return
try:
tweet.tweet_id, tweet.text = twitter.tweet(update.message.text)
message = "Tweeted: " + tweet.text
context.bot.send_message(chat_id=update.effective_chat.id, text=message)
except Exception as error:
print(error)
context.bot.send_message(
chat_id=update.effective_chat.id, text="Something went wrong!" + str(error))
else:
update_message(update, context)

def delete_tweet(update: Update, context: CallbackContext):
"""
Delete a tweet command
"""
print(tweet.tweet_id, tweet.text, sep="\n")
if tweet.tweet_id is not None:
twitter.delete_tweet(tweet.tweet_id)
tweet.tweet_id = None
context.bot.send_message(chat_id=update.effective_chat.id, text="Tweet deleted!")
"""Delete a tweet command"""
if update.message is not None:
print(tweet.tweet_id, tweet.text, sep="\n")
if tweet.tweet_id is not None:
twitter.delete_tweet(tweet.tweet_id)
tweet.tweet_id = None
context.bot.send_message(chat_id=update.effective_chat.id, text="Tweet deleted!")
else:
context.bot.send_message(chat_id=update.effective_chat.id, text="No tweet to delete!")
else:
context.bot.send_message(chat_id=update.effective_chat.id, text="No tweet to delete!")
update_message(update, context, "delete")

def bot():
"""
Main function to start the bot
"""
"""Main function to start the bot"""
# Setting up the updater
updater = Updater(token=get_secret(), use_context=True)
# Setting up the dispatcher
Expand All @@ -73,11 +99,13 @@ def bot():
start_handler = CommandHandler('start', start)
message_handle = MessageHandler(Filters.text & ~Filters.command, get_tweet_text)
delete_handle = CommandHandler('delete', delete_tweet)
test_handle = CommandHandler('test', test)

# Adding handlers to the dispatcher
dispatcher.add_handler(start_handler)
dispatcher.add_handler(message_handle)
dispatcher.add_handler(delete_handle)
dispatcher.add_handler(test_handle)

# Starting the bot
updater.start_polling()
4 changes: 1 addition & 3 deletions twitter/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
from settings import get_env

def authenticate():
"""
Authenticates the twitter API
"""
"""Authenticates the twitter API"""
# Getting the tokens and keys
environ_secrets = get_env()
# Setting up variables
Expand Down
4 changes: 1 addition & 3 deletions twitter/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
Twitter response classes
"""
class Tweet:
"""
Tweet object
"""
""" Tweet object """
def __init__(self, tweet_id, text, hashtags):
self.tweet_id = tweet_id
self.text = text
Expand Down
8 changes: 2 additions & 6 deletions twitter/twitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,11 @@
api = authenticate()

def tweet(message) -> int:
"""
Tweet a message
"""
"""Tweet a message"""
tweet_data = api.update_status(message)
print(tweet_data.id, tweet_data.text, tweet_data.entities["hashtags"], sep="\n")
return tweet_data.id, tweet_data.text

def delete_tweet(tweet_id):
"""
Delete a tweet
"""
"""Delete a tweet"""
api.destroy_status(id=tweet_id)
26 changes: 26 additions & 0 deletions validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""
Validation and Security checks
"""
from functools import wraps

from settings import get_env

def restricted(func):
"""Restrict usage of func to allowed users only and replies if necessary"""
allowed_ids = get_env()["telegram_allowed_ids"]
@wraps(func)
def wrapped(update, context, *args, **kwargs):
user_id = str(update.effective_user.id)
if user_id not in allowed_ids:
print(f"WARNING: Unauthorized access denied for {user_id}")
update.message.reply_text('You are not allowed to use this bot! Please leave.')
return None # quit function
return func(update, context, *args, **kwargs)
return wrapped

def check_tweet_length(tweet):
""" Checks if the tweet is within the allowed length"""
print("Tweet Length: " + str(len(tweet)))
if len(tweet) > 240:
return False
return True

0 comments on commit 5792ac5

Please sign in to comment.