-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtelegram.py
51 lines (42 loc) · 2.07 KB
/
telegram.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from fail2ban.server.actions import ActionBase
import requests, json
class telegramAction(ActionBase):
def __init__(self, jail, name):
self.installpath = '/etc/fail2ban/action.d/'
try: self.config = json.loads(open(self.installpath + 'telegram_config.json', 'r').read())
except Exception as e: print(f'Config file not found: {e}')
self.token = self.config['telegram_api_token']
self.get_chat_id()
def get_chat_id(self):
self.chat_id = self.config['telegram_chat_id']
if self.chat_id == '' or self.chat_id == None:
try:
self.chat_id = requests.get(f'https://api.telegram.org/bot{self.token}/getUpdates').json()['result'][0]['message']['chat']['id']
self.config['telegram_chat_id'] = self.chat_id
open(self.installpath + 'telegram_config.json', 'w').write(json.dumps(str(self.config))).close()
except Exception as e:
self.get_chat_id()
print(f'Logged: {e}')
print(f'Got chat ID: {self.chat_id}')
def send_message(self, message):
try: requests.post(f'https://api.telegram.org/bot{self.token}/sendMessage', params={"chat_id": self.chat_id, "text": message})
except Exception as e:
self.send_message(message)
print(f'Logged: {e}')
def start(self):
self.send_message("Fail2ban started.")
def stop(self):
self.send_message("Fail2ban stopped.")
def ban(self, aInfo):
if self.config['receive_banned']:
location = self.get_location(aInfo['ip'])
self.send_message(f"{aInfo['ip']} from {location} banned")
def unban(self, aInfo):
if self.config['receive_unbanned']:
location = self.get_location(aInfo['ip'])
self.send_message(f"{aInfo['ip']} from {location} unbanned")
def get_location(self, ip):
try: response = requests.get(f'https://ipapi.co/{ip}/json/').json()
except: return 'Unknown'
return response['country_name']
Action = telegramAction