diff --git a/crates/jzflowsdk_py/loop.py b/crates/jzflowsdk_py/loop.py index 7cad961..4594f2a 100644 --- a/crates/jzflowsdk_py/loop.py +++ b/crates/jzflowsdk_py/loop.py @@ -35,8 +35,10 @@ def simple_loop(callback): if data is not None: if isinstance(data, list): for item in data: + data.id = new_id client.submit_output(item) else: + data.id = new_id client.submit_output(data) logger.info("submit new data %s", time.time() - instant) else: diff --git a/script/housing-prices/Dockerfile b/script/housing-prices/Dockerfile index 5f8c73b..9b4d944 100644 --- a/script/housing-prices/Dockerfile +++ b/script/housing-prices/Dockerfile @@ -1,7 +1,6 @@ FROM civisanalytics/datascience-python RUN pip install seaborn -i https://pypi.tuna.tsinghua.edu.cn/simple -RUN pip install requests-unixsocket2==0.4.1 -i https://pypi.tuna.tsinghua.edu.cn/simple -RUN pip install requests==2.32.3 -i https://pypi.tuna.tsinghua.edu.cn/simple +RUN pip install git+https://github.com/GitDataAI/jiaoziflow.git WORKDIR /workdir/ COPY *.py /workdir/ \ No newline at end of file diff --git a/script/housing-prices/ipc.py b/script/housing-prices/ipc.py deleted file mode 100644 index 8089f22..0000000 --- a/script/housing-prices/ipc.py +++ /dev/null @@ -1,80 +0,0 @@ -import requests_unixsocket -import json -import logging - -# Setup logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -# NotReady = 1, -# DataMissing = 2, -# InComingFinish = 3, -# AlreadyFinish = 4, -# NoAvaiableData = 5, -class IPCError(Exception): - def __init__(self, code: int, msg: str): - self.code = code - self.msg = msg - super().__init__(f"Error {code}: {msg}") - -class IPCClient: - def __init__(self): - #self.base_url = f'http+unix://%2Fhom e%2Fhunjixin%2Fcode%2Fjz-flow%2Ftest.d' unix_socket/compute_unit_runner_d - self.base_url = f'http+unix://%2Funix_socket%2Fcompute_unit_runner_d' - # Initialize the session - self.session = requests_unixsocket.Session() - - def _send_request(self, method: str, endpoint: str, json_data: dict = None): - url = f"{self.base_url}{endpoint}" - headers = {"Content-Type": "application/json"} - response = self.session.request(method, url, headers=headers, json=json_data) - - if response.status_code != 200: - try: - error_data = response.json() - code = error_data.get("code", 0) - message = error_data.get("msg", response.text) - raise IPCError(code, message) - except ValueError: - raise IPCError(0, response.text) - - return response - - def finish(self) -> None: - self._send_request("POST", "/api/v1/status") - - def status(self) -> dict: - response = self._send_request("GET", "/api/v1/status") - return response.json() - - def submit_output(self, req: dict) -> None: - self._send_request("POST", "/api/v1/submit", req) - - def request_available_data(self, id: str = None) -> dict: - endpoint = "/api/v1/data" - if id: - endpoint += f"?id={id}" - response = self._send_request("GET", endpoint) - return response.json() - - def complete_result(self, id: str) -> None: - req = {"id": id} - self._send_request("POST", "/api/v1/data", req) - -# Example usage -if __name__ == "__main__": - client = IPCClient() - - try: - client.finish() - status = client.status() - print("Status:", status) - - available_data = client.request_available_data() - print("Available Data:", available_data) - - client.complete_result('example-id') - client.submit_output({"new_id": "example-id", "timeout": 30, "flags": 0, "count": 0}) - - except IPCError as e: - logger.error(f"IPC Error: {e}") diff --git a/script/housing-prices/regression.py b/script/housing-prices/regression.py index ff413f2..9c6b172 100644 --- a/script/housing-prices/regression.py +++ b/script/housing-prices/regression.py @@ -11,6 +11,7 @@ import uuid from utils import plot_learning_curve from ipc import IPCClient, IPCError +from jzflowsdk import simple_loop from sklearn.model_selection import ShuffleSplit from sklearn import datasets, ensemble, linear_model @@ -72,57 +73,38 @@ def create_learning_curve(estimator, features, targets): plot_learning_curve(estimator, title, features, targets, ylim=(0.5, 1.01), cv=cv, n_jobs=4) -def main(): - client = IPCClient() - tmp_path = "/app/tmp" +def train_house_price(root_input_dir, output_dir): args = parser.parse_args() - while True: - instant = time.time() - try: - available_data = client.request_available_data() - if available_data is not None: - id = available_data['id'] - path_str = os.path.join(tmp_path, id) - root_input_dir = path_str - - new_id = str(uuid.uuid4()) - output_dir = os.path.join(tmp_path, new_id, "images") - os.makedirs(output_dir, exist_ok=True) - - for dirpath, dirs, files in os.walk(root_input_dir): - input_files = [ os.path.join(dirpath, filename) for filename in files if filename.endswith('.csv') ] - logger.info(f"Datasets: {input_files}") - for filename in input_files: - - experiment_name = os.path.basename(os.path.splitext(filename)[0]) - # Data loading and Exploration - data, features, targets = load_data(filename, args.target_col) - create_pairplot(data) - plt.savefig(path.join(output_dir, experiment_name + '_pairplot.png')) - logger.info("save : {}".format(path.join(output_dir, experiment_name + '_pairplot.sav'))) - - create_corr_matrix(data) - plt.savefig(path.join(output_dir, experiment_name + '_corr_matrix.png')) - logger.info("save : {}".format(path.join(output_dir, experiment_name + '_corr_matrix.sav'))) - - # Fit model - reg = train_model(features, targets) - create_learning_curve(reg, features, targets) - plt.savefig(path.join(output_dir, experiment_name + '_cv_reg_output.png')) - logger.info("save : {}".format(path.join(output_dir, experiment_name + '_cv_reg_output.sav'))) - - # Save model - joblib.dump(reg, path.join(output_dir, experiment_name + '_model.sav')) - logger.info("save : {}".format(path.join(output_dir, experiment_name + '_model.sav'))) - - - logger.info("process data %s", time.time() - instant) - - client.complete_result(id) - # Submit directory after completing a batch - data = { - "id": new_id, + output_dir = os.path.join(output_dir, "images") + os.makedirs(output_dir, exist_ok=True) + + for dirpath, dirs, files in os.walk(root_input_dir): + input_files = [ os.path.join(dirpath, filename) for filename in files if filename.endswith('.csv') ] + logger.info(f"Datasets: {input_files}") + for filename in input_files: + + experiment_name = os.path.basename(os.path.splitext(filename)[0]) + # Data loading and Exploration + data, features, targets = load_data(filename, args.target_col) + create_pairplot(data) + plt.savefig(path.join(output_dir, experiment_name + '_pairplot.png')) + logger.info("save : {}".format(path.join(output_dir, experiment_name + '_pairplot.sav'))) + + create_corr_matrix(data) + plt.savefig(path.join(output_dir, experiment_name + '_corr_matrix.png')) + logger.info("save : {}".format(path.join(output_dir, experiment_name + '_corr_matrix.sav'))) + + # Fit model + reg = train_model(features, targets) + create_learning_curve(reg, features, targets) + plt.savefig(path.join(output_dir, experiment_name + '_cv_reg_output.png')) + logger.info("save : {}".format(path.join(output_dir, experiment_name + '_cv_reg_output.sav'))) + + # Save model + joblib.dump(reg, path.join(output_dir, experiment_name + '_model.sav')) + logger.info("save : {}".format(path.join(output_dir, experiment_name + '_model.sav'))) + data = { "size": 4, "data_flag": { "is_keep_data": False, @@ -130,30 +112,7 @@ def main(): }, "priority": 0, } - client.submit_output(data) - logger.info("submit new data %s", time.time() - instant) - else: - time.sleep(2) - continue + return data - except IPCError as e: - if e.code == 1: - time.sleep(2) - continue - elif e.code == 3: - client.finish() - logger.info(f"incoming data finish {e.msg}") - return - elif e.code == 4: - logger.info(f"receive AlreadyFinish {e.msg}") - time.sleep(60*60*24*365) # alway wait here to provent container restart - return - elif e.code == 5: - logger.info(f"no avaiable data {e.msg}") - time.sleep(2) - continue - else: - logger.error("got unknown error %s", e) - time.sleep(5) if __name__ == "__main__": - main() \ No newline at end of file + simple_loop(train_house_price) \ No newline at end of file