GitHub this note show getting started with distributed training on SageMaker
- Basic MPI
- Pytorch DataParallel and DistributedDataParallel
- SageMaker DistributedDataParallel Lib
- Reference papers
Basic MPI#
- Run on 3 nodes x 2 process per node
- Broadcass node, and receive node
- Agregrate data on nodes
Let create a simple MPI (Message Passing Interface)
from mpi4py import MPIimport numpy as npimport timecomm = MPI.COMM_WORLDsize = comm.Get_size()rank = comm.Get_rank()if rank == 0:print("Number of MPI processes that will talk to each other:", size)def point_to_point():"""Point to point communicationSend a numpy array (buffer like object) from rank 0 to rank 1"""if rank == 0:print("point to point")data = np.array([0, 1, 2], dtype=np.intc) # int in C# remember the difference between# Upper case API and lower case API# Basically uppper case API directly calls C API# so it is fast# checkout[data, MPI.INT], dest=1)elif rank == 1:print(f"Hello I am rank {rank}")data = np.empty(3, dtype=np.intc)comm.Recv([data, MPI.INT], source=0)print("I received some data:", data)if rank == 0:time.sleep(1) # give some buffer time for execution to completeprint("=" * 50)returndef broadcast():"""Broadcast a numpy array from rank 0 to others"""if rank == 0:print(f"Broadcasting from rank {rank}")data = np.arange(10, dtype=np.intc)else:data = np.empty(10, dtype=np.intc)comm.Bcast([data, MPI.INT], root=0)print(f"Data at rank {rank}", data)if rank == 0:time.sleep(1)print("=" * 50)returndef gather_reduce_broadcast():"""Gather numpy arrays from all ranks to rank 0then take average and broadcast result to other ranksIt is a useful operation in distributed training:train a model in a few MPI workers with differentinput data, then take average weights on rank 0 andsynchroinze weights on other ranks"""# stuff to gather at each ranksendbuf = np.zeros(10, dtype=np.intc) + rankrecvbuf = Noneif rank == 0:print("Gather and reduce")recvbuf = np.empty([size, 10], dtype=np.intc)comm.Gather(sendbuf, recvbuf, root=0)if rank == 0:print(f"I am rank {rank}, data I gathered is: {recvbuf}")# take average# think of it as a prototype of# average weights, average gradients etcavg = np.mean(recvbuf, axis=0, dtype=np.float)else:# get averaged array from rank 0# think of it as a prototype of# synchronizing weights across different MPI procsavg = np.empty(10, dtype=np.float)# Note that the data type is float here# because we took averagecomm.Bcast([avg, MPI.FLOAT], root=0)print(f"I am rank {rank}, my avg is: {avg}")returnif __name__ == "__main__":point_to_point()broadcast()gather_reduce_broadcast()
Then create a SageMaker job to run it
import sagemakerfrom sagemaker import get_execution_rolefrom sagemaker.tensorflow import TensorFlowrole = get_execution_role()# Running 2 processes per host# if we use 3 instances,# then we should see 6 MPI processesdistribution = {"mpi": {"enabled": True, "processes_per_host": 2}}tfest = TensorFlow(entry_point="",role=role,framework_version="2.3.0",distribution=distribution,py_version="py37",instance_count=3,instance_type="ml.c5.2xlarge", # 8 coresoutput_path="s3://" + sagemaker.Session().default_bucket() + "/" + "mpi",)
Pytorch Multiple CPU#
- Average gradient and DistributedSampler
- Setup dist.init_process_group
- Wrap model in DataParallel or DistributedDataParallel
Follow this sm example and distributed training workshop example
if is_distributed and use_cuda:# multi-machine multi-gpu casemodel = torch.nn.parallel.DistributedDataParallel(model)else:# single-machine multi-gpu case or single-machine or multi-machine cpu casemodel = torch.nn.DataParallel(model)
Let create model and train script
import jsonimport argparseimport osimport torchimport torch.utils.dataimport torch.nn as nnfrom import Dataset, DataLoader#import torch.distributed as dist# Parameters and DataLoadersinput_size = 5output_size = 2batch_size = 30data_size = 100class RandomDataset(Dataset):def __init__(self, size, length):self.len = = torch.randn(length, size)def __getitem__(self, index):return[index]def __len__(self):return self.lenclass Model(nn.Module):# Our modeldef __init__(self, input_size, output_size):super(Model, self).__init__()self.fc = nn.Linear(input_size, output_size)def forward(self, input):output = self.fc(input)print("\tIn Model: input size", input.size(), "output size", output.size())return outputdef _get_train_data_loader(is_distributed:bool):# generate datatrain_set = RandomDataset(input_size, data_size)#train_sampler = ( if is_distributed else None)#return,batch_size=batch_size,shuffle=train_sampler is None,sampler=train_sampler)def _get_test_data_loader():return Nonedef _average_gradients(model):# Gradient averaging.size = float(dist.get_world_size())for param in model.parameters():dist.all_reduce(, op=dist.reduce_op.SUM) /= sizedef parse_args():parser = argparse.ArgumentParser()parser.add_argument("--backend",type=str,default="gloo")parser.add_argument("--model-type",type=str,default="custom",)parser.add_argument("--hosts",type=list,default=json.loads(os.environ["SM_HOSTS"]))parser.add_argument("--current-host",type=str,default=os.environ["SM_CURRENT_HOST"])return parser.parse_args()def train(args):# devicedevice = torch.device("cpu")#world_size = len(args.hosts)#host_rank = args.hosts.index(args.current_host)#print(f'world size {world_size} and host rank {host_rank}')dist.init_process_group(backend=args.backend,rank=host_rank,world_size=world_size)# modelmodel = Model(input_size, output_size).to(device)model = torch.nn.DataParallel(model)# model = torch.nn.parallel.DistributedDataParallel(model)# datatrain_loader = _get_train_data_loader(is_distributed=True)print(f'train_loader_len {len(train_loader.sampler)} data_set_len {len(train_loader.dataset)}')# trainfor data in train_loader:input = = model(input)print("Outside: input size", input.size(), "output_size", output.size())if __name__=="__main__":args = parse_args()train(args)
Let create a training job in SageMaker
estimator_1 = PyTorch(role="arn:aws:iam::392194582387:role/RoleForDataScientistUserProfile",entry_point="",framework_version="1.8.0",py_version="py3",instance_count=2,instance_type="ml.c5.2xlarge",hyperparameters={'backend': 'gloo','model-type': 'custom'}# distribution={# "smdistributed": {"dataparallel": {"enabled": True}}# mpirun backend# "pytorchddp": {"enable": True}# },)
Check the output on host algo-1
#011In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2])Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])#011In Model: input size torch.Size([20, 5]) output size torch.Size([20, 2])Outside: input size torch.Size([20, 5]) output_size torch.Size([20, 2])
and check output on host algo-2
#011In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2])Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])#011In Model: input size torch.Size([20, 5]) output size torch.Size([20, 2])Outside: input size torch.Size([20, 5]) output_size torch.Size([20, 2])
Pytorch DataParallel#
- Pin the model to multiple GPUs
- DataParallel automatically split the batch into smaller batches running on GPUs
Follow this tutorial, first create a simple model
import torchimport torch.nn as nnfrom import Dataset, DataLoader# Parameters and DataLoadersinput_size = 5output_size = 2batch_size = 30data_size = 100#device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")device = "cpu"class RandomDataset(Dataset):def __init__(self, size, length):self.len = = torch.randn(length, size)def __getitem__(self, index):return[index]def __len__(self):return self.lenclass Model(nn.Module):# Our modeldef __init__(self, input_size, output_size):super(Model, self).__init__()self.fc = nn.Linear(input_size, output_size)def forward(self, input):output = self.fc(input)print("\tIn Model: input size", input.size(), "output size", output.size())try:id = int(torch.cuda.current_device())print(torch.cuda.device(id))print(torch.cuda.get_device_name(torch.cuda.current_device()))except:print("not able to print device")return outputrand_loader = DataLoader(dataset=RandomDataset(input_size, data_size), batch_size=batch_size, shuffle=True)model = Model(input_size, output_size)if torch.cuda.device_count() > 1:print("Let's use", torch.cuda.device_count(), "GPUs!")# dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUsmodel = nn.DataParallel(model) data in rand_loader:input = = model(input)print("Outside: input size", input.size(), "output_size", output.size())
Then create SM training job
from sagemaker.pytorch import PyTorchestimator = PyTorch(role="arn:aws:iam::$ACCOUNT_ID:role/RoleForDataScientistUserProfile",entry_point="",framework_version="2.0.1",py_version="py310",instance_count=1,instance_type="ml.g5.12xlarge",distribution={# mpirun backend"pytorchddp": {"enable": True}},)
In case of 4 GPUs, the output look like this
Let's use 4 GPUs!NCCL version 2.17.1+cuda11.8algo-1:46:58 [0] configure_nvls_option:287 NCCL WARN NET/OFI Could not find ncclGetVersion symbolIn Model: input size torch.Size([8, 5]) output size torch.Size([8, 2])<torch.cuda.device object at 0x7efe8dc55cf0>NVIDIA A10GIn Model: input size torch.Size([8, 5]) output size torch.Size([8, 2])<torch.cuda.device object at 0x7efe8dc54e50>NVIDIA A10GIn Model: input size torch.Size([8, 5]) output size torch.Size([8, 2])<torch.cuda.device object at 0x7efe8dc55990>NVIDIA A10GIn Model: input size torch.Size([6, 5]) output size torch.Size([6, 2])<torch.cuda.device object at 0x7efe8dc55ab0>NVIDIA A10GOutside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])In Model: input sizetorch.Size([8, 5])output size torch.Size([8, 2])In Model: input sizeIn Model: input size torch.Size([8, 5])torch.Size([8, 5])output size torch.Size([8, 2])<torch.cuda.device object at 0x7efe8dc55d80>In Model: input size torch.Size([6, 5])output size torch.Size([8, 2])<torch.cuda.device object at 0x7efe8dc55de0>NVIDIA A10G<torch.cuda.device object at 0x7efe8dc55d80>NVIDIA A10GNVIDIA A10Goutput sizetorch.Size([6, 2])<torch.cuda.device object at 0x7efe8dc55a20>NVIDIA A10GOutside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])In Model: input sizeIn Model: input size torch.Size([8, 5])output size torch.Size([8, 2])<torch.cuda.device object at 0x7efe8dc558d0>NVIDIA A10Gtorch.Size([8, 5])output sizetorch.Size([8, 2])In Model: input size#011In Model: input sizetorch.Size([6, 5])torch.Size([8, 5])output sizeoutput size <torch.cuda.device object at 0x7efe8dc55de0>torch.Size([8, 2])torch.Size([6, 2])<torch.cuda.device object at 0x7efe8dc55c60>NVIDIA A10G<torch.cuda.device object at 0x7efe8dc55d80>NVIDIA A10GNVIDIA A10GOutside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])In Model: input sizetorch.Size([3, 5])output size#011In Model: input sizetorch.Size([3, 5]) torch.Size([3, 2])output sizetorch.Size([3, 2])<torch.cuda.device object at 0x7efe8dc55cc0>NVIDIA A10G<torch.cuda.device object at 0x7efe8dc55750>NVIDIA A10GIn Model: input sizetorch.Size([3, 5]) output size torch.Size([3, 2])<torch.cuda.device object at 0x7efe8dc559f0>NVIDIA A10GIn Model: input size torch.Size([1, 5]) output size torch.Size([1, 2])<torch.cuda.device object at 0x7efe8dc54e50>NVIDIA A10GOutside: input size torch.Size([10, 5]) output_size torch.Size([10, 2])
If there is no GPU or single GPU, output should look like
In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2])<torch.cuda.device object at 0x7f0707748f10>NVIDIA A10GOutside: input size torch.Size([30, 5])output_size torch.Size([30, 2])In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2])<torch.cuda.device object at 0x7f0707748f10>NVIDIA A10GOutside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])In Model: input size torch.Size([30, 5]) output size torch.Size([30, 2])<torch.cuda.device object at 0x7f0707748f10>NVIDIA A10GOutside: input size torch.Size([30, 5]) output_sizetorch.Size([30, 2])In Model: input size torch.Size([10, 5]) output size torch.Size([10, 2])<torch.cuda.device object at 0x7f0707748f10>NVIDIA A10GOutside: input size torch.Size([10, 5]) output_sizetorch.Size([10, 2])2023-12-15 01:51:15,880 sagemaker-training-toolkit INFO Waiting for the process to finish and give a return code.2023-12-15 01:51:15,881 sagemaker-training-toolkit INFO Done waiting for a return code. Received 0 from exiting process.2023-12-15 01:51:15,881 sagemaker-training-toolkit INFO Reporting training SUCCESS
Implements data parallelism at the module level. This container parallelizes the application of the given module by splitting the input across the specified devices by chunking in the batch dimension (other objects will be copied once per device). In the forward pass, the module is replicated on each device, and each replica handles a portion of the input. During the backwards pass, gradients from each replica are summed into the original module.
[!WARNING] It is recommended to use DistributedDataParallel, instead of this class, to do multi-GPU training, even if there is only a single node. See: Use nn.parallel.DistributedDataParallel instead of multiprocessing or nn.DataParallel and Distributed Data Parallel.
Pytorch Distributed Data Parallel#
- Introduction explains basics of DDP and compare it with previous Data Parallel.
- DDP Overview
Compared to DataParallel, DistributedDataParallel requires one more step to set up, i.e., calling init_process_group. DDP uses multi-process parallelism, and hence there is no GIL contention across model replicas. Moreover, the model is broadcast at DDP construction time instead of in every forward pass, which also helps to speed up training. DDP is shipped with several performance optimization technologies. For a more in-depth explanation, refer to this paper (VLDB’20).
Let create a similar model as before. Wrap model in DistributedDataParallel
device = "cuda"model = Model(input_size, output_size)# for single machine gpusmodel = torch.nn.DataParallel(model)# multiple machine gpus# model = torch.nn.parallel.DistributedDataParallel(model)
Init the processing group
world_size = len(args.hosts)host_rank = args.hosts.index(args.current_host)print(f'host rank is {host_rank}')dist.init_process_group(backend=args.backend,rank=host_rank,world_size=world_size)
import argparseimport jsonimport osimport torchimport torch.nn as nnfrom import Dataset, DataLoader# pytorch ddpimport torch.distributed as dist# Parameters and DataLoadersinput_size = 5output_size = 2batch_size = 30data_size = 100#device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")class RandomDataset(Dataset):def __init__(self, size, length):self.len = = torch.randn(length, size)def __getitem__(self, index):return[index]def __len__(self):return self.lenclass Model(nn.Module):# Our modeldef __init__(self, input_size, output_size):super(Model, self).__init__()self.fc = nn.Linear(input_size, output_size)def forward(self, input):output = self.fc(input)print("\tIn Model: input size", input.size(), "output size", output.size())try:id = int(torch.cuda.current_device())print(torch.cuda.device(id))print(torch.cuda.get_device_name(torch.cuda.current_device()))except:print("not able to print device")return outputdef parse_args():parser = argparse.ArgumentParser()parser.add_argument("--backend",type=str,default="gloo")parser.add_argument("--model-type",type=str,default="custom",)parser.add_argument("--hosts",type=list,default=json.loads(os.environ["SM_HOSTS"]))parser.add_argument("--current-host",type=str,default=os.environ["SM_CURRENT_HOST"])return parser.parse_args()if __name__=="__main__":args = parse_args()#world_size = len(args.hosts)host_rank = args.hosts.index(args.current_host)print(f'host rank is {host_rank}')dist.init_process_group(backend=args.backend,rank=host_rank,world_size=world_size)# devicedevice = "cuda"# modelmodel = Model(input_size, output_size)# single machine gpusmodel = torch.nn.DataParallel(model)# multiple machine gpus# model = torch.nn.parallel.DistributedDataParallel(model) gen datarand_loader = DataLoader(dataset=RandomDataset(input_size, data_size),batch_size=batch_size,shuffle=True)# trainfor data in rand_loader:input = = model(input)print("Outside: input size", input.size(), "output_size", output.size())
Then create a SM training job
estimator_3 = PyTorch(role="arn:aws:iam::392194582387:role/RoleForDataScientistUserProfile",entry_point="",framework_version="2.0.1",py_version="py310",instance_count=1,instance_type="ml.g5.12xlarge",hyperparameters={'backend': 'gloo','model-type': 'custom'},distribution={# mpirun backend"pytorchddp": {"enable": True}},)
SageMaker SDP and SMP Library#
- Apply to p4 instance only HERE
- DataParallel and
Only support some large instances like ml.p4d.24xlarge and ml.p4de.24xlarge. Stoped supporte for P3 instances already at this moment.
First we need to modify code
import torch.distributed as distimport smdistributed.dataparallel.torch.torch_smddpdist.init_process_group(backend="smddp")
Then create a SM training job
from sagemaker.pytorch import PyTorchpt_estimator = PyTorch(base_job_name="training_job_name_prefix",source_dir="subdirectory-to-your-code",entry_point="",role="SageMakerRole",py_version="py310",framework_version="2.0.1",# For running a multi-node distributed training job, specify a value greater than 1# Example: 2,3,4,..8instance_count=2,# Instance types supported by the SageMaker data parallel library:# ml.p4d.24xlarge, ml.p4de.24xlargeinstance_type="ml.p4d.24xlarge",# Activate distributed training with SMDDPdistribution={ "pytorchddp": { "enabled": True } } # mpirun, activates SMDDP AllReduce OR AllGather# distribution={ "torch_distributed": { "enabled": True } } # torchrun, activates SMDDP AllGather# distribution={ "smdistributed": { "dataparallel": { "enabled": True } } } # mpirun, activates SMDDP AllReduce OR AllGather)"s3://bucket/path/to/training/data")
Output should look like this
host rank is 0NCCL version 2.17.1+cuda11.8algo-1:46:63 [0] configure_nvls_option:287 NCCL WARN NET/OFI Could not find ncclGetVersion symbolIn Model: input size torch.Size([8, 5]) output size torch.Size([8, 2])<torch.cuda.device object at 0x7f51331f6ad0>NVIDIA A10GIn Model: input size torch.Size([8, 5]) output size torch.Size([8, 2])<torch.cuda.device object at 0x7f51331f5c00>NVIDIA A10GIn Model: input size torch.Size([8, 5]) output size torch.Size([8, 2])<torch.cuda.device object at 0x7f51331f6740>NVIDIA A10GIn Model: input size torch.Size([6, 5]) output size torch.Size([6, 2])<torch.cuda.device object at 0x7f51331f6860>NVIDIA A10GOutside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])In Model: input sizetorch.Size([8, 5]) output size torch.Size([8, 2])<torch.cuda.device object at 0x7f51331f6800>NVIDIA A10GIn Model: input sizetorch.Size([8, 5])In Model: input sizeIn Model: input size torch.Size([6, 5])torch.Size([8, 5])output sizeoutput size torch.Size([8, 2])torch.Size([8, 2])<torch.cuda.device object at 0x7f51331f5f30>NVIDIA A10Goutput size torch.Size([6, 2])<torch.cuda.device object at 0x7f51331f68f0>NVIDIA A10G<torch.cuda.device object at 0x7f51331f67d0>NVIDIA A10GOutside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])In Model: input size torch.Size([8, 5]) output size torch.Size([8, 2])In Model: input size<torch.cuda.device object at 0x7f51331f66e0>NVIDIA A10Gtorch.Size([8, 5])output sizeIn Model: input sizetorch.Size([8, 2])torch.Size([8, 5])<torch.cuda.device object at 0x7f51331f6a40>#011In Model: input size output sizetorch.Size([6, 5])NVIDIA A10Gtorch.Size([8, 2])output size torch.Size([6, 2])<torch.cuda.device object at 0x7f51331f6680>NVIDIA A10G<torch.cuda.device object at 0x7f51331f6a10>NVIDIA A10GOutside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])In Model: input sizetorch.Size([3, 5])#011In Model: input sizeoutput size torch.Size([3, 2])<torch.cuda.device object at 0x7f51331f68f0>NVIDIA A10Gtorch.Size([3, 5])output size torch.Size([3, 2])<torch.cuda.device object at 0x7f51331f67a0>In Model: input sizeNVIDIA A10Gtorch.Size([3, 5])output size torch.Size([3, 2])<torch.cuda.device object at 0x7f51331f5f30>NVIDIA A10GIn Model: input size torch.Size([1, 5]) output size torch.Size([1, 2])<torch.cuda.device object at 0x7f51331f66e0>NVIDIA A10G