forked from MalakaDevs/CFPiHole
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcloudflare_config.py
102 lines (69 loc) · 3.12 KB
/
cloudflare_config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from typing import List
from logger_config import CustomFormatter
import cloudflare_api
import time
# Configure logging
logger = CustomFormatter.configure_logger("cloudflare_setup")
def get_block_lists(name_prefix: str):
return cloudflare_api.get_lists(name_prefix)
def get_gateway_policies(name_prefix: str):
"""Gets blocking policies with defined name prefix in cloudflare_api."""
cf_policies = cloudflare_api.get_firewall_policies(name_prefix)
return cf_policies, len(cf_policies)
def create_firewall_policy(
name_prefix: str, list_ids: List[str] = None, regex_tld: str = None
):
"""Creates or updates a blocking policy in cloudflare_api."""
cf_policies, num_policies = get_gateway_policies(name_prefix)
if "TLDs" in name_prefix:
# Remove duplicates, sort, and create regex
unique_tlds = sorted(set(tld.replace(".", "") for tld in list_ids or []))
regex_tld = rf"[.](|{"|".join(unique_tlds)})$"
list_ids = None
if num_policies == 0:
cloudflare_api.create_gateway_policy(
name_prefix, list_ids=list_ids, regex_tld=regex_tld
)
elif num_policies == 1:
cloudflare_api.delete_firewall_policy(name_prefix, cf_policies[0]["id"])
cloudflare_api.create_gateway_policy(
name_prefix, list_ids=list_ids, regex_tld=regex_tld
)
else:
logger.error("One or more than one firewall policy found")
raise Exception("More than one firewall policy found")
def delete_firewall_policy(name_prefix: str):
"""Deletes a blocking policy from Cloudflare."""
cf_policies, num_policies = get_gateway_policies(name_prefix)
if num_policies == 0:
logger.info(f"No firewall policy {name_prefix} found to delete")
return []
elif num_policies != 1:
logger.error("One or more than one firewall policy found")
raise Exception("More than one firewall policy found")
cloudflare_api.delete_firewall_policy(name_prefix, cf_policies[0]["id"])
def delete_lists_policy(name_prefix: str, cf_lists: List[str]):
"""Deletes the blocking policy and then the lists in cloudflare_api."""
delete_firewall_policy(name_prefix)
for l in cf_lists:
cloudflare_api.delete_list(l["id"], l["name"])
# Sleep to prevent rate limit
time.sleep(1)
def create_lists_policy(name_prefix: str, unique_domains: List[str]):
"""Creates new lists with chunking and handles rate limits."""
# Sleep to prevent rate limit
logger.warning("Pausing for 120 seconds to prevent rate limit, please wait")
time.sleep(120)
logger.info("Creating lists, please wait")
cf_lists = []
# Chunk the domains into lists of 1000 and create them
for chunk in chunk_list(unique_domains, 1000):
list_name = f"{name_prefix} {len(cf_lists) + 1}"
_list = cloudflare_api.create_list(list_name, chunk)
cf_lists.append(_list)
# Sleep to prevent rate limit
time.sleep(1)
create_firewall_policy(name_prefix, [l["id"] for l in cf_lists])
def chunk_list(_list: List[str], n: int):
for i in range(0, len(_list), n):
yield _list[i : i + n]