Skip to content

Commit

Permalink
Allocate Tensor memory based on Tensor storage id (#136)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #136

Allocate Tensor memory based on Tensor storage id

Differential Revision: D58474695
  • Loading branch information
shengfukevin authored and facebook-github-bot committed Jul 26, 2024
1 parent c9e2255 commit 59ce971
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 8 deletions.
1 change: 1 addition & 0 deletions et_replay/execution_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ def _create_node_v1_1_1_chakra_0_0_4(pid, x: Dict[str, Any]) -> Node:
rf_id,
kernel_backend,
kernel_file,
None,
x["inputs"]["strides"],
x["outputs"]["strides"],
)
Expand Down
97 changes: 89 additions & 8 deletions et_replay/tools/et_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
from collections import defaultdict
from datetime import datetime
from typing import Dict

import numpy as np
import torch
Expand Down Expand Up @@ -98,6 +99,9 @@ def __init__(self):
# Dict that stores the shapes of a tensor, for the convenience of quickly determining whether
# to create a unique tensor in replay if the id is same but shape is different.
self.tensor_shapes = defaultdict(set)
# Dict that maps tensor storage id to its size, and a map for {device, torch.Tensor}.
# The tensor with the same storage id may located on different devices.
self.tensor_storage_map: Dict[int, []] = defaultdict(set)
# Mark those tensors that occur first as an input in the original et as needing to be instantiated in replay
# at the very beginning.
self.instantiate = set()
Expand Down Expand Up @@ -279,7 +283,7 @@ def reset_registry(self):
None
if v is None
else (
v
v.cpu()
if self.tensor_device[k] == "cpu" or self.cpu
else v.cuda(self.device)
)
Expand All @@ -292,7 +296,9 @@ def reset_registry(self):
None
if v is None
else (
v if k in self.cpu_tensor or self.cpu else v.cuda(self.device)
v.cpu()
if k in self.cpu_tensor or self.cpu
else v.cuda(self.device)
)
)
for k, v in self.tensor_registry_permanent.items()
Expand Down Expand Up @@ -377,7 +383,27 @@ def has_parallel_parent(node):
assert len(self.parallel_nodes_ids) == len(set(self.parallel_nodes_ids))

def analyze_tensors(self):
def add_storage_tensor(t_id, device):
# t_id is a tupe of (tensor_id, storage_id, offset, number of element,
# number of bytes for each element, device)

# ET does not save the size of the tensor storage, so we iterate over all the
# tensors to find the maximum size of the storage.
storage_id = t_id[1]
if storage_id not in self.tensor_storage_map:
# the storage size for this tensor is the sum of the storage offset and
# number of elements * number of bytes per element.
self.tensor_storage_map[storage_id] = [
t_id[2] + t_id[3] * t_id[4],
{},
]
else:
self.tensor_storage_map[storage_id][0] = max(
self.tensor_storage_map[storage_id][0], t_id[2] + t_id[3] * t_id[4]
)

def add_unique_tensor(node_name, node_id, t_id, shape, input, device=-1):
add_storage_tensor(t_id, device)
# If we did not see this tensor before, add it as a unique tensor.
if t_id not in self.original_unique_tensors:
self.original_unique_tensors.add(t_id)
Expand Down Expand Up @@ -467,6 +493,8 @@ def add_unique_tensor(node_name, node_id, t_id, shape, input, device=-1):
output_set.add(self.tensors_mapping[(node.id, t_id, False)])

def allocate_tensors(self):
start_ns = time.time_ns()

for node in self.sorted_nodes:
if node.name == "record_param_comms" and (
self.compute_only or self.args.separate
Expand All @@ -490,7 +518,9 @@ def allocate_tensors(self):
self.args.alpha,
)
for idx, (data_type, t_id, shape) in enumerate(get_input_tensors(node)):
device = self.device
if self.tensor_with_device:
device = t_id[5]
t_id = tuple(list(t_id)[:5])
replay_t_id = self.tensors_mapping[(node.id, t_id, True)]
if (
Expand All @@ -511,14 +541,25 @@ def allocate_tensors(self):
self.unchangeable_intermediate_tensors.add(replay_t_id)
else:
if data_type == "Tensor(signed char)":
dtype, rng = TORCH_DTYPES_RNG["signed char"]
dtype, _ = TORCH_DTYPES_RNG["signed char"]
else:
dtype, rng = TORCH_DTYPES_RNG[
dtype, _ = TORCH_DTYPES_RNG[
data_type.lstrip("Tensor(").rstrip(")")
]
self.tensor_registry_permanent[replay_t_id] = rng(shape).to(
dtype

strides = None
if node.input_strides is not None:
strides = node.input_strides[idx]
tensor = self.get_tensor_from_storage(
t_id[1], # storage_id
t_id[2], # offset
t_id[4], # number of bytes per element
device,
shape,
dtype,
strides,
)
self.tensor_registry_permanent[replay_t_id] = tensor
if node.name == "aten::embedding_bag":
self.unchangeable_intermediate_tensors.add(replay_t_id)
if node.name == "aten::pin_memory" and idx == 0:
Expand Down Expand Up @@ -548,6 +589,8 @@ def allocate_tensors(self):
][i] = (i * nnz)
######

print(f"Tensor allocation time: {(time.time_ns() - start_ns) / 1000000.0} ms")

def build_func(self, node):
if is_fbgemm_forward(node):
if self.cpu:
Expand Down Expand Up @@ -946,6 +989,44 @@ def _generate_run_ops_str(override):
print(code_str, file=f)
exec(code_str)

def get_tensor_from_storage(
self, storage_id, data_offset, elem_bytes, device, shape, data_type, strides
):
assert storage_id in self.tensor_storage_map

tensor_data = self.tensor_storage_map[storage_id]
device = torch.device(device)
if device not in tensor_data[1]:
if data_type in [torch.half, torch.float32, torch.float64, torch.bfloat16]:
storage_tensor = torch.rand(
(tensor_data[0] // elem_bytes), dtype=data_type, device=device
)
else:
storage_tensor = torch.ones(
(tensor_data[0] // elem_bytes), dtype=data_type, device=device
)
tensor_data[1][device] = storage_tensor
else:
storage_tensor = tensor_data[1][device]
x = torch.empty(0, dtype=data_type)
if device != torch.device("cpu"):
x = x.cuda(device)
if strides is None:
x = x.set_(
storage_tensor.untyped_storage(),
storage_offset=data_offset,
size=shape,
)
else:
x = x.set_(
storage_tensor.untyped_storage(),
storage_offset=data_offset,
size=shape,
stride=tuple(strides),
)

return x

def get_inputs(self, node):
try:
if is_fbgemm_forward(node):
Expand Down Expand Up @@ -1188,8 +1269,7 @@ def benchTime(self):
self.preprocess_graph()
if self.generator:
return
print("Start to execution: ")
time.sleep(2)
print("Start execution: ")

total_time = 0.0
event_1 = torch.cuda.Event(enable_timing=True)
Expand All @@ -1204,6 +1284,7 @@ def benchTime(self):
qps_print_interval = 10

prev_iter = self.numWarmupIters

if self.profile_replay:
try:
from aiplatform.monitoring.unitrace.upload_manifold import (
Expand Down

0 comments on commit 59ce971

Please sign in to comment.