-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathlink_validator.py
executable file
·141 lines (116 loc) · 4.13 KB
/
link_validator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#! /usr/bin/env python3
import logging
import threading
from html.parser import HTMLParser
from urllib.request import urlopen, Request
from urllib.parse import urlparse, urljoin
from enum import Enum
class Result(Enum):
GOOD = 0
BAD = 1
ERROR = 2
WARN = 3
SITE_URL = "http://docs.openindiana.org/"
ACCEPTED_PROTOCOLS = ["http", "https", "ftp"]
log = logging.getLogger(__name__)
logging.basicConfig(
format='%(levelname)s %(message)s',
level=logging.INFO,
)
def _to_absolute_url(url, base):
"""
Retrieve the absolute URL of a relative link within a web page.
:param str url: the relative URL
:param str base: the base URL
:return: the absolute URL if existed
"""
protocol = urlparse(url)[0]
if protocol == "":
return urljoin(base, url)
elif protocol in ACCEPTED_PROTOCOLS:
return url
else:
return None
class LinkGetter(HTMLParser):
def __init__(self, base, *args, **kwargs):
HTMLParser.__init__(self, *args, **kwargs)
self.base = base # the base URL
self.links = set() # a set of URLs within the web page
def handle_starttag(self, tag, attrs):
if tag == 'a':
for key, value in attrs:
if key == 'href':
url = _to_absolute_url(value, self.base)
if url:
self.links.add(url)
def check(url):
"""
Validate the all the resources within the url.
:param url: the URL string to the page.
:return:
"""
log.info("Base URL: %s", url)
links = {url} # collection of all URLs we have visited
result = {} # given a URL, it will tell us whether that link is good
lock = threading.Lock()
def _check(url, base):
# log.debug("Checking: %s", url)
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.7) Gecko/2009021910 Firefox/3.0.7',
}
request = Request(url, headers=headers)
response = urlopen(request)
except Exception as ex:
log.debug("Exception while opening URL: %s", ex)
return Result.ERROR
to_external_site = lambda link, base: link[:len(base)] != base
if to_external_site(url, base):
# if we receive the response externally, that means the link is alive
return Result.GOOD
else:
# we're still in the same site
# log.debug("Downloading %s", url)
charset = response.headers.get_param('charset')
# Can't get the charset
if charset == None:
log.debug("Cannot get character set for %s", url)
return Result.WARN
data = response.read()
data = data.strip().decode(charset)
content_type = response.info().get('Content-Type')
content_type = content_type.split(';')[0] if content_type else None
if content_type == 'text/html':
# parse for links
try:
parser = LinkGetter(response.geturl())
parser.feed(data)
parser.close()
with lock:
links.update(i for i in parser.links if i not in result.keys())
except Exception as ex:
log.error("Exception: %s", ex)
return Result.BAD
return Result.GOOD
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=8)
while links:
with lock:
link = links.pop()
if link in result:
continue
executor.submit(result.setdefault(link, _check(link, url)))
if result[link] != Result.GOOD:
log.warn("Bad link: %s - %s", link, result[link])
log.info("Done validation")
def main():
# parser = optparse.OptionParser()
# options, args = parser.parse_args()
# if not args:
# log.error("Where's my url?")
# exit(1)
# check(args[0])
log.setLevel(logging.DEBUG)
check(SITE_URL)
if __name__ == "__main__":
main()