DDP-script.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. # Copyright (c) Sebastian Raschka under Apache License 2.0 (see LICENSE.txt).
  2. # Source for "Build a Large Language Model From Scratch"
  3. # - https://www.manning.com/books/build-a-large-language-model-from-scratch
  4. # Code: https://github.com/rasbt/LLMs-from-scratch
  5. # Appendix A: Introduction to PyTorch (Part 3)
  6. import torch
  7. import torch.nn.functional as F
  8. from torch.utils.data import Dataset, DataLoader
  9. # NEW imports:
  10. import os
  11. import platform
  12. import torch.multiprocessing as mp
  13. from torch.utils.data.distributed import DistributedSampler
  14. from torch.nn.parallel import DistributedDataParallel as DDP
  15. from torch.distributed import init_process_group, destroy_process_group
  16. # NEW: function to initialize a distributed process group (1 process / GPU)
  17. # this allows communication among processes
  18. def ddp_setup(rank, world_size):
  19. """
  20. Arguments:
  21. rank: a unique process ID
  22. world_size: total number of processes in the group
  23. """
  24. # rank of machine running rank:0 process
  25. # here, we assume all GPUs are on the same machine
  26. os.environ["MASTER_ADDR"] = "localhost"
  27. # any free port on the machine
  28. os.environ["MASTER_PORT"] = "12345"
  29. # initialize process group
  30. if platform.system() == "Windows":
  31. # Disable libuv because PyTorch for Windows isn't built with support
  32. os.environ["USE_LIBUV"] = "0"
  33. # Windows users may have to use "gloo" instead of "nccl" as backend
  34. # gloo: Facebook Collective Communication Library
  35. init_process_group(backend="gloo", rank=rank, world_size=world_size)
  36. else:
  37. # nccl: NVIDIA Collective Communication Library
  38. init_process_group(backend="nccl", rank=rank, world_size=world_size)
  39. torch.cuda.set_device(rank)
  40. class ToyDataset(Dataset):
  41. def __init__(self, X, y):
  42. self.features = X
  43. self.labels = y
  44. def __getitem__(self, index):
  45. one_x = self.features[index]
  46. one_y = self.labels[index]
  47. return one_x, one_y
  48. def __len__(self):
  49. return self.labels.shape[0]
  50. class NeuralNetwork(torch.nn.Module):
  51. def __init__(self, num_inputs, num_outputs):
  52. super().__init__()
  53. self.layers = torch.nn.Sequential(
  54. # 1st hidden layer
  55. torch.nn.Linear(num_inputs, 30),
  56. torch.nn.ReLU(),
  57. # 2nd hidden layer
  58. torch.nn.Linear(30, 20),
  59. torch.nn.ReLU(),
  60. # output layer
  61. torch.nn.Linear(20, num_outputs),
  62. )
  63. def forward(self, x):
  64. logits = self.layers(x)
  65. return logits
  66. def prepare_dataset():
  67. X_train = torch.tensor([
  68. [-1.2, 3.1],
  69. [-0.9, 2.9],
  70. [-0.5, 2.6],
  71. [2.3, -1.1],
  72. [2.7, -1.5]
  73. ])
  74. y_train = torch.tensor([0, 0, 0, 1, 1])
  75. X_test = torch.tensor([
  76. [-0.8, 2.8],
  77. [2.6, -1.6],
  78. ])
  79. y_test = torch.tensor([0, 1])
  80. # Uncomment these lines to increase the dataset size to run this script on up to 8 GPUs:
  81. # factor = 4
  82. # X_train = torch.cat([X_train + torch.randn_like(X_train) * 0.1 for _ in range(factor)])
  83. # y_train = y_train.repeat(factor)
  84. # X_test = torch.cat([X_test + torch.randn_like(X_test) * 0.1 for _ in range(factor)])
  85. # y_test = y_test.repeat(factor)
  86. train_ds = ToyDataset(X_train, y_train)
  87. test_ds = ToyDataset(X_test, y_test)
  88. train_loader = DataLoader(
  89. dataset=train_ds,
  90. batch_size=2,
  91. shuffle=False, # NEW: False because of DistributedSampler below
  92. pin_memory=True,
  93. drop_last=True,
  94. # NEW: chunk batches across GPUs without overlapping samples:
  95. sampler=DistributedSampler(train_ds) # NEW
  96. )
  97. test_loader = DataLoader(
  98. dataset=test_ds,
  99. batch_size=2,
  100. shuffle=False,
  101. )
  102. return train_loader, test_loader
  103. # NEW: wrapper
  104. def main(rank, world_size, num_epochs):
  105. ddp_setup(rank, world_size) # NEW: initialize process groups
  106. train_loader, test_loader = prepare_dataset()
  107. model = NeuralNetwork(num_inputs=2, num_outputs=2)
  108. model.to(rank)
  109. optimizer = torch.optim.SGD(model.parameters(), lr=0.5)
  110. model = DDP(model, device_ids=[rank]) # NEW: wrap model with DDP
  111. # the core model is now accessible as model.module
  112. for epoch in range(num_epochs):
  113. # NEW: Set sampler to ensure each epoch has a different shuffle order
  114. train_loader.sampler.set_epoch(epoch)
  115. model.train()
  116. for features, labels in train_loader:
  117. features, labels = features.to(rank), labels.to(rank) # New: use rank
  118. logits = model(features)
  119. loss = F.cross_entropy(logits, labels) # Loss function
  120. optimizer.zero_grad()
  121. loss.backward()
  122. optimizer.step()
  123. # LOGGING
  124. print(f"[GPU{rank}] Epoch: {epoch+1:03d}/{num_epochs:03d}"
  125. f" | Batchsize {labels.shape[0]:03d}"
  126. f" | Train/Val Loss: {loss:.2f}")
  127. model.eval()
  128. try:
  129. train_acc = compute_accuracy(model, train_loader, device=rank)
  130. print(f"[GPU{rank}] Training accuracy", train_acc)
  131. test_acc = compute_accuracy(model, test_loader, device=rank)
  132. print(f"[GPU{rank}] Test accuracy", test_acc)
  133. ####################################################
  134. # NEW (not in the book):
  135. except ZeroDivisionError as e:
  136. raise ZeroDivisionError(
  137. f"{e}\n\nThis script is designed for 2 GPUs. You can run it as:\n"
  138. "CUDA_VISIBLE_DEVICES=0,1 python DDP-script.py\n"
  139. f"Or, to run it on {torch.cuda.device_count()} GPUs, uncomment the code on lines 103 to 107."
  140. )
  141. ####################################################
  142. destroy_process_group() # NEW: cleanly exit distributed mode
  143. def compute_accuracy(model, dataloader, device):
  144. model = model.eval()
  145. correct = 0.0
  146. total_examples = 0
  147. for idx, (features, labels) in enumerate(dataloader):
  148. features, labels = features.to(device), labels.to(device)
  149. with torch.no_grad():
  150. logits = model(features)
  151. predictions = torch.argmax(logits, dim=1)
  152. compare = labels == predictions
  153. correct += torch.sum(compare)
  154. total_examples += len(compare)
  155. return (correct / total_examples).item()
  156. if __name__ == "__main__":
  157. # This script may not work for GPUs > 2 due to the small dataset
  158. # Run `CUDA_VISIBLE_DEVICES=0,1 python DDP-script.py` if you have GPUs > 2
  159. print("PyTorch version:", torch.__version__)
  160. print("CUDA available:", torch.cuda.is_available())
  161. print("Number of GPUs available:", torch.cuda.device_count())
  162. torch.manual_seed(123)
  163. # NEW: spawn new processes
  164. # note that spawn will automatically pass the rank
  165. num_epochs = 3
  166. world_size = torch.cuda.device_count()
  167. mp.spawn(main, args=(world_size, num_epochs), nprocs=world_size)
  168. # nprocs=world_size spawns one process per GPU