From 74994121099e4ce97424dbe5995274eb1713f516 Mon Sep 17 00:00:00 2001 From: WillNigel23 Date: Tue, 16 Jan 2024 11:40:48 +0800 Subject: [PATCH 1/3] llm-188 Initial concurrency re-implementation --- configs/configs.py | 2 +- engine.py | 32 ++++++++++++++++++++------------ src/agents.py | 13 +++++-------- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/configs/configs.py b/configs/configs.py index bb301ae..f246ee8 100644 --- a/configs/configs.py +++ b/configs/configs.py @@ -18,7 +18,7 @@ NUM_AGENTS = int(os.getenv("NUM_AGENTS", default="0")) NUM_ZOMBIES = int(os.getenv("NUM_ZOMBIES", default="0")) WARP = int(os.getenv("WARP", default="0")) -MAX_WORKERS = int(os.getenv("MAX_WORKERS", default="1")) +MAX_WORKERS = int(os.getenv("MAX_WORKERS", default="0")) IGNORE_PROB = float(os.getenv("IGNORE_PROB", default="0.75")) ALLOW_MOVEMENT = int(os.getenv("ALLOW_MOVEMENT", default="1")) SLEEP_STEP = float(os.getenv("SLEEP_STEP", default=0)) diff --git a/engine.py b/engine.py index b0e6bfa..dce8ac8 100755 --- a/engine.py +++ b/engine.py @@ -294,37 +294,42 @@ def run(self): self.sim_start_time = datetime.now() self.send_matrix_to_redis() for step in range(self.steps): + self.cur_step = step with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: if self.status == "stop": pd("stopping simulation") break start_time = datetime.now() + pd(f"Step {step + 1}:") + + redis_log(self.get_arr_2D(), f"{self.id}:matrix_states") + #redis_connection.lpush(f"{self.id}:agent_conversations", json.dumps(f"Step: {step + 1} | {self.unix_to_strftime(self.unix_time)}")) + redis_connection.set(f"{self.id}:matrix_state", json.dumps(self.get_arr_2D())) + print_and_log(f"Step: {step + 1} | {self.unix_to_strftime(self.unix_time)}", f"{self.id}:agent_conversations") - pd(f"Step {step}:") + self.log_agents_to_redis() - redis_connection.lpush(f"{self.id}:agent_conversations", json.dumps(f"Step: {step + 1} | {self.unix_to_strftime(self.unix_time)}")) + for a in self.agents: + print_and_log(f"Step: {step + 1} | {self.unix_to_strftime(self.unix_time)}", f"{self.id}:events:{a.name}") + print_and_log(f"Step: {step + 1} | {self.unix_to_strftime(self.unix_time)}", f"{self.id}:conversations:{a.name}") # Submit agent actions concurrently if LLM_ACTION == 1: - futures = [executor.submit(self.llm_action, agent, self.unix_time,step) for agent in self.agents] + futures = [executor.submit(self.llm_action, agent, self.unix_time) for agent in self.agents] else: - futures = [executor.submit(self.agent_action, agent, self.unix_time) for agent in self.agents] + futures = [executor.submit(self.agent_action, agent, self.unix_time) for agent in self.agents] + # Retrieve results from futures updated_agents = [future.result() for future in futures] - # Update the agents' states. Reflect - - #for i, updated_agent in enumerate(updated_agents): - # self.agents[i].__dict__.update(updated_agent.__dict__) if PRINT_MAP == 1: self.print_matrix() - redis_connection.set(f"{self.id}:matrix_state", json.dumps(self.get_arr_2D())) self.unix_time = self.unix_time + 10 end_time = datetime.now() pd(f"LLm calls for Step {step}: {llm.call_counter - self.llm_calls} calls") self.llm_calls = llm.call_counter - pd(f"Step {step} ran in {end_time - start_time}") + pd(f"Step {step + 1} ran in {end_time - start_time}") if SLEEP_STEP and SLEEP_STEP > 0: time.sleep(SLEEP_STEP) @@ -863,8 +868,11 @@ def main(): # Run start_time = datetime.now() - #matrix.run() - matrix.run_singlethread() + if MAX_WORKERS == 0: + matrix.run_singlethread() + else: + matrix.run() + end_time = datetime.now() matrix.run_interviews() diff --git a/src/agents.py b/src/agents.py index 3b9e669..7b5416b 100644 --- a/src/agents.py +++ b/src/agents.py @@ -252,7 +252,7 @@ def summarize_conversation(self, timestamp): } msg = llm.prompt(prompt_name="summarize_conversation", variables=variables) - interaction = f"{timestamp} - {self} summarized their conversation with {self.last_conversation.other_agent.name}: {msg}" + interaction = f"{timestamp} - {self} summarized the conversation: {msg}" if self.matrix is not None: print_and_log(interaction, f"{self.matrix.id}:events:{self.name}") @@ -305,9 +305,6 @@ def talk(self, opts={}): relevant_memories_string = relevant_memories_string + "\n".join([mem for mem in self.short_memory[-5:] if "said" not in mem]) previous_conversations = "\n".join([convo for convo in self.last_conversation.messages]) - - - variables = { 'selfContext': self.getSelfContext(), 'relevant_memories': relevant_memories_string, @@ -333,10 +330,10 @@ def talk(self, opts={}): print_and_log(interaction, f"{self.matrix.id}:conversations:{other_agent.name}") print_and_log(interaction, f"{self.matrix.id}:agent_conversations") # Temporarily here - #self.short_memory.append(interaction) - #self.add_short_memory(interaction, timestamp) - self.last_conversation.messages.append(interaction) - other_agent.last_conversation.messages.append(interaction) + if self.last_conversation is not None: + self.last_conversation.messages.append(interaction) + if other_agent.last_conversation is not None: + other_agent.last_conversation.messages.append(interaction) def talk_many(self, perceived_agents, timestamp): relevant_memories = self.getMemories(f"{[a.name for a in perceived_agents]}", timestamp) From 398ec97243db66af167b49351fc7c27e3a2e61e3 Mon Sep 17 00:00:00 2001 From: WillNigel23 Date: Wed, 17 Jan 2024 14:20:06 +0800 Subject: [PATCH 2/3] llm-188 Concurrency test runner and compares time between different setups --- concurrency.py | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++ engine.py | 1 + utils/utils.py | 13 +++++++++++ 3 files changed, 77 insertions(+) create mode 100644 concurrency.py diff --git a/concurrency.py b/concurrency.py new file mode 100644 index 0000000..ea2f9d6 --- /dev/null +++ b/concurrency.py @@ -0,0 +1,63 @@ +import threading +import concurrent.futures +from concurrent.futures import ThreadPoolExecutor +import time +from engine import Matrix + +steps = 5 +variation = { "steps": steps, "allow_plan": 1, "allow_reflect": 1, "allow_observance": 1, "allow_meta": 1, "llm_action": 1, "llm_importance": 0 } +scenario = "configs/christmas_party_situation.json" +id = "test_concurrency" +time_scores = [] + +# Run Single Thread 3 times and grab scores +for i in range(3): + params = { **variation, "scenario": scenario, "id": id } + matrix = Matrix(params) + + start_time = time.time() + for step in range(steps): + for a in matrix.agents: + matrix.llm_action(a, matrix.unix_time) + + matrix.unix_time += 10 + + end_time = time.time() + + time_scores.append(end_time - start_time) + +# Run 3 Threads 3 times and grab scores +for i in range(3): + params = { **variation, "scenario": scenario, "id": id, "max_workers": 3 } + matrix = Matrix(params) + + start_time = time.time() + for step in range(steps): + with concurrent.futures.ThreadPoolExecutor(max_workers=matrix.max_workers) as executor: + futures = [executor.submit(matrix.llm_action, agent, matrix.unix_time) for agent in matrix.agents] + updated_agents = [future.result() for future in futures] + + matrix.unix_time += 10 + + end_time = time.time() + + time_scores.append(end_time - start_time) + +# Run 5 Threads 3 times and grab scores +for i in range(3): + params = { **variation, "scenario": scenario, "id": id, "max_workers": 5 } + matrix = Matrix(params) + + start_time = time.time() + for step in range(steps): + with concurrent.futures.ThreadPoolExecutor(max_workers=matrix.max_workers) as executor: + futures = [executor.submit(matrix.llm_action, agent, matrix.unix_time) for agent in matrix.agents] + updated_agents = [future.result() for future in futures] + + matrix.unix_time += 10 + + end_time = time.time() + + time_scores.append(end_time - start_time) + +print(time_scores) diff --git a/engine.py b/engine.py index dce8ac8..83b078c 100755 --- a/engine.py +++ b/engine.py @@ -30,6 +30,7 @@ def __init__(self, matrix_data={}): self.allow_reflect_flag = matrix_data.get("allow_reflect", ALLOW_REFLECT) self.allow_meta_flag = matrix_data.get("allow_meta", ALLOW_META) self.allow_observance_flag = matrix_data.get("allow_observance", ALLOW_OBSERVANCE) + self.max_workers = matrix_data.get("max_workers", MAX_WORKERS) self.id = matrix_data.get("id", str(uuid.uuid4())) self.scenario_file = matrix_data.get("scenario", "configs/def.json") diff --git a/utils/utils.py b/utils/utils.py index e91cfdf..dd9d920 100644 --- a/utils/utils.py +++ b/utils/utils.py @@ -141,6 +141,19 @@ def generate(self, prompt, fallback="Llm Error"): self.call_counter += 1 else: msg = fallback + elif self.model == "vllm": + data = { + "prompt": prompt, + "use_beam_search": True, + "n": 4, + "temperature": 0 + } + response = requests.post(f"{VLLM_URL}/generate", json=data, timeout=LLAMA_TIMEOUT) + if response.status_code == 200: + msg = response.json()['text'][0] + self.call_counter += 1 + else: + msg = fallback + ": " + response.text else: response = requests.post(f"{current_url}/generate", json=data, timeout=LLAMA_TIMEOUT) From 774d2bdad6b9dff39cd3470e30fcdaec689a7606 Mon Sep 17 00:00:00 2001 From: WillNigel23 Date: Wed, 17 Jan 2024 14:21:17 +0800 Subject: [PATCH 3/3] Configs changed for vllm url --- configs/configs.py | 1 + 1 file changed, 1 insertion(+) diff --git a/configs/configs.py b/configs/configs.py index f246ee8..4508699 100644 --- a/configs/configs.py +++ b/configs/configs.py @@ -6,6 +6,7 @@ # DECLARE PARAMS HERE DEBUG = os.getenv("DEBUG", default="1") LLAMA_URL = os.getenv("LLAMA_URL", default="http://localhost:11434/api") +VLLM_URL = os.getenv("VLLM_URL", default="http://localhost:8000") POWERINF_URL = os.getenv("POWERINF_URL", default="http://localhost:8080") LLAMA_TIMEOUT = int(os.getenv("LLAMA_TIMEOUT", default="600")) REDIS_URL = os.getenv("REDIS_URL", default="redis://localhost:6379")