diff --git a/k8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py b/k8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py new file mode 100644 index 0000000000..6350ac42d0 --- /dev/null +++ b/k8s/examples/vineyard-on-fluid/linear-regression-with-vineyard.py @@ -0,0 +1,163 @@ +import fluid + +from fluid import constants +from fluid import models + +# Use the default kubeconfig file to connect to the Fluid control plane +# and create a Fluid client instance +client_config = fluid.ClientConfig() +fluid_client = fluid.FluidClient(client_config) + +# Create a dataset named "vineyard" in the default namespace +fluid_client.create_dataset( + dataset_name="vineyard", + mount_name="dummy-mount-name", + mount_point="dummy-mount-point" +) + +# Get the dataset instance of the "vineyard" dataset +dataset = fluid_client.get_dataset(dataset_name="vineyard") + +# Init vineyard runtime configuration and bind the vineyard dataset instance to the runtime. +# Replicas is 2, and the memory is 30Gi +dataset.bind_runtime( + runtime_type=constants.VINEYARD_RUNTIME_KIND, + replicas=2, + cache_capacity_GiB=30, + cache_medium="MEM", + wait=True +) + +# define the script of data preprocessing +preprocess_data_script = """ +pip3 install numpy pandas pyarrow requests vineyard scikit-learn==1.4.0 joblib==1.3.2 +#!/bin/bash +set -ex + +cat < ./preprocess.py +from sklearn.model_selection import train_test_split + +import pandas as pd +import vineyard + +df = pd.read_pickle('/data/df.pkl') + +# Preprocess Data +df = df.drop(df[(df['GrLivArea']>4800)].index) +X = df.drop('SalePrice', axis=1) # Features +y = df['SalePrice'] # Target variable + +del df + +X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2) + +del X, y + +vineyard.put(X_train, name="x_train", persist=True) +vineyard.put(X_test, name="x_test", persist=True) +vineyard.put(y_train, name="y_train", persist=True) +vineyard.put(y_test, name="y_test", persist=True) + +EOF + +python3 ./preprocess.py +""" + +# define the script of model training +train_data_script = """ +pip3 install numpy pandas pyarrow requests vineyard scikit-learn==1.4.0 joblib==1.3.2 +#!/bin/bash +set -ex + +cat < ./train.py +from sklearn.linear_model import LinearRegression + +import joblib +import pandas as pd +import vineyard + +x_train_data = vineyard.get(name="x_train", fetch=True) +y_train_data = vineyard.get(name="y_train", fetch=True) + +model = LinearRegression() +model.fit(x_train_data, y_train_data) + +joblib.dump(model, '/data/model.pkl') + +EOF +python3 ./train.py +""" + +# define the script of model testing +test_data_script = """ +pip3 install numpy pandas pyarrow requests vineyard scikit-learn==1.4.0 joblib==1.3.2 +#!/bin/bash +set -ex + +cat < ./test.py +from sklearn.linear_model import LinearRegression +from sklearn.metrics import mean_squared_error + +import vineyard +import joblib +import pandas as pd + +x_test_data = vineyard.get(name="x_test", fetch=True) +y_test_data = vineyard.get(name="y_test", fetch=True) + +model = joblib.load("/data/model.pkl") +y_pred = model.predict(x_test_data) + +err = mean_squared_error(y_test_data, y_pred) + +with open('/data/output.txt', 'a') as f: + f.write(str(err)) + +EOF + +python3 ./test.py +""" + +from kubernetes.client import models as k8s_models +# define the template of the task processor and mount the OSS Volume +def create_processor(script): + return models.Processor( + # When enabling fuse affinity scheduling, add the following label + # to achieve the best performance of data processing + # pod_metadata=models.PodMetadata( + # labels={"fuse.serverful.fluid.io/inject": "true"}, + # ), + script=models.ScriptProcessor( + command=["bash"], + source=script, + image="python", + image_tag="3.10", + volumes=[k8s_models.V1Volume( + name="data", + persistent_volume_claim=k8s_models.V1PersistentVolumeClaimVolumeSource( + claim_name="pvc-oss" + ) + )], + volume_mounts=[k8s_models.V1VolumeMount( + name="data", + mount_path="/data" + )], + ) + ) + +preprocess_processor = create_processor(preprocess_data_script) +train_processor = create_processor(train_data_script) +test_processor = create_processor(test_data_script) + +# Create a linear regression model task workflow: data preprocessing -> model training -> model testing +# The following mount path "/var/run" is the default path of the vineyard configuration file +flow = dataset.process(processor=preprocess_processor, dataset_mountpath="/var/run") \ + .process(processor=train_processor, dataset_mountpath="/var/run") \ + .process(processor=test_processor, dataset_mountpath="/var/run") + +# Submit the linear regression model task workflow to the Fluid platform and start execution +run = flow.run(run_id="linear-regression-with-vineyard") +run.wait() + +# Clean up all resources +dataset.clean_up(wait=True) diff --git a/k8s/examples/vineyard-on-fluid/prepare-dataset.py b/k8s/examples/vineyard-on-fluid/prepare-dataset.py new file mode 100644 index 0000000000..d6cad2c1a2 --- /dev/null +++ b/k8s/examples/vineyard-on-fluid/prepare-dataset.py @@ -0,0 +1,65 @@ +import numpy as np +import pandas as pd + +# generate a dataframe with size around 22G +num_rows = 6000 * 10000 +df = pd.DataFrame({ + 'Id': np.random.randint(1, 100000, num_rows), + 'MSSubClass': np.random.randint(20, 201, size=num_rows), + 'LotFrontage': np.random.randint(50, 151, size=num_rows), + 'LotArea': np.random.randint(5000, 20001, size=num_rows), + 'OverallQual': np.random.randint(1, 11, size=num_rows), + 'OverallCond': np.random.randint(1, 11, size=num_rows), + 'YearBuilt': np.random.randint(1900, 2022, size=num_rows), + 'YearRemodAdd': np.random.randint(1900, 2022, size=num_rows), + 'MasVnrArea': np.random.randint(0, 1001, size=num_rows), + 'BsmtFinSF1': np.random.randint(0, 2001, size=num_rows), + 'BsmtFinSF2': np.random.randint(0, 1001, size=num_rows), + 'BsmtUnfSF': np.random.randint(0, 2001, size=num_rows), + 'TotalBsmtSF': np.random.randint(0, 3001, size=num_rows), + '1stFlrSF': np.random.randint(500, 4001, size=num_rows), + '2andFlrSF': np.random.randint(0, 2001, size=num_rows), + 'LowQualFinSF': np.random.randint(0, 201, size=num_rows), + 'GrLivArea': np.random.randint(600, 5001, size=num_rows), + 'BsmtFullBath': np.random.randint(0, 4, size=num_rows), + 'BsmtHalfBath': np.random.randint(0, 3, size=num_rows), + 'FullBath': np.random.randint(0, 5, size=num_rows), + 'HalfBath': np.random.randint(0, 3, size=num_rows), + 'BedroomAbvGr': np.random.randint(0, 11, size=num_rows), + 'KitchenAbvGr': np.random.randint(0, 4, size=num_rows), + 'TotRmsAbvGrd': np.random.randint(0, 16, size=num_rows), + 'Fireplaces': np.random.randint(0, 4, size=num_rows), + 'GarageYrBlt': np.random.randint(1900, 2022, size=num_rows), + 'GarageCars': np.random.randint(0, 5, num_rows), + 'GarageArea': np.random.randint(0, 1001, num_rows), + 'WoodDeckSF': np.random.randint(0, 501, num_rows), + 'OpenPorchSF': np.random.randint(0, 301, num_rows), + 'EnclosedPorch': np.random.randint(0, 201, num_rows), + '3SsnPorch': np.random.randint(0, 101, num_rows), + 'ScreenPorch': np.random.randint(0, 201, num_rows), + 'PoolArea': np.random.randint(0, 301, num_rows), + 'MiscVal': np.random.randint(0, 5001, num_rows), + 'TotalRooms': np.random.randint(2, 11, num_rows), + "GarageAge": np.random.randint(1, 31, num_rows), + "RemodAge": np.random.randint(1, 31, num_rows), + "HouseAge": np.random.randint(1, 31, num_rows), + "TotalBath": np.random.randint(1, 5, num_rows), + "TotalPorchSF": np.random.randint(1, 1001, num_rows), + "TotalSF": np.random.randint(1000, 6001, num_rows), + "TotalArea": np.random.randint(1000, 6001, num_rows), + 'MoSold': np.random.randint(1, 13, num_rows), + 'YrSold': np.random.randint(2006, 2022, num_rows), + 'SalePrice': np.random.randint(50000, 800001, num_rows), +}) + +import oss2 +import io +from oss2.credentials import EnvironmentVariableCredentialsProvider +# Please set your OSS accessKeyID and accessKeySecret as environment variables OSS_ACCESS_KEY_ID and OSS_ACCESS_KEY_SECRET +auth = oss2.ProviderAuth(EnvironmentVariableCredentialsProvider()) +# Please replace OSS_ENDPOINT and BUCKET_NAME with your OSS Endpoint and Bucket +bucket = oss2.Bucket(auth, 'OSS_ENDPOINT', 'BUCKET_NAME') + +bytes_buffer = io.BytesIO() +df.to_pickle(bytes_buffer) +bucket.put_object("df.pkl", bytes_buffer.getvalue())