Close Menu
    Main Menu
    • Home
    • News
    • Tech
    • Robotics
    • ML & Research
    • AI
    • Digital Transformation
    • AI Ethics & Regulation
    • Thought Leadership in AI

    Subscribe to Updates

    Get the latest creative news from FooBar about art, design and business.

    What's Hot

    High quality Knowledge Annotation for Cardiovascular AI

    January 23, 2026

    Joi Chatbot Entry, Pricing, and Characteristic Overview

    January 23, 2026

    Transferring from self-importance to worth metrics

    January 23, 2026
    Facebook X (Twitter) Instagram
    UK Tech InsiderUK Tech Insider
    Facebook X (Twitter) Instagram
    UK Tech InsiderUK Tech Insider
    Home»Machine Learning & Research»Prepare Your Massive Mannequin on A number of GPUs with Pipeline Parallelism
    Machine Learning & Research

    Prepare Your Massive Mannequin on A number of GPUs with Pipeline Parallelism

    Oliver ChambersBy Oliver ChambersJanuary 4, 2026No Comments6 Mins Read
    Facebook Twitter Pinterest Telegram LinkedIn Tumblr Email Reddit
    Prepare Your Massive Mannequin on A number of GPUs with Pipeline Parallelism
    Share
    Facebook Twitter LinkedIn Pinterest Email Copy Link


    import dataclasses

    import os

     

    import datasets

    import tokenizers

    import torch

    import torch.distributed as dist

    import torch.nn as nn

    import torch.nn.useful as F

    import torch.optim.lr_scheduler as lr_scheduler

    import tqdm

    from torch import Tensor

    from torch.distributed.checkpoint import load, save

    from torch.distributed.checkpoint.state_dict import StateDictOptions, get_state_dict, set_state_dict

    from torch.distributed.pipelining import PipelineStage, ScheduleGPipe

     

     

    # Construct the mannequin

    @dataclasses.dataclass

    class LlamaConfig:

        “”“Outline Llama mannequin hyperparameters.”“”

        vocab_size: int = 50000  # Measurement of the tokenizer vocabulary

        max_position_embeddings: int = 2048  # Most sequence size

        hidden_size: int = 768  # Dimension of hidden layers

        intermediate_size: int = 4*768  # Dimension of MLP’s hidden layer

        num_hidden_layers: int = 12  # Variety of transformer layers

        num_attention_heads: int = 12  # Variety of consideration heads

        num_key_value_heads: int = 3  # Variety of key-value heads for GQA

     

     

    class RotaryPositionEncoding(nn.Module):

        “”“Rotary place encoding.”“”

     

        def __init__(self, dim: int, max_position_embeddings: int) -> None:

            “”“Initialize the RotaryPositionEncoding module.

     

            Args:

                dim: The hidden dimension of the enter tensor to which RoPE is utilized

                max_position_embeddings: The utmost sequence size of the enter tensor

            ““”

            tremendous().__init__()

            self.dim = dim

            self.max_position_embeddings = max_position_embeddings

            # compute a matrix of ntheta_i

            N = 10_000.0

            inv_freq = 1.0 / (N ** (torch.arange(0, dim, 2) / dim))

            inv_freq = torch.cat((inv_freq, inv_freq), dim=–1)

            place = torch.arange(max_position_embeddings)

            sinusoid_inp = torch.outer(place, inv_freq)

            # save cosine and sine matrices as buffers, not parameters

            self.register_buffer(“cos”, sinusoid_inp.cos())

            self.register_buffer(“sin”, sinusoid_inp.sin())

     

        def ahead(self, x: Tensor) -> Tensor:

            “”“Apply RoPE to tensor x.

     

            Args:

                x: Enter tensor of form (batch_size, seq_length, num_heads, head_dim)

     

            Returns:

                Output tensor of form (batch_size, seq_length, num_heads, head_dim)

            ““”

            batch_size, seq_len, num_heads, head_dim = x.form

            dtype = x.dtype

            # rework the cosine and sine matrices to 4D tensor and the identical dtype as x

            cos = self.cos.to(dtype)[:seq_len].view(1, seq_len, 1, –1)

            sin = self.sin.to(dtype)[:seq_len].view(1, seq_len, 1, –1)

            # apply RoPE to x

            x1, x2 = x.chunk(2, dim=–1)

            rotated = torch.cat((–x2, x1), dim=–1)

            output = (x * cos) + (rotated * sin)

            return output

     

     

    class LlamaAttention(nn.Module):

        “”“Grouped-query consideration with rotary embeddings.”“”

     

        def __init__(self, config: LlamaConfig) -> None:

            tremendous().__init__()

            self.hidden_size = config.hidden_size

            self.num_heads = config.num_attention_heads

            self.head_dim = self.hidden_size // self.num_heads

            self.num_kv_heads = config.num_key_value_heads  # GQA: H_kv < H_q

     

            # hidden_size have to be divisible by num_heads

            assert (self.head_dim * self.num_heads) == self.hidden_dimension

     

            # Linear layers for Q, Ok, V projections

            self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False)

            self.k_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)

            self.v_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)

            self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)

     

        def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding) -> Tensor:

            bs, seq_len, dim = hidden_states.dimension()

     

            # Challenge inputs to Q, Ok, V

            query_states = self.q_proj(hidden_states).view(bs, seq_len, self.num_heads, self.head_dim)

            key_states = self.k_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)

            value_states = self.v_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)

     

            # Apply rotary place embeddings

            query_states = rope(query_states)

            key_states = rope(key_states)

     

            # Transpose tensors from BSHD to BHSD dimension for scaled_dot_product_attention

            query_states = query_states.transpose(1, 2)

            key_states = key_states.transpose(1, 2)

            value_states = value_states.transpose(1, 2)

     

            # Use PyTorch’s optimized consideration implementation

            # setting is_causal=True is incompatible with setting specific consideration masks

            attn_output = F.scaled_dot_product_attention(

                query_states,

                key_states,

                value_states,

                is_causal=True,

                dropout_p=0.0,

                enable_gqa=True,

            )

     

            # Transpose output tensor from BHSD to BSHD dimension, reshape to 3D, after which venture output

            attn_output = attn_output.transpose(1, 2).reshape(bs, seq_len, self.hidden_size)

            attn_output = self.o_proj(attn_output)

            return attn_output

     

     

    class LlamaMLP(nn.Module):

        “”“Feed-forward community with SwiGLU activation.”“”

     

        def __init__(self, config: LlamaConfig) -> None:

            tremendous().__init__()

            # Two parallel projections for SwiGLU

            self.gate_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)

            self.up_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)

            self.act_fn = F.silu  # SwiGLU activation perform

            # Challenge again to hidden dimension

            self.down_proj = nn.Linear(config.intermediate_size, config.hidden_size, bias=False)

     

        def ahead(self, x: Tensor) -> Tensor:

            # SwiGLU activation: multiply gate and up-projected inputs

            gate = self.act_fn(self.gate_proj(x))

            up = self.up_proj(x)

            return self.down_proj(gate * up)

     

     

    class LlamaDecoderLayer(nn.Module):

        “”“Single transformer layer for a Llama mannequin.”“”

     

        def __init__(self, config: LlamaConfig) -> None:

            tremendous().__init__()

            self.input_layernorm = nn.RMSNorm(config.hidden_size, eps=1e–5)

            self.self_attn = LlamaAttention(config)

            self.post_attention_layernorm = nn.RMSNorm(config.hidden_size, eps=1e–5)

            self.mlp = LlamaMLP(config)

     

        def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding) -> Tensor:

            # First residual block: Self-attention

            residual = hidden_states

            hidden_states = self.input_layernorm(hidden_states)

            attn_outputs = self.self_attn(hidden_states, rope=rope)

            hidden_states = attn_outputs + residual

     

            # Second residual block: MLP

            residual = hidden_states

            hidden_states = self.post_attention_layernorm(hidden_states)

            hidden_states = self.mlp(hidden_states) + residual

            return hidden_states

     

     

    class LlamaModel(nn.Module):

        “”“The total Llama mannequin with none pretraining heads.”“”

     

        def __init__(self, config: LlamaConfig) -> None:

            tremendous().__init__()

            self.rope = RotaryPositionEncoding(

                config.hidden_size // config.num_attention_heads,

                config.max_position_embeddings,

            )

     

            self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size)

            self.layers = nn.ModuleDict({

                str(i): LlamaDecoderLayer(config) for i in vary(config.num_hidden_layers)

            })

            self.norm = nn.RMSNorm(config.hidden_size, eps=1e–5)

     

        def ahead(self, input_ids: Tensor) -> Tensor:

            # Convert enter token IDs to embeddings

            if self.embed_tokens is not None:

                hidden_states = self.embed_tokens(input_ids)

            else:

                hidden_states = enter_ids

            # Course of by means of all transformer layers, then the ultimate norm layer

            for n in vary(len(self.layers)):

                if self.layers[str(n)] is not None:

                    hidden_states = self.layers[str(n)](hidden_states, self.rope)

            if self.norm is not None:

                hidden_states = self.norm(hidden_states)

            # Return the ultimate hidden states, and replica over the eye masks

            return hidden_states

     

     

    class LlamaForPretraining(nn.Module):

        def __init__(self, config: LlamaConfig) -> None:

            tremendous().__init__()

            self.base_model = LlamaModel(config)

            self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)

     

        def ahead(self, input_ids: Tensor) -> Tensor:

            hidden_states = self.base_model(input_ids)

            if self.lm_head is not None:

                hidden_states = self.lm_head(hidden_states)

            return hidden_states

     

     

    # Generator perform to create padded sequences of fastened size

    class PretrainingDataset(torch.utils.information.Dataset):

        def __init__(self, dataset: datasets.Dataset, tokenizer: tokenizers.Tokenizer,

                     seq_length: int, machine: torch.machine = None):

            self.dataset = dataset

            self.tokenizer = tokenizer

            self.machine = machine

            self.seq_length = seq_length

            self.bot = tokenizer.token_to_id(“[BOT]”)

            self.eot = tokenizer.token_to_id(“[EOT]”)

            self.pad = tokenizer.token_to_id(“[PAD]”)

     

        def __len__(self):

            return len(self.dataset)

     

        def __getitem__(self, index):

            “”“Get a sequence of token ids from the dataset. [BOT] and [EOT] tokens

            are added. Clipped and padded to the sequence size.

            ““”

            seq = self.dataset[index][“text”]

            tokens: checklist[int] = [self.bot] + self.tokenizer.encode(seq).ids + [self.eot]

            # pad to focus on sequence size

            toklen = len(tokens)

            if toklen < self.seq_length+1:

                pad_length = self.seq_length+1 – toklen

                tokens += [self.pad] * pad_size

            # return the sequence

            x = torch.tensor(tokens[:self.seq_length], dtype=torch.int64, machine=self.machine)

            y = torch.tensor(tokens[1:self.seq_length+1], dtype=torch.int64, machine=self.machine)

            return x, y

     

     

    def load_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer) -> None:

        dist.barrier()

        model_state, optimizer_state = get_state_dict(

            mannequin, optimizer, choices=StateDictOptions(full_state_dict=True),

        )

        load(

            {“mannequin”: model_state, “optimizer”: optimizer_state},

            checkpoint_id=“checkpoint-dist”,

        )

        set_state_dict(

            mannequin, optimizer,

            model_state_dict=model_state, optim_state_dict=optimizer_state,

            choices=StateDictOptions(broadcast_from_rank0=True, full_state_dict=True),

        )

        dist.barrier()

     

     

    def save_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer) -> None:

        dist.barrier()

        model_state, optimizer_state = get_state_dict(

            mannequin, optimizer, choices=StateDictOptions(full_state_dict=True),

        )

        save(

            {“mannequin”: model_state, “optimizer”: optimizer_state},

            checkpoint_id=“checkpoint-dist”,

        )

        dist.barrier()

     

     

    # Load the tokenizer and dataset

    tokenizer = tokenizers.Tokenizer.from_file(“bpe_50K.json”)

    dataset = datasets.load_dataset(“HuggingFaceFW/fineweb”, “sample-10BT”, break up=“prepare”)

     

    # Initialize the distributed atmosphere

    dist.init_process_group(backend=“nccl”)

    rank = dist.get_rank()

    local_rank = int(os.environ[“LOCAL_RANK”])

    world_size = dist.get_world_size()

    machine = torch.machine(f“cuda:{local_rank}”)

    print(f“World dimension {world_size}, rank {rank}, native rank {local_rank}. Utilizing {machine}”)

    assert world_size == 3, f“This script is designed for 3 GPUs, obtained {world_size}”

     

    # Create pretraining mannequin with default config on meta machine to forestall OOM

    with torch.machine(“meta”):

        model_config = LlamaConfig()

        mannequin = LlamaForPretraining(model_config)

        # Partition the mannequin by eradicating some layers

        num_layers = model_config.num_hidden_layers

        partition = [num_layers // 3, 2 * num_layers // 3, num_layers]

        if rank == 0:

            # from embedding to 1/3 of the decoder layers

            for n in vary(partition[0], partition[2]):

                mannequin.base_model.layers[str(n)] = None

            mannequin.base_model.norm = None

            mannequin.lm_head = None

        elif rank == 1:

            # from 1/3 to 2/3 of the decoder layers

            mannequin.base_model.embed_tokens = None

            for n in vary(0, partition[0]):

                mannequin.base_model.layers[str(n)] = None

            for n in vary(partition[1], partition[2]):

                mannequin.base_model.layers[str(n)] = None

            mannequin.base_model.norm = None

            mannequin.lm_head = None

        elif rank == 2:

            # from 2/3 to the tip of the decoder layers and the ultimate norm layer, LM head

            mannequin.base_model.embed_tokens = None

            for n in vary(partition[1]):

                mannequin.base_model.layers[str(n)] = None

        else:

            increase ValueError(f“Invalid rank: {rank}”)

     

     

    # Transfer mannequin from meta machine to CUDA machine, then initialize the weights

    def reset_all_weights(mannequin: nn.Module) -> None:

        @torch.no_grad()

        def weight_reset(m: nn.Module):

            reset_parameters = getattr(m, “reset_parameters”, None)

            if callable(reset_parameters):

                m.reset_parameters()

     

        # Applies fn recursively to mannequin itself and all of mannequin.kids()

        mannequin.apply(fn=weight_reset)

     

     

    mannequin.to_empty(machine=machine)

    reset_all_weights(mannequin)

    mannequin.prepare()

    stage = PipelineStage(mannequin, stage_index=rank, num_stages=world_size, machine=machine)

     

    # Coaching parameters

    epochs = 3

    learning_rate = 1e–3

    batch_size = 64

    seq_length = 512

    num_warmup_steps = 1000

    PAD_TOKEN_ID = tokenizer.token_to_id(“[PAD]”)

     

    # DataLoader, optimizer, scheduler, and loss perform

    dataset = PretrainingDataset(dataset, tokenizer, seq_length, machine)

    dataloader = torch.utils.information.DataLoader(

        dataset,

        batch_size=batch_size,

    )

    num_training_steps = len(dataloader) * epochs

    print(f“Variety of coaching steps: {num_training_steps} = {len(dataloader)} * {epochs}”)

     

    optimizer = torch.optim.AdamW(

        mannequin.parameters(), lr=learning_rate, betas=(0.9, 0.99), eps=1e–8, weight_decay=0.1,

    )

    warmup_scheduler = lr_scheduler.LinearLR(

        optimizer,

        start_factor=0.1, end_factor=1.0, total_iters=num_warmup_steps,

    )

    cosine_scheduler = lr_scheduler.CosineAnnealingLR(

        optimizer,

        T_max=num_training_steps – num_warmup_steps,

        eta_min=0,

    )

    scheduler = lr_scheduler.SequentialLR(

        optimizer,

        schedulers=[warmup_scheduler, cosine_scheduler],

        milestones=[num_warmup_steps],

    )

     

    # if checkpoint-dist dir exists, load the checkpoint to mannequin and optimizer

    # Observe: It’s best to implement easy methods to reset the epoch and step to permit appropriate resume

    if os.path.exists(“checkpoint-dist”):

        load_checkpoint(mannequin, optimizer)

     

    # Create pipeline schedule

    def loss_fn(logits: Tensor, target_ids: Tensor) -> Tensor:

        logits = logits.view(–1, logits.dimension(–1))

        target_ids = target_ids.view(–1)

        return F.cross_entropy(logits, target_ids, ignore_index=PAD_TOKEN_ID)

     

    n_microbatches = 4  # num break up per batch

    schedule = ScheduleGPipe(stage, n_microbatches=n_microbatches, loss_fn=loss_fn)

     

    # begin coaching

    for epoch in vary(epochs):

        pbar = tqdm.tqdm(dataloader, desc=f“Epoch {epoch+1}/{epochs}”, disable=(rank != world_size – 1))

        for batch_id, batch in enumerate(pbar):

            if batch_id % 1000 == 0:

                save_checkpoint(mannequin, optimizer)

            # zero grad earlier than ahead cross, since no specific backward cross known as

            optimizer.zero_grad(set_to_none=True)

            # get batched information

            input_ids, target_ids = batch

            if rank == 0:

                schedule.step(input_ids)

            elif rank == world_size – 1:

                losses = []  # expects one misplaced per microbatch

                logits = schedule.step(goal=target_ids, losses=losses)

                with torch.no_grad():

                    pbar.set_postfix(loss=sum(losses).merchandise() / len(losses))

            else:

                schedule.step()

     

            torch.nn.utils.clip_grad_norm_(mannequin.parameters(), 1.0)

            optimizer.step()

            scheduler.step()

            pbar.replace(1)

        pbar.shut()

     

    # Save the mannequin

    save_checkpoint(mannequin, optimizer)

     

    # Clear up the distributed atmosphere

    dist.destroy_process_group()

    Share. Facebook Twitter Pinterest LinkedIn Tumblr Email
    Oliver Chambers
    • Website

    Related Posts

    The Human Behind the Door – O’Reilly

    January 23, 2026

    How PDI constructed an enterprise-grade RAG system for AI functions with AWS

    January 23, 2026

    Open Pocket book: A True Open Supply Non-public NotebookLM Various?

    January 22, 2026
    Top Posts

    Evaluating the Finest AI Video Mills for Social Media

    April 18, 2025

    Utilizing AI To Repair The Innovation Drawback: The Three Step Resolution

    April 18, 2025

    Midjourney V7: Quicker, smarter, extra reasonable

    April 18, 2025

    Meta resumes AI coaching utilizing EU person knowledge

    April 18, 2025
    Don't Miss

    High quality Knowledge Annotation for Cardiovascular AI

    By Declan MurphyJanuary 23, 2026

    Nevertheless, the power of AI within the prevention and administration of heart problems is determined…

    Joi Chatbot Entry, Pricing, and Characteristic Overview

    January 23, 2026

    Transferring from self-importance to worth metrics

    January 23, 2026

    Fortinet Confirms Energetic Exploitation of FortiCloud SSO Bypass Vulnerability

    January 23, 2026
    Stay In Touch
    • Facebook
    • Twitter
    • Pinterest
    • Instagram
    • YouTube
    • Vimeo

    Subscribe to Updates

    Get the latest creative news from SmartMag about art & design.

    UK Tech Insider
    Facebook X (Twitter) Instagram
    • About Us
    • Contact Us
    • Privacy Policy
    • Terms Of Service
    • Our Authors
    © 2026 UK Tech Insider. All rights reserved by UK Tech Insider.

    Type above and press Enter to search. Press Esc to cancel.