Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allocate Tensor memory based on Tensor storage id #136

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions et_replay/execution_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ def _create_node_v1_1_1_chakra_0_0_4(pid, x: Dict[str, Any]) -> Node:
rf_id,
kernel_backend,
kernel_file,
None,
x["inputs"]["strides"],
x["outputs"]["strides"],
)
Expand Down
97 changes: 89 additions & 8 deletions et_replay/tools/et_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
from collections import defaultdict
from datetime import datetime
from typing import Dict

import numpy as np
import torch
Expand Down Expand Up @@ -98,6 +99,9 @@ def __init__(self):
# Dict that stores the shapes of a tensor, for the convenience of quickly determining whether
# to create a unique tensor in replay if the id is same but shape is different.
self.tensor_shapes = defaultdict(set)
# Dict that maps tensor storage id to its size, and a map for {device, torch.Tensor}.
# The tensor with the same storage id may located on different devices.
self.tensor_storage_map: Dict[int, []] = defaultdict(set)
# Mark those tensors that occur first as an input in the original et as needing to be instantiated in replay
# at the very beginning.
self.instantiate = set()
Expand Down Expand Up @@ -279,7 +283,7 @@ def reset_registry(self):
None
if v is None
else (
v
v.cpu()
if self.tensor_device[k] == "cpu" or self.cpu
else v.cuda(self.device)
)
Expand All @@ -292,7 +296,9 @@ def reset_registry(self):
None
if v is None
else (
v if k in self.cpu_tensor or self.cpu else v.cuda(self.device)
v.cpu()
if k in self.cpu_tensor or self.cpu
else v.cuda(self.device)
)
)
for k, v in self.tensor_registry_permanent.items()
Expand Down Expand Up @@ -377,7 +383,27 @@ def has_parallel_parent(node):
assert len(self.parallel_nodes_ids) == len(set(self.parallel_nodes_ids))

def analyze_tensors(self):
def add_storage_tensor(t_id, device):
# t_id is a tupe of (tensor_id, storage_id, offset, number of element,
# number of bytes for each element, device)

# ET does not save the size of the tensor storage, so we iterate over all the
# tensors to find the maximum size of the storage.
storage_id = t_id[1]
if storage_id not in self.tensor_storage_map:
# the storage size for this tensor is the sum of the storage offset and
# number of elements * number of bytes per element.
self.tensor_storage_map[storage_id] = [
t_id[2] + t_id[3] * t_id[4],
{},
]
else:
self.tensor_storage_map[storage_id][0] = max(
self.tensor_storage_map[storage_id][0], t_id[2] + t_id[3] * t_id[4]
)

def add_unique_tensor(node_name, node_id, t_id, shape, input, device=-1):
add_storage_tensor(t_id, device)
# If we did not see this tensor before, add it as a unique tensor.
if t_id not in self.original_unique_tensors:
self.original_unique_tensors.add(t_id)
Expand Down Expand Up @@ -467,6 +493,8 @@ def add_unique_tensor(node_name, node_id, t_id, shape, input, device=-1):
output_set.add(self.tensors_mapping[(node.id, t_id, False)])

def allocate_tensors(self):
start_ns = time.time_ns()

for node in self.sorted_nodes:
if node.name == "record_param_comms" and (
self.compute_only or self.args.separate
Expand All @@ -490,7 +518,9 @@ def allocate_tensors(self):
self.args.alpha,
)
for idx, (data_type, t_id, shape) in enumerate(get_input_tensors(node)):
device = self.device
if self.tensor_with_device:
device = t_id[5]
t_id = tuple(list(t_id)[:5])
replay_t_id = self.tensors_mapping[(node.id, t_id, True)]
if (
Expand All @@ -511,14 +541,25 @@ def allocate_tensors(self):
self.unchangeable_intermediate_tensors.add(replay_t_id)
else:
if data_type == "Tensor(signed char)":
dtype, rng = TORCH_DTYPES_RNG["signed char"]
dtype, _ = TORCH_DTYPES_RNG["signed char"]
else:
dtype, rng = TORCH_DTYPES_RNG[
dtype, _ = TORCH_DTYPES_RNG[
data_type.lstrip("Tensor(").rstrip(")")
]
self.tensor_registry_permanent[replay_t_id] = rng(shape).to(
dtype

strides = None
if node.input_strides is not None:
strides = node.input_strides[idx]
tensor = self.get_tensor_from_storage(
t_id[1], # storage_id
t_id[2], # offset
t_id[4], # number of bytes per element
device,
shape,
dtype,
strides,
)
self.tensor_registry_permanent[replay_t_id] = tensor
if node.name == "aten::embedding_bag":
self.unchangeable_intermediate_tensors.add(replay_t_id)
if node.name == "aten::pin_memory" and idx == 0:
Expand Down Expand Up @@ -548,6 +589,8 @@ def allocate_tensors(self):
][i] = (i * nnz)
######

print(f"Tensor allocation time: {(time.time_ns() - start_ns) / 1000000.0} ms")

def build_func(self, node):
if is_fbgemm_forward(node):
if self.cpu:
Expand Down Expand Up @@ -946,6 +989,44 @@ def _generate_run_ops_str(override):
print(code_str, file=f)
exec(code_str)

def get_tensor_from_storage(
self, storage_id, data_offset, elem_bytes, device, shape, data_type, strides
):
assert storage_id in self.tensor_storage_map

tensor_data = self.tensor_storage_map[storage_id]
device = torch.device(device)
if device not in tensor_data[1]:
if data_type in [torch.half, torch.float32, torch.float64, torch.bfloat16]:
storage_tensor = torch.rand(
(tensor_data[0] // elem_bytes), dtype=data_type, device=device
)
else:
storage_tensor = torch.ones(
(tensor_data[0] // elem_bytes), dtype=data_type, device=device
)
tensor_data[1][device] = storage_tensor
else:
storage_tensor = tensor_data[1][device]
x = torch.empty(0, dtype=data_type)
if device != torch.device("cpu"):
x = x.cuda(device)
if strides is None:
x = x.set_(
storage_tensor.untyped_storage(),
storage_offset=data_offset,
size=shape,
)
else:
x = x.set_(
storage_tensor.untyped_storage(),
storage_offset=data_offset,
size=shape,
stride=tuple(strides),
)

return x

def get_inputs(self, node):
try:
if is_fbgemm_forward(node):
Expand Down Expand Up @@ -1188,8 +1269,7 @@ def benchTime(self):
self.preprocess_graph()
if self.generator:
return
print("Start to execution: ")
time.sleep(2)
print("Start execution: ")

total_time = 0.0
event_1 = torch.cuda.Event(enable_timing=True)
Expand All @@ -1204,6 +1284,7 @@ def benchTime(self):
qps_print_interval = 10

prev_iter = self.numWarmupIters

if self.profile_replay:
try:
from aiplatform.monitoring.unitrace.upload_manifold import (
Expand Down
Loading