Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Got run time error 0 active drivers ([]). There should only be one. when using PipelineModule through ray and deepspeed #8007

Open
consciousgaze opened this issue Feb 12, 2025 · 2 comments

Comments

@consciousgaze
Copy link

consciousgaze commented Feb 12, 2025

Description
When I am using a PipelineModule in a ray trainer after deepspeed.initialize, I always encounter the runtime error RuntimeError: 0 active drivers ([]). There should only be one.. But when I can checking the driver and gpu status on the same process, the driver is there:

(RayTrainWorker pid=1822, ip=10.34.4.3) triton backends: {'amd': Backend(compiler=<class 'amd.HIPBackend'>, driver=<class 'amd.HIPDriver'>), 'nvidia': Backend(compiler=<class 'nvidia.CUDABackend'>, driver=<class 'nvidia.CudaDriver'>)}
(RayTrainWorker pid=1822, ip=10.34.4.3)  drivers[(Backend(compiler=<class 'amd.HIPBackend'>, driver=<class 'amd.HIPDriver'>), False), (Backend(compiler=<class 'nvidia.CUDABackend'>, driver=<class 'nvidia.CudaDriver'>), True)]

Triton Information
What version of Triton are you using?
3.2.0
The full suffix i can see is triton-3.2.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata

Are you using the Triton container or did you build it yourself?
The triton was installed by installing deepspeed 0.14.4

To Reproduce
Steps to reproduce the behavior.
Start a ray cluster, run the forward path in the training step and the error shows up.

Describe the models (framework, inputs, outputs), ideally include the model configuration file (if using an ensemble include the model configuration file for that as well).
The original model I am using is the pytorch version of Gemma 2 27b model in (https://github.com/google/gemma_pytorch/tree/main).
I wrapped it with my Casual definition and a "piped" version of

class GemmaCasualLM(nn.Module):
    def __init__(
            self,
            config: GemmaConfig,
    ):
        super().__init__()
        self.config = config
        assert config.hidden_size % config.num_attention_heads == 0

        max_seq_len = config.max_position_embeddings
        head_dim = config.head_dim
        vocab_size = config.vocab_size

        self.tokenizer = Tokenizer(config.tokenizer)
        self.embedder = Embedding(vocab_size, config.hidden_size, config.quant)
        self.model = GemmaModel(config)
        self.sampler = Sampler(vocab_size, config)

        # Pre-compute rotary embedding table.
        rope_theta = getattr(config, 'rope_theta', 10000)
        freqs_cis = precompute_freqs_cis(head_dim,
                                         max_seq_len * 2,
                                         theta=rope_theta)
        self.register_buffer('freqs_cis', freqs_cis)

    # TODO: update forward to a more training friendly form:
    #   * Skip rotary position calculation
    #   * Contain loss calculation
    def forward(
            self,
            input_token_ids: torch.Tensor,
            input_positions: torch.Tensor,
            kv_caches: List[Tuple[torch.Tensor, torch.Tensor]],
            mask: torch.Tensor,
    ) -> torch.Tensor:
        freqs_cis = self.freqs_cis.index_select(0, input_positions)
        kv_write_indices = input_positions

        # [batch_size, input_len, hidden_size]
        hidden_states = self.embedder(input_token_ids)
        # Gemma normalizes the embedding by sqrt(hidden_size).
        # Gemma2 downcasts the below to float16, causing sqrt(3072)=55.4256 to become 55.5
        # See https://github.com/huggingface/transformers/pull/29402
        normalizer = torch.tensor(self.config.hidden_size**0.5, dtype=hidden_states.dtype)
        hidden_states = hidden_states * normalizer

        hidden_states = self.model(
            hidden_states=hidden_states,
            freqs_cis=freqs_cis,
            kv_write_indices=kv_write_indices,
            kv_caches=kv_caches,
            mask=mask,
        )
        print(f"hidden state size {hidden_states.shape}, embedder shape {self.embedder.weight.shape}")
        probs = torch.matmul(hidden_states, self.embedder.weight.t())
        return probs

    def load_weights(self, model_path: str):
        if os.path.isfile(model_path):
            self.load_state_dict(
                torch.load(
                    model_path, mmap=True, weights_only=True,
                )['model_state_dict'],
                strict=False,
            )
        else:
            index_path = os.path.join(model_path, 'pytorch_model.bin.index.json')
            with open(index_path, "r", encoding="utf-8") as f:
                index = json.load(f)
            shard_files = list(set(index["weight_map"].values()))
            for shard_file in shard_files:
                shard_path = os.path.join(model_path, shard_file)
                state_dict = torch.load(shard_path, map_location="cpu", weights_only=True)
                self.load_state_dict(state_dict, strict=False)
                del state_dict  # Save memory.
                gc.collect()


class PipedEmbedder(nn.Module):
    def __init__(self, embedder: Embedding, freqs_cis: Tensor, config: GemmaConfig):
        super().__init__()
        self.embedder = embedder
        self.freqs_cis = freqs_cis
        self.hidden_size = config.hidden_size

    def forward(self, x):
        input_token_ids, input_positions, mask = x
        freqs_cis = self.freqs_cis.index_select(0, input_positions)
        embedded = self.embedder(input_token_ids)
        normalizer = torch.tensor(self.hidden_size**0.5, dtype=embedded.dtype)
        hidden_states = embedded * normalizer
        return hidden_states, freqs_cis, input_positions, mask


class PipedDecoder(nn.Module):
    def __init__(self, decoder_layer: Gemma2DecoderLayer, k_cache: Tensor, v_cache: Tensor):
        super().__init__()
        self.decoder_layer = decoder_layer
        self.k_cache = k_cache
        self.v_cache = v_cache

    def forward(self, x):
        hidden_states, freqs_cis, kv_write_indices, mask = x
        hidden_states = self.decoder_layer(
            hidden_states=hidden_states,
            freqs_cis=freqs_cis,
            kv_write_indices=kv_write_indices,
            kv_cache=(self.k_cache, self.v_cache),
            mask=mask
        )
        return hidden_states, freqs_cis, kv_write_indices, mask


class PipedNorm(nn.Module):
    def __init__(self, norm: RMSNorm):
        super().__init__()
        self.norm = norm

    def forward(self, x):
        hidden_states, _, _, _ = x
        return self.norm(hidden_states)


class PipedLogits(nn.Module):
    def __init__(self, embedder: Embedding):
        super().__init__()
        self.embedder = embedder

    def forward(self, hidden_states):
        return torch.matmul(hidden_states, self.embedder.weight.t())


class PipedGemmaCasualLM(nn.Module):
    def __init__(self, model: GemmaCasualLM, kv_caches: List[Tuple[Tensor, Tensor]]):
        super().__init__()
        self.embedder = PipedEmbedder(model.embedder, model.freqs_cis, model.config)
        self.decoders = nn.ModuleList()
        for layer, (k_cache, v_cache) in zip(model.model.layers, kv_caches):
            self.decoders.append(PipedDecoder(layer, k_cache, v_cache))
        self.final_norm = PipedNorm(model.model.norm)
        self.logits = PipedLogits(model.embedder)

    def forward(self, x):
        # input_token_ids, input_positions, mask = x
        x = self.embedder(x)
        for layer in self.decoders:
            x = layer(x)
        x = self.final_norm(x)
        x = self.logits(x)
        return x

    def to_layers(self):
        return [self.embedder, *self.decoders, self.final_norm, self.logits]

Then I created a piped module with:

model = PipedGemmaCasualLM(cpu_model, kv_caches)
from deepspeed import PipelineModule
pipeline_parallelized_model = PipelineModule(model.to_layers(), num_stages=4)
# Initialization
model = PipedGemmaCasualLM(cpu_model, kv_caches)
    from deepspeed import PipelineModule
    pipeline_parallelized_model = PipelineModule(model.to_layers(), num_stages=4)
# Infer after loading data, creating mask etc.
datapoint = torch.randint(10, 100, [test_seq_len])
positions = torch.tensor(list(range(test_seq_len)))
positions = positions[0].to(device)
datapoint = datapoint.to(device)
mask = torch.full((1, 1, test_seq_len, test_seq_len), -2.3819763e38).to(torch.float)
mask_tensor = torch.triu(mask, diagonal=1).to(device)
curr_mask_tensor = mask_tensor.index_select(2, positions)
predicted = model((datapoint, positions, curr_mask_tensor))

Expected behavior
A clear and concise description of what you expected to happen.
The model should go through and finish the forward pass

@shins777
Copy link

shins777 commented Feb 17, 2025

Hi, any update so far?, I also have same issue on this when I use deepspeed.

@consciousgaze
Copy link
Author

Hi, is there any update? I am also waiting for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants