forked from traveloka-archive/elasticsearch-snapshot-script
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
196 lines (168 loc) · 6.36 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import boto3
import requests
import time
import json
import thread
from requests_aws4auth import AWS4Auth
default_repo_name = "tvlk-repo"
###
# get ec2 iam keys
###
def ec2_auth():
role = requests.get("http://169.254.169.254/latest/meta-data/iam/security-credentials/").text
role = requests.get("http://169.254.169.254/latest/meta-data/iam/security-credentials/"+role)
return AWS4Auth(role.json()["AccessKeyId"], role.json()["SecretAccessKey"], "ap-southeast-1", 'es', session_token=role.json()["Token"])
###
# Basic elasticsearch call
###
def create_repo(host,bucket,role,repo_name):
url = host + "_snapshot/" + repo_name
payload = {
"type": "s3",
"settings": {
"bucket": bucket,
"region":"ap-southeast-1",
"role_arn": role
}
}
payload = json.dumps(payload)
r = requests.put(url, auth=ec2_auth(), data=payload, headers={"Content-Type": "application/json"})
return r
def create_snapshot(host,repo_name,snapshot_name):
print("creating snapshot with name : " + snapshot_name)
url = host + "_snapshot/" + repo_name + "/" + snapshot_name
payload = '{}'
r = requests.put(url, auth=ec2_auth(), data=payload, headers={"Content-Type": "application/json"})
return r
def restore(host,repo_name,snapshot_name):
url = host + "_snapshot/" + repo_name + "/" + snapshot_name + "/_restore"
payload = '{}'
r = requests.post(url, auth=ec2_auth(), headers={"Content-Type": "application/json"})
return r
def get_repo(host):
url = host + "_snapshot/_all"
r = requests.get(url, auth=ec2_auth(), headers={"Content-Type": "application/json"})
return r
def get_repo_detail(host,repo_name):
url = host + "_snapshot/" + repo_name + "/_all"
r = requests.get(url, auth=ec2_auth(), headers={"Content-Type": "application/json"})
return r
def delete_repo(host,repo_name):
url = host + "_snapshot/" + repo_name
r = requests.delete(url, auth=ec2_auth(), headers={"Content-Type": "application/json"})
return r
def get_aliases(host):
url = host + "_aliases"
r = requests.get(url, auth=ec2_auth(), headers={"Content-Type": "application/json"})
return r
def get_indices(host):
url = host + "_cat/indices"
r = requests.get(url, auth=ec2_auth(), headers={"Content-Type": "application/json"})
return r
def delete_index(host,index):
url = host +index
r = requests.delete(url, auth=ec2_auth(), headers={"Content-Type": "application/json"})
return r
def get_index_detail(host,index):
url = host + index
r = requests.get(url, auth=ec2_auth(), headers={"Content-Type": "application/json"})
return r
def search(host,target):
url = host + target +"/_search"
r = requests.get(url, auth=ec2_auth(), headers={"Content-Type": "application/json"})
return r
def adv_search(host,target):
url = host + target
r = requests.get(url, auth=ec2_auth(), headers={"Content-Type": "application/json"})
return r
def post(host,json):
url = host
r = requests.post(url, auth=ec2_auth(), data=json, headers={"Content-Type": "application/json"})
return r
def get(host):
url = host
r = requests.get(url, auth=ec2_auth(), headers={"Content-Type": "application/json"})
return r
def put(host,json):
url = host
r = requests.put(url, auth=ec2_auth(), data=json, headers={"Content-Type": "application/json"})
return r
def delete(host,json):
url = host
r = requests.delete(url, auth=ec2_auth(), data=json, headers={"Content-Type": "application/json"})
return r
###
# job wrapper
###
def get_snapshot_status_from_list_snapshots(json,snapshot_name):
for i in range(5):
print "try "+ str(i) + " " + snapshot_name
for x in json["snapshots"]:
if x["snapshot"]==snapshot_name:
return x
return
def snapshot_checker_worker(identifier,host,snapshot_name):
while get_snapshot_status_from_list_snapshots(get_repo_detail(host,default_repo_name).json(),snapshot_name)["state"] == "IN_PROGRESS":
print "Snapshoting on "+identifier+" "+get_snapshot_status_from_list_snapshots(get_repo_detail(host,default_repo_name).json(),snapshot_name)["state"]
time.sleep(5)
pass
print "Snapshoting on "+host+" finished with status "+get_snapshot_status_from_list_snapshots(get_repo_detail(host,default_repo_name).json(),snapshot_name)["state"]
pass
def initiate_snapshot_async(tuples,s3,role):
for identifier, host in tuples.items():
create_repo(host,s3,role,default_repo_name)
time.sleep(5)
localtime = time.localtime()
time_string = time.strftime("%Y-%m-%d-%H-%M-%S", localtime)
snapshot_name = identifier+"-"+time_string
try:
print "Create snapshot response : " + create_snapshot(host,default_repo_name,snapshot_name).text
time.sleep(5)
thread.start_new_thread( snapshot_checker_worker, (identifier, host, snapshot_name) )
except:
print "Error: unable to start snapshot or thread on "+ identifier
pass
pass
return
def initiate_snapshot(tuples,s3,role):
for identifier, host in tuples.items():
print "Create repo response : " + create_repo(host,s3,role,default_repo_name).text
time.sleep(2)
localtime = time.localtime()
time_string = time.strftime("%Y-%m-%d-%H-%M-%S", localtime)
snapshot_name = identifier+"-"+time_string
print "Create snapshot with name " + snapshot_name
try:
print "Create snapshot response : " + create_snapshot(host,default_repo_name,snapshot_name).text
time.sleep(2)
snapshot_checker_worker(identifier, host, snapshot_name)
except:
print "Error: unable to start snapshot or thread on "+ identifier
pass
pass
return
def get_latest_snapshot(json,identifier):
ret = None
for x in json["snapshots"]:
if x["snapshot"].startswith(identifier):
if ret == None:
ret = x
elif x["start_time_in_millis"] > ret["start_time_in_millis"]:
ret = x
pass
pass
return ret
def initiate_restore(tuples,s3,role):
for identifier, host in tuples.items():
print "Create repo response : " + create_repo(host,s3,role,default_repo_name).text
time.sleep(2)
print "Get repo response : " + get_repo_detail(host,default_repo_name).text
json = get_repo_detail(host,default_repo_name).json()
snapshot_name = get_latest_snapshot(json,identifier)["snapshot"]
if snapshot_name == None:
continue
print "Restoring "+snapshot_name+" for " + identifier
print "Restoring response " + restore(host,default_repo_name,snapshot_name).text
print get_indices(host).text
pass
return