Skip to content

Commit

Permalink
feat: add python simple sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 22, 2024
1 parent f17cacd commit b171e34
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 156 deletions.
2 changes: 2 additions & 0 deletions crates/jzflowsdk_py/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ def simple_loop(callback):
if data is not None:
if isinstance(data, list):
for item in data:
data.id = new_id
client.submit_output(item)
else:
data.id = new_id
client.submit_output(data)
logger.info("submit new data %s", time.time() - instant)
else:
Expand Down
3 changes: 1 addition & 2 deletions script/housing-prices/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
FROM civisanalytics/datascience-python
RUN pip install seaborn -i https://pypi.tuna.tsinghua.edu.cn/simple
RUN pip install requests-unixsocket2==0.4.1 -i https://pypi.tuna.tsinghua.edu.cn/simple
RUN pip install requests==2.32.3 -i https://pypi.tuna.tsinghua.edu.cn/simple
RUN pip install git+https://github.com/GitDataAI/jiaoziflow.git

WORKDIR /workdir/
COPY *.py /workdir/
80 changes: 0 additions & 80 deletions script/housing-prices/ipc.py

This file was deleted.

107 changes: 33 additions & 74 deletions script/housing-prices/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import uuid
from utils import plot_learning_curve
from ipc import IPCClient, IPCError
from jzflowsdk import simple_loop

from sklearn.model_selection import ShuffleSplit
from sklearn import datasets, ensemble, linear_model
Expand Down Expand Up @@ -72,88 +73,46 @@ def create_learning_curve(estimator, features, targets):
plot_learning_curve(estimator, title, features, targets,
ylim=(0.5, 1.01), cv=cv, n_jobs=4)

def main():
client = IPCClient()
tmp_path = "/app/tmp"
def train_house_price(root_input_dir, output_dir):
args = parser.parse_args()

while True:
instant = time.time()
try:
available_data = client.request_available_data()
if available_data is not None:
id = available_data['id']
path_str = os.path.join(tmp_path, id)
root_input_dir = path_str

new_id = str(uuid.uuid4())
output_dir = os.path.join(tmp_path, new_id, "images")
os.makedirs(output_dir, exist_ok=True)

for dirpath, dirs, files in os.walk(root_input_dir):
input_files = [ os.path.join(dirpath, filename) for filename in files if filename.endswith('.csv') ]
logger.info(f"Datasets: {input_files}")
for filename in input_files:

experiment_name = os.path.basename(os.path.splitext(filename)[0])
# Data loading and Exploration
data, features, targets = load_data(filename, args.target_col)
create_pairplot(data)
plt.savefig(path.join(output_dir, experiment_name + '_pairplot.png'))
logger.info("save : {}".format(path.join(output_dir, experiment_name + '_pairplot.sav')))

create_corr_matrix(data)
plt.savefig(path.join(output_dir, experiment_name + '_corr_matrix.png'))
logger.info("save : {}".format(path.join(output_dir, experiment_name + '_corr_matrix.sav')))

# Fit model
reg = train_model(features, targets)
create_learning_curve(reg, features, targets)
plt.savefig(path.join(output_dir, experiment_name + '_cv_reg_output.png'))
logger.info("save : {}".format(path.join(output_dir, experiment_name + '_cv_reg_output.sav')))

# Save model
joblib.dump(reg, path.join(output_dir, experiment_name + '_model.sav'))
logger.info("save : {}".format(path.join(output_dir, experiment_name + '_model.sav')))


logger.info("process data %s", time.time() - instant)

client.complete_result(id)
# Submit directory after completing a batch
data = {
"id": new_id,
output_dir = os.path.join(output_dir, "images")
os.makedirs(output_dir, exist_ok=True)

for dirpath, dirs, files in os.walk(root_input_dir):
input_files = [ os.path.join(dirpath, filename) for filename in files if filename.endswith('.csv') ]
logger.info(f"Datasets: {input_files}")
for filename in input_files:

experiment_name = os.path.basename(os.path.splitext(filename)[0])
# Data loading and Exploration
data, features, targets = load_data(filename, args.target_col)
create_pairplot(data)
plt.savefig(path.join(output_dir, experiment_name + '_pairplot.png'))
logger.info("save : {}".format(path.join(output_dir, experiment_name + '_pairplot.sav')))

create_corr_matrix(data)
plt.savefig(path.join(output_dir, experiment_name + '_corr_matrix.png'))
logger.info("save : {}".format(path.join(output_dir, experiment_name + '_corr_matrix.sav')))

# Fit model
reg = train_model(features, targets)
create_learning_curve(reg, features, targets)
plt.savefig(path.join(output_dir, experiment_name + '_cv_reg_output.png'))
logger.info("save : {}".format(path.join(output_dir, experiment_name + '_cv_reg_output.sav')))

# Save model
joblib.dump(reg, path.join(output_dir, experiment_name + '_model.sav'))
logger.info("save : {}".format(path.join(output_dir, experiment_name + '_model.sav')))
data = {
"size": 4,
"data_flag": {
"is_keep_data": False,
"is_transparent_data": False
},
"priority": 0,
}
client.submit_output(data)
logger.info("submit new data %s", time.time() - instant)
else:
time.sleep(2)
continue
return data

except IPCError as e:
if e.code == 1:
time.sleep(2)
continue
elif e.code == 3:
client.finish()
logger.info(f"incoming data finish {e.msg}")
return
elif e.code == 4:
logger.info(f"receive AlreadyFinish {e.msg}")
time.sleep(60*60*24*365) # alway wait here to provent container restart
return
elif e.code == 5:
logger.info(f"no avaiable data {e.msg}")
time.sleep(2)
continue
else:
logger.error("got unknown error %s", e)
time.sleep(5)
if __name__ == "__main__":
main()
simple_loop(train_house_price)

0 comments on commit b171e34

Please sign in to comment.