diff --git a/csrc/deepep/ops/op_host/notify_dispatch_tiling.cc b/csrc/deepep/ops/op_host/notify_dispatch_tiling.cc index 4be4394c..cbcd507d 100644 --- a/csrc/deepep/ops/op_host/notify_dispatch_tiling.cc +++ b/csrc/deepep/ops/op_host/notify_dispatch_tiling.cc @@ -200,9 +200,9 @@ static bool CheckTensorDataType(gert::TilingContext *context, const char *nodeNa // Verify the size of the win area NotifyDispatchTilingData *tilingData = context->GetTilingData(); uint64_t maxWindowSize = Mc2TilingUtils::GetMaxWindowSize(); - uint64_t actualSize = dataSize * tilingData->notifyDispatchInfo.sendCount; + uint64_t actualSize = dataSize * tilingData->notifyDispatchInfo.sendCount + 2 * 1024 * 1024; // 2MB flag位 if (actualSize > maxWindowSize) { - OP_LOGE(nodeName, "HCCL_BUFFSIZE is too SMALL, should larger than %lu", actualSize); + OP_LOGE(nodeName, "HCCL_BUFFSIZE is too SMALL, should larger than %luMB.", actualSize / MB_SIZE); return false; } return true; diff --git a/csrc/deepep/ops/op_kernel/notify_dispatch.cpp b/csrc/deepep/ops/op_kernel/notify_dispatch.cpp index 5a0a3c4b..bb5cebe2 100644 --- a/csrc/deepep/ops/op_kernel/notify_dispatch.cpp +++ b/csrc/deepep/ops/op_kernel/notify_dispatch.cpp @@ -17,10 +17,6 @@ extern "C" __global__ __aicore__ void notify_dispatch(GM_ADDR sendData, GM_ADDR REGISTER_TILING_DEFAULT(NotifyDispatchTilingData); GET_TILING_DATA_WITH_STRUCT(NotifyDispatchTilingData, tilingData, tiling); - // hcomm will set magic later in init - uint32_t magic = 1; - GM_ADDR commArgs = nullptr; - int localRank = tilingData.notifyDispatchInfo.localRankId; int localRankSize = tilingData.notifyDispatchInfo.localRankSize; int rank = tilingData.notifyDispatchInfo.rankId; diff --git a/csrc/deepep/ops/op_kernel/notify_dispatch.h b/csrc/deepep/ops/op_kernel/notify_dispatch.h index 96a04513..bbe87f98 100644 --- a/csrc/deepep/ops/op_kernel/notify_dispatch.h +++ b/csrc/deepep/ops/op_kernel/notify_dispatch.h @@ -6,7 +6,6 @@ #include "comm_args.h" #include "data_copy.h" -#include "sync_collectives.h" #include "moe_distribute_base.h" using namespace AscendC; @@ -24,34 +23,23 @@ __aicore__ inline void SyncFunc() GM_ADDR sendDataInput, GM_ADDR tokenPerExpertDataInput, GM_ADDR sendDataOffsetOutput, GM_ADDR recvDataOutput, \ GM_ADDR totalRecvTokens, GM_ADDR recvCount, GM_ADDR recvOffset, GM_ADDR maxBs, GM_ADDR recvTokensPerExpert, \ int64_t len, int64_t numTokens, int op, int root, int cycleCount, GM_ADDR scale, int64_t scaleCount, \ - GM_ADDR offset, int localRank, int localRankSize, GM_ADDR commArgs, int magic + GM_ADDR offset, int localRank, int localRankSize #define KERNELS_ARGS_CALL_ALL2ALL() \ sendDataInput, tokenPerExpertDataInput, sendDataOffsetOutput, recvDataOutput, totalRecvTokens, recvCount, \ recvOffset, maxBs, recvTokensPerExpert, len, numTokens, op, root, cycleCount, scale, scaleCount, offset, \ - localRank, localRankSize, commArgs, magic + localRank, localRankSize template class NotifyDispatch { - constexpr static int INVALID_RANK_NUM = 0xFFFFFFFF; // Invalid rank - constexpr static int64_t CORE_NUMS_PER_STAGE_X = 24; // Maximum number of cores provided by the producer stage - constexpr static int64_t CORE_NUMS_PER_STAGE_Y = 16; // Maximum number of cores provided by the consumer stage - constexpr static int64_t CORE_NUMS_PER_STAGE_Z = 16; // Maximum number of cores provided by the consumer stage 2 - constexpr static int64_t SHARE_QUE_DEPTH = 1; // Depth of a single shared queue - constexpr static int64_t RANK_NUM_PER_NODE = 16; - constexpr static int64_t SIO_NUM = 2; // Depth of a single shared queue - constexpr static int64_t MAX_CORE_NUM = 48; constexpr static int64_t MAX_RANK_PER_CORE = 8; constexpr static int64_t MULTI_RANK_SIZE = 48; constexpr static int64_t MAX_BUFFER_NUMBER = 10; constexpr static uint32_t UB_FLAG_SIZE = 8U * 1024U; - constexpr static int64_t IDLER_CORE = 0; // Idle core - constexpr static int64_t PRODUCER_CORE = - 1; // Producer group, responsible for writing data to shared memory, input->share, or share->share - constexpr static int64_t CONSUMER_CORE = - 2; // Consumer group, responsible for reading data from shared memory, share->output - constexpr static int64_t CONSUMER_CORE2 = 3; + // Synchronization flag occupies length + constexpr static int64_t FLAG_UNIT_INT_NUM = 4; + constexpr static int64_t MAGIC_MASK = ~((1LL << 32) - 1); public: __aicore__ inline NotifyDispatch(int rank, int rankSize, uint32_t extraFlag) @@ -119,25 +107,20 @@ class NotifyDispatch private: __aicore__ inline void InitCoreGroup() { - coreNumPerStageY = MAX_CORE_NUM; - coreNumPerStageX = MAX_CORE_NUM; - rankNumPerCore = (rankSize + MAX_CORE_NUM - 1) / MAX_CORE_NUM; + coreNumPerStageY = blockNum; + coreNumPerStageX = blockNum; + rankNumPerCore = (rankSize + blockNum - 1) / blockNum; } __aicore__ inline void InitDataSlice() { // The producer is responsible for moving the input data of this rank to shared memory, input-->share if (blockIdx < coreNumPerStageX) { - ProducerDataSlice(); + // The ipcQue responsible for the current core + writeGt.SetGlobalBuffer((__gm__ T *)(shareAddrs[rank] + IPC_DATA_OFFSET)); } } - __aicore__ inline void ProducerDataSlice() - { - // The ipcQue responsible for the current core - writeGt.SetGlobalBuffer((__gm__ T *)(shareAddrs[rank] + IPC_DATA_OFFSET)); - } - __aicore__ inline void AssembleSendData() { pipe.InitBuffer(tokenPerExpertDataBuf, tokenPerExpertDataAlignLen); @@ -174,28 +157,69 @@ class NotifyDispatch // copy input to other rank share __aicore__ inline void InputToShareSlice() { - __ubuf__ int64_t *inputUB = (__ubuf__ int64_t *)get_imm(0); + __ubuf__ uint64_t *inputUB = (__ubuf__ uint64_t *)get_imm(0); int64_t copyOffset = blockIdx * rankNumPerCore; copyLen = rankSize - copyOffset < rankNumPerCore ? rankSize - copyOffset : rankNumPerCore; if (copyLen > 0) { readGt = sendDataInputGt[copyOffset * perRankDataNum]; CpGM2GMPingPong(copyLen * perRankDataNum * sizeof(T), readGt, writeGt[copyOffset * perRankDataNum], COPYONLY); - int64_t v = MergeMagicWithValue(magic, 1); + uint64_t v = MergeMagicWithValue(magic, 1); *inputUB = v; AscendC::SetFlag(EVENT_ID0); AscendC::WaitFlag(EVENT_ID0); for (int i = copyOffset; i < copyOffset + copyLen; ++i) { - CpUB2GM((__gm__ int64_t *)(shareAddrs[i]) + rank * FLAG_UNIT_INT_NUM, inputUB, sizeof(int64_t)); + CpUB2GM((__gm__ uint64_t *)(shareAddrs[i]) + rank * FLAG_UNIT_INT_NUM, inputUB, sizeof(uint64_t)); } pipe_barrier(PIPE_ALL); } } - __aicore__ inline int64_t MergeMagicWithValue(int32_t magic, int32_t value) + __aicore__ inline uint64_t MergeMagicWithValue(uint64_t magic, uint64_t value) { // magic as the high part, eventID as the low part, combined into a value for comparison - return (static_cast(static_cast(magic)) << MAGIC_OFFSET) | static_cast(value); + return (magic * 2ULL + value); + } + + // Wait for a part of synchronization flags within a rank + __aicore__ inline void WaitOneRankPartFlag(__gm__ uint64_t *waitAddr, int64_t flagNum, uint64_t checkValue) + { + GlobalTensor globalWait; + globalWait.SetGlobalBuffer(waitAddr, flagNum * FLAG_UNIT_INT_NUM); + LocalTensor localWait = tBuf.GetWithOffset(flagNum * FLAG_UNIT_INT_NUM, 0); + bool isSync = true; + uint64_t checkedFlagNum = 0; + do { + // Copy global synchronization flags to local + DataCopy(localWait, globalWait[checkedFlagNum * FLAG_UNIT_INT_NUM], + (flagNum - checkedFlagNum) * FLAG_UNIT_INT_NUM); + AscendC::SetFlag(EVENT_ID0); + AscendC::WaitFlag(EVENT_ID0); // Wait for GM->UB + + // Check if the synchronization flags are equal to checkValue + isSync = true; + uint64_t remainToCheck = flagNum - checkedFlagNum; + for (auto i = 0; i < remainToCheck; ++i) { + // Continue waiting if any core has not reached the checkValue phase + uint64_t v = localWait.GetValue(i * FLAG_UNIT_INT_NUM); + if ((v & MAGIC_MASK) != (checkValue & MAGIC_MASK) || v < checkValue) { + isSync = false; + checkedFlagNum += i; + break; + } + } + } while (!isSync); + } + + /** + * @brief Wait for the flags starting from the specified eventID on the specified card to become + * a value composed of the combination of magic and value.
+ * Note: [eventID, eventID + flagNum) + */ + __aicore__ inline void WaitSyncFlag(uint64_t magic, uint64_t value, uint64_t eventID, int32_t rank, int64_t flagNum) + { + uint64_t v = MergeMagicWithValue(magic, value); + WaitOneRankPartFlag((__gm__ uint64_t *)(shareAddrs[rank]) + eventID * FLAG_UNIT_INT_NUM, flagNum, v); } __aicore__ inline void ShareToShareSlice() @@ -214,7 +238,9 @@ class NotifyDispatch for (int i = 0; i < copyLen; i++) { readGt1[i].SetGlobalBuffer((__gm__ T *)(shareAddrs[checkRank[i]] + IPC_DATA_OFFSET)); } - sync.WaitSyncFlag(magic, 1, copyOffset, rank, copyLen); + + WaitSyncFlag(magic, 1, copyOffset, rank, copyLen); + for (int i = 0; i < copyLen; i++) { CpGM2GMPingPong(perRankDataNum * sizeof(T), readGt1[i][rank * perRankDataNum], recvDataOutputGt[checkRank[i] * perRankDataNum], COPYONLY); @@ -380,18 +406,18 @@ class NotifyDispatch DataCopyPad(recvTokenPerExpGt, tmpTensor, copyParams); } - FORCE_INLINE_AICORE int64_t GetDataCount(const int64_t dataLen, const int64_t useBlockNum); + __aicore__ inline int64_t GetDataCount(const int64_t dataLen, const int64_t useBlockNum); __aicore__ inline GM_ADDR GetWindAddrByRankId(const int32_t rankId, uint8_t ctxIdx); - __aicore__ inline int32_t GetMagicValue(void); - FORCE_INLINE_AICORE void InitSmallFullMesh(KERNELS_ARGS_FUN_ALL2ALL()); + __aicore__ inline uint64_t GetMagicValue(void); + __aicore__ inline void InitSmallFullMesh(KERNELS_ARGS_FUN_ALL2ALL()); template - FORCE_INLINE_AICORE void SetAtomic(int op); - FORCE_INLINE_AICORE void UnsetAtomic(int op); + __aicore__ inline void SetAtomic(int op); + __aicore__ inline void UnsetAtomic(int op); template - FORCE_INLINE_AICORE void SetWaitEvent(event_t eventId); + __aicore__ inline void SetWaitEvent(event_t eventId); template - FORCE_INLINE_AICORE void CpGM2GMPingPong(int64_t dataSizeRemain, const GlobalTensor &sendDataInputGt, - const GlobalTensor &recvDataOutputGT, int op); + __aicore__ inline void CpGM2GMPingPong(int64_t dataSizeRemain, const GlobalTensor &sendDataInputGt, + const GlobalTensor &recvDataOutputGT, int op); GlobalTensor sendDataInputGt; GlobalTensor tokenPerExpertDataInputGt; @@ -422,25 +448,12 @@ class NotifyDispatch int64_t nodeNum; int64_t localRankId; int64_t localNodeId; - int64_t targetNode; - int64_t targetLocalRankIds[2]; - int64_t queLen; - int64_t queSize; - int64_t coreNumPerStageX; // Number of cores used per stage - int64_t coreNumPerStageY; // Number of cores used per stage - int64_t coreNumPerStageZ; // Number of cores used per stage - int64_t flagNumPerStage; // Number of synchronization flags used per stage - int64_t coreNumPerNode; // Number of cores allocated per node - int64_t coreNumPerRank; // Number of cores allocated per rank - int64_t rankNumPerCore; // Number of ranks responsible per core - int64_t coreGroup; // Functional group of the current core - int64_t targetRank[MULTI_RANK_SIZE]; // Ranks responsible by the current core - int64_t targetRankX; - int64_t targetRankY; - - int64_t queElemLen; // Size of each element in the shared memory queue (in terms of T) - - int64_t copyLen; // Length of the current data slice being copied (in terms of T) + int64_t coreNumPerStageX; // Number of cores used per stage + int64_t coreNumPerStageY; // Number of cores used per stage + int64_t coreNumPerStageZ; // Number of cores used per stage + int64_t coreNumPerRank; // Number of cores allocated per rank + int64_t rankNumPerCore; // Number of ranks responsible per core + int64_t copyLen; // Length of the current data slice being copied (in terms of T) // for coll int rank; @@ -457,12 +470,9 @@ class NotifyDispatch int root; int64_t len; int64_t numExperts; - int64_t magic; + uint64_t magic{0}; int64_t blockIdx; // Index of the current aicore int64_t blockNum; // Total number of aicores for the current rank - int32_t numRanks; - int64_t timeout; - uint16_t *rootRanks; uint32_t maxBsNum{0}; GM_ADDR scale; GM_ADDR shareAddrs[CAM_MAX_RANK_SIZE]; // List of shared memory addresses @@ -473,8 +483,6 @@ class NotifyDispatch GM_ADDR recvTokensPerExpert_; __gm__ HcclOpResParam *winContext_[COMM_NUM]{nullptr, nullptr}; Hccl hccl_; - GlobalTensor peerMemsAddrGm_; - GlobalTensor dfx; TPipe pipe; TBuf tBuf; TBuf<> tokenPerExpertDataBuf; @@ -495,12 +503,10 @@ class NotifyDispatch TBuf<> tmpBuf2_; TBuf<> tmpBuf3_; TBuf<> tmpBuf4_; - - SyncCollectives sync; }; template -FORCE_INLINE_AICORE int64_t NotifyDispatch::GetDataCount(const int64_t dataLen, const int64_t useBlockNum) +__aicore__ inline int64_t NotifyDispatch::GetDataCount(const int64_t dataLen, const int64_t useBlockNum) { return dataLen / useBlockNum; } @@ -526,13 +532,13 @@ __aicore__ inline GM_ADDR NotifyDispatch::GetWindAddrByRankId(const int32_t r // Assign values to winContext_[COMM_EP_IDX] and blockIdx before calling template -__aicore__ inline int32_t NotifyDispatch::GetMagicValue(void) +__aicore__ inline uint64_t NotifyDispatch::GetMagicValue(void) { - int32_t magic = 0; - GlobalTensor selfDataStatusTensor; + uint64_t magic = 0; + GlobalTensor selfDataStatusTensor; GM_ADDR statusDataSpaceGm = (GM_ADDR)(winContext_[COMM_EP_IDX]->localWindowsExp); - selfDataStatusTensor.SetGlobalBuffer((__gm__ int32_t *)(statusDataSpaceGm + STATE_WIN_OFFSET)); - DataCacheCleanAndInvalid( + selfDataStatusTensor.SetGlobalBuffer((__gm__ uint64_t *)(statusDataSpaceGm + STATE_WIN_OFFSET)); + DataCacheCleanAndInvalid( selfDataStatusTensor[blockIdx * UB_ALIGN_SIZE]); magic = selfDataStatusTensor(blockIdx * UB_ALIGN_SIZE); if (magic <= 0) { @@ -543,7 +549,7 @@ __aicore__ inline int32_t NotifyDispatch::GetMagicValue(void) } template -FORCE_INLINE_AICORE void NotifyDispatch::InitSmallFullMesh(KERNELS_ARGS_FUN_ALL2ALL()) +__aicore__ inline void NotifyDispatch::InitSmallFullMesh(KERNELS_ARGS_FUN_ALL2ALL()) { this->root = root; this->len = len; @@ -567,7 +573,7 @@ FORCE_INLINE_AICORE void NotifyDispatch::InitSmallFullMesh(KERNELS_ARGS_FUN_A shareAddrs[rank] = GetWindAddrByRankId(rank, ctxIdx) + (this->magic % PING_PONG_SIZE) * (IPC_BUFF_MAX_SIZE + IPC_DATA_OFFSET); - int64_t rankNumPerCore = (rankSize + MAX_CORE_NUM - 1) / MAX_CORE_NUM; + int64_t rankNumPerCore = (rankSize + blockNum - 1) / blockNum; int64_t copyOffset = blockIdx * rankNumPerCore; int64_t copyLen = rankSize - copyOffset < rankNumPerCore ? rankSize - copyOffset : rankNumPerCore; if (copyLen > 0) { @@ -592,8 +598,6 @@ FORCE_INLINE_AICORE void NotifyDispatch::InitSmallFullMesh(KERNELS_ARGS_FUN_A } pipe.InitBuffer(tBuf, UB_FLAG_SIZE); - - sync.Init(rank, rankSize, shareAddrs, tBuf); } /** @@ -611,9 +615,9 @@ FORCE_INLINE_AICORE void NotifyDispatch::InitSmallFullMesh(KERNELS_ARGS_FUN_A */ template template -FORCE_INLINE_AICORE void NotifyDispatch::CpGM2GMPingPong(int64_t dataSizeRemain, - const GlobalTensor &sendDataInputGt, - const GlobalTensor &recvDataOutputGT, int op) +__aicore__ inline void NotifyDispatch::CpGM2GMPingPong(int64_t dataSizeRemain, + const GlobalTensor &sendDataInputGt, + const GlobalTensor &recvDataOutputGT, int op) { // General case (U = K), input/output are the same, share one UB // Only when conversion is needed (U->K), UB will be divided into two parts according to the ratio of @@ -674,7 +678,7 @@ FORCE_INLINE_AICORE void NotifyDispatch::CpGM2GMPingPong(int64_t dataSizeRema template template -FORCE_INLINE_AICORE void NotifyDispatch::SetAtomic(int op) +__aicore__ inline void NotifyDispatch::SetAtomic(int op) { PipeBarrier(); if (op != -1) { @@ -686,7 +690,7 @@ FORCE_INLINE_AICORE void NotifyDispatch::SetAtomic(int op) } template -FORCE_INLINE_AICORE void NotifyDispatch::UnsetAtomic(int op) +__aicore__ inline void NotifyDispatch::UnsetAtomic(int op) { if (op != -1) { AscendC::SetAtomicNone(); @@ -696,7 +700,7 @@ FORCE_INLINE_AICORE void NotifyDispatch::UnsetAtomic(int op) template template -FORCE_INLINE_AICORE void NotifyDispatch::SetWaitEvent(event_t eventId) +__aicore__ inline void NotifyDispatch::SetWaitEvent(event_t eventId) { AscendC::SetFlag(eventId); AscendC::WaitFlag(eventId); diff --git a/csrc/deepep/ops2/op_host/notify_dispatch_tiling.cc b/csrc/deepep/ops2/op_host/notify_dispatch_tiling.cc index 153db4ad..61f2fc2f 100644 --- a/csrc/deepep/ops2/op_host/notify_dispatch_tiling.cc +++ b/csrc/deepep/ops2/op_host/notify_dispatch_tiling.cc @@ -223,9 +223,9 @@ static bool CheckTensorDataType(gert::TilingContext *context, const char *nodeNa // Verify the size of the win area NotifyDispatchTilingData *tilingData = context->GetTilingData(); uint64_t maxWindowSize = Mc2TilingUtils::GetMaxWindowSize(); - uint64_t actualSize = dataSize * tilingData->notifyDispatchInfo.sendCount; + uint64_t actualSize = dataSize * tilingData->notifyDispatchInfo.sendCount + 2 * 1024 * 1024; // 2MB flag位 if (actualSize > maxWindowSize) { - OP_LOGE(nodeName, "HCCL_BUFFSIZE is too SMALL, should larger than %lu", actualSize); + OP_LOGE(nodeName, "HCCL_BUFFSIZE is too SMALL, should larger than %luMB.", actualSize / MB_SIZE); return false; } return true; diff --git a/csrc/deepep/ops2/op_host/notify_dispatch_tiling_a2.cc b/csrc/deepep/ops2/op_host/notify_dispatch_tiling_a2.cc index 1e989530..c29913ae 100644 --- a/csrc/deepep/ops2/op_host/notify_dispatch_tiling_a2.cc +++ b/csrc/deepep/ops2/op_host/notify_dispatch_tiling_a2.cc @@ -79,8 +79,11 @@ constexpr uint32_t ATTR_RANK_ID_INDEX = 6; constexpr uint32_t ATTR_LOCAL_RANK_SIZE_INDEX = 7; constexpr uint32_t ATTR_LOCAL_RANK_ID_INDEX = 8; -const size_t MAX_GROUP_NAME_LENGTH = 128UL; -const int64_t MAX_COMM_WORLD_SIZE = 384; +constexpr size_t MAX_GROUP_NAME_LENGTH = 128UL; +constexpr int64_t MAX_COMM_WORLD_SIZE = 384; +constexpr int64_t MAX_A2_WORLD_SIZE = 64; +constexpr int64_t MAX_COMM_LOCAL_SIZE = 16; +constexpr int64_t MAX_A2_LOCAL_SIZE = 8; constexpr uint32_t SYSTEM_NEED_WORKSPACE = 16 * 1024 * 1024; constexpr uint32_t KERNEL_USE_WORKSPACE = 1 * 1024 * 1024; @@ -93,8 +96,6 @@ constexpr static int TILING_KEY_BFLOAT16 = 21; constexpr static int TILING_KEY_FLOAT = 22; constexpr static int TILING_KEY_INT = 23; constexpr static int TILING_KEY_A2_TYPE = 100; - -constexpr static int ALL_TO_ALL_CORE_NUM = 32; } // namespace namespace optiling { @@ -141,14 +142,23 @@ static ge::graphStatus GetAttrAndSetTilingData(gert::TilingContext *context, con return ge::GRAPH_FAILED); OP_TILING_CHECK(localRankIdPtr == nullptr, OP_LOGE(nodeName, "localRankIdPtr is null."), return ge::GRAPH_FAILED); - OP_TILING_CHECK((*rankSizePtr <= 0) || (*rankSizePtr > MAX_COMM_WORLD_SIZE), + OP_TILING_CHECK((*rankSizePtr <= 0) || (*rankSizePtr > MAX_A2_WORLD_SIZE), OP_LOGE(nodeName, "rankSize is invalid, only support (0, %ld], but got rankSize=%ld.", - MAX_COMM_WORLD_SIZE, *rankSizePtr), + MAX_A2_WORLD_SIZE, *rankSizePtr), return ge::GRAPH_FAILED); OP_TILING_CHECK( (*rankIdPtr < 0) || (*rankIdPtr >= *rankSizePtr), OP_LOGE(nodeName, "rankId is invalid, only support [0, %ld), but got rankId=%ld.", *rankSizePtr, *rankIdPtr), return ge::GRAPH_FAILED); + OP_TILING_CHECK((*localRankSizePtr <= 0) || (*localRankSizePtr > MAX_A2_LOCAL_SIZE), + OP_LOGE(nodeName, "localRankSize is invalid, A2 only support (0, %ld], but got localRankSize=%ld.", + MAX_A2_LOCAL_SIZE, *localRankSizePtr), + return ge::GRAPH_FAILED); + OP_TILING_CHECK((*localRankIdPtr < 0) || (*localRankIdPtr >= *localRankSizePtr), + OP_LOGE(nodeName, "localRankId is invalid, only support [0, %ld), but got localRankId=%ld.", + *localRankSizePtr, *localRankIdPtr), + return ge::GRAPH_FAILED); + OP_TILING_CHECK((*sendCountPtr <= 0), OP_LOGE(nodeName, "sendCount is invalid, only support > 0, but got sendCount=%ld.", *sendCountPtr), return ge::GRAPH_FAILED); @@ -187,8 +197,7 @@ static ge::graphStatus SetWorkSpace(gert::TilingContext *context, const char *no { size_t *workSpaces = context->GetWorkspaceSizes(1); OP_TILING_CHECK(workSpaces == nullptr, OP_LOGE(nodeName, "workSpaces is nullptr."), return ge::GRAPH_FAILED); - workSpaces[0] = SYSTEM_NEED_WORKSPACE + KERNEL_USE_WORKSPACE + - KERNEL_A2_ARG_SIZE; // TODO: 多预留空间,dispatch和combine同步要改? + workSpaces[0] = SYSTEM_NEED_WORKSPACE + KERNEL_USE_WORKSPACE + KERNEL_A2_ARG_SIZE; return ge::GRAPH_SUCCESS; } @@ -353,9 +362,9 @@ static bool CheckTensorDataType(gert::TilingContext *context, const char *nodeNa // Verify the size of the win area NotifyDispatchA2TilingData *tilingData = context->GetTilingData(); uint64_t maxWindowSize = Mc2TilingUtils::GetMaxWindowSize(); - uint64_t actualSize = dataSize * tilingData->notifyDispatchInfoA2.sendCount; + uint64_t actualSize = 2 * dataSize * tilingData->notifyDispatchInfoA2.sendCount + 2 * 1024 * 1024; // 2MB flag位 if (actualSize > maxWindowSize) { - OP_LOGE(nodeName, "HCCL_BUFFSIZE is too SMALL, should larger than %lu", actualSize); + OP_LOGE(nodeName, "HCCL_BUFFSIZE is too SMALL, should larger than %luMB", actualSize / MB_SIZE); return false; } return true; diff --git a/csrc/deepep/ops2/op_kernel/notify_dispatch.cpp b/csrc/deepep/ops2/op_kernel/notify_dispatch.cpp index faeca409..3d444809 100644 --- a/csrc/deepep/ops2/op_kernel/notify_dispatch.cpp +++ b/csrc/deepep/ops2/op_kernel/notify_dispatch.cpp @@ -15,10 +15,6 @@ extern "C" __global__ __aicore__ void notify_dispatch(GM_ADDR sendData, GM_ADDR REGISTER_TILING_DEFAULT(NotifyDispatchTilingData); GET_TILING_DATA_WITH_STRUCT(NotifyDispatchTilingData, tilingData, tilingGM); - // hcomm will set magic later in init - uint32_t magic = 1; - GM_ADDR commArgs = nullptr; - int localRank = tilingData.notifyDispatchInfo.localRankId; int localRankSize = tilingData.notifyDispatchInfo.localRankSize; int rank = tilingData.notifyDispatchInfo.rankId; diff --git a/csrc/deepep/ops2/op_kernel/notify_dispatch.h b/csrc/deepep/ops2/op_kernel/notify_dispatch.h index a7bdab61..0f35c1e8 100644 --- a/csrc/deepep/ops2/op_kernel/notify_dispatch.h +++ b/csrc/deepep/ops2/op_kernel/notify_dispatch.h @@ -16,33 +16,23 @@ using namespace Moe; #define KERNELS_ARGS_FUN_ALL2ALL() \ GM_ADDR sendDataInput, GM_ADDR tokenPerExpertDataInput, GM_ADDR sendDataOffsetOutput, GM_ADDR recvDataOutput, \ int64_t len, int64_t numTokens, int op, int root, int cycleCount, GM_ADDR scale, int64_t scaleCount, \ - GM_ADDR offset, int localRank, int localRankSize, GM_ADDR commArgs, int magic, GM_ADDR tilingGM + GM_ADDR offset, int localRank, int localRankSize, GM_ADDR tilingGM #define KERNELS_ARGS_CALL_ALL2ALL() \ sendDataInput, tokenPerExpertDataInput, sendDataOffsetOutput, recvDataOutput, len, numTokens, op, root, \ - cycleCount, scale, scaleCount, offset, localRank, localRankSize, commArgs, magic, tilingGM + cycleCount, scale, scaleCount, offset, localRank, localRankSize, tilingGM template class NotifyDispatch { - constexpr static int INVALID_RANK_NUM = 0xFFFFFFFF; // Invalid rank - constexpr static int64_t CORE_NUMS_PER_STAGE_X = 24; // Maximum number of cores provided by the producer stage - constexpr static int64_t CORE_NUMS_PER_STAGE_Y = 16; // Maximum number of cores provided by the consumer stage - constexpr static int64_t CORE_NUMS_PER_STAGE_Z = 16; // Maximum number of cores provided by the consumer stage 2 - constexpr static int64_t SHARE_QUE_DEPTH = 1; // Depth of a single shared queue - constexpr static int64_t RANK_NUM_PER_NODE = 16; - constexpr static int64_t SIO_NUM = 2; // Depth of a single shared queue constexpr static int64_t MAX_CORE_NUM = 48; constexpr static int64_t MAX_RANK_PER_CORE = 8; constexpr static int64_t MULTI_RANK_SIZE = 48; constexpr static int64_t MAX_BUFFER_NUMBER = 10; - constexpr static int64_t IDLER_CORE = 0; // Idle core - constexpr static int64_t PRODUCER_CORE = - 1; // Producer group, responsible for writing data to shared memory, input->share, or share->share - constexpr static int64_t CONSUMER_CORE = - 2; // Consumer group, responsible for reading data from shared memory, share->output - constexpr static int64_t CONSUMER_CORE2 = 3; + // Synchronization flag occupies length + constexpr static int64_t FLAG_UNIT_INT_NUM = 4; + constexpr static int64_t MAGIC_MASK = ~((1LL << 32) - 1); public: __aicore__ inline NotifyDispatch(int rank, int rankSize, uint32_t extraFlag) @@ -95,25 +85,20 @@ class NotifyDispatch private: __aicore__ inline void InitCoreGroup() { - coreNumPerStageY = MAX_CORE_NUM; - coreNumPerStageX = MAX_CORE_NUM; - rankNumPerCore = (rankSize + MAX_CORE_NUM - 1) / MAX_CORE_NUM; + coreNumPerStageY = blockNum; + coreNumPerStageX = blockNum; + rankNumPerCore = (rankSize + blockNum - 1) / blockNum; } __aicore__ inline void InitDataSlice() { // The producer is responsible for moving the input data of this rank to shared memory, input-->share if (blockIdx < coreNumPerStageX) { - ProducerDataSlice(); + // The ipcQue responsible for the current core + writeGt.SetGlobalBuffer((__gm__ T *)(shareAddrs[rank] + IPC_DATA_OFFSET)); } } - __aicore__ inline void ProducerDataSlice() - { - // The ipcQue responsible for the current core - writeGt.SetGlobalBuffer((__gm__ T *)(shareAddrs[rank] + IPC_DATA_OFFSET)); - } - __aicore__ inline void AssembleSendData() { pipe.InitBuffer(tokenPerExpertDataBuf, tokenPerExpertDataAlignLen); @@ -150,28 +135,69 @@ class NotifyDispatch // copy input to other rank share __aicore__ inline void InputToShareSlice() { - __ubuf__ int64_t *inputUB = (__ubuf__ int64_t *)get_imm(0); + __ubuf__ uint64_t *inputUB = (__ubuf__ uint64_t *)get_imm(0); int64_t copyOffset = blockIdx * rankNumPerCore; copyLen = rankSize - copyOffset < rankNumPerCore ? rankSize - copyOffset : rankNumPerCore; if (copyLen > 0) { readGt = sendDataInputGt[copyOffset * perRankDataNum]; CpGM2GMPingPong(copyLen * perRankDataNum * sizeof(T), readGt, writeGt[copyOffset * perRankDataNum], COPYONLY); - int64_t v = MergeMagicWithValue(magic, 1); + uint64_t v = MergeMagicWithValue(magic, 1); *inputUB = v; AscendC::SetFlag(EVENT_ID0); AscendC::WaitFlag(EVENT_ID0); for (int i = copyOffset; i < copyOffset + copyLen; ++i) { - CpUB2GM((__gm__ int64_t *)(shareAddrs[i]) + rank * FLAG_UNIT_INT_NUM, inputUB, sizeof(int64_t)); + CpUB2GM((__gm__ uint64_t *)(shareAddrs[i]) + rank * FLAG_UNIT_INT_NUM, inputUB, sizeof(uint64_t)); } pipe_barrier(PIPE_ALL); } } - __aicore__ inline int64_t MergeMagicWithValue(int32_t magic, int32_t value) + __aicore__ inline uint64_t MergeMagicWithValue(uint64_t magic, uint64_t value) { // magic as the high part, eventID as the low part, combined into a value for comparison - return (static_cast(static_cast(magic)) << MAGIC_OFFSET) | static_cast(value); + return (magic * 2ULL + value); + } + + // Wait for a part of synchronization flags within a rank + __aicore__ inline void WaitOneRankPartFlag(__gm__ uint64_t *waitAddr, int64_t flagNum, uint64_t checkValue) + { + GlobalTensor globalWait; + globalWait.SetGlobalBuffer(waitAddr, flagNum * FLAG_UNIT_INT_NUM); + LocalTensor localWait = tBuf.GetWithOffset(flagNum * FLAG_UNIT_INT_NUM, 0); + bool isSync = true; + uint64_t checkedFlagNum = 0; + do { + // Copy global synchronization flags to local + DataCopy(localWait, globalWait[checkedFlagNum * FLAG_UNIT_INT_NUM], + (flagNum - checkedFlagNum) * FLAG_UNIT_INT_NUM); + AscendC::SetFlag(EVENT_ID0); + AscendC::WaitFlag(EVENT_ID0); // Wait for GM->UB + + // Check if the synchronization flags are equal to checkValue + isSync = true; + uint64_t remainToCheck = flagNum - checkedFlagNum; + for (auto i = 0; i < remainToCheck; ++i) { + // Continue waiting if any core has not reached the checkValue phase + uint64_t v = localWait.GetValue(i * FLAG_UNIT_INT_NUM); + if ((v & MAGIC_MASK) != (checkValue & MAGIC_MASK) || v < checkValue) { + isSync = false; + checkedFlagNum += i; + break; + } + } + } while (!isSync); + } + + /** + * @brief Wait for the flags starting from the specified eventID on the specified card to become + * a value composed of the combination of magic and value.
+ * Note: [eventID, eventID + flagNum) + */ + __aicore__ inline void WaitSyncFlag(uint64_t magic, uint64_t value, uint64_t eventID, int32_t rank, int64_t flagNum) + { + uint64_t v = MergeMagicWithValue(magic, value); + WaitOneRankPartFlag((__gm__ uint64_t *)(shareAddrs[rank]) + eventID * FLAG_UNIT_INT_NUM, flagNum, v); } __aicore__ inline void ShareToShareSlice() @@ -190,7 +216,7 @@ class NotifyDispatch for (int i = 0; i < copyLen; i++) { readGt1[i].SetGlobalBuffer((__gm__ T *)(shareAddrs[checkRank[i]] + IPC_DATA_OFFSET)); } - sync.WaitSyncFlag(magic, 1, copyOffset, rank, copyLen); + WaitSyncFlag(magic, 1, copyOffset, rank, copyLen); for (int i = 0; i < copyLen; i++) { CpGM2GMPingPong(perRankDataNum * sizeof(T), readGt1[i][rank * perRankDataNum], recvDataOutputGt[checkRank[i] * perRankDataNum], COPYONLY); @@ -198,18 +224,18 @@ class NotifyDispatch } } - FORCE_INLINE_AICORE int64_t GetDataCount(const int64_t dataLen, const int64_t useBlockNum); + __aicore__ inline int64_t GetDataCount(const int64_t dataLen, const int64_t useBlockNum); __aicore__ inline GM_ADDR GetWindAddrByRankId(const int32_t rankId, uint8_t ctxIdx); - __aicore__ inline int32_t GetMagicValue(void); - FORCE_INLINE_AICORE void InitSmallFullMesh(KERNELS_ARGS_FUN_ALL2ALL()); + __aicore__ inline uint64_t GetMagicValue(void); + __aicore__ inline void InitSmallFullMesh(KERNELS_ARGS_FUN_ALL2ALL()); template - FORCE_INLINE_AICORE void SetAtomic(int op); - FORCE_INLINE_AICORE void UnsetAtomic(int op); + __aicore__ inline void SetAtomic(int op); + __aicore__ inline void UnsetAtomic(int op); template - FORCE_INLINE_AICORE void SetWaitEvent(event_t eventId); + __aicore__ inline void SetWaitEvent(event_t eventId); template - FORCE_INLINE_AICORE void CpGM2GMPingPong(int64_t dataSizeRemain, const GlobalTensor &sendDataInputGt, - const GlobalTensor &recvDataOutputGT, int op); + __aicore__ inline void CpGM2GMPingPong(int64_t dataSizeRemain, const GlobalTensor &sendDataInputGt, + const GlobalTensor &recvDataOutputGT, int op); GlobalTensor sendDataInputGt; GlobalTensor tokenPerExpertDataInputGt; @@ -239,25 +265,12 @@ class NotifyDispatch int64_t nodeNum; int64_t localRankId; int64_t localNodeId; - int64_t targetNode; - int64_t targetLocalRankIds[2]; - int64_t queLen; - int64_t queSize; - int64_t coreNumPerStageX; // Number of cores used per stage - int64_t coreNumPerStageY; // Number of cores used per stage - int64_t coreNumPerStageZ; // Number of cores used per stage - int64_t flagNumPerStage; // Number of synchronization flags used per stage - int64_t coreNumPerNode; // Number of cores allocated per node - int64_t coreNumPerRank; // Number of cores allocated per rank - int64_t rankNumPerCore; // Number of ranks responsible per core - int64_t coreGroup; // Functional group of the current core - int64_t targetRank[MULTI_RANK_SIZE]; // Ranks responsible by the current core - int64_t targetRankX; - int64_t targetRankY; - - int64_t queElemLen; // Size of each element in the shared memory queue (in terms of T) - - int64_t copyLen; // Length of the current data slice being copied (in terms of T) + int64_t coreNumPerStageX; // Number of cores used per stage + int64_t coreNumPerStageY; // Number of cores used per stage + int64_t coreNumPerStageZ; // Number of cores used per stage + int64_t coreNumPerRank; // Number of cores allocated per rank + int64_t rankNumPerCore; // Number of ranks responsible per core + int64_t copyLen; // Length of the current data slice being copied (in terms of T) // for coll int rank; @@ -274,18 +287,13 @@ class NotifyDispatch int root; int64_t len; int64_t numExperts; - int64_t magic; + uint64_t magic{0}; int64_t blockIdx; // Index of the current aicore int64_t blockNum; // Total number of aicores for the current rank - int32_t numRanks; - int64_t timeout; - uint16_t *rootRanks; GM_ADDR scale; GM_ADDR shareAddrs[CAM_MAX_RANK_SIZE]; // List of shared memory addresses __gm__ HcclOpResParam *winContext_[COMM_NUM]{nullptr, nullptr}; Hccl hccl_; - GlobalTensor peerMemsAddrGm_; - GlobalTensor dfx; TPipe pipe; TBuf tBuf; TBuf<> tokenPerExpertDataBuf; @@ -295,12 +303,10 @@ class NotifyDispatch uint32_t sendDataAlignLen{0}; uint32_t tokenPerExpertDataAlignLen{0}; uint32_t sendDataOffsetAlignLen{0}; - - SyncCollectives sync; }; template -FORCE_INLINE_AICORE int64_t NotifyDispatch::GetDataCount(const int64_t dataLen, const int64_t useBlockNum) +__aicore__ inline int64_t NotifyDispatch::GetDataCount(const int64_t dataLen, const int64_t useBlockNum) { return dataLen / useBlockNum; } @@ -319,13 +325,13 @@ __aicore__ inline GM_ADDR NotifyDispatch::GetWindAddrByRankId(const int32_t r // Assign values to winContext_[COMM_EP_IDX] and blockIdx before calling template -__aicore__ inline int32_t NotifyDispatch::GetMagicValue(void) +__aicore__ inline uint64_t NotifyDispatch::GetMagicValue(void) { - int32_t magic = 0; - GlobalTensor selfDataStatusTensor; + uint64_t magic = 0; + GlobalTensor selfDataStatusTensor; GM_ADDR statusDataSpaceGm = hccl_.GetWindowsInAddr(rank) + winContext_[COMM_EP_IDX]->winSize - Moe::STATE_SIZE * 3; - selfDataStatusTensor.SetGlobalBuffer((__gm__ int32_t *)(statusDataSpaceGm + STATE_WIN_OFFSET)); - DataCacheCleanAndInvalid( + selfDataStatusTensor.SetGlobalBuffer((__gm__ uint64_t *)(statusDataSpaceGm + STATE_WIN_OFFSET)); + DataCacheCleanAndInvalid( selfDataStatusTensor[blockIdx * UB_ALIGN_SIZE]); magic = selfDataStatusTensor(blockIdx * UB_ALIGN_SIZE); if (magic <= 0) { @@ -336,7 +342,7 @@ __aicore__ inline int32_t NotifyDispatch::GetMagicValue(void) } template -FORCE_INLINE_AICORE void NotifyDispatch::InitSmallFullMesh(KERNELS_ARGS_FUN_ALL2ALL()) +__aicore__ inline void NotifyDispatch::InitSmallFullMesh(KERNELS_ARGS_FUN_ALL2ALL()) { this->root = root; this->len = len; @@ -369,7 +375,7 @@ FORCE_INLINE_AICORE void NotifyDispatch::InitSmallFullMesh(KERNELS_ARGS_FUN_A shareAddrs[rank] = GetWindAddrByRankId(rank, ctxIdx) + (this->magic % PING_PONG_SIZE) * (IPC_BUFF_MAX_SIZE + IPC_DATA_OFFSET); - int64_t rankNumPerCore = (rankSize + MAX_CORE_NUM - 1) / MAX_CORE_NUM; + int64_t rankNumPerCore = (rankSize + blockNum - 1) / blockNum; int64_t copyOffset = blockIdx * rankNumPerCore; int64_t copyLen = rankSize - copyOffset < rankNumPerCore ? rankSize - copyOffset : rankNumPerCore; if (copyLen > 0) { @@ -394,8 +400,6 @@ FORCE_INLINE_AICORE void NotifyDispatch::InitSmallFullMesh(KERNELS_ARGS_FUN_A } pipe.InitBuffer(tBuf, UB_SINGLE_TOTAL_SIZE_MAX); - - sync.Init(rank, rankSize, shareAddrs, tBuf); } /** @@ -413,9 +417,9 @@ FORCE_INLINE_AICORE void NotifyDispatch::InitSmallFullMesh(KERNELS_ARGS_FUN_A */ template template -FORCE_INLINE_AICORE void NotifyDispatch::CpGM2GMPingPong(int64_t dataSizeRemain, - const GlobalTensor &sendDataInputGt, - const GlobalTensor &recvDataOutputGT, int op) +__aicore__ inline void NotifyDispatch::CpGM2GMPingPong(int64_t dataSizeRemain, + const GlobalTensor &sendDataInputGt, + const GlobalTensor &recvDataOutputGT, int op) { // General case (U = K), input/output are the same, share one UB // Only when conversion is needed (U->K), UB will be divided into two parts according to the ratio of @@ -476,7 +480,7 @@ FORCE_INLINE_AICORE void NotifyDispatch::CpGM2GMPingPong(int64_t dataSizeRema template template -FORCE_INLINE_AICORE void NotifyDispatch::SetAtomic(int op) +__aicore__ inline void NotifyDispatch::SetAtomic(int op) { PipeBarrier(); if (op != -1) { @@ -488,7 +492,7 @@ FORCE_INLINE_AICORE void NotifyDispatch::SetAtomic(int op) } template -FORCE_INLINE_AICORE void NotifyDispatch::UnsetAtomic(int op) +__aicore__ inline void NotifyDispatch::UnsetAtomic(int op) { if (op != -1) { AscendC::SetAtomicNone(); @@ -498,7 +502,7 @@ FORCE_INLINE_AICORE void NotifyDispatch::UnsetAtomic(int op) template template -FORCE_INLINE_AICORE void NotifyDispatch::SetWaitEvent(event_t eventId) +__aicore__ inline void NotifyDispatch::SetWaitEvent(event_t eventId) { AscendC::SetFlag(eventId); AscendC::WaitFlag(eventId); diff --git a/csrc/deepep/ops2/op_kernel/notify_dispatch_a2.cpp b/csrc/deepep/ops2/op_kernel/notify_dispatch_a2.cpp index 064d4de9..9c40e78d 100644 --- a/csrc/deepep/ops2/op_kernel/notify_dispatch_a2.cpp +++ b/csrc/deepep/ops2/op_kernel/notify_dispatch_a2.cpp @@ -24,10 +24,6 @@ extern "C" __global__ __aicore__ void notify_dispatch_a2(GM_ADDR sendData, GM_AD REGISTER_TILING_DEFAULT(NotifyDispatchA2TilingData); GET_TILING_DATA_WITH_STRUCT(NotifyDispatchA2TilingData, tilingData, tiling); - // hcomm will set magic later in init - uint32_t magic = 1; - GM_ADDR commArgs = nullptr; - int localRank = tilingData.notifyDispatchInfoA2.localRankId; int localRankSize = tilingData.notifyDispatchInfoA2.localRankSize; int rank = tilingData.notifyDispatchInfoA2.rankId; diff --git a/csrc/deepep/ops2/op_kernel/notify_dispatch_a2.h b/csrc/deepep/ops2/op_kernel/notify_dispatch_a2.h index 3dafc181..8dd554b1 100644 --- a/csrc/deepep/ops2/op_kernel/notify_dispatch_a2.h +++ b/csrc/deepep/ops2/op_kernel/notify_dispatch_a2.h @@ -6,7 +6,6 @@ #include "comm_args.h" #include "data_copy.h" -#include "sync_collectives.h" #include "moe_distribute_base.h" #include "notify_dispatch_tiling_a2.h" @@ -17,16 +16,16 @@ using namespace Moe; GM_ADDR sendDataInput, GM_ADDR tokenPerExpertDataInput, GM_ADDR tmpDataInput, GM_ADDR sendDataOffsetOutput, \ GM_ADDR recvDataOutput, int64_t len, int64_t numTokens, int64_t topkNum, int64_t numExperts, int op, int root, \ int cycleCount, GM_ADDR scale, int64_t scaleCount, GM_ADDR offset, int localRank, int localRankSize, \ - GM_ADDR commArgs, GM_ADDR tokenServerIdxOutput, GM_ADDR tokensUniquePerServerOutput, \ - GM_ADDR epRankTokenCntOutput, GM_ADDR localEpTokenCntOutput, GM_ADDR srcOffsetRankTokenIdxOutput, \ - GM_ADDR dstOffsetRankTokenIdxOutput, GM_ADDR offsetInnerOutput, GM_ADDR countOuterOutput, \ - GM_ADDR expandIdxOutput, GM_ADDR workspace, GM_ADDR tiling - -#define KERNELS_ARGS_CALL_A2_ALL2ALL() \ - sendDataInput, tokenPerExpertDataInput, tmpDataInput, sendDataOffsetOutput, recvDataOutput, len, numTokens, \ - topkNum, numExperts, op, root, cycleCount, scale, scaleCount, offset, localRank, localRankSize, commArgs, \ - tokenServerIdxOutput, tokensUniquePerServerOutput, epRankTokenCntOutput, localEpTokenCntOutput, \ - srcOffsetRankTokenIdxOutput, dstOffsetRankTokenIdxOutput, offsetInnerOutput, countOuterOutput, \ + GM_ADDR tokenServerIdxOutput, GM_ADDR tokensUniquePerServerOutput, GM_ADDR epRankTokenCntOutput, \ + GM_ADDR localEpTokenCntOutput, GM_ADDR srcOffsetRankTokenIdxOutput, GM_ADDR dstOffsetRankTokenIdxOutput, \ + GM_ADDR offsetInnerOutput, GM_ADDR countOuterOutput, GM_ADDR expandIdxOutput, GM_ADDR workspace, \ + GM_ADDR tiling + +#define KERNELS_ARGS_CALL_A2_ALL2ALL() \ + sendDataInput, tokenPerExpertDataInput, tmpDataInput, sendDataOffsetOutput, recvDataOutput, len, numTokens, \ + topkNum, numExperts, op, root, cycleCount, scale, scaleCount, offset, localRank, localRankSize, \ + tokenServerIdxOutput, tokensUniquePerServerOutput, epRankTokenCntOutput, localEpTokenCntOutput, \ + srcOffsetRankTokenIdxOutput, dstOffsetRankTokenIdxOutput, offsetInnerOutput, countOuterOutput, \ expandIdxOutput, workspace, tiling // #define ENABLE_PRINT @@ -59,9 +58,7 @@ class NotifyDispatchA2 constexpr static int64_t MULTI_RANK_SIZE = 4; // 每个core最多往4个rank发送数据,64卡场景 constexpr static int64_t MAX_RANK_SIZE = 64; // 910B设备本算子最大支持的rank数,64卡场景 constexpr static int32_t INVALID_RANK = -1; - constexpr static uint32_t TEMP_BUF_LEN = 128 * 1024; // tuf注册长度为128K,剩余部分注册为其他buffer - constexpr static uint32_t SYSTEM_NEED_WORKSPACE = 16 * 1024 * 1024; // 对齐tiling - // constexpr static uint32_t NOTIFY_SHARE_OFFSET = 3696U * 1024 * 1024; // 最大HCCL_BUFFSIZE - 400 + constexpr static uint32_t TEMP_BUF_LEN = 128 * 1024; // tBuf注册长度为128K,剩余部分注册为其他buffer constexpr static uint32_t BW_ITEM_SIZE = 32; // = sizeof(BatchWriteItem) constexpr static uint32_t U64_PER_ITEM = BW_ITEM_SIZE / sizeof(uint64_t); // 每个BatchWriteItem占多少个unit64 @@ -80,10 +77,13 @@ class NotifyDispatchA2 constexpr static uint32_t DEST_RANK_OFFSET = 20; // destRankId 在 statusTensor中的offset, bytes constexpr static uint32_t DATALEN_OFFSET = 24; // dataLen 在 statusTensor中的offset, bytes constexpr static uint32_t UB_ALIGN = 32; // UB按32字节对齐 - constexpr static uint32_t EXP_TOKEN_COUNT_FLAG_CNT = UB_ALIGN / sizeof(int32_t); // 8 - constexpr static uint32_t GM_ALIGN = 64; // GM按64字节对齐 + constexpr static uint64_t EXP_TOKEN_COUNT_FLAG_CNT = UB_ALIGN / sizeof(uint64_t); // 4 + constexpr static uint32_t GM_ALIGN = 64; // GM按64字节对齐 constexpr static uint32_t MAX_BS = 4096; // 每卡支持的最大bs + // Synchronization flag occupies length + constexpr static int64_t FLAG_UNIT_INT_NUM = 4; + constexpr static int64_t MAGIC_MASK = ~((1LL << 32) - 1); public: __aicore__ inline NotifyDispatchA2(int rank, int rankSize, uint32_t extraFlag) @@ -170,8 +170,9 @@ class NotifyDispatchA2 // 第二阶段,处理server内通信 ProcessWithinServer(); SyncAll(); + // 交换后的数据拆分和计算输出 - SplitAndCalcData(); // TODO: 先验证recv_data + SplitAndCalcData(); SyncAll(); hccl_.Finalize(); @@ -179,7 +180,7 @@ class NotifyDispatchA2 } private: - FORCE_INLINE_AICORE void InitAll2AllLayeredRdma(KERNELS_ARGS_FUN_A2_ALL2ALL()) + __aicore__ inline void InitAll2AllLayeredRdma(KERNELS_ARGS_FUN_A2_ALL2ALL()) { this->root = 0; this->len = len; @@ -212,15 +213,15 @@ class NotifyDispatchA2 this->winContext_[COMM_EP_IDX] = (__gm__ HcclOpResParam *)contextGM0; notifyMemoryOffset = winContext_[COMM_EP_IDX]->winSize - IPC_BUFF_MAX_SIZE * 2; // 设置并自增magic - magicTensor_.SetGlobalBuffer((__gm__ int32_t *)(hccl_.GetWindowsInAddr(rank) + IPC_DATA_OFFSET - - blockNum * sizeof(int32_t) * EXP_TOKEN_COUNT_FLAG_CNT + - notifyMemoryOffset)); + magicTensor_.SetGlobalBuffer((__gm__ uint64_t *)(hccl_.GetWindowsInAddr(rank) + IPC_DATA_OFFSET - + blockNum * sizeof(uint64_t) * EXP_TOKEN_COUNT_FLAG_CNT + + notifyMemoryOffset)); pipe.InitBuffer(this->tBuf, TEMP_BUF_LEN); - LocalTensor tempLocal = tBuf.Get(); + LocalTensor tempLocal = tBuf.Get(); tempLocal(0) = 1; // 使用atomic方式实现+1 - AscendC::SetAtomicAdd(); + AscendC::SetAtomicAdd(); AscendC::SetFlag(EVENT_ID0); AscendC::WaitFlag(EVENT_ID0); // 等待SetValue完成 DataCopy(magicTensor_[blockIdx * EXP_TOKEN_COUNT_FLAG_CNT], tempLocal, EXP_TOKEN_COUNT_FLAG_CNT); @@ -234,18 +235,16 @@ class NotifyDispatchA2 this->shareAddrs[i] = hccl_.GetWindowsInAddr(i) + notifyMemoryOffset + (magic % PING_PONG_SIZE) * IPC_BUFF_MAX_SIZE; } - - sync.Init(this->rank, this->rankSize, this->shareAddrs, tBuf); } template - FORCE_INLINE_AICORE void CpGM2GMPingPong(int64_t dataSizeRemain, const GlobalTensor &sendDataInputGt, - const GlobalTensor &recvDataOutputGT, int op); + __aicore__ inline void CpGM2GMPingPong(int64_t dataSizeRemain, const GlobalTensor &sendDataInputGt, + const GlobalTensor &recvDataOutputGT, int op); template - FORCE_INLINE_AICORE void SetAtomic(int op); - FORCE_INLINE_AICORE void UnsetAtomic(int op); + __aicore__ inline void SetAtomic(int op); + __aicore__ inline void UnsetAtomic(int op); template - FORCE_INLINE_AICORE void SetWaitEvent(event_t eventId); + __aicore__ inline void SetWaitEvent(event_t eventId); __aicore__ inline void InitTensorLen() { @@ -329,6 +328,67 @@ class NotifyDispatchA2 } } + __aicore__ inline uint64_t MergeMagicWithValue(uint64_t magic, uint64_t value) + { + // magic as the high part, eventID as the low part, combined into a value for comparison + return (magic * 2ULL + value); + } + + // Wait for a part of synchronization flags within a rank + __aicore__ inline void WaitOneRankPartFlag(__gm__ uint64_t *waitAddr, int64_t flagNum, uint64_t checkValue) + { + GlobalTensor globalWait; + globalWait.SetGlobalBuffer(waitAddr, flagNum * FLAG_UNIT_INT_NUM); + LocalTensor localWait = tBuf.GetWithOffset(flagNum * FLAG_UNIT_INT_NUM, 0); + bool isSync = true; + uint64_t checkedFlagNum = 0; + do { + // Copy global synchronization flags to local + DataCopy(localWait, globalWait[checkedFlagNum * FLAG_UNIT_INT_NUM], + (flagNum - checkedFlagNum) * FLAG_UNIT_INT_NUM); + SetWaitEvent(EVENT_ID0); // Wait for GM->UB + + // Check if the synchronization flags are equal to checkValue + isSync = true; + uint64_t remainToCheck = flagNum - checkedFlagNum; + for (auto i = 0; i < remainToCheck; ++i) { + // Continue waiting if any core has not reached the checkValue phase + uint64_t v = localWait.GetValue(i * FLAG_UNIT_INT_NUM); + if ((v & MAGIC_MASK) != (checkValue & MAGIC_MASK) || v < checkValue) { + isSync = false; + checkedFlagNum += i; + break; + } + } + } while (!isSync); + } + + __aicore__ inline void SetInnerFlag(uint64_t magic, uint64_t eventID, int64_t setRank, int64_t setBlock) + { + uint64_t value = MergeMagicWithValue(magic, eventID); + // SetFlag((__gm__ uint64_t *)(shareAddrs[setRank]) + setBlock * FLAG_UNIT_INT_NUM, value); + __gm__ uint64_t *setAddr = (__gm__ uint64_t *)(shareAddrs[setRank]) + setBlock * FLAG_UNIT_INT_NUM; + + SetWaitEvent(EVENT_ID0); + SetWaitEvent(EVENT_ID0); + GlobalTensor globalSet; + globalSet.SetGlobalBuffer(setAddr, FLAG_UNIT_INT_NUM); + LocalTensor localSet = tBuf.GetWithOffset(1, 0); + localSet.SetValue(0, value); + + // Copy global synchronization flag to local + SetWaitEvent(EVENT_ID0); + DataCopy(globalSet, localSet, FLAG_UNIT_INT_NUM); + SetWaitEvent(EVENT_ID0); + } + + // Wait for a single inner-card synchronization flag + __aicore__ inline void WaitInnerFlag(uint64_t magic, uint64_t eventID, int64_t waitRank, int64_t waitBlock) + { + uint64_t value = MergeMagicWithValue(magic, eventID); + WaitOneRankPartFlag((__gm__ uint64_t *)(shareAddrs[waitRank]) + waitBlock * FLAG_UNIT_INT_NUM, 1, value); + } + __aicore__ inline void InputToShareSlice() { if (blockIdx > 0) { @@ -349,7 +409,7 @@ class NotifyDispatchA2 // 给当前server每卡写入serverNum个标记,位置为 rank + j * localRankSize int32_t offset = rank + j * rankSize; // rank0: 0,16 / rank8: 8,24 // rank0,server0: 0-7,16-23 rank8,server1: 8-15,24-31 - sync.SetInnerFlag(magic, 1, curServerRankId, offset); + SetInnerFlag(magic, 1, curServerRankId, offset); } } } @@ -380,7 +440,7 @@ class NotifyDispatchA2 // server0: 0-7,16-23 server1: 8-15,24-31 int32_t offset = (i / serverNum + serverId * localRankSize) + (i % serverNum) * rankSize; - sync.WaitInnerFlag(magic, 1, rank, offset); + WaitInnerFlag(magic, 1, rank, offset); remoteGt.SetGlobalBuffer((__gm__ T *)(shareAddrs[targetRankId] + IPC_DATA_OFFSET + serverTarRankId * queSize + @@ -414,7 +474,6 @@ class NotifyDispatchA2 } int32_t targetRankId = 0; if (blockIdx == 0) { - // targetRankId = rank; return; // 同server的不搬运 } else { // blockIdx=1 targetRankId = (1 - serverId) * localRankSize + localRank; // 2个server的计算方式,求对端同号卡rankid @@ -550,13 +609,13 @@ class NotifyDispatchA2 pipe.Reset(); pipe.InitBuffer(tempBuf_, UB_ALIGN); // 存放临时的立即数 pipe.InitBuffer(tempBuf2_, - Ceil(4096 * sizeof(int32_t), UB_ALIGN) * UB_ALIGN); // MAX_BS <= 4096, 要能放下一个bs的数据 + Ceil(MAX_BS * sizeof(int32_t), UB_ALIGN) * UB_ALIGN); // MAX_BS <= 4096, 要能放下一个bs的数据 pipe.InitBuffer(tempBuf3_, Ceil(numExperts * sizeof(int32_t), UB_ALIGN) * UB_ALIGN); // 要能放numExpert个数据 pipe.InitBuffer(tempBuf7_, Ceil(numExperts * sizeof(int32_t), UB_ALIGN) * UB_ALIGN); // 要能放numExpert个数据 pipe.InitBuffer(tempBuf8_, - Ceil(4096 * sizeof(int32_t), UB_ALIGN) * UB_ALIGN); // MAX_BS <= 4096, 要能放下一个bs的数据 + Ceil(MAX_BS * sizeof(int32_t), UB_ALIGN) * UB_ALIGN); // MAX_BS <= 4096, 要能放下一个bs的数据 pipe.InitBuffer(tempBuf9_, - Ceil(4096 * sizeof(int32_t), UB_ALIGN) * UB_ALIGN); // MAX_BS <= 4096, 要能放下一个bs的数据 + Ceil(MAX_BS * sizeof(int32_t), UB_ALIGN) * UB_ALIGN); // MAX_BS <= 4096, 要能放下一个bs的数据 pipe.InitBuffer(tempBuf10_, Ceil(numExperts * sizeof(int32_t), UB_ALIGN) * UB_ALIGN); // 要能放numExpert个数据 pipe.InitBuffer(tempBuf4_, 1000 * sizeof(float)); // 要能放localExp从所有rank接收token的数据 @@ -1157,7 +1216,7 @@ class NotifyDispatchA2 int64_t numExperts; int64_t numTokens; int64_t len; - int64_t magic; + uint64_t magic; int64_t blockIdx; // 当前aicore序号 int64_t blockNum; // 当前rank的总aicore数 int64_t timeout; @@ -1166,12 +1225,11 @@ class NotifyDispatchA2 __gm__ HcclOpResParam *winContext_[COMM_NUM]{nullptr, nullptr}; TPipe pipe; // pipe工具类 TBuf tBuf; - SyncCollectives sync; Hccl hccl_; GM_ADDR windowInGM_; GM_ADDR windowOutGM_; - GlobalTensor magicTensor_; // 用于存放magic,位于windowInstatusTensor_之前 + GlobalTensor magicTensor_; // 用于存放magic,位于windowInstatusTensor_之前 GlobalTensor batchWriteInfoTensor_; GlobalTensor windowInstatusTensor_; // 用于rank间状态同步 GlobalTensor windowInTensor_; @@ -1216,7 +1274,6 @@ class NotifyDispatchA2 uint32_t gExpertMaxBsOriOffsetAlignLen{0}; // 全局,包含所有rank的 uint32_t notifyMemoryOffset{0}; - // GM_ADDR dataSpaceGT_; GlobalTensor gRankEpTokenCntGT_; // 临时数据 GlobalTensor gExpertMaxBsSrcGT_; // 临时数据 @@ -1241,7 +1298,7 @@ class NotifyDispatchA2 template template -FORCE_INLINE_AICORE void NotifyDispatchA2::SetAtomic(int op) +__aicore__ inline void NotifyDispatchA2::SetAtomic(int op) { PipeBarrier(); if (op != -1) { @@ -1254,14 +1311,14 @@ FORCE_INLINE_AICORE void NotifyDispatchA2::SetAtomic(int op) template template -FORCE_INLINE_AICORE void NotifyDispatchA2::SetWaitEvent(event_t eventId) +__aicore__ inline void NotifyDispatchA2::SetWaitEvent(event_t eventId) { AscendC::SetFlag(eventId); AscendC::WaitFlag(eventId); } template -FORCE_INLINE_AICORE void NotifyDispatchA2::UnsetAtomic(int op) +__aicore__ inline void NotifyDispatchA2::UnsetAtomic(int op) { if (op != -1) { AscendC::SetAtomicNone(); @@ -1271,9 +1328,9 @@ FORCE_INLINE_AICORE void NotifyDispatchA2::UnsetAtomic(int op) template template -FORCE_INLINE_AICORE void NotifyDispatchA2::CpGM2GMPingPong(int64_t dataSizeRemain, - const GlobalTensor &sendDataInputGt, - const GlobalTensor &recvDataOutputGT, int op) +__aicore__ inline void NotifyDispatchA2::CpGM2GMPingPong(int64_t dataSizeRemain, + const GlobalTensor &sendDataInputGt, + const GlobalTensor &recvDataOutputGT, int op) { // General case (U = K), input/output are the same, share one UB // Only when conversion is needed (U->K), UB will be divided into two parts according to the ratio of @@ -1332,4 +1389,4 @@ FORCE_INLINE_AICORE void NotifyDispatchA2::CpGM2GMPingPong(int64_t dataSizeRe return; } -#endif /* ALL2ALL_V_LAYERED_RDMA_H */ +#endif /* NOTIFY_DISPATCH_A2_H */