From e7c783c3e87422aadcdf61b6ea0f09e1124acc44 Mon Sep 17 00:00:00 2001
From: unknown <katharina.goenner@tik.uni-stuttgart.de>
Date: Thu, 2 Jun 2022 11:28:01 +0200
Subject: [PATCH] Receive preparation messages from websocket-api. Then if
 environment is container, load image

---
 amqp_messager.py  |  9 ++++---
 backend.py        | 68 +++++++++++++++++++++++++++++++----------------
 config.sample.ini |  2 +-
 3 files changed, 52 insertions(+), 27 deletions(-)

diff --git a/amqp_messager.py b/amqp_messager.py
index 18de360..1cf0b24 100644
--- a/amqp_messager.py
+++ b/amqp_messager.py
@@ -7,13 +7,14 @@
 
 class AMQPMessager(MessagingHandler):    
     def __init__(self, server, receiver_queues, sender_queue, task_queue,
-                 result_queue):
+                 result_queue, preparation_queue):
         super(AMQPMessager, self).__init__()
         self.server = server
         self.receiver_queues = receiver_queues
         self.sender_queue = sender_queue
         self.tasks = task_queue
         self.results = result_queue
+        self.preparations = preparation_queue
 
     def on_start(self, event):
         conn = event.container.connect(self.server)
@@ -28,8 +29,10 @@ def on_start(self, event):
 
     def on_message(self, event):
         # ToDO: ignore duplicate message
-        # print(event.message.body)
-        self.tasks.put(event.message.body)
+        if event.message.address != "preparations":
+            self.tasks.put(event.message.body)
+        elif event.message.address == "preparations":
+            self.preparations.put(event.message.body)
     
     def on_result(self, event):
         # check if we are finished
diff --git a/backend.py b/backend.py
index 7f86e51..3579851 100644
--- a/backend.py
+++ b/backend.py
@@ -8,6 +8,7 @@
 import re
 import json
 import tempfile
+from xml.etree.ElementPath import prepare_star
 import docker
 import requests
 import time
@@ -34,6 +35,7 @@ def __init__(self, config_file):
             print("Using env AMQPServer %s"%os.getenv('AMQPServer'))
         self.tasks = multiprocessing.Queue(3)
         self.results = multiprocessing.Queue()
+        self.preparations = multiprocessing.Queue()
         self.running_computations = {}
         self.client = docker.from_env()
         # ToDO: store errors and send them within result-message back
@@ -46,7 +48,8 @@ def __init__(self, config_file):
                         self.config.getlist("AMQP", "computationqueues"),
                         self.config["AMQP"]["resultqueue"],
                         self.tasks,
-                        self.results))
+                        self.results,
+                        self.preparations))
         self.messager_process = multiprocessing.Process(target=messager.run)
         self.messager_process.start()
             
@@ -103,6 +106,20 @@ def main(self):
                     comp2trash.append(key)
             for key in comp2trash:
                 del self.running_computations[key]
+
+            # prepare containers if available
+            # TODO also prepare matlab
+            try:
+                prepare_task = self.preparations.get(block=True, timeout=1)
+            except Empty:
+                pass
+            else:
+                print("--- Got preparation task. ---")
+                json_prepare_task = json.loads(prepare_task)
+                tmp_dir, files = self._prepare_all_environments(json_prepare_task)
+                if json_prepare_task["environment"] == "Container":
+                    comp_conf = ConfigurationContainerSchema().load(json_prepare_task["configuration"])
+                    self.load_image(comp_conf, tmp_dir.name)
                 
     def _prepare_all_environments(self, computation):
         # create tmp-dir for this computation and store files there
@@ -137,7 +154,33 @@ def _prepare_container_backend(self, computation, tmp_dir):
         # ToDO: create in-between status messages for frontend
         comp_conf = ConfigurationContainerSchema().load(
             computation["configuration"])
+        
         # load image
+        image_filename, image_id = self.load_image(comp_conf, tmp_dir)
+        
+        # create container
+        if comp_conf["volume"] is not None:
+            print("Creating volume ...")
+            volume = self.client.volumes.create(labels={"computation": computation['identifier'].hex})
+
+        print("Creating container ...")
+        container = self.client.containers.create(
+            image_id,
+            command=comp_conf["command_line_arguments"], 
+            auto_remove=False,
+            cpu_quota=100000*comp_conf["num_cpus"], 
+            detach=True,
+            entrypoint=comp_conf["entrypoint"],
+            mem_limit=comp_conf["memory"],
+            mounts=[Mount(comp_conf["volume"],volume.id)] \
+                if comp_conf["volume"] is not None else None)
+        print("... Done.")
+        if comp_conf["volume"] is not None:
+            return container, image_filename, volume
+        else:
+            return container, image_filename, None
+
+    def load_image(self, comp_conf, tmp_dir):
         image_filename = None
         image_uri = comp_conf["image"]
         if image_uri.startswith("file"):
@@ -175,27 +218,7 @@ def _prepare_container_backend(self, computation, tmp_dir):
             with open(os.path.join(tmp_dir, image_filename), 'rb') as bf:
                 image_id = self.client.images.load(bf)[0].id
         
-        # create container
-        if comp_conf["volume"] is not None:
-            print("Creating volume ...")
-            volume = self.client.volumes.create(labels={"computation": computation['identifier'].hex})
-
-        print("Creating container ...")
-        container = self.client.containers.create(
-            image_id,
-            command=comp_conf["command_line_arguments"], 
-            auto_remove=False,
-            cpu_quota=100000*comp_conf["num_cpus"], 
-            detach=True,
-            entrypoint=comp_conf["entrypoint"],
-            mem_limit=comp_conf["memory"],
-            mounts=[Mount(comp_conf["volume"],volume.id)] \
-                if comp_conf["volume"] is not None else None)
-        print("... Done.")
-        if comp_conf["volume"] is not None:
-            return container, image_filename, volume
-        else:
-            return container, image_filename, None
+        return image_filename, image_id
 
     def copy_to_container(self, ip_add, basepath, files):
         print(basepath, files)
@@ -210,7 +233,6 @@ def add_mimetypes(self):
         mimetypes.add_type("application/x-vgf", ".vgf")
         mimetypes.add_type("application/x-vgf3", ".vgf3")
         mimetypes.add_type("application/x-vgfc", ".vgfc")
-                
 
 class ResultStreamer(Thread):
     def __init__(self, stream, tmp_dir, files, result_queue, computation_id, sidekick):
diff --git a/config.sample.ini b/config.sample.ini
index a6c65d9..257f7e3 100644
--- a/config.sample.ini
+++ b/config.sample.ini
@@ -5,5 +5,5 @@ KeepContainer = no # yes for debugging
 
 [AMQP]
 Server = localhost:5672
-ComputationQueues = ["computations", "computations/container"]
+ComputationQueues = ["computations", "computations/container", "preparations"]
 ResultQueue = results
\ No newline at end of file