From f6be5adae4d6293293f888ae3cd8e17776519ad8 Mon Sep 17 00:00:00 2001 From: sumana sree Date: Fri, 4 Oct 2024 17:22:20 +0530 Subject: [PATCH 1/6] Added examples for tensorflow types in Datatypes and IO section Signed-off-by: sumana sree --- .../data_types_and_io/tensorflow_type.py | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 examples/data_types_and_io/data_types_and_io/tensorflow_type.py diff --git a/examples/data_types_and_io/data_types_and_io/tensorflow_type.py b/examples/data_types_and_io/data_types_and_io/tensorflow_type.py new file mode 100644 index 000000000..00d99c905 --- /dev/null +++ b/examples/data_types_and_io/data_types_and_io/tensorflow_type.py @@ -0,0 +1,85 @@ +# Tensorflow Model +import tensorflow as tf +from flytekit import task, workflow + +@task +def train_model() -> tf.keras.Model: + model = tf.keras.Sequential([ + tf.keras.layers.Dense(128, activation='relu'), + tf.keras.layers.Dense(10, activation='softmax') + ]) + model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) + return model + +@task +def evaluate_model(model: tf.keras.Model, x: tf.Tensor, y: tf.Tensor) -> float: + loss, accuracy = model.evaluate(x, y) + return accuracy + +@workflow +def training_workflow(x: tf.Tensor, y: tf.Tensor) -> float: + model = train_model() + accuracy = evaluate_model(model=model, x=x, y=y) + return accuracy + + +# TensorFlow Record File +from flytekit.types.file import TFRecordFile +from flytekit import task, workflow + +@task +def process_tfrecord(file: TFRecordFile) -> int: + dataset = tf.data.TFRecordDataset(file) + count = 0 + for raw_record in dataset: + example = tf.train.Example() + example.ParseFromString(raw_record.numpy()) + count += 1 + return count + +@workflow +def tfrecord_workflow(file: TFRecordFile) -> int: + return process_tfrecord(file=file) + + +# TensorFlow Records Directory +from flytekit.types.directory import TFRecordsDirectory +from flytekit import task, workflow +import os +import tensorflow as tf + +@task +def process_tfrecords_dir(dir: TFRecordsDirectory) -> int: + files = [f.path for f in os.scandir(dir) if f.is_file()] + dataset = tf.data.TFRecordDataset(files) + count = 0 + for raw_record in dataset: + example = tf.train.Example() + example.ParseFromString(raw_record.numpy()) + count += 1 + return count + +@workflow +def tfrecords_dir_workflow(dir: TFRecordsDirectory) -> int: + return process_tfrecords_dir(dir=dir) + +# TFRecordDatasetConfig +from flytekit.types.directory import TFRecordsDirectory +from flytekit import task, workflow +import os +import tensorflow as tf + +@task +def process_tfrecords_dir(dir: TFRecordsDirectory) -> int: + files = [f.path for f in os.scandir(dir) if f.is_file()] + dataset = tf.data.TFRecordDataset(files) + count = 0 + for raw_record in dataset: + example = tf.train.Example() + example.ParseFromString(raw_record.numpy()) + count += 1 + return count + +@workflow +def tfrecords_dir_workflow(dir: TFRecordsDirectory) -> int: + return process_tfrecords_dir(dir=dir) From 903aab5ed842ad6c827921eb651bd9c56e328349 Mon Sep 17 00:00:00 2001 From: sumana sree Date: Sat, 5 Oct 2024 09:19:33 +0530 Subject: [PATCH 2/6] Fixed linting errors Signed-off-by: sumana sree --- .../data_types_and_io/tensorflow_type.py | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/examples/data_types_and_io/data_types_and_io/tensorflow_type.py b/examples/data_types_and_io/data_types_and_io/tensorflow_type.py index 00d99c905..d04a18541 100644 --- a/examples/data_types_and_io/data_types_and_io/tensorflow_type.py +++ b/examples/data_types_and_io/data_types_and_io/tensorflow_type.py @@ -1,7 +1,11 @@ -# Tensorflow Model +# Import necessary libraries and modules import tensorflow as tf from flytekit import task, workflow +from flytekit.types.file import TFRecordFile +from flytekit.types.directory import TFRecordsDirectory +import os +# TensorFlow Model @task def train_model() -> tf.keras.Model: model = tf.keras.Sequential([ @@ -24,9 +28,6 @@ def training_workflow(x: tf.Tensor, y: tf.Tensor) -> float: # TensorFlow Record File -from flytekit.types.file import TFRecordFile -from flytekit import task, workflow - @task def process_tfrecord(file: TFRecordFile) -> int: dataset = tf.data.TFRecordDataset(file) @@ -43,11 +44,6 @@ def tfrecord_workflow(file: TFRecordFile) -> int: # TensorFlow Records Directory -from flytekit.types.directory import TFRecordsDirectory -from flytekit import task, workflow -import os -import tensorflow as tf - @task def process_tfrecords_dir(dir: TFRecordsDirectory) -> int: files = [f.path for f in os.scandir(dir) if f.is_file()] @@ -63,12 +59,8 @@ def process_tfrecords_dir(dir: TFRecordsDirectory) -> int: def tfrecords_dir_workflow(dir: TFRecordsDirectory) -> int: return process_tfrecords_dir(dir=dir) -# TFRecordDatasetConfig -from flytekit.types.directory import TFRecordsDirectory -from flytekit import task, workflow -import os -import tensorflow as tf +# TFRecordDatasetConfig @task def process_tfrecords_dir(dir: TFRecordsDirectory) -> int: files = [f.path for f in os.scandir(dir) if f.is_file()] From 0958ad2cb643077039a050878ef84a8cbaba3990 Mon Sep 17 00:00:00 2001 From: sumana sree Date: Sun, 6 Oct 2024 10:47:45 +0530 Subject: [PATCH 3/6] updated tensorflow_type.py file to avoid linting errors Signed-off-by: sumana sree --- .../data_types_and_io/tensorflow_type.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/examples/data_types_and_io/data_types_and_io/tensorflow_type.py b/examples/data_types_and_io/data_types_and_io/tensorflow_type.py index d04a18541..95c9b2529 100644 --- a/examples/data_types_and_io/data_types_and_io/tensorflow_type.py +++ b/examples/data_types_and_io/data_types_and_io/tensorflow_type.py @@ -59,19 +59,3 @@ def process_tfrecords_dir(dir: TFRecordsDirectory) -> int: def tfrecords_dir_workflow(dir: TFRecordsDirectory) -> int: return process_tfrecords_dir(dir=dir) - -# TFRecordDatasetConfig -@task -def process_tfrecords_dir(dir: TFRecordsDirectory) -> int: - files = [f.path for f in os.scandir(dir) if f.is_file()] - dataset = tf.data.TFRecordDataset(files) - count = 0 - for raw_record in dataset: - example = tf.train.Example() - example.ParseFromString(raw_record.numpy()) - count += 1 - return count - -@workflow -def tfrecords_dir_workflow(dir: TFRecordsDirectory) -> int: - return process_tfrecords_dir(dir=dir) From 44f079d751e01f90cfca7c1c2dcb3194af7ae60a Mon Sep 17 00:00:00 2001 From: sumana sree Date: Sun, 13 Oct 2024 12:16:16 +0530 Subject: [PATCH 4/6] Apply lint corrections using pre-commit hooks Signed-off-by: sumana sree --- .../data_types_and_io/tensorflow_type.py | 35 ++++++++----------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/examples/data_types_and_io/data_types_and_io/tensorflow_type.py b/examples/data_types_and_io/data_types_and_io/tensorflow_type.py index 95c9b2529..114b2e0c2 100644 --- a/examples/data_types_and_io/data_types_and_io/tensorflow_type.py +++ b/examples/data_types_and_io/data_types_and_io/tensorflow_type.py @@ -1,61 +1,54 @@ # Import necessary libraries and modules + import tensorflow as tf from flytekit import task, workflow -from flytekit.types.file import TFRecordFile from flytekit.types.directory import TFRecordsDirectory -import os +from flytekit.types.file import TFRecordFile + # TensorFlow Model @task def train_model() -> tf.keras.Model: - model = tf.keras.Sequential([ - tf.keras.layers.Dense(128, activation='relu'), - tf.keras.layers.Dense(10, activation='softmax') - ]) - model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy']) + model = tf.keras.Sequential( + [tf.keras.layers.Dense(128, activation="relu"), tf.keras.layers.Dense(10, activation="softmax")] + ) + model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) return model + @task def evaluate_model(model: tf.keras.Model, x: tf.Tensor, y: tf.Tensor) -> float: loss, accuracy = model.evaluate(x, y) return accuracy + @workflow def training_workflow(x: tf.Tensor, y: tf.Tensor) -> float: model = train_model() - accuracy = evaluate_model(model=model, x=x, y=y) - return accuracy + return evaluate_model(model=model, x=x, y=y) -# TensorFlow Record File @task def process_tfrecord(file: TFRecordFile) -> int: - dataset = tf.data.TFRecordDataset(file) count = 0 - for raw_record in dataset: - example = tf.train.Example() - example.ParseFromString(raw_record.numpy()) + for record in tf.data.TFRecordDataset(file): count += 1 return count + @workflow def tfrecord_workflow(file: TFRecordFile) -> int: return process_tfrecord(file=file) -# TensorFlow Records Directory @task def process_tfrecords_dir(dir: TFRecordsDirectory) -> int: - files = [f.path for f in os.scandir(dir) if f.is_file()] - dataset = tf.data.TFRecordDataset(files) count = 0 - for raw_record in dataset: - example = tf.train.Example() - example.ParseFromString(raw_record.numpy()) + for record in tf.data.TFRecordDataset(dir.path): count += 1 return count + @workflow def tfrecords_dir_workflow(dir: TFRecordsDirectory) -> int: return process_tfrecords_dir(dir=dir) - From 606b78dbf6c4530fab64c2ad9ac0af25583a36a7 Mon Sep 17 00:00:00 2001 From: sumana sree Date: Sun, 13 Oct 2024 12:27:47 +0530 Subject: [PATCH 5/6] added required comments Signed-off-by: sumana sree --- examples/data_types_and_io/data_types_and_io/tensorflow_type.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/data_types_and_io/data_types_and_io/tensorflow_type.py b/examples/data_types_and_io/data_types_and_io/tensorflow_type.py index 114b2e0c2..7ebd04424 100644 --- a/examples/data_types_and_io/data_types_and_io/tensorflow_type.py +++ b/examples/data_types_and_io/data_types_and_io/tensorflow_type.py @@ -28,6 +28,7 @@ def training_workflow(x: tf.Tensor, y: tf.Tensor) -> float: return evaluate_model(model=model, x=x, y=y) +# TFRecord Files @task def process_tfrecord(file: TFRecordFile) -> int: count = 0 @@ -41,6 +42,7 @@ def tfrecord_workflow(file: TFRecordFile) -> int: return process_tfrecord(file=file) +# TFRecord Directories @task def process_tfrecords_dir(dir: TFRecordsDirectory) -> int: count = 0 From dde5ed88ed227bfecc4c6d0b713762c3729abe12 Mon Sep 17 00:00:00 2001 From: sumana sree Date: Tue, 15 Oct 2024 18:29:30 +0530 Subject: [PATCH 6/6] fixed error on importing tensorflow Signed-off-by: sumana sree --- .../data_types_and_io/tensorflow_type.py | 100 +++++++++--------- 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/examples/data_types_and_io/data_types_and_io/tensorflow_type.py b/examples/data_types_and_io/data_types_and_io/tensorflow_type.py index 7ebd04424..349f34b67 100644 --- a/examples/data_types_and_io/data_types_and_io/tensorflow_type.py +++ b/examples/data_types_and_io/data_types_and_io/tensorflow_type.py @@ -1,56 +1,56 @@ # Import necessary libraries and modules -import tensorflow as tf from flytekit import task, workflow from flytekit.types.directory import TFRecordsDirectory from flytekit.types.file import TFRecordFile - -# TensorFlow Model -@task -def train_model() -> tf.keras.Model: - model = tf.keras.Sequential( - [tf.keras.layers.Dense(128, activation="relu"), tf.keras.layers.Dense(10, activation="softmax")] - ) - model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) - return model - - -@task -def evaluate_model(model: tf.keras.Model, x: tf.Tensor, y: tf.Tensor) -> float: - loss, accuracy = model.evaluate(x, y) - return accuracy - - -@workflow -def training_workflow(x: tf.Tensor, y: tf.Tensor) -> float: - model = train_model() - return evaluate_model(model=model, x=x, y=y) - - -# TFRecord Files -@task -def process_tfrecord(file: TFRecordFile) -> int: - count = 0 - for record in tf.data.TFRecordDataset(file): - count += 1 - return count - - -@workflow -def tfrecord_workflow(file: TFRecordFile) -> int: - return process_tfrecord(file=file) - - -# TFRecord Directories -@task -def process_tfrecords_dir(dir: TFRecordsDirectory) -> int: - count = 0 - for record in tf.data.TFRecordDataset(dir.path): - count += 1 - return count - - -@workflow -def tfrecords_dir_workflow(dir: TFRecordsDirectory) -> int: - return process_tfrecords_dir(dir=dir) +custom_image = ImageSpec( + packages=["tensorflow", "tensorflow-datasets", "flytekitplugins-kftensorflow"], + registry="ghcr.io/flyteorg", +) + +if custom_image.is_container(): + import tensorflow as tf + + # TensorFlow Model + @task + def train_model() -> tf.keras.Model: + model = tf.keras.Sequential( + [tf.keras.layers.Dense(128, activation="relu"), tf.keras.layers.Dense(10, activation="softmax")] + ) + model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) + return model + + @task + def evaluate_model(model: tf.keras.Model, x: tf.Tensor, y: tf.Tensor) -> float: + loss, accuracy = model.evaluate(x, y) + return accuracy + + @workflow + def training_workflow(x: tf.Tensor, y: tf.Tensor) -> float: + model = train_model() + return evaluate_model(model=model, x=x, y=y) + + # TFRecord Files + @task + def process_tfrecord(file: TFRecordFile) -> int: + count = 0 + for record in tf.data.TFRecordDataset(file): + count += 1 + return count + + @workflow + def tfrecord_workflow(file: TFRecordFile) -> int: + return process_tfrecord(file=file) + + # TFRecord Directories + @task + def process_tfrecords_dir(dir: TFRecordsDirectory) -> int: + count = 0 + for record in tf.data.TFRecordDataset(dir.path): + count += 1 + return count + + @workflow + def tfrecords_dir_workflow(dir: TFRecordsDirectory) -> int: + return process_tfrecords_dir(dir=dir)