|
鎮中和環データホエール データホエールの共有 著者:Zhenzhong Hehuan、出典:Zhihu 志胡:鎮中和歓。編集者: Bao Bao アルゴリズム ノート アドレス: https://zhuanlan.zhihu.com/p/... なぜこの記事を書いたのか?私の仕事はインフラ系ではないものの、学習手法の研究や学習フレームワークの改良に興味があります。最近、OpenRLHFのソリューションであるRayを使ってVLLMを管理する方法に出会い、非常に興味を持ちました。そこで調べてみたところ、VLLMのTP分割とMegatronのロジックは同じであることがわかりました。TorchRPCはRayのリモート呼び出しを置き換えることもできるので、Megatron + TorchRPC + VLLMで同様のフレームワークを実装し、将来的にはVLLMをMegatron推論に直接置き換える予定です。この大きなプロジェクトに着手する前に、この記事を書く機会を得ました。これは一種の起工式と言えるでしょう。 この記事の主な内容この記事では、主にLLMトレーニングフレームワークに必要なプログラミング知識をプログラミングの観点から解説し、現在のLLMトレーニングフレームワークに適用可能な応用テクニックを紹介することで、トレーニングフレームワークのコードロジックの理解を支援します。例えば、以下はマルチカード通信グループを初期化するMegatronコードです。 rank = torch.distributed.get_rank() for ranks in rank_generator.get_ranks('dp'): # 迭代生成所有数据并行ranks的列表 group = torch.distributed.new_group( ranks, timeout=timeout, pg_options=get_nccl_options('dp', nccl_comm_cfgs) ) # 根据数据并行ranks创建通信组 group_gloo = torch.distributed.new_group(ranks, timeout=timeout, backend="gloo") if rank in ranks: # 如果当前rank属于这个数据并行ranks,则保存创建的通信组 _DATA_PARALLEL_GROUP = group _DATA_PARALLEL_GROUP_GLOO = group_gloo _DATA_PARALLEL_GLOBAL_RANKS = ranks この記事を読むと、次のことが理解できるようになります。 - ランクとは何ですか? world_size とは何ですか? コミュニケーション グループとは何ですか?
- このコードはなぜ最初に通信グループを作成し、次にランクの順位に基づいて保存するかどうかを決定するのでしょうか? ランクの順位を最初にチェックしてからグループを作成した方がよいのではないでしょうか?
- 子プロセスと子スレッドで通信グループを作成する際の違いや注意点は何ですか?
- backend="gloo": バックエンドとは何ですか? glooバックエンドは何をしますか? ncclバックエンドを使わないのはなぜですか?
たとえば、DeepSpeed によってパラメータに登録されたコールバック関数は次のとおりです。 def create_reduce_and_remove_grad_hooks(self): self.grad_accs = [] for i, param_group in enumerate(self.bit16_groups): # 遍历混精优化器的参数 for param in param_group: if param.requires_grad: def wrapper(param, i): param_tmp = param.expand_as(param) # 在原始的参数上建立一个视图 grad_acc = param_tmp.grad_fn.next_functions[0][0] # 获得这个这个视图的前一个算子 def reduce_partition_and_remove_grads(*notneeded): self.reduce_ready_partitions_and_remove_grads(param, i) self._grad_acc_hooks.append(grad_acc.register_hook(reduce_partition_and_remove_grads)) # 在视图的前一个算子上注册回调函数 self.grad_accs.append(grad_acc) wrapper(param, i) この記事を読むと、次のことが分かります。 - register_hook は何を実行し、いつトリガーされますか?
- 通常、フックはパラメータに直接登録されます。では、なぜここでは、パラメータのビューの前の演算子でフックが登録されているのでしょうか? param.expand_as(param).grad_fn.next_functions[0][0]
本文は以下から始まります。 トレーニング フレームワークの目的は何ですか?現在、LLMトレーニングにはDeepspeedとMegatron-LMという2つの主要なフレームワークがあります。前者は主にMicrosoftのエンジニアによって提案・保守され、後者はNVIDIAのエンジニアによって開発されました。この2つのフレームワークは、その基本原理と設計言語が大きく異なります。トレーニングフレームワークの主な目標は2つあります。1つ目は、限られたGPUに可能な限り大規模なモデルを詰め込むこと、2つ目は、トレーニングに複数のGPUを効率的に活用することです。最初の目標の達成には、主にモデルのパーティショニング、より広義には単一GPUのメモリ使用量の削減が不可欠です。2つ目の目標の達成には、非同期で高度なオーバーラップと高帯域幅のデータ通信が不可欠です。Deepspeedは、GPUメモリ使用量、逐次並列処理、CPUオフロードを削減するために、主にゼロ1、2、3の手法を採用しています。効率的な通信のために、レジスタ/フックコールバック関数、複数のCUDAイベントストリーム、GPU計算オーバーラップ、連続ページキャッシュを介した非同期通信が主に活用されています。 MegatronのGPUメモリ削減の主な手法には、分散オプティマイザ、テンソルモデル並列、パイプラインモデル並列、逐次並列処理などがあります。効率的な通信のために、P2P通信、オーバーラップパイプライン並列、勾配キャッシングなどが主な手法として採用されています。設計言語の観点から見ると、DeepSpeedは外部フレームワークです。フレームワークはモデルの順方向計算グラフに介入しないため、モデル構造に特別な要件はありません。コアコードは、多数のコールバック関数とtorch派生クラスを通じてカプセル化されており、ユーザーには公開されていません。一方、Megatronは、モデルの計算グラフを直接変更する組み込みフレームワークです。そのため、モデル構造をTransformerのような構造に制限し、すべてのコードを公開し、コールバック関数への依存度を低くしています。外部フレームワークの利点は、互換性が良く、初心者に優しいことです。欠点は、起動が遅く、計算速度が若干低下することです。これらは、データセットが小さく、トレーニングの高速化技術に重点を置かず、モデルとデータの反復処理に集中する人に適しています。組み込みフレームワークの利点は、起動とトレーニングの効率性が高いため、基盤となる構造を大幅に変更したい人にとってはユーザーフレンドリーです。しかし、トレーニングロジックに軽微な変更を加えたい人にとってはユーザーフレンドリーとは言えず、大規模なトレーニングや並列トレーニングコードを理解・修正したい人に適しています。 こんにちは世界まずは「Hello world」から始めましょう。 import torch import os if __name__ == '__main__': rank = int(os.getenv('RANK','0')) world_size = int(os.getenv('WORLD_SIZE','1')) torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl') print(f'Hello World from rank:{torch.distributed.get_rank()} out of {torch.distributed.get_world_size()} worlds.') コードの名前が t1.py だとすると、`torchrun --nproc-per-node 2 t1.py` を使って起動できます。(特に指定がない限り、このパラメータセットが起動時に使用されます。後ほど提供されるデモのほとんどは GPU なしで実行できます。テンソルの作成場所を CPU に変更し、通信関連のすべてのバックエンドに gloo バックエンドを使用するだけです。) 通常の状況では、プログラムは次のコードを出力します。これについては簡単に説明します。 起動コマンドtorchrunはtorchのインストール時に付属するコマンドです。このコマンドは、指定されたスクリプトをマルチプロセスで起動するのに役立ちます。 分散環境では、まず複数のマシンと複数のGPUを区別する必要があります。複数のマシンとは、トレーニングタスクを実行する複数のサーバーを指し、複数のGPUとは、複数のグラフィックカードを搭載した単一のマシンを指します。一般的に、サーバーはノードと呼ばれ、ノードの数はサーバーの数と等しくなります。 `--nproc-per-node` という名前は、各サーバーで起動されるプロセスの数を明確に示しています。通常、サーバーのGPU数と同じ数のプロセスを起動します。例えば、8GPUサーバーの場合、`nproc-per-node` は8に設定されます。もちろん、8GPUサーバーでは2プロセスしか起動できないこともあれば、100プロセスしか起動できないこともあります。これは厳密な制限ではありません。しかし、プロセス数をGPU数と同じにすることでプログラミングロジックが簡素化され、各プロセスが1つのGPUでのみ計算を処理できるようになります。 nproc-per-node 以外にも、起動コマンドにはいくつかの共通パラメータがあります。 - `--master_addr` と `--master_port`: マルチマシン環境を起動する場合、ホストのIPアドレスとポート番号を指定するために、これら2つのパラメータが必要です。これらの2つのパラメータは、マスターマシンとスレーブマシンの両方で同じであり、変更する必要はありません。
- --nnodes: このパラメータは、複数のマシンで分散システムを起動する際に使用され、マシンの総数を指定します。マスターは、このパラメータの設定に基づいてスレーブの接続を待機し、スレーブ数がnnodesに達するまで待機してから起動します。このパラメータは各マシンで同じであり、変更する必要はありません。
- --node_rank: 現在のノードの全ノード中における順位を示します。このパラメータは、起動元のマシンに応じて変更する必要があります。マスターノードの順位は0である必要があります。
上記の起動パラメータは、`torchrun` の後、スクリプトパスの前に配置するようにしてください。スクリプトパスの前に記述されたパラメータはスクリプトの実行環境に渡されないため、スクリプト内で処理されるべきではありません。 ランクと世界規模プログラムを開始するトレーニング スクリプトでは、最初に分散環境を初期化する必要があります。これは次の行で実行されます。 rank = int(os.getenv('RANK','0')) world_size = int(os.getenv('WORLD_SIZE','1')) torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl') `torchrun` で起動したスクリプトを使用している場合、torch は `RANK` 変数と `WORLD_SIZE` 変数を環境変数に自動的に追加し、スクリプトから読み取ることができます。これらはそれぞれ現在のプロセス番号とプロセスの総数を表します。プロセス番号はグローバルです。つまり、各サーバーが 8 個のプロセスを開始し、合計 10 台のサーバーがトレーニングに参加する場合、10 台目のマシン上の 8 個のプロセスのランクは 72、73、74、75、76、77、78、79 になります。`world_size` もグローバルなプロセス数であり、単一サーバー上のプロセス数ではありません。 ランクと world_size を取得したら、torch.distributed.init_process_group を使用して分散環境を初期化できます。 初期化後のどこからでも、torch.distributed.get_rank() と torch.distributed.get_world_size() を使用して、現在のプロセス ID とプロセスの合計数を取得できます。 もう一つ言及しておくべき点は、Torchには「local_rank」という概念がないことです。これは一部の学習フレームワークで定義されているカスタム概念であり、通常はサーバー上の現在のプロセスを示すために使用されます。 バックエンド初期化コマンドでは、`backend` パラメータも指定しました。このパラメータは、分散環境がデフォルトで使用する通信バックエンドを指定します。一般的な選択肢としては、nccl、gloo、mpi などがあります。GPU間通信ではncclバックエンドが使用されます。CPU間通信ではglooバックエンドが使用され、mpiバックエンドは通常使用されません。 nccl通信はglob通信よりも高速なので、可能な限りnccl通信を使用するべきです。ただし、データ読み取りフェーズなど、複数のプロセス間で通信が必要な場合、通常はglob通信が使用されます。これは、順伝播を開始していないテンソルがGPUに早期に出現することを防ぎ、GPUメモリの過剰な消費を回避するためです。後述のグループに関するセクションでは、トレーニングプロセスでncclバックエンドを使用し、データ読み取りプロセスでglobバックエンドを使用する方法について説明します。 トレーニングスクリプトの冒頭にあるいくつかの細かい詳細デバイスを設定する分散学習では、GPUテンソルを作成する際に、`torch.cuda.current_device()` を使用して現在のランクで使用されているCUDAデバイスを取得し、そのデバイス上でテンソルを直接作成することがよくあります。この関数を使用するには、事前に `set_device` を使用してデフォルトのデバイスを明示的に設定しておく必要があります。さらに、一部の演算子では、ユーザーが `set_device` を実行しておく必要があります。 したがって、メインのトレーニング ロジックを実行する前に、できるだけ早くデフォルトの CUDA デバイスを設定することをお勧めします。 devices = torch.cuda.device_count() torch.cuda.set_device(rank % devices) 手動で設定されていない場合、current_device はデフォルトで cuda:0 を返します。 固定ランダムシード分散学習において、ランダムシードは見落とされがちですが、非常に重要な要素です。スクリプトの初期化時に固定のランダムシードを設定すると、実験の再現性が向上し、異なるランクのモデルが同じように初期化されることが保証されます。そうでなければ、モデルを同期させ、初期化状態の一貫性を維持するために通信操作が必要になります。 def set_random_seed(seed): random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) 通信オペレーター通信演算子とは、分散学習において異なるランク間でのデータ交換を可能にする演算子を指します。PyTorchは多くの通信関連の演算子を提供しており、その中で最もよく使用されるものは、リダクション、集約、ブロードキャスト、ピアツーピア通信、その他という5つのカテゴリに分類できます。 規則最も典型的なリダクション演算子は `torch.distributed.all_reduce` です。この演算子は、異なるランクにまたがるデータに対して、合計、平均計算、最大値計算などの演算を実行できます。その主な特徴は、ランク数に関係なく、最終結果が単一のテンソルになることです。例えば、次のコードはすべてのランクのテンソルを合計します。 import torch import os if __name__ == '__main__': rank = int(os.getenv('RANK','0')) world_size = int(os.getenv('WORLD_SIZE','1')) torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl') devices = torch.cuda.device_count() torch.cuda.set_device(rank % devices) tensor = torch.tensor([rank +。1], dtype=torch.long, device='cuda') torch.distributed.all_reduce(tensor) print(f'rank:{rank} {tensor}') ランク0のテンソルは1、ランク1のテンソルは2です。合計は3なので、出力は次のようになります。縮約演算は、最も一般的に使用される通信演算子と考えることができます。例えば、縮約演算はランク間で勾配を同期させ、平均損失を計算するために使用されます。言い換えれば、分散ソフトマックス法では、縮約演算を使用して最大値を計算し、分母を正規化します。 集会最も典型的な集約操作は `torch.distributed.all_gather()` であり、これは異なるランクからデータをまとめて収集できます。この操作の特徴は、取得されるテンソルの数が通信に参加しているランクの数と等しいことです。例えば、次のコードはすべてのランクからテンソルをまとめて収集します。 import torch import os if __name__ == '__main__': rank = int(os.getenv('RANK','0')) world_size = int(os.getenv('WORLD_SIZE','1')) torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl') devices = torch.cuda.device_count() torch.cuda.set_device(rank % devices) tensor = torch.tensor([rank+1], dtype=torch.long, device='cuda') tensors = [torch.empty_like(tensor) for _ in range(world_size)] torch.distributed.all_gather(tensors,tensor) print(f'rank:{rank} {tensors}') 「すべての」ランクは「すべての」テンソルを収集するため、各ランクは他のすべてのランクとそれ自身のテンソルを持ちます。したがって、次のように出力されます。「all_gather」は「all_reduce」をシミュレートできます。例えば、「sum(tensors)」は「all_reduce sum」と等しく、「max(tensors)」は「all_reduce max」と等しくなります。また、リスト乗算を使用してテンソルを作成することはできないことにも注意してください。 tensors = [torch.empty_like(tensor)] * world_size Python変数との通信をサポートする「all」「_gather」「_object」などの操作を除き、ほとんどの操作はインプレース操作であり、リスト乗算は各テンソルが同じオブジェクトを指すテンソルを作成するため、インプレース操作と競合します。ギャザーリングはリダクションよりも柔軟性に優れていますが、リダクションは通常、より効率的でGPUメモリの使用量が少なくなります。 放送ブロードキャストの典型的な操作は `torch.distributed.broadcast()` です。これは、指定されたランクのテンソルを他のすべてのランクに送信します。例えば、次のコードはランク0のテンソルを他のすべてのランクに送信し、すべてのランクのテンソルが最終的に同一になるようにします。 import torch import os if __name__ == '__main__': rank = int(os.getenv('RANK','0')) world_size = int(os.getenv('WORLD_SIZE','1')) torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl') devices = torch.cuda.device_count() torch.cuda.set_device(rank % devices) tensor = torch.tensor([rank+1], dtype=torch.long, device='cuda') torch.distributed.broadcast(tensor,0) print(f'rank:{rank} {tensor}') 実行後、次のような結果が表示されます。ブロードキャストは、データが1つのノードからすべてのノードに流れるという特徴があります。これは通常、特定のランクにのみ表示される信号をすべてのランクに送信するために使用されます。例えば、動的データトレーニングでは、最後のバッチに到達したかどうかは、最後の並列ランクが完全なマイクロバッチを取得できるかどうかによって通知されます。この場合、ブロードキャストを使用して、最後の並列ランクの終了信号を他のすべてのランクにブロードキャストできます。別の例として、Megatronでは、テンソル並列処理を使用する場合、最初のテンソルランクのみがデータを受信し、他のテンソルランクのデータは最初のテンソルランクからブロードキャストされます。 ピアツーピア通信、P2P前述の全ランク間の通信に加えて、2つのランク間のペアワイズ通信が必要になる場合もあります。このような場合にピアツーピア (P2P) 通信が役立ちます。P2P通信では、送信側は `torch.distributed.send()` を使用し、受信側は `torch.distributed.recv()` を使用します。例えば、次のコードは、すべての偶数ランクのユニットから次の奇数ランクのユニットにテンソルを送信します。 import torch import os if __name__ == '__main__': rank = int(os.getenv('RANK','0')) world_size = int(os.getenv('WORLD_SIZE','1')) torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl') devices = torch.cuda.device_count() torch.cuda.set_device(rank % devices) if rank % 2 == 0: tensor = torch.tensor([999], dtype=torch.long, device='cuda') torch.distributed.send(tensor, rank+1) else: tensor = torch.empty(1, dtype=torch.long, device='cuda') torch.distributed.recv(tensor,rank-1) print(f'rank:{rank} {tensor}') 実行すると以下の結果が得られます。上記の演算子のほとんどはインプレース演算であるため、問題が発生します。インプレース演算では、事前に空のテンソルを作成し、通信演算子がデータを入力するのを待つ必要があります。しかし、ランクは通信するテンソルの形状をどのように把握し、この空のテンソルを事前にどのように作成できるのでしょうか?この問題は、ブロードキャスト通信やP2P通信で特によく見られます。一般的な解決策は、まず通信するテンソルの形状を表す固定のndim値を持つテンソルを通信し、次に実際のデータを通信することです。 import torch import os if __name__ == '__main__': rank = int(os.getenv('RANK','0')) world_size = int(os.getenv('WORLD_SIZE','1')) torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl') devices = torch.cuda.device_count() torch.cuda.set_device(rank % devices) if rank % 2 == 0: tensor = torch.randn(1, 4, dtype=torch.float16, device='cuda') shape_tensor = torch.tensor(tensor.size(), dtype=torch.long, device='cuda') torch.distributed.send(shape_tensor, rank+1) torch.distributed.send(tensor, rank+1) else: shape_tensor = torch.empty(2, dtype=torch.long, device='cuda') torch.distributed.recv(shape_tensor, rank-1) tensor = torch.empty(torch.Size(shape_tensor), dtype=torch.float16, device='cuda') torch.distributed.recv(tensor, rank-1) print(f'rank:{rank} {tensor}') このアプローチは、通信プロトコルの定義に似ています。最初のハンドシェイクで形状を伝達し、2番目のハンドシェイクでデータを伝達します。使用中にndim値を固定できない場合、またはテンソルのdtypeも伝達する必要がある場合は、shape=(10,)のような、より長い固定長の形状テンソルを定義できます。最初の9ビットで伝達するデータテンソルの形状を表し、必要に応じて0をパディングし、最後のビットをデータ型を表す数値として使用します。ポイントツーポイント通信の主な用途は、データが1つのパイプから次のパイプに送信されるパイプライン並列処理、確率を生徒に送信する教師モデル、および同様の参照ポリシーモデルです。 その他の通信事業者よく使われるもう一つの方法は、同期バリア `torch.distributed.barrier()` です。この操作はデータを一切通信しません。その目的は、すべてのプロセスが後続のアクションを開始する前に、このポイントに到達していることを確認することです。例えば、チェックポイントを保存する場合、通常は並列実行される最初のランクのみを保存します。他のランクは同期バリアを使用して、最初のランクの保存が完了するまで待機する必要があります。これにより、他のランクが新しい計算を開始したり、途中で終了したりして保存に失敗するのを防ぎます。例えば、次のようになります。 import torch import os import time if __name__ == '__main__': rank = int(os.getenv('RANK','0')) world_size = int(os.getenv('WORLD_SIZE','1')) torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl') devices = torch.cuda.device_count() torch.cuda.set_device(rank % devices) if rank == 0: time.sleep(20) # 用sleep20秒模拟rank0在做一些高耗时行为,比如存储checkpoint else: a = 1 + 2 torch.distributed.barrier() 上記のタイプに加えて、全対全演算子と散布演算子も存在します。LLMシナリオでは、これら2つの演算子は並列シーケンスの順方向伝播と逆方向伝播で頻繁に使用されます。 通信オペレータを使用する場合、関連するすべてのランクがこのコード セグメントを実行するようにする必要があります。そうしないと、すでにこのステップに到達しているランクがハングして無期限に待機することになり、プログラムを続行できなくなります。 通信モードについて上記の通信演算子は、一般的にそれぞれ固有の通信モードに対応しています。例えば、reduce演算子にはtree-reduceとring-reduceの通信モードがあり、broadcastやP2Pなどもそれぞれ独自の通信モードに対応しています。LLMトレーニングフレームワークの開発レベルでは、ring-reduceとtree-reduceの通信モードに注意を払う必要があるかもしれません。理論的には、これら2つの通信モードはall_reduceの基盤実装に属しており、torchレベルではあまり重視する必要はありません。しかし、実際の開発では、次のような関連する問題に何度も遭遇しました。 - all_reduceアルゴリズムは、通信データ量、GPU数、NVLink接続構造に基づいて、リング型かツリー型のどちらを使用するかを決定します。そのため、並列モデル数とGPU数が変化すると、同じall_reduce操作でも異なる通信モードが使用される可能性があります。特にツリー型削減を使用し、通信環境が複雑な場合、メモリオーバーフローのバグが発生することがあります。
- リングリデュースとツリーリデュースの精度低下は、GPUの種類によって異なります。この問題の調査については、Zhihuに優れたデバッグ記事があります。「AIトレーニングと計算:A800プラットフォームでInternLM-7Bをトレーニングする際の収束失敗に関する考察」(https://zhuanlan.zhihu.com/p/...)
- また、Torch バージョン 2.3 では、コミュニケータが間違った数のランクを作成したり、cuBus 再利用エラーが発生したりするなどの問題も発生しましたが、これらはバージョン 2.4 で修正されました。
同様の問題が発生していると思われる場合は、環境変数NCCL_ALGOを使用して、ツリー型とリング型のどちらを使用するかを強制的に指定することを検討してください。または、all_reduceをall_gatherに置き換えるか、Torchのバージョンを更新してみてください。 コミュニケーションに関わることを決意するということは、torch の安定した舞台裏を離れ、苦痛を伴うデバッグの最前線に足を踏み入れることを意味します。 コミュニケーショングループ上記の演算子は、P2P通信を除き、基本的に全ランク間の通信を伴います。これはかなり厳密なものです。最初の5ランクでall-reduce操作を、最後の5ランクでall-gather操作を実現するにはどうすればよいでしょうか?奇数ランクにのみブロードキャストするにはどうすればよいでしょうか?一部の操作をnccl通信を用いてGPUで実行し、他の操作をgloo通信を用いてCPUで実行するような操作を実装するにはどうすればよいでしょうか?2つのランクが同時にP2P通信を使用している場合、異なるテンソルをどのように区別すればよいでしょうか?ここで通信グループの出番となります。 注: 以下の例はやや複雑になります。以降のスクリプトはすべて4ランクから開始されるため、これ以降の`--nproc-per-node`の値はすべて4になります。 単一のコミュニケーショングループを作成する通信グループは次のように作成されます。 ranks = [0,1] group = torch.distributed.new_group(ranks,backend='nccl') 上記のコードは、ncclをバックエンドとして、rank0とrank1を含む通信グループを作成します。通信演算子を使用する場合、`group`パラメータを使用して通信グループを指定することにより、データ交換はグループ内でのみ行われます。例えば、バリアを使用する場合、通信グループを指定することにより、グループ内のすべてのランクがバリアに到達するまで待つことなく、グループ内のすべてのランクがバリアに到達する限りプロセスを続行できます。 import torch import os if __name__ == '__main__': rank = int(os.getenv('RANK','0')) world_size = int(os.getenv('WORLD_SIZE','1')) torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl') devices = torch.cuda.device_count() torch.cuda.set_device(rank % devices) ranks = [0,1] group = torch.distributed.new_group(ranks) if rank in ranks: torch.distributed.barrier(group=group) print(f'rank:{rank} finish') else: torch.distributed.barrier() print(f'rank:{rank} finish') このコードを実行すると、ランク0と1のバリアが通信グループを指定しているため、ランク0と1はバリアに到達して「finish」を出力するとすぐに処理を続行できることがわかります。ランク2と3のバリアは通信グループを指定していないため、すべてのランクが到着するまで待機します。しかし、ランク0と1はこのelse分岐に入らないため、ランク2と3は停止します。 通信関連の演算子はすべて通信グループの指定をサポートしており、その機能は類似しています。つまり、すべてのランク間通信をグループ内のランク内通信に変換します。P2P通信の場合、グループは通信識別子としても機能します。2つのランク間で複数のP2P通信が同時に発生した場合、異なるグループを使用することで、異なる通信を区別することができます。 複数のコミュニケーショングループを作成する複数の通信グループを作成し、それぞれが異なるバックエンドを使用するように指定することで、CUDA または CPU テンソルの混合通信を容易にすることができます。 group1 = torch.distributed.new_group([0,1]) group2 = torch.distributed.new_group([2,3]) group3 = torch.distributed.new_group([1,2,3]) group4 = torch.distributed.new_group([0,3], backend='gloo') 通信グループを使用する場合、ランク自体がそのグループのメンバーである必要があります。例えば、rank3はgroup1を使用できません。では、rank3がgroup1を使用できないのであれば、rank3はgroup1を作成できず、group2、3、4のみを作成できるのでしょうか?答えは「いいえ」です。PyTorchには、通信グループを作成するための2つの要件があります。 - 通信グループに関係するすべてのランクは、通信グループを作成するコードを実行する必要があります。
- 通信グループの作成順序は、すべてのランクで同じである必要があります。
例えば、rank0で最初に作成される通信グループが[0, 1]の場合、rank1で最初に作成される通信グループも[0, 1]でなければなりません。rank2とrank3も同様です(ただし、rank2とrank3はこの通信グループを使用できません)。したがって、複数の通信グループを作成するためのコードは、一般的に次のようになります。 group = None rank = torch.distributed.get_rank() for ranks in [[0,1],[2,3]]: _group = torch.distributed.new_group(ranks) if rank in ranks: group = _group これにより、グループの作成順序が一貫しており、それぞれのランクで使用可能なグループのみが保持されます。この設計は理にかなっています。例えば、互いの変数にアクセスできない2つの並列プロセスを想像してみてください。2つのプロセスは、相手がどの通信グループを使用しているかをどのように知るのでしょうか?まず、通信グループを構成するランクは不適切です。なぜなら、2つのグループが同じランクで構成され、一意性が失われる可能性があるからです。作成時間も不適切です。各ランクは完全に同期されていないからです。各グループに一意のIDを手動で割り当てるか、自動増分IDを使用することもできます。 これでTorchの基本的な分散トレーニング機能の説明は終わりです。それでは、これらの機能をすべて活用した小さなデモを作成しましょう。 分散トレーニングのデモ言語モデルをコンテキストとして用いた蒸留フレームワークを実装します。ここで、教師モデルと生徒モデルは、言語モデルの入力と出力をシミュレートする疑似モデルです。蒸留フレームワークはデータ並列処理をサポートし、トレーニングデータもシミュレートされます。分散オプティマイザーはカスタム実装する必要があります。また、このフレームワークは、教師モデルと生徒モデル間のオーバーラップ計算とデータ通信をサポートする必要があります。(興味がある場合は、Megatronフレームワーク内で蒸留フレームワークを実装することをお勧めします。) 以下、ステップバイステップで実装していきます。上記で既に実装済みの関数の中には、ここでは再度実装しないものもあります。デモを作成する際は、通常、ステップごとに説明し、完全な直接実行可能なコードは提供しません。 シミュレーションデータ 言語モデルの入力をシミュレートし、偽データを生成するイテレータを作成します。言語モデルの入力はトークンのシーケンスで、shape = [batch_size, seq_length]、値は0 - vocab_sizeの整数テンソルです。seq_lengthは毎回ランダムな値です。 import torch class Dataloader: def __init__(self,batch_size, max_length, vocab_size): self.batch_size = batch_size self.max_length = max_length self.vocab_size = vocab_size def __iter__(self): while True: length = torch.randint(2,self.max_length,size=(1,)) input_ids = torch.randint(0,self.vocab_size,size=(self.batch_size,length),device='cpu') yield input_ids 教師と生徒のモデル 教師モデルと学習モデルはどちらも言語モデルです。入力はトークン列であり、出力はそのトークン列の語彙確率です。これは[batch_size, seq_length, vocab_size]の形状を持つ浮動小数点テンソルです。ここでは、パラメータはlm headのみであると仮定します。 class Model(torch.nn.Module): def __init__(self, vocab_size): super().__init__() self.lm_head = torch.nn.Parameter(torch.randn(1, vocab_size, dtype=torch.float16)) def forward(self,input_ids:torch.Tensor): logits = input_ids.unsqueeze(-1).to(self.lm_head.dtype) @ self.lm_head probs = logits.softmax(-1) return probs 分散オプティマイザー 生徒モデルのパラメータを更新するために、分散オプティマイザを実装する必要があります。データ並列処理を使用しているため、勾配は各カードに分散されます。モデルパラメータを更新する前に、すべての勾配を合計するall-reduce演算を実行する必要があります。教師モデルにはオプティマイザがないため、all-reduce演算には参加しないため、オプティマイザには全生徒の順位を含む通信グループパラメータを渡す必要があります。 class DistrubutedAdam(torch.optim.Adam): def __init__(self, *args, group=None, **kwargs): self.group = group super().__init__(*args, **kwargs) def step(self, closure=None): if closure is not None: closure.mean().backward() for group in self.param_groups: for param in group['params']: if param.grad is not None: torch.distributed.all_reduce(param.grad, group=self.group) super().step() コミュニケーショングループを作成する 私たちのミッション目標に基づいて、次の 2 種類のコミュニケーション グループが必要です。 - データ並列通信グループは、モデルの役割 (生徒または教師) を区別するために使用され、オプティマイザーは現在の役割のモデルのみを削減します。
- 教師と生徒のコミュニケーショングループは、データ交換が必要な教師と生徒を識別するために使用されます。このシンプルなコミュニケーション設定ではこのグループは不要であるため、作成せずにそのままにしておくことができます。
ランク分割方法は次のように定義します。 - world_size の合計数は偶数でなければなりません。前半は生徒として使用され、後半は教師として使用されます。
- Student0 と teacher0 はデータを通信するためにペアになっています。また、student1 と teacher1 はデータを通信するためにペアになっています...
通信グループは次のように作成されます。 data_parallel_group = None teacher_student_group = None def create_group(): rank = torch.distributed.get_rank() world_size = torch.distributed.get_world_size() assert world_size % 2 == 0 student_ranks = list(range(world_size // 2)) teacher_ranks = list(range(world_size // 2, world_size)) global data_parallel_group global teacher_student_group for ranks in [student_ranks,teacher_ranks]: group = torch.distributed.new_group(ranks, backend='nccl') if rank in ranks: data_parallel_group = group for ranks in zip(student_ranks,teacher_ranks): group = torch.distributed.new_group(ranks, backend='nccl') if rank in ranks: teacher_student_group = group データの送受信 教師モデルによって計算された確率密度関数(prob)は、生徒モデルに送信する必要があります。各データポイントは1つの教師モデルによってのみ計算され、教師モデルの計算結果は1つの生徒にのみ送信されればよいため、ここではピアツーピア(P2P)通信が使用されます。入力シーケンスの長さはランダムに変化するため、前述の動的に整形されるP2P通信を使用する必要があります。前述のように、P2Pではsend演算子とrecv演算子を使用すると述べましたが、これらは同期演算子です。ここでは非同期演算子を使用します。 テンソル送信 最初にシェイプを送信し、次に実際のテンソルを送信します。 def send_tensor(tensor, dst): shape = torch.tensor(tensor.shape, dtype=torch.int64, device='cuda') ops = [] ops.append(torch.distributed.P2POp( torch.distributed.isend, shape, dst, group=teacher_student_group )) reqs = torch.distributed.batch_isend_irecv(ops) for req in reqs: req.wait() ops = [] ops.append(torch.distributed.P2POp( torch.distributed.isend, tensor, dst, group=teacher_student_group )) reqs = torch.distributed.batch_isend_irecv(ops) for req in reqs: req.wait() テンソルは受け取る この実装は単純です。テンソルの ndim と dtype がパラメータとして渡されます。 def recv_tensor(src, ndim, dtype): shape = torch.empty(ndim, dtype=torch.int64, device='cuda') ops = [] ops.append(torch.distributed.P2POp( torch.distributed.irecv, shape, src, group=teacher_student_group )) reqs = torch.distributed.batch_isend_irecv(ops) for req in reqs: req.wait() tensor = torch.empty(torch.Size(shape), dtype=dtype, device='cuda') ops = [] ops.append(torch.distributed.P2POp( torch.distributed.irecv, tensor, src, group=teacher_student_group )) reqs = torch.distributed.batch_isend_irecv(ops) for req in reqs: req.wait() return tensor 注: 非同期P2P演算は、まず演算子(ops)を作成し、次にそれらをバッチ(batch、_isend、_irecv)で実行することで、並列性を高めることができます。例えば、MegatronのP2P並列処理では、計算結果を次のカードに送信することと、次のカードから勾配を受信することが同時に行われます。 メイン関数 上記の重要なモジュールの機能はすべて実装済みです。残っているのはメイン関数のみです。まず、環境を初期化し、デフォルトのCUDAデバイスを設定します。次に、通信グループを作成し、語彙サイズを20に定義し、教師ランクオフセットを計算します。 if __name__ == '__main__': rank = int(os.getenv('RANK','0')) world_size = int(os.getenv('WORLD_SIZE','1')) torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl') devices = torch.cuda.device_count() torch.cuda.set_device(rank % devices) create_group() vocab_size = 20 teacher_offset = world_size // 2 学生のロジックは次のとおりです。 - オフセットより低いランクを持つすべてのデータは学生です。まずランダムシードを設定し、データセット、モデル、オプティマイザーを初期化します。
- 学習者はinput_idsにロードされた後、まず教師にそれを送信し、その後計算を開始します。probを取得した後、学習者は教師からテンソルを受け取り、学習者と教師のオーバーラップを実現します。
- kL ダイバージェンスを損失として計算し、バックプロパゲートしてから、optimizer.step を呼び出してモデル パラメータを更新します。
- 変数を出力して、各ランクの損失と合計平均損失、および現在のモデルの最初の 2 つのパラメータを表示します。
- 私たちのコードは、損失が減少し、各ランクのモデル パラメータが同じままである場合にのみ正しいものとなります。
if rank < teacher_offset: # student torch.random.manual_seed(1) dataloader = Dataloader(1,200,vocab_size) model = Model(vocab_size).half().cuda() optimizer = DistrubutedAdam(model.parameters(), lr=1e-2, group=data_parallel_group, eps=1e-4) for i,input_ids in enumerate(dataloader): if i % teacher_offset != rank: continue optimizer.zero_grad() input_ids = input_ids.cuda() send_tensor(input_ids, rank + teacher_offset) student_probs = model(input_ids) teacher_probs = recv_tensor(rank + teacher_offset, 3, torch.float16) kl_loss = teacher_probs * ((teacher_probs + 1e-5).log() - (student_probs + 1e-5).log()) kl_loss = kl_loss.sum(-1).mean() / torch.distributed.get_world_size(data_parallel_group) kl_loss.backward() optimizer.step() reporting_kl_loss = kl_loss.clone() torch.distributed.all_reduce(reporting_kl_loss, group=data_parallel_group) print(f'rank:{rank} reporting kl loss:{reporting_kl_loss} kl loss:{kl_loss} weight:{model.lm_head.data[0,:2]}',flush=True) torch.distributed.barrier(group=data_parallel_group) if i >= 10: break else: # teacher 以下は先生です 通常、すべてのランクの乱数シードは同じ値に設定します。ここでは、教師と生徒が同じ確率を計算するのを避けるため、教師の乱数シードを意図的に異なる値に設定し、klが常に0になるようにしています。 if rank < teacher_offset: # student else: # teacher torch.random.manual_seed(2) model = Model(vocab_size).half().cuda() model.eval() while True: input_ids = recv_tensor(rank - teacher_offset, 2, torch.int64) teacher_probs = model(input_ids) send_tensor(teacher_probs, rank - teacher_offset) 実行後、次の出力が観察されました: 10 個のデータ ポイントをトレーニングした後に終了するロジックを削除すると、損失はすぐに 1e-4 未満に低下し、デモが完了します。 このデモは最適な蒸留フレームワークではありません。まず、教師と生徒のパラメータ数が異なり、教師はバックプロパゲーションを行わないため、計算速度が異なります。教師1人生徒1人の設定は必ずしも効率的ではありません。生徒1人に対して複数の教師が必要になる場合もあれば、複数の生徒に対して1人の教師が必要になる場合もありますが、これは完全なフレームワークが目指すものです。次に、現在主流のモデルの語彙サイズは約15万語で、トレーニングデータの長さは通常8kバイトです。つまり、最終的な `teacher_probs` は8k * 150k の浮動小数点テンソルとなり、通信コストが高くなります。最適化戦略の1つは、上位n個の値を取得するなど、サブセットをサンプリングする方法、または置換ありまたは置換なしのサンプリングを行うことです。もう1つの戦略は、教師の `lm_head` レイヤーを生徒のランク内に配置することです。ロジットを計算する前に、教師の隠れ状態が通信され、`lm_head` を乗算して生徒上でローカルにロジットが計算されます。 レジスタフック`register_hook` は分散システム関連の機能ではありませんが、ほぼすべてのフレームワークで使用されています。`register_hook` の目的は、パラメータまたは演算子のコールバック関数を登録することです。この関数は、そのパラメータまたは演算子の勾配が計算されたが、まだ `grad` に代入されていないときに呼び出されます。コールバック関数が戻り値を持つ場合、元の勾配がその値に置き換えられます。 import torch def print_grad(grad): print(grad) return grad / 2 w = torch.nn.Parameter(torch.randn(2, 2)) w.register_hook(print_grad) loss = (w - 1) ** 2 print('before backward') loss.mean().backward() print('after backward') print(w.grad) 上記のコードは次のように出力されます。勾配内のNaN値を0に置き換えることは、register_hookに関するいくつかの記事で例として示されていますが、実際のプログラミングではこれを推奨しません。NaNに遭遇した場合は、安全な値に変更してからモデルを更新するのではなく、例外をスローすることをお勧めします。そうしないと、モデルに問題が発生した場合、問題を特定できなくなります。 `register_hook` 関数は、パラメータだけでなく演算子にも登録できます。これは、様々なフレームワークにおける `register_hook` の主な用途です。例えば、次の演算は加算演算子に登録されます。 import torch def parameter_hook(grad): print('parameter hook') def operator_hook(*grads): print('operator hook' ) w = torch.nn.Parameter(torch.randn(2, 2)) w.register_hook(parameter_hook) print('first') y = w + 1 op1 = y.grad_fn print(op1) op1.register_hook(operator_hook) y.sum().backward() print('second') z = w + 1 op2 = z.grad_fn print(op2) z.sum().backward() 実行時には、次のような結果が表示されます。演算子は通常、1回限りのイベントであり、演算子のコールバックが最初に実行され、次にパラメータのコールバックが実行されます。ただし、勾配累積演算子という特別な演算子があり、この演算子のコールバック関数はパラメータのコールバック関数の後に実行され、この演算子は毎回生成されるわけではありません。 import torch def parameter_hook(grad): print('parameter hook') def operator_hook(*grads): print('operator hook' ) w = torch.nn.Parameter(torch.randn(2, 2)) w.register_hook(parameter_hook) y = w + 1 op = y.grad_fn.next_functions[0][0] print(op) op.register_hook(operator_hook) print('first') y.sum().backward() print('second') z = w + 1 op2 = z.grad_fn.next_functions[0][0] print(op2) z.sum().backward() 很多框架会围绕着梯度累计算子的这个特性展开。为了获得梯度累积算子,需要创建一个计算图。一般用expand\_as,这个计算结果的grad\_fn指向的是expand\_as自己,next\_functions指向的是上一个算子,也就是梯度累积算子: grad_acc_op = w.expand_as(w).grad_fn.next_functions[0][0] 然后可以利用闭包注册一个hook,让hook能够直接访问参数而不仅仅是梯度。 def make_grad_hook(param): def hook(*grads): print(param.grad) return hook grad_acc_op.register_hook(make_grad_hook(w)) 这样就可以做一些骚操作了。 我们可以简单看下megatron和deepspeed的代码,看看他们是怎么用的。下面这段是megatron的用法:megatron的优化器并不使用param.grad,而是自己在参数上注册了一个param.main\_grad,用它来累积梯度。这个main\_grad不会删除,只会累积和清0。然后在最后一个microbatch的时候做allreduce,而不是等优化器来做:deepspeed也有类似的逻辑:deepspeed因为要支持连续内存和cpu offload,逻辑更加复杂,会等积攒的grad数量够了一批一批的操作:会使用不同事件流(stream)来增加计算和梯度累积的重叠度:使用了锁页内存,并且会在cpu和gpu之间来回来去传递张量来进行计算。有兴趣的读者可以自己研究一下,deepspeed的代码我一直没机会仔细读一遍。 多进程在理解torch分布式训练时,多进程这个概念是一直伴随我们左右的。使用torchrun启动脚本,就是以多进程方式启动脚本。这里我们还可以再深入了解一下torch与多进程。 首先来看看python原生的多进程启动方式: import multiprocessing as mp def main(rank, world): print(rank, world) if __name__ == '__main__': world_size = 4 ps = [mp.Process(None, main, args=(rank, world_size)) for rank in range(world_size)] for p in ps: p.start() for p in ps: p.join() 这种多进程能不能拿来作为torch的分布式训练环境呢?当然是可以的,只需要这样操作: import multiprocessing as mp import torch import torch.distributed def main(rank, world_size, master_addr='127.0.0.1', master_port=29500): init_method = f'tcp://{master_addr}:{master_port}' torch.distributed.init_process_group(rank=rank,world_size=world_size,init_method=init_method,backend='nccl') print(f'rank:{torch.distributed.get_rank()} world_size:{torch.distributed.get_world_size()}') if __name__ == '__main__': world_size = 4 ps = [mp.Process(None, main, args=(rank, world_size)) for rank in range(world_size)] for p in ps: p.start() for p in ps: p.join() 这里init\_method就是用来在初始化阶段,实现进程发现的方法,除了tcp,还可以用本地文件发现或者环境变量。除了multiprocessing 的多进程,用subprocess 的多进程也是一样可以的。当然torch也提供了一种启动多进程的方法: import torch def main(rank, world_size, master_addr='127.0.0.1', master_port=29500): init_method = f'tcp://{master_addr}:{master_port}' torch.distributed.init_process_group(rank=rank,world_size=world_size,init_method=init_method,backend='nccl') print(f'rank:{torch.distributed.get_rank()} world_size:{torch.distributed.get_world_size()}') if __name__ == '__main__': world_size = 4 torch.multiprocessing.spawn(main, args=(world_size,), nprocs=world_size) 你也可以在多进程里再创建子进程再初始化环境: import multiprocessing as mp import torch import torch.distributed def sub_process(rank, world_size, master_addr='127.0.0.1', master_port=29500): init_method = f'tcp://{master_addr}:{master_port}' torch.distributed.init_process_group(rank=rank, world_size=world_size, init_method=init_method, backend='nccl') torch.distributed.barrier() print(f'rank:{torch.distributed.get_rank()} world_size:{torch.distributed.get_world_size()}') def main(rank, world_size, master_addr='127.0.0.1', master_port=29500): init_method = f'tcp://{master_addr}:{master_port}' process = mp.Process(None, sub_process, args=(rank + world_size, 2 * world_size,)) process.start() torch.distributed.init_process_group(rank=rank, world_size=2*world_size, init_method=init_method, backend='nccl') torch.distributed.barrier() print(f'rank:{torch.distributed.get_rank()} world_size:{torch.distributed.get_world_size()}') if __name__ == '__main__': world_size = 2 ps = [mp.Process(None, main, args=(rank, world_size)) for rank in range(world_size)] for p in ps: p.start() for p in ps: p.join() 在torch的多进程中,再次启动子进程有一点需要注意的地方。那就是如果在启动子进程之前触发了任何与cuda相关的操作,比如使用了set\_device,或者在cuda上创建了一个张量,那么子进程中就不能再使用cuda。比如下面这段代码: import multiprocessing as mp import torch import torch.distributed def sub_process(): tensor = torch.tensor([2]).cuda(0) if __name__ == '__main__': torch.cuda.set_device(0) process = mp.Process(None,sub_process) process.start() process.join() 运行时会报错:这个报错的意思是,cuda环境只能初始化一次,并且与进程绑定。Linux上创建的子进程默认使用的是fork的方式。fork创建的子进程会继承父进程的内存空间,因此已经绑定了父进程的cuda环境被继承给了子进程,子进程使用cuda就会报错。报错中要求子进程以spawn方式启动,是因为spawn方式启动的子进程使用的是全新的解释器,cuda还处于未初始化的状态。 这里用的时候需要权衡清楚,究竟需不需要子进程继承父进程的内存,以及是否需要在子进程使用cuda。子进程如果用spawn方式启动不继承父进程,可能需要单独初始化分布式环境,父进程的全局变量子进程也用不了。如果用fork方式启动继承父进程内存,意味着继承了父进程创建的各种变量,以及父进程初始化过的分布式环境但是不能用cuda。另外需要注意的是,就算用fork方式启动,子进程也继承不了父进程创建的通信组,但是会继承“通信组的创建顺序”。意思是如果rank0顺序创建了5个group,rank1创建的3个group,然后用fork方式启动了一个子进程,子进程又创建了2个,这2个会去对应rank0的第4、5个group。 说了这么多,多进程好像挺麻烦,那他相比torchrun有啥好处呢? 好处就是可以更加灵活的使用init\_process\_group初始化环境,以区分不同角色。比如上面的我们的demo中的这个蒸馏场景,我们是4个rank,分成2个student2个teacher,通信还不是很复杂。那如果student和teacher不是各占一卡,而是用了3d混合并行占很多卡,相互之间还有tp、pp的通信,通信逻辑就很复杂了。我们可以考虑给teacher和student分别用一套不同的rank、world\_size个init\_method初始化,让他们在这个分布式环境中只能看见自己这个角色的进程,这样就只需要实现自己的3d混合并行就可以了。 再比如如果你想在自己的训练环境中引入一个VLLM模型。VLLM内部是会调用init\_process\_group创建自己的环境,使用自己的rank和world\_size来实现TP并行的,和你训练环境的init\_process\_group是冲突的。这个时候使用自定义的多进程,就可以减小VLLM的干扰。 问题来了,每个角色都使用独立的分布式环境,相互之间怎么通信呢?这就是最后的部分,TorchRPC。 RPC TorchRPC原本的用法是在本地创建远程变量的引用,在本地调用远程函数。但是我觉得这种编程不灵活且抽象,堪比tf的静态图,所以我这里不把rpc当作远程调用,只把他当作对p2p算子的封装,以及除init\_process\_group之外第二种建立rank间通信的方式。 rpc的初始化和分布式环境很像。rpc的初始化和init\_process\_group可以同时存在,且可以使用不同的rank和world\_size: import multiprocessing as mp import torch def main(rank,world_size): torch.distributed.init_process_group(rank=0,world_size=1,backend='nccl',init_method=f'tcp://127.0.0.1:{29500+rank}') options = torch.distributed.rpc.TensorPipeRpcBackendOptions(init_method='tcp://127.0.0.1:30001') torch.distributed.rpc.init_rpc(f'worker-{rank}', rank=rank, world_size=world_size, rpc_backend_options=options) print(f'rank: {torch.distributed.get_rank()}', f' world_size: {torch.distributed.get_world_size()}', f' {torch.distributed.rpc.get_worker_info()}') torch.distributed.rpc.shutdown() if __name__ == '__main__': world_size = 4 ps = [mp.Process(None,main,args=(rank,world_size)) for rank in range(world_size)] for p in ps: p.start() for p in ps: p.join() 直接用python运行,打印出如下内容:这里需要注意一点,因为我们每个rank都独立各自初始化分布式环境,互不干扰,因此init\_method的port要换一下。 我们在简单重写一下之前的蒸馏demo,主要为了演示在这种用法下如何使用rpc通信。 首先定义模型和模型的调用函数,再定义一个全局变量 import multiprocessing as mp import torch import torch.distributed from torch.distributed import rpc class Model: def __call__(self, tensor) -> torch.Any: return tensor + 1 def call_model(tensor): return model(tensor) model = None
定义teacher的逻辑: def teacher(rank,world_size): torch.distributed.init_process_group(rank=0,world_size=1,backend='nccl',init_method=f'tcp://127.0.0.1:{29500+rank}') options = rpc.TensorPipeRpcBackendOptions(init_method='tcp://127.0.0.1:30000') global model model = Model() rpc.init_rpc('teacher', rank=rank, world_size=world_size, rpc_backend_options=options) rpc.shutdown() 先初始化分布式环境,然后创建模型,赋值给全局变量,然后再初始化rpc,确保rpc初始化后模型一定已经准备好了,最后shutdown等待。这里初始化分布式环境只是模拟一下,RPC本身并不依赖分布式环境。 然后定义student的逻辑: def student(rank,world_size): torch.distributed.init_process_group(rank=0,world_size=1,backend='nccl',init_method=f'tcp://127.0.0.1:{29500+rank}') options = rpc.TensorPipeRpcBackendOptions(init_method='tcp://127.0.0.1:30000') rpc.init_rpc('student', rank=rank, world_size=world_size, rpc_backend_options=options) input_ids = torch.randn(4) teacher_probs = rpc.rpc_async('teacher', call_model, args=(input_ids,)) # 这里student计算 student_probs = input_ids loss = teacher_probs.wait() - student_probs print(loss) rpc.shutdown() student也是先初始化分布式环境和rpc,然后模拟一下输入数据input\_ids。然后通过rpc.rpc\_async异步远程调用teacher进程的call\_model函数。此时teacher进程的全局变量应该已经有值了,call\_model可以正常返回。这里使用异步调用,不需要等待结果,直接继续。下面就是假装student在计算,得到student\_probs,然后计算一下差值,使用teacher\_probs.wait() 等待远程调用的结果,不出意外应该等于全1向量。 最后是启动和划分角色的代码: def main(rank, world_size): teacher_offset = world_size // 2 if rank < teacher_offset: student(rank, world_size) else: teacher(rank, world_size) if __name__ == '__main__': world_size = 2 ps = [mp.Process(None,main,args=(rank,world_size)) for rank in range(world_size)] for p in ps: p.start() for p in ps: p.join() RPC后端对NVlink、IB等都是支持的,也支持传递cuda张量,只需要在初始化rpc环境时,指定一下本机cuda和远程cuda的映射。 比如你可以在配置rpc后端时这样设置: rpc.TensorPipeRpcBackendOptions(init_method='tcp://127.0.0.1:30000', device_maps={'teacher':{0:1}}) 这个表示把teacher进程的cuda1映射到本地的cuda0,这样本地cuda0张量通信到远端时就会被放到cuda1,不需要移动到cpu。 最后再说一下,RPC的官方用法是远程调用和远程引用,可以去看官网教程。 追記目前已经在Megatron-RPC框架下实现了SFT、DPO、Distillation和on-policy RS,性能持平P2P通信,远超一些非IB、NVlink的通信方案,证明RPC方案确实可行。RPC是torch1.4版本就引入的特性,支持远程引用(本地创建一个模型,但是占用远程机器的显存)、链式异步调用(以异步函数的方式调用远程模型,且调用过程中支持继续调用其他远程模型)和自动求导(远程调用返回的结果可以求导,梯度传递给远程模型)。不得不感慨torch确实有前瞻性,以前都没太关注过这个特性。 いいね! (3件のいいね!)↓ |