-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcluster.py
130 lines (97 loc) · 2.97 KB
/
cluster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import sys
import time
import logging
from collections import Counter
from redisearch import Client
import redisearch
import json
import ray
import os
@ray.remote
class RecordTracker:
def __int__(self):
self.total = 0
def inc(self):
self.total += 1
def counts(self):
return self.total
@ray.remote
class RecordLoader:
def __int__(self):
print("file")
def loadFile(file):
print(file)
def loadRecord(record):
print(record)
file = open(
'/Users/tiagoooliveira/Documents/dev/scala/akka-http-quickstart-scala/src/main/resources/patent-13062022-1.json')
data = json.load(file)
file.close()
database_object_ref = ray.put(data)
@ray.remote
def get_host_name(x):
import platform
import time
time.sleep(0.01)
return x + (platform.node(),)
@ray.remote
def f(x):
time.sleep(1)
return x
@ray.remote
def send_to_redis(record):
print("sending")
#https://pypi.org/project/redisearch/
@ray.remote
def retrieve_task(item):
obj_store_data = ray.get(database_object_ref)
time.sleep(item / 10.)
return item, obj_store_data[item]
def wait_for_nodes(expected):
# Wait for all nodes to join the cluster.
while True:
num_nodes = len(ray.nodes())
if num_nodes < expected:
print(
"{} nodes have joined so far, waiting for {} more.".format(
num_nodes, expected - num_nodes
)
)
sys.stdout.flush()
time.sleep(1)
else:
break
def print_runtime(input_data, start_time, decimals=1):
print(f'Runtime: {time.time() - start_time:.{decimals}f} seconds, data:')
print(*input_data, sep='\n')
def main():
print("initialized go")
# Start 4 tasks in parallel.
result_ids = []
for i in range(4):
result_ids.append(f.remote(i))
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_ids) # [0, 1, 2, 3]
#https://realpython.com/python-redis/
#wait_for_nodes(4)
#curl -O --insecure -I https://files.pythonhosted.org/packages/ba/7d/2ae672b176e675519c0f5d2cb46f023e37be3754a59f7307756e3fdf7552/redisearch-2.1.1-py3-none-any.whl
#r = redis.Redis(host=os.environ["REDIS_ENDPOINT"])
#loading from file
#making database available for actors
#iterate over remote method/actor (responsible to send json to Redis) at the limit of actors available
if __name__ == "__main__":
if ray.is_initialized:
ray.shutdown()
ray.init(logging_level=logging.ERROR)
start = time.time()
# data_references = [retrieve_task.remote(item) for item in range(8)]
data_references = retrieve_task.remote("CN112310387B")
all_data = []
finished, data_references = ray.wait(data_references)
data = ray.get(finished)
print_runtime(data, start, 3)
# all_data.extend(data)
print("Success!")
sys.stdout.flush()
time.sleep(20)