generated from stratosphereips/awesome-code-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservice-finder.py
140 lines (115 loc) · 4.78 KB
/
service-finder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import argparse
import socket
import logging
import signal
import sys
from zeroconf import ServiceBrowser, ServiceListener, Zeroconf, BadTypeInNameException
from colorama import init, Fore, Style
try:
import nmb.NetBIOS as NetBIOS
except ImportError:
NetBIOS = None
init(autoreset=True)
class MyListener(ServiceListener):
def __init__(self, zeroconf, debug):
self.zeroconf = zeroconf
self.debug = debug
self.services = {}
self.max_name_length = 0
self.max_ip_length = 15 # IP addresses have a fixed length format
def remove_service(self, zeroconf, service_type, name):
if name in self.services:
del self.services[name]
print(f"{Fore.RED}Service {name} removed")
def add_service(self, zeroconf, service_type, name):
self.update_service(zeroconf, service_type, name)
def update_service(self, zeroconf, service_type, name):
try:
formatted_type = self.format_service_type(service_type)
formatted_name = self.format_service_name(name)
if self.debug:
print(f"{Fore.YELLOW}Attempting to get service info for type: {formatted_type}, name: {formatted_name}")
info = zeroconf.get_service_info(formatted_type, formatted_name)
if info:
address = socket.inet_ntoa(info.addresses[0]) if info.addresses else None
if address:
device_name = self.resolve_device_name(address)
self.services[name] = address
self.max_name_length = max(self.max_name_length, len(name))
print(self.format_output(name.strip(), address.strip(), device_name.strip()) + '\r')
else:
print(f"{Fore.YELLOW}Service {name} has no address")
else:
print(f"{Fore.YELLOW}No info found for service {name}")
except BadTypeInNameException as e:
if self.debug:
print(f"{Fore.RED}BadTypeInNameException for type: {formatted_type}, name: {formatted_name} - {e}")
def resolve_device_name(self, ip):
name = None
try:
# Reverse DNS lookup
name = socket.gethostbyaddr(ip)[0]
except (socket.herror, socket.gaierror):
pass
if not name and NetBIOS:
# NetBIOS lookup
nb = NetBIOS.NetBIOS()
try:
name = nb.queryIPForName(ip, timeout=1)
except Exception as e:
if self.debug:
print(f"{Fore.RED}NetBIOS lookup failed for IP {ip}: {e}")
finally:
nb.close()
return name if name else "Unknown"
def format_service_type(self, service_type):
if not service_type.endswith('.'):
service_type += '.'
return service_type
def format_service_name(self, service_name):
if not service_name.endswith('.'):
service_name += '.'
return service_name
def format_output(self, name, address, device_name):
name_spacing = ' ' * (60 - len(name) + 4)
return (f"{Fore.GREEN}Service {name}{name_spacing}"
f"{Fore.CYAN}IP: {address}{' ' * (self.max_ip_length - len(address) + 4)}"
f"{Fore.MAGENTA}Name: {device_name}")
class ServiceTypeListener(ServiceListener):
def __init__(self, zeroconf, debug):
self.zeroconf = zeroconf
self.debug = debug
def add_service(self, zeroconf, service_type, name):
formatted_type = self.format_service_type(name)
print(f"{Fore.BLUE}Discovered service type: {formatted_type}")
if self.debug:
print(f"{Fore.YELLOW}Creating browser for type: {formatted_type}")
ServiceBrowser(self.zeroconf, formatted_type, MyListener(self.zeroconf, self.debug))
def update_service(self, zeroconf, service_type, name):
pass
def remove_service(self, zeroconf, service_type, name):
pass
def format_service_type(self, service_type):
if not service_type.endswith('.'):
service_type += '.'
return service_type
def signal_handler(sig, frame):
print('Exiting...')
zeroconf.close()
sys.exit(0)
def main():
parser = argparse.ArgumentParser(description="Service Finder using Zeroconf")
parser.add_argument('--debug', action='store_true', help='Enable debug mode')
args = parser.parse_args()
debug = args.debug
if debug:
print("Debug mode enabled")
global zeroconf
zeroconf = Zeroconf()
type_listener = ServiceTypeListener(zeroconf, debug)
print("Waiting for services to appear...")
ServiceBrowser(zeroconf, "_services._dns-sd._udp.local.", type_listener)
signal.signal(signal.SIGINT, signal_handler)
signal.pause()
if __name__ == '__main__':
main()