Close Menu
    Main Menu
    • Home
    • News
    • Tech
    • Robotics
    • ML & Research
    • AI
    • Digital Transformation
    • AI Ethics & Regulation
    • Thought Leadership in AI

    Subscribe to Updates

    Get the latest creative news from FooBar about art, design and business.

    What's Hot

    A Information to Kedro: Your Manufacturing-Prepared Information Science Toolbox

    March 4, 2026

    Simbe Turns into First Retail Robotics Firm to Obtain UL 3300 Certification for Autonomous Robotic Tally

    March 4, 2026

    The Machine Studying Practitioner’s Information to Speculative Decoding

    March 4, 2026
    Facebook X (Twitter) Instagram
    UK Tech InsiderUK Tech Insider
    Facebook X (Twitter) Instagram
    UK Tech InsiderUK Tech Insider
    Home»Machine Learning & Research»Practice Your Giant Mannequin on A number of GPUs with Tensor Parallelism
    Machine Learning & Research

    Practice Your Giant Mannequin on A number of GPUs with Tensor Parallelism

    Oliver ChambersBy Oliver ChambersJanuary 1, 2026No Comments7 Mins Read
    Facebook Twitter Pinterest Telegram LinkedIn Tumblr Email Reddit
    Practice Your Giant Mannequin on A number of GPUs with Tensor Parallelism
    Share
    Facebook Twitter LinkedIn Pinterest Email Copy Link


    import dataclasses

    import datetime

    import os

     

    import datasets

    import tokenizers

    import torch

    import torch.distributed as dist

    import torch.nn as nn

    import torch.nn.useful as F

    import torch.optim.lr_scheduler as lr_scheduler

    import tqdm

    from torch import Tensor

    from torch.distributed.checkpoint import load, save

    from torch.distributed.checkpoint.default_planner import DefaultLoadPlanner

    from torch.distributed.fsdp import FSDPModule, fully_shard

    from torch.distributed.tensor import Replicate, Shard

    from torch.distributed.tensor.parallel import (

        ColwiseParallel,

        PrepareModuleInput,

        RowwiseParallel,

        SequenceParallel,

        loss_parallel,

        parallelize_module,

    )

    from torch.utils.information.distributed import DistributedSampler

     

    # Set default to bfloat16

    torch.set_default_dtype(torch.bfloat16)

    print(“NCCL model:”, torch.cuda.nccl.model())

     

    # Construct the mannequin

    @dataclasses.dataclass

    class LlamaConfig:

        “”“Outline Llama mannequin hyperparameters.”“”

        vocab_size: int = 50000  # Measurement of the tokenizer vocabulary

        max_position_embeddings: int = 2048  # Most sequence size

        hidden_size: int = 768  # Dimension of hidden layers

        intermediate_size: int = 4*768  # Dimension of MLP’s hidden layer

        num_hidden_layers: int = 12  # Variety of transformer layers

        num_attention_heads: int = 12  # Variety of consideration heads

        num_key_value_heads: int = 3  # Variety of key-value heads for GQA

     

     

    class RotaryPositionEncoding(nn.Module):

        “”“Rotary place encoding.”“”

     

        def __init__(self, dim: int, max_position_embeddings: int) -> None:

            “”“Initialize the RotaryPositionEncoding module.

     

            Args:

                dim: The hidden dimension of the enter tensor to which RoPE is utilized

                max_position_embeddings: The utmost sequence size of the enter tensor

            ““”

            tremendous().__init__()

            self.dim = dim

            self.max_position_embeddings = max_position_embeddings

            # compute a matrix of ntheta_i

            N = 10_000.0

            inv_freq = 1.0 / (N ** (torch.arange(0, dim, 2) / dim))

            inv_freq = torch.cat((inv_freq, inv_freq), dim=–1)

            place = torch.arange(max_position_embeddings)

            sinusoid_inp = torch.outer(place, inv_freq)

            # save cosine and sine matrices as buffers, not parameters

            self.register_buffer(“cos”, sinusoid_inp.cos())

            self.register_buffer(“sin”, sinusoid_inp.sin())

     

        def ahead(self, x: Tensor) -> Tensor:

            “”“Apply RoPE to tensor x.

     

            Args:

                x: Enter tensor of form (batch_size, seq_length, num_heads, head_dim)

     

            Returns:

                Output tensor of form (batch_size, seq_length, num_heads, head_dim)

            ““”

            batch_size, seq_len, num_heads, head_dim = x.form

            gadget = x.gadget

            dtype = x.dtype

            # remodel the cosine and sine matrices to 4D tensor and the identical dtype as x

            cos = self.cos.to(gadget, dtype)[:seq_len].view(1, seq_len, 1, –1)

            sin = self.sin.to(gadget, dtype)[:seq_len].view(1, seq_len, 1, –1)

            # apply RoPE to x

            x1, x2 = x.chunk(2, dim=–1)

            rotated = torch.cat((–x2, x1), dim=–1)

            output = (x * cos) + (rotated * sin)

            return output

     

     

    class LlamaAttention(nn.Module):

        “”“Grouped-query consideration with rotary embeddings.”“”

     

        def __init__(self, config: LlamaConfig) -> None:

            tremendous().__init__()

            self.hidden_size = config.hidden_size

            self.num_heads = config.num_attention_heads

            self.head_dim = self.hidden_size // self.num_heads

            self.num_kv_heads = config.num_key_value_heads  # GQA: H_kv < H_q

     

            # hidden_size have to be divisible by num_heads

            assert (self.head_dim * self.num_heads) == self.hidden_measurement

     

            # Linear layers for Q, Ok, V projections

            self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False)

            self.k_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)

            self.v_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)

            self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)

     

        def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding, attn_mask: Tensor) -> Tensor:

            bs, seq_len, dim = hidden_states.measurement()

     

            # Challenge inputs to Q, Ok, V

            query_states = self.q_proj(hidden_states).view(bs, seq_len, self.num_heads, self.head_dim)

            key_states = self.k_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)

            value_states = self.v_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)

     

            # Apply rotary place embeddings

            query_states = rope(query_states)

            key_states = rope(key_states)

     

            # Transpose tensors from BSHD to BHSD dimension for scaled_dot_product_attention

            query_states = query_states.transpose(1, 2)

            key_states = key_states.transpose(1, 2)

            value_states = value_states.transpose(1, 2)

     

            # Use PyTorch’s optimized consideration implementation

            # setting is_causal=True is incompatible with setting specific consideration masks

            attn_output = F.scaled_dot_product_attention(

                query_states,

                key_states,

                value_states,

                attn_mask=attn_mask,

                dropout_p=0.0,

                enable_gqa=True,

            )

     

            # Transpose output tensor from BHSD to BSHD dimension, reshape to 3D, after which undertaking output

            attn_output = attn_output.transpose(1, 2).reshape(bs, seq_len, self.hidden_size)

            attn_output = self.o_proj(attn_output)

            return attn_output

     

     

    class LlamaMLP(nn.Module):

        “”“Feed-forward community with SwiGLU activation.”“”

     

        def __init__(self, config: LlamaConfig) -> None:

            tremendous().__init__()

            # Two parallel projections for SwiGLU

            self.gate_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)

            self.up_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)

            self.act_fn = F.silu  # SwiGLU activation operate

            # Challenge again to hidden measurement

            self.down_proj = nn.Linear(config.intermediate_size, config.hidden_size, bias=False)

     

        def ahead(self, x: Tensor) -> Tensor:

            # SwiGLU activation: multiply gate and up-projected inputs

            gate = self.act_fn(self.gate_proj(x))

            up = self.up_proj(x)

            return self.down_proj(gate * up)

     

     

    class LlamaDecoderLayer(nn.Module):

        “”“Single transformer layer for a Llama mannequin.”“”

     

        def __init__(self, config: LlamaConfig) -> None:

            tremendous().__init__()

            self.input_layernorm = nn.RMSNorm(config.hidden_size, eps=1e–5)

            self.self_attn = LlamaAttention(config)

            self.post_attention_layernorm = nn.RMSNorm(config.hidden_size, eps=1e–5)

            self.mlp = LlamaMLP(config)

     

        def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding, attn_mask: Tensor) -> Tensor:

            # First residual block: Self-attention

            residual = hidden_states

            hidden_states = self.input_layernorm(hidden_states)

            attn_outputs = self.self_attn(hidden_states, rope=rope, attn_mask=attn_mask)

            hidden_states = attn_outputs + residual

     

            # Second residual block: MLP

            residual = hidden_states

            hidden_states = self.post_attention_layernorm(hidden_states)

            hidden_states = self.mlp(hidden_states) + residual

            return hidden_states

     

     

    class LlamaModel(nn.Module):

        “”“The complete Llama mannequin with none pretraining heads.”“”

     

        def __init__(self, config: LlamaConfig) -> None:

            tremendous().__init__()

            self.rotary_emb = RotaryPositionEncoding(

                config.hidden_size // config.num_attention_heads,

                config.max_position_embeddings,

            )

     

            self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size)

            self.layers = nn.ModuleList([

                LlamaDecoderLayer(config) for _ in range(config.num_hidden_layers)

            ])

            self.norm = nn.RMSNorm(config.hidden_size, eps=1e–5)

     

        def ahead(self, input_ids: Tensor, attn_mask: Tensor) -> Tensor:

            # Convert enter token IDs to embeddings

            hidden_states = self.embed_tokens(input_ids)

            # Course of by all transformer layers, then the ultimate norm layer

            for layer in self.layers:

                hidden_states = layer(hidden_states, rope=self.rotary_emb, attn_mask=attn_mask)

            hidden_states = self.norm(hidden_states)

            # Return the ultimate hidden states

            return hidden_states

     

     

    class LlamaForPretraining(nn.Module):

        def __init__(self, config: LlamaConfig) -> None:

            tremendous().__init__()

            self.base_model = LlamaModel(config)

            self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)

     

        def ahead(self, input_ids: Tensor, attn_mask: Tensor) -> Tensor:

            hidden_states = self.base_model(input_ids, attn_mask)

            return self.lm_head(hidden_states)

     

     

    def create_causal_mask(batch: Tensor, dtype: torch.dtype = torch.float32) -> Tensor:

        “”“Create a causal masks for self-attention.

     

        Args:

            batch: Batch of sequences, form (batch_size, seq_len)

            dtype: Knowledge kind of the masks

     

        Returns:

            Causal masks of form (seq_len, seq_len)

        ““”

        batch_size, seq_len = batch.form

        masks = torch.full((seq_len, seq_len), float(“-inf”), gadget=batch.gadget, dtype=dtype)

                    .triu(diagonal=1)

        return masks

     

     

    def create_padding_mask(batch: Tensor, padding_token_id: int, dtype: torch.dtype = torch.float32) -> Tensor:

        “”“Create a padding masks for a batch of sequences for self-attention.

     

        Args:

            batch: Batch of sequences, form (batch_size, seq_len)

            padding_token_id: ID of the padding token

            dtype: Knowledge kind of the masks

     

        Returns:

            Padding masks of form (batch_size, 1, seq_len, seq_len)

        ““”

        padded = torch.zeros_like(batch, gadget=batch.gadget, dtype=dtype)

                      .masked_fill(batch == padding_token_id, float(“-inf”))

        masks = padded[:,:,None] + padded[:,None,:]

        return masks[:, None, :, :]

     

     

    # Generator operate to create padded sequences of mounted size

    class PretrainingDataset(torch.utils.information.Dataset):

        def __init__(self, dataset: datasets.Dataset, tokenizer: tokenizers.Tokenizer,

                     seq_length: int):

            self.dataset = dataset

            self.tokenizer = tokenizer

            self.seq_length = seq_length

            self.bot = tokenizer.token_to_id(“[BOT]”)

            self.eot = tokenizer.token_to_id(“[EOT]”)

            self.pad = tokenizer.token_to_id(“[PAD]”)

     

        def __len__(self):

            return len(self.dataset)

     

        def __getitem__(self, index: int) -> tuple[Tensor, Tensor]:

            “”“Get a sequence of token ids from the dataset. [BOT] and [EOT] tokens

            are added. Clipped and padded to the sequence size.

            ““”

            seq = self.dataset[index][“text”]

            tokens: listing[int] = [self.bot] + self.tokenizer.encode(seq).ids + [self.eot]

            # pad to focus on sequence size

            toklen = len(tokens)

            if toklen < self.seq_length+1:

                pad_length = self.seq_length+1 – toklen

                tokens += [self.pad] * pad_size

            # return the sequence

            x = torch.tensor(tokens[:self.seq_length], dtype=torch.int64)

            y = torch.tensor(tokens[1:self.seq_length+1], dtype=torch.int64)

            return x, y

     

     

    def load_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer, scheduler: lr_scheduler.SequentialLR) -> None:

        dist.barrier()

        load(

            {“mannequin”: mannequin, “optimizer”: optimizer},

            checkpoint_id=“checkpoint-dist”,

            planner=DefaultLoadPlanner(allow_partial_load=True),  # ignore keys for RoPE buffer

        )

        scheduler.load_state_dict(

            torch.load(“checkpoint-dist/lrscheduler.pt”, map_location=gadget),

        )

        dist.barrier()

     

     

    def save_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer, scheduler: lr_scheduler.SequentialLR) -> None:

        dist.barrier()

        save(

            {“mannequin”: mannequin, “optimizer”: optimizer},

            checkpoint_id=“checkpoint-dist”,

        )

        if dist.get_rank() == 0:

            torch.save(scheduler.state_dict(), “checkpoint-dist/lrscheduler.pt”)

        dist.barrier()

     

     

    # Load the tokenizer and dataset

    tokenizer = tokenizers.Tokenizer.from_file(“bpe_50K.json”)

    dataset = datasets.load_dataset(“HuggingFaceFW/fineweb”, “sample-10BT”, break up=“prepare”)

     

    # Initialize the distributed setting

    dist.init_process_group(backend=“nccl”, timeout=datetime.timedelta(seconds=60))

    local_rank = int(os.environ[“LOCAL_RANK”])

    gadget = torch.gadget(f“cuda:{local_rank}”)

    rank = dist.get_rank()

    world_size = dist.get_world_size()

    print(f“World measurement {world_size}, rank {rank}, native rank {local_rank}. Utilizing {gadget}”)

     

    # Initialize the mesh for tensor parallelism

    n_tensor_parallel = 2

    assert world_size % n_tensor_parallel == 0, “Count on world measurement to be divisible by variety of tensor parallel GPUs”

    mesh = dist.device_mesh.init_device_mesh(

        “cuda”,

        (world_size // n_tensor_parallel, n_tensor_parallel),

        mesh_dim_names=(“dp”, “tp”),

    )

    print(f“({rank}) Mesh: {mesh}, DP measurement: {mesh[‘dp’].measurement()}, TP measurement: {mesh[‘tp’].measurement()}, DP native rank: {mesh[‘dp’].get_local_rank()}, TP native rank: {mesh[‘tp’].get_local_rank()}”)

     

    # Create pretraining mannequin on meta gadget, on all ranks

    with torch.gadget(“meta”):

        model_config = LlamaConfig()

        mannequin = LlamaForPretraining(model_config)

     

    # Arrange tensor parallelism on every transformer block within the base mannequin

    tp_plan = {

        “input_layernorm”: SequenceParallel(),

        “self_attn”: PrepareModuleInput(

            input_layouts=Shard(dim=1),  # just one place arg will likely be used

            desired_input_layouts=Replicate(),

        ),

        # Q/Ok projections output will likely be used with RoPE, must be replicated

        # Q/Ok/V output will likely be used with GQA, additionally must be replicated

        “self_attn.q_proj”: ColwiseParallel(output_layouts=Replicate()),

        “self_attn.k_proj”: ColwiseParallel(output_layouts=Replicate()),

        “self_attn.v_proj”: ColwiseParallel(output_layouts=Replicate()),

        “self_attn.o_proj”: RowwiseParallel(input_layouts=Replicate(), output_layouts=Shard(1)),

        “post_attention_layernorm”: SequenceParallel(),

        “mlp”: PrepareModuleInput(

            input_layouts=Shard(dim=1),

            desired_input_layouts=Replicate(),

        ),

        “mlp.gate_proj”: ColwiseParallel(),

        “mlp.up_proj”: ColwiseParallel(),

        “mlp.down_proj”: RowwiseParallel(output_layouts=Shard(1)),

    }

    for layer in mannequin.base_model.layers:

        parallelize_module(layer, mesh[“tp”], tp_plan)

     

    # Arrange tensor parallelism on the embedding and output norm layers within the base mannequin

    # and the prediction head within the top-level mannequin

    tp_plan = {

        “base_model.embed_tokens”: RowwiseParallel(

            input_layouts=Replicate(),

            output_layouts=Shard(1),

        ),

        “base_model.norm”: SequenceParallel(),

        “lm_head”: ColwiseParallel(

            input_layouts=Shard(1),

            # output_layouts=Replicate(), # provided that not utilizing loss parallel

            use_local_output=False,  # Hold DTensor output for loss parallel

        ),

    }

    parallelize_module(mannequin, mesh[“tp”], tp_plan)

     

    # Convert tensor-parallelized mannequin to FSDP2, should shard each element

    # shard throughout the “dp” dimension of the mesh

    for layer in mannequin.base_model.layers:

        fully_shard(layer, mesh=mesh[“dp”])

    fully_shard(mannequin.base_model, mesh=mesh[“dp”])

    fully_shard(mannequin, mesh=mesh[“dp”])

     

    def reset_all_weights(mannequin: nn.Module) -> None:

        “”“Initialize all weights of the mannequin after transferring it away from meta gadget.”“”

        @torch.no_grad()

        def weight_reset(m: nn.Module):

            reset_parameters = getattr(m, “reset_parameters”, None)

            if callable(reset_parameters):

                m.reset_parameters()

     

        # Applies fn recursively to mannequin itself and all of mannequin.youngsters()

        mannequin.apply(fn=weight_reset)

     

    torch.manual_seed(42)

    mannequin.to_empty(gadget=gadget)

    reset_all_weights(mannequin)

    assert isinstance(mannequin, FSDPModule), f“Anticipated FSDPModule, bought {kind(mannequin)}”

     

    # Coaching parameters

    epochs = 3

    learning_rate = 1e–3

    batch_size = 64 // mesh[“dp”].measurement()

    seq_length = 512

    num_warmup_steps = 1000

    PAD_TOKEN_ID = tokenizer.token_to_id(“[PAD]”)

    mannequin.prepare()

     

    # DataLoader, optimizer, scheduler, and loss operate

    # Sampler is required to shard the dataset throughout world measurement

    dataset = PretrainingDataset(dataset, tokenizer, seq_length)

    sampler = DistributedSampler(

        dataset, shuffle=False, drop_last=True,

        num_replicas=mesh[“dp”].measurement(),

        rank=mesh[“dp”].get_local_rank(),

    )

    dataloader = torch.utils.information.DataLoader(

        dataset,

        sampler=sampler,

        batch_size=batch_size,

        pin_memory=True,  # non-obligatory

        shuffle=False,

        num_workers=2,

        prefetch_factor=2,

    )

    num_training_steps = len(dataloader) * epochs

     

    optimizer = torch.optim.AdamW(

        mannequin.parameters(), lr=learning_rate, betas=(0.9, 0.99), eps=1e–8, weight_decay=0.1,

    )

    warmup_scheduler = lr_scheduler.LinearLR(

        optimizer,

        start_factor=0.1, end_factor=1.0, total_iters=num_warmup_steps,

    )

    cosine_scheduler = lr_scheduler.CosineAnnealingLR(

        optimizer,

        T_max=num_training_steps – num_warmup_steps,

        eta_min=0,

    )

    scheduler = lr_scheduler.SequentialLR(

        optimizer,

        schedulers=[warmup_scheduler, cosine_scheduler],

        milestones=[num_warmup_steps],

    )

    loss_fn = nn.CrossEntropyLoss(ignore_index=PAD_TOKEN_ID)

     

    # if checkpoint-dist dir exists, load the checkpoint to mannequin and optimizer

    if os.path.exists(“checkpoint-dist”):

        load_checkpoint(mannequin, optimizer, scheduler)

     

    # begin coaching

    print(f“({rank}) Beginning coaching”)

    for epoch in vary(epochs):

        pbar = tqdm.tqdm(dataloader, desc=f“({rank}) Epoch {epoch+1}/{epochs}”)

        for batch_id, batch in enumerate(pbar):

            if batch_id % 1000 == 0:

                save_checkpoint(mannequin, optimizer, scheduler)

            # Specific prefetching earlier than sending any information to mannequin

            mannequin.unshard()

            # Get batched information, transfer from CPU to GPU

            input_ids, target_ids = batch

            input_ids = input_ids.to(gadget)

            target_ids = target_ids.to(gadget)

            # create consideration masks: causal masks + padding masks

            attn_mask = create_causal_mask(input_ids) +

                        create_padding_mask(input_ids, PAD_TOKEN_ID)

            # Extract output from mannequin

            logits = mannequin(input_ids, attn_mask)

            optimizer.zero_grad()

            with loss_parallel():

                # Compute loss: cross-entropy between logits and goal, ignoring padding tokens

                loss = loss_fn(logits.view(–1, logits.measurement(–1)), target_ids.view(–1))

                # Backward with loss on DTensor

                loss.backward()

            torch.nn.utils.clip_grad_norm_(mannequin.parameters(), 1.0)

            optimizer.step()

            scheduler.step()

            pbar.set_postfix(loss=loss.merchandise())

            pbar.replace(1)

        pbar.shut()

     

    # Save the mannequin

    save_checkpoint(mannequin, optimizer, scheduler)

     

    # Clear up the distributed setting

    dist.destroy_process_group()

    Share. Facebook Twitter Pinterest LinkedIn Tumblr Email
    Oliver Chambers
    • Website

    Related Posts

    A Information to Kedro: Your Manufacturing-Prepared Information Science Toolbox

    March 4, 2026

    Deploying AI Brokers to Manufacturing: Structure, Infrastructure, and Implementation Roadmap

    March 4, 2026

    On the Impossibility of Separating Intelligence from Judgment: The Computational Intractability of Filtering for AI Alignment

    March 4, 2026
    Top Posts

    Evaluating the Finest AI Video Mills for Social Media

    April 18, 2025

    Utilizing AI To Repair The Innovation Drawback: The Three Step Resolution

    April 18, 2025

    Midjourney V7: Quicker, smarter, extra reasonable

    April 18, 2025

    Meta resumes AI coaching utilizing EU person knowledge

    April 18, 2025
    Don't Miss

    A Information to Kedro: Your Manufacturing-Prepared Information Science Toolbox

    By Oliver ChambersMarch 4, 2026

    Picture by Editor   # Introduction  Information science tasks normally start as exploratory Python notebooks however…

    Simbe Turns into First Retail Robotics Firm to Obtain UL 3300 Certification for Autonomous Robotic Tally

    March 4, 2026

    The Machine Studying Practitioner’s Information to Speculative Decoding

    March 4, 2026

    Iranian cyberattacks fail to materialize however risk stays acute

    March 4, 2026
    Stay In Touch
    • Facebook
    • Twitter
    • Pinterest
    • Instagram
    • YouTube
    • Vimeo

    Subscribe to Updates

    Get the latest creative news from SmartMag about art & design.

    UK Tech Insider
    Facebook X (Twitter) Instagram
    • About Us
    • Contact Us
    • Privacy Policy
    • Terms Of Service
    • Our Authors
    © 2026 UK Tech Insider. All rights reserved by UK Tech Insider.

    Type above and press Enter to search. Press Esc to cancel.