Skip to content

Commit

Permalink
Cleanup and update scripts for running benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
IsaevIlya committed Aug 13, 2024
1 parent 76aaa19 commit 1f843de
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 107 deletions.
2 changes: 1 addition & 1 deletion s3torchbenchmarking/conf/lightning_checkpointing.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ hydra:
mode: MULTIRUN
sweeper:
params:
training.model: vit, whisper, clip, t0_3b
training.model: vit, whisper, clip, t0_3b, t0pp
checkpoint: disk, s3
checkpoint.save_one_in: 1
150 changes: 60 additions & 90 deletions s3torchbenchmarking/utils/download_and_transform_results.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,54 @@
import boto3
import json
from collections import defaultdict
import csv
import sys
from enum import Enum
from typing import List, Dict, Any
import json
import boto3

# Using type aliases for better code readability
JsonData = List[Dict[str, Any]]
ExtractedData = List[Dict[str, Any]]


class GenerateForMode(Enum):
"""
Enum class to represent the mode for data generation.
"""

DATALOAD = "dataload"
CHECKPOINT = "checkpoint"


s3_client = boto3.client("s3")


def get_object_keys(bucket_name, prefix):
"""
Get a list of all object keys (file names) in the S3 bucket.
"""
response = s3_client.list_objects_v2(Bucket=bucket_name)
# Get a list of all object keys (file names) in the S3 bucket.
def get_object_keys(bucket_name: str, prefix: str) -> List[str]:
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
object_keys = [content["Key"] for content in response.get("Contents", [])]
while response["IsTruncated"]:
while response.get("IsTruncated", False):
response = s3_client.list_objects_v2(
Bucket=bucket_name, ContinuationToken=response["NextContinuationToken"]
Bucket=bucket_name,
Prefix=prefix,
ContinuationToken=response["NextContinuationToken"],
)
object_keys.extend([content["Key"] for content in response.get("Contents", [])])

# filter out the files that are not json files
object_keys = [key for key in object_keys if key.endswith(".json")]
# filter out the files that are not in the root folder
object_keys = [key for key in object_keys if prefix in key]
print(object_keys)
return object_keys
return [key for key in object_keys if key.endswith(".json")]


def read_json_from_s3(bucket_name, object_key):
"""
Read the content of a JSON file from the S3 bucket.
"""
response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
json_data = response["Body"].read().decode("utf-8")
# Read the content of a JSON file from the S3 bucket.
def read_json_from_s3(bucket_name: str, object_key: str) -> JsonData:
try:
json_data = json.loads(json_data)
return json_data
except json.JSONDecodeError as e:
print(f"Error decoding JSON for {object_key}: {e}")
return ""
response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
json_data = response["Body"].read().decode("utf-8")
return json.loads(json_data)
except (json.JSONDecodeError, s3_client.exceptions.ClientError) as exc:
print(f"Error reading JSON for {object_key}: {exc}")
return []


# Function to extract relevant fields from a JSON file
def extract_fields_chekpoint(json_data):
# Extract relevant fields from a JSON file for checkpointing.
def extract_fields_checkpoint(json_data: JsonData) -> ExtractedData:
extracted_data = []
for entry in json_data:
cfg = entry["cfg"]
Expand Down Expand Up @@ -79,8 +78,8 @@ def extract_fields_chekpoint(json_data):
return extracted_data


# Function to extract relevant fields from a JSON file
def extract_fields_dataloading(json_data):
# Extract relevant fields from a JSON file for data loading.
def extract_fields_dataloading(json_data: JsonData) -> ExtractedData:
extracted_data = []
for entry in json_data:
cfg = entry["cfg"]
Expand Down Expand Up @@ -114,7 +113,7 @@ def extract_fields_dataloading(json_data):


# Helper function to count rows for rowspan
def count_rows(data):
def count_rows(data: Dict[str, Any]) -> int:
if "result" in data:
return 1
return sum(
Expand All @@ -123,14 +122,16 @@ def count_rows(data):


# Helper function to generate merged table cells
def generate_table_rows(data, indent=0, col_spans=None):
def generate_table_rows(
data: Dict[str, Any], indent: int = 0, col_spans: List[int] = None
) -> str:
if col_spans is None:
col_spans = [1] * 7

html = ""
if "result" in data:
html += "<tr>"
for i in range(indent):
for _ in range(indent):
html += "<td></td>"
for key, value in data["result"].items():
html += f"<td>{value}</td>"
Expand All @@ -148,19 +149,12 @@ def generate_table_rows(data, indent=0, col_spans=None):
return html


# Function to recursively set nested dictionaries with explicit field names
def set_nested_dict(d, keys, value, field_names):
current_level = d
for key, name in zip(keys, field_names):
if name not in current_level:
current_level[name] = {}
if key not in current_level[name]:
current_level[name][key] = {}
current_level = current_level[name][key]
current_level["result"] = value


def load_data_from_s3(extract_fields_function, bucket_name, prefix):
# Function to read all json files from S3
def load_data_from_s3(
extract_fields_function: callable,
bucket_name: str,
prefix: str,
) -> ExtractedData:
object_keys = get_object_keys(bucket_name, prefix)
all_data = []

Expand All @@ -170,47 +164,21 @@ def load_data_from_s3(extract_fields_function, bucket_name, prefix):
return all_data


def save_data_to_csv(all_data, file_name):
# Get the keys from the first dictionary in all_data
fieldnames = list(all_data[0].keys())

# Open the CSV file for writing
with open(file_name, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# Write the header row
writer.writeheader()

# Write each dictionary as a row in the CSV file
for data in all_data:
writer.writerow(data)


def save_data_to_simple_json(all_data, file_name):
with open(file_name, "w") as f:
json.dump(all_data, f, indent=4)


def save_data_to_json(all_data, file_name, field_names):
# Create a nested dictionary for the hierarchical structure
nested_dict = defaultdict(dict)

# Populate the nested dictionary with the extracted data
for data in all_data:
# create list of keys from data values for fields in field_names
keys = [data[field] for field in field_names]
set_nested_dict(nested_dict, keys, data["result"], field_names)

# Convert defaultdict to dict for JSON serialization
hierarchical_data = json.loads(json.dumps(nested_dict))

# Save the hierarchical structure to a JSON file
with open(file_name, "w") as f:
json.dump(hierarchical_data, f, indent=4)
# Helper function to save results to json
def save_data_to_simple_json(all_data: ExtractedData, file_name: str) -> None:
with open(file_name, "w", encoding="utf-8") as file:
json.dump(all_data, file, indent=4)


# Function to generate html page that represent output data in table view
def save_data_to_html(
all_data, file_name, sort_keys, field_names, result_fields, all_fields_captions
):
all_data: ExtractedData,
file_name: str,
sort_keys: List[str],
field_names: List[str],
result_fields: List[str],
all_fields_captions: List[str],
) -> None:
# Generate the HTML content
html_content = (
"""
Expand Down Expand Up @@ -257,7 +225,7 @@ def save_data_to_html(
key=lambda x: ([x[field] for field in sort_keys]),
)

# initialise array of span indexes with 1 at start for every rows in sorted_data for first 6 columns
# initialise array of span indexes with 1 at start for every rows in sorted_data for first X columns
column_count = len(field_names)
span_indexes = [1] * len(sorted_data) * column_count

Expand Down Expand Up @@ -312,7 +280,8 @@ def get_cell(all_data, index, field_names, field, span_indexes):
f.write(html_content)


def get_dataloading_fields():
# Get field names and captions for data loading.
def get_dataloading_fields() -> tuple[List[str], List[str], List[str]]:
sort_key_names = [
"sharding",
"model",
Expand Down Expand Up @@ -348,7 +317,8 @@ def get_dataloading_fields():
return sort_key_names, result_fields, all_fields_captions


def get_checkpointing_fields():
# Get field names and captions for checkpointing.
def get_checkpointing_fields() -> tuple[List[str], List[str], List[str]]:
sort_key_names = ["kind", "model", "max_epochs", "save_one_in", "destination"]
result_fields = [
"model_size",
Expand Down Expand Up @@ -405,7 +375,7 @@ def get_checkpointing_fields():
sort_key_names, result_fields, all_fields_captions = get_checkpointing_fields()
grouped_by_key_names = sort_key_names[:-1]
data = load_data_from_s3(
lambda json_data: extract_fields_chekpoint(json_data), bucket_name, prefix
lambda json_data: extract_fields_checkpoint(json_data), bucket_name, prefix
)

save_data_to_html(
Expand Down
5 changes: 3 additions & 2 deletions s3torchbenchmarking/utils/run_dataloading_benchmarks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ for dataset in "${datasets[@]}"; do
s3torch-benchmark -cd conf -m -cn dataloading_sharded_ent "dataset=$dataset" "dataloader=$DATALOADER"
else
s3torch-benchmark -cd conf -m -cn dataloading_unsharded_1epochs "dataset=$dataset" "dataloader=$DATALOADER"
s3torch-benchmark -cd conf -m -cn dataloading_unsharded_10epochs "dataset=$dataset" "dataloader=$DATALOADER"
s3torch-benchmark -cd conf -m -cn dataloading_unsharded_vit_10epochs "dataset=$dataset" "dataloader=$DATALOADER"
s3torch-benchmark -cd conf -m -cn dataloading_unsharded_ent_10epochs "dataset=$dataset" "dataloader=$DATALOADER"
fi
done
done
34 changes: 20 additions & 14 deletions s3torchbenchmarking/utils/upload_colated_results_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,48 @@
from botocore.exceptions import ClientError
import sys

BUCKET_NAME = 'iisaev-pytorch-benchmarks-results'
ROOT_FOLDER = './multirun'
DS_PREFIX = 's3iterabledataset'
FOLDER_PREFIX = 'results'
s3_client = boto3.client("s3")

s3_client = boto3.client('s3')

def upload_file_to_s3(local_file_path, bucket_name, s3_file_key):
def upload_file_to_s3(local_file_path: str, bucket_name: str, s3_file_key: str) -> None:
try:
s3_client.upload_file(local_file_path, bucket_name, s3_file_key)
print(f"Uploaded {local_file_path} to {bucket_name}/{s3_file_key}")
except ClientError as e:
print(f"Error uploading {local_file_path} to {bucket_name}/{s3_file_key}: {e}")

def traverse_folders(folder_path, bucket_name, prefix, dataloader):
for root, dirs, files in os.walk(folder_path):

def traverse_folders(
folder_path: str, bucket_name: str, prefix: str, dataloader: str
) -> None:
for root, _, files in os.walk(folder_path):
for file in files:
if file == 'collated_results.json':
if file == "collated_results.json":
local_file_path = os.path.join(root, file)
parent_folder = os.path.basename(os.path.dirname(local_file_path))
s3_file_key = f"{prefix}/{dataloader}_{parent_folder}_{file}"
print(f"Uploading {local_file_path} to {bucket_name}/{s3_file_key}")
upload_file_to_s3(local_file_path, bucket_name, s3_file_key)

if __name__ == '__main__':

if __name__ == "__main__":
if len(sys.argv) != 5:
print("Usage: python script.py ROOT_FOLDER BUCKET_NAME FOLDER_PREFIX DS_PREFIX")
print("Example: python script.py ./multirun pytorch-benchmarks-results 20240810 s3iterabledataset")
print(
"Example: python script.py ./multirun pytorch-benchmarks-results 20240810 s3iterabledataset"
)
print("Note: ROOT_FOLDER is the root folder where the results are stored")
print("Note: BUCKET_NAME is the S3 bucket name where the results will be uploaded")
print("Note: FOLDER_PREFIX is the prefix for the folder where the results are stored")
print(
"Note: BUCKET_NAME is the S3 bucket name where the results will be uploaded"
)
print(
"Note: FOLDER_PREFIX is the prefix for the folder where the results are stored"
)
print("Note: DS_PREFIX is the prefix for the dataset loader")
sys.exit(1)

ROOT_FOLDER = sys.argv[1]
BUCKET_NAME = sys.argv[2]
FOLDER_PREFIX = sys.argv[3]
DS_PREFIX = sys.argv[4]
traverse_folders(ROOT_FOLDER, BUCKET_NAME, FOLDER_PREFIX, DS_PREFIX)
traverse_folders(ROOT_FOLDER, BUCKET_NAME, FOLDER_PREFIX, DS_PREFIX)

0 comments on commit 1f843de

Please sign in to comment.