diff --git a/example/user.cc b/example/user.cc index b049a93..6807db4 100644 --- a/example/user.cc +++ b/example/user.cc @@ -10,26 +10,25 @@ start::start(CkArgMsg *msg) { } n = atoi(msg->argv[1]); - k = atoi(msg->argv[2]); + k = atoi(msg->argv[2]);// TODO: remove this x = atoi(msg->argv[3]); y = atoi(msg->argv[4]); + sizeArray = (long int *)malloc((n + 1) * sizeof(long int)); delete msg; - sim = CProxy_simBox::ckNew(thisProxy, k, n, x, y, n); + #ifdef FLOODING + CProxy_AllGather allGatherProxy = CProxy_AllGather::ckNew(n, (int)allGatherType::ALL_GATHER_FLOODING); + #endif + + #ifdef HYPERCUBE + CProxy_AllGather allGatherProxy = CProxy_AllGather::ckNew(n, (int)allGatherType::ALL_GATHER_HYPERCUBE); + #endif + + #ifdef RING + CProxy_AllGather allGatherProxy = CProxy_AllGather::ckNew(n, (int)allGatherType::ALL_GATHER_RING); + #endif -#ifdef FLOODING - AllGather = CProxy_AllGather::ckNew(k, n, (int)allGatherType::ALL_GATHER_FLOODING); -#endif - -#ifdef HYPERCUBE - AllGather = CProxy_AllGather::ckNew(k, n, (int)allGatherType::ALL_GATHER_HYPERCUBE); -#endif - -#ifdef RING - AllGather = CProxy_AllGather::ckNew(k, n, (int)allGatherType::ALL_GATHER_RING); -#endif - - sim.begin(AllGather); + sim = CProxy_simBox::ckNew(thisProxy, allGatherProxy, n, x, y, n); } void start::fini(int numDone) { @@ -38,26 +37,45 @@ void start::fini(int numDone) { CkExit(); } } +void start::gatherSize(int arrayIndex, int dataSize) { + sizeArray[arrayIndex] = dataSize; + numSizeGathered++; + if(numSizeGathered==n){ + long int* dispArray = (long int *)malloc((n + 1) * sizeof(long int)); + // do a prefix sum + dispArray[0] = 0; + for (int i = 1; i < n + 1; i++) { + dispArray[i] = dispArray[i - 1] + sizeArray[i - 1]; + } + // send the displacement arrays to begin + sim.begin(dispArray, n+1); + + } +} -simBox::simBox(CProxy_start startProxy, int k, int n, int x, int y) - : startProxy(startProxy), k(k), n(n), x(x), y(y) { - result = (long int *)malloc(k * n * sizeof(long int)); - data = (long int *)malloc(k * sizeof(long int)); +simBox::simBox(CProxy_start startProxy, CProxy_AllGather allGatherProxy, int n, int x, int y) + : startProxy(startProxy), allGatherProxy(allGatherProxy), n(n), x(x), y(y) { + srand(thisIndex); + dataSize = random()%100+1; + data = (long int *)malloc(dataSize*sizeof(long int)); long int max_serial = (1 << y) - 1; long int base = thisIndex; while (max_serial > 0) { base = base * 10; max_serial = max_serial / 10; } - for (int i = 0; i < k; i++) { + for (int i = 0; i < dataSize; i++) { data[i] = base + i; } + startProxy.gatherSize(thisIndex, dataSize); } -void simBox::begin(CProxy_AllGather AllGatherGroup) { +void simBox::begin(long* dispArray, int _) { + this->dispArray = dispArray; + result = (long int *)malloc(dispArray[n]*sizeof(long int)); CkCallback cb(CkIndex_simBox::done(NULL), CkArrayIndex1D(thisIndex), thisProxy); - AllGather* libptr = AllGatherGroup.ckLocalBranch(); - libptr->init(result, data, cb); + AllGather* libptr = allGatherProxy.ckLocalBranch(); + libptr->init(result, data, dispArray, thisIndex, cb); } void simBox::done(allGatherMsg *msg) { @@ -69,8 +87,10 @@ void simBox::done(allGatherMsg *msg) { base = base * 10; max_serial = max_serial / 10; } - for(int j = 0; j < k; j++) { - if(result[i * k + j] != base + j) { + long int dataSize = dispArray[i+1] - dispArray[i]; + int offset = dispArray[i]; + for(int j = 0; j < dataSize; j++) { + if(result[offset + j] != base + j) { success = false; break; } @@ -81,7 +101,7 @@ void simBox::done(allGatherMsg *msg) { if(success) ckout << "[STATUS] Correct result for Chare " << thisIndex << endl; else { ckout << "[STATUS] Incorrect result for Chare " << thisIndex << endl; - for(int i = 0; i < n * k; i++) { + for(int i = 0; i < dispArray[thisIndex+1] - dispArray[thisIndex]; i++) { ckout << result[i] << " "; } ckout << endl; diff --git a/example/user.ci b/example/user.ci index 7abb604..22712d8 100644 --- a/example/user.ci +++ b/example/user.ci @@ -4,11 +4,12 @@ mainmodule user { mainchare start { entry start(CkArgMsg * m); entry[reductiontarget] void fini(int numDone); + entry void gatherSize(int arrayIndex, int dataSize); }; array[1D] simBox { - entry simBox(CProxy_start startProxy, int k, int n, int x, int y); - entry void begin(CProxy_AllGather allGatherProxy); + entry simBox(CProxy_start startProxy, CProxy_AllGather allGatherProxy, int n, int x, int y); + entry void begin(long dispArray[size], int size); entry void done(allGatherMsg * m); } }; diff --git a/example/user.hh b/example/user.hh index c69c011..19d5de5 100644 --- a/example/user.hh +++ b/example/user.hh @@ -9,28 +9,33 @@ private: int x; int y; CProxy_simBox sim; - CProxy_AllGather AllGather; + int numSizeGathered{}; + long int* sizeArray; public: start(CkArgMsg *msg); void fini(int numDone); + + void gatherSize(int arrayIndex, int dataSize); }; class simBox : public CBase_simBox { private: CProxy_start startProxy; - int k; + long dataSize; int n; int x; int y; long int *data; long int *result; + long int *dispArray; + CProxy_AllGather allGatherProxy; public: - simBox(CProxy_start startProxy, int k, int n, int x, int y); + simBox(CProxy_start startProxy, CProxy_AllGather allGatherProxy, int n, int x, int y); - void begin(CProxy_AllGather AllGather); + void begin(long* dispArray, int size); void done(allGatherMsg *msg); }; diff --git a/src/allGather/allGather.cc b/src/allGather/allGather.cc index 125a832..6dbcb44 100644 --- a/src/allGather/allGather.cc +++ b/src/allGather/allGather.cc @@ -9,7 +9,7 @@ int AllGather::gen_rand() { return dis(gen); } -AllGather::AllGather(int k, int n, int type) : k(k), n(n) { +AllGather::AllGather(int n, int type) : n(n) { this->type = (allGatherType)type; switch (type) { case allGatherType::ALL_GATHER_HYPERCUBE: { @@ -54,8 +54,10 @@ void AllGather::initdone() { } // TODO: remove this broadcast -void AllGather::init(long int* result, long int* data, CkCallback cb) { +void AllGather::init(long int* result, long int* data, long int* dispArray, int idx, CkCallback cb) { this->lib_done_callback = cb; + this->dispArray = dispArray; + this->idx = idx; zero_copy_callback = CkCallback(CkIndex_AllGather::local_buff_done(NULL), thisProxy[CkMyPe()]); dum_dum = CkCallback(CkCallback::ignore); this->store = result; @@ -71,79 +73,50 @@ void AllGather::local_buff_done(CkDataMsg *m) { } void AllGather::startGather() { - int currPE = CkMyPe(); - for (int i = 0; i < k; i++) { - store[k * currPE + i] = data[i]; + int offset = dispArray[idx]; + int dataSize = dispArray[idx + 1] - dispArray[idx]; + for (int i = 0; i < dataSize; i++) { + store[offset + i] = data[i]; } - CkNcpyBuffer src(data, k*sizeof(long int), dum_dum, CK_BUFFER_UNREG); - + CkNcpyBuffer src(data, dataSize * sizeof(long int), dum_dum, CK_BUFFER_UNREG); switch (type) { case allGatherType::ALL_GATHER_RING: { -#ifdef TIMESTAMP - thisProxy[(currPE + 1) % n].recvRing( - currPE, src, (timeStamp + alpha + beta * k * 8)); - timeStamp += alpha; -#else - thisProxy[(currPE + 1) % n].recvRing(currPE, src, 0.0); -#endif + thisProxy[(idx + 1) % n].recvRing(idx, src); } break; case allGatherType::ALL_GATHER_HYPERCUBE: { - hyperCubeIndx.push_back(currPE); + hyperCubeIndx.push_back(idx); hyperCubeStore.push_back(src); - thisProxy[currPE].Hypercube(); + thisProxy[idx].Hypercube(); } break; case allGatherType::ALL_GATHER_FLOODING: { - recvFloodMsg[currPE] = true; + recvFloodMsg[idx] = true; for (int i = 0; i < n; i++) { - if (graph[currPE][i] == 1) { -#ifdef TIMESTAMP - thisProxy[i].Flood(currPE, src, - (timeStamp + alpha + beta * k * 8)); - timeStamp += alpha; -#else - thisProxy[i].Flood(currPE, src, 0.0); -#endif + if (graph[idx][i] == 1) { + thisProxy[i].Flood(idx, src); } } } break; } } -void AllGather::recvRing(int sender, CkNcpyBuffer src, double recvTime) { - CkNcpyBuffer dst(store + sender * k, k * sizeof(long int), zero_copy_callback, CK_BUFFER_UNREG); +void AllGather::recvRing(int sender, CkNcpyBuffer src) { + CkNcpyBuffer dst(store + dispArray[sender], (dispArray[sender+1] - dispArray[sender])*sizeof(long int), zero_copy_callback, CK_BUFFER_UNREG); dst.get(src); -#ifdef TIMESTAMP - timeStamp = std::max(recvTime, timeStamp); -#endif if (((CkMyPe() + 1) % n) != sender) { -#ifdef TIMESTAMP - thisProxy[(CkMyPe() + 1) % n].recvRing( - sender, src, (timeStamp + alpha + beta * k * 8)); - timeStamp += alpha; -#else - thisProxy[(CkMyPe() + 1) % n].recvRing(sender, src, 0.0); -#endif + thisProxy[(CkMyPe() + 1) % n].recvRing(sender, src); } } -void AllGather::Flood(int sender, CkNcpyBuffer src, double recvTime) { +void AllGather::Flood(int sender, CkNcpyBuffer src) { if (recvFloodMsg[sender]) { return; } recvFloodMsg[sender] = true; - CkNcpyBuffer dst(store + sender * k, k * sizeof(long int), zero_copy_callback, CK_BUFFER_UNREG); + CkNcpyBuffer dst(store + dispArray[sender], (dispArray[sender+1] - dispArray[sender])*sizeof(long int), zero_copy_callback, CK_BUFFER_UNREG); dst.get(src); -#ifdef TIMESTAMP - timeStamp = std::max(recvTime, timeStamp); -#endif for (int i = 0; i < n; i++) { if (graph[CkMyPe()][i] == 1 and i != sender) { -#ifdef TIMESTAMP - thisProxy[i].Flood(sender, src, (timeStamp + alpha + beta * k * 8)); - timeStamp += alpha; -#else - thisProxy[i].Flood(sender, src, 0.0); -#endif + thisProxy[i].Flood(sender, src); } } } diff --git a/src/allGather/allGather.ci b/src/allGather/allGather.ci index 7e40c42..5b39a5b 100644 --- a/src/allGather/allGather.ci +++ b/src/allGather/allGather.ci @@ -6,10 +6,10 @@ module allGather { message allGatherMsg; group AllGather { - entry AllGather(int size, int n, int type); + entry AllGather(int n, int type); entry void initdone(); entry void startGather(); - entry void recvRing(int sender, CkNcpyBuffer data, double recvTime); + entry void recvRing(int sender, CkNcpyBuffer data); entry void local_buff_done(CkDataMsg *m); entry void Hypercube() { for(iter = 0; iter < numHypercubeIter; iter++) { @@ -19,45 +19,29 @@ module allGather { } else { HypercubeToSend = CkMyPe() ^ ((int)pow(2, iter)); } - } - serial { -#ifdef TIMESTAMP + serial { if(HypercubeRecursiveDoubling && iter == numHypercubeIter-1) { std::vector hyperCubeStoreCopy(hyperCubeStore.begin(), hyperCubeStore.begin()+(n-(int)pow(2,iter))); std::vector hyperCubeIndxCopy(hyperCubeIndx.begin(), hyperCubeIndx.begin()+(n-(int)pow(2,iter))); - thisProxy[HypercubeToSend].recvHypercube(iter, hyperCubeStoreCopy, hyperCubeIndxCopy, (timeStamp + alpha + beta * hyperCubeStore.size() * 8)); + thisProxy[HypercubeToSend].recvHypercube(iter, hyperCubeStoreCopy, hyperCubeIndxCopy); } else { - thisProxy[HypercubeToSend].recvHypercube(iter, hyperCubeStore, hyperCubeIndx, (timeStamp + alpha + beta * hyperCubeStore.size() * 8)); + thisProxy[HypercubeToSend].recvHypercube(iter, hyperCubeStore, hyperCubeIndx); } - timeStamp += alpha; -#else - - if(HypercubeRecursiveDoubling && iter == numHypercubeIter-1) { - std::vector hyperCubeStoreCopy(hyperCubeStore.begin(), hyperCubeStore.begin()+(n-(int)pow(2,iter))); - std::vector hyperCubeIndxCopy(hyperCubeIndx.begin(), hyperCubeIndx.begin()+(n-(int)pow(2,iter))); - thisProxy[HypercubeToSend].recvHypercube(iter, hyperCubeStoreCopy, hyperCubeIndxCopy, 0.0); - } else { - thisProxy[HypercubeToSend].recvHypercube(iter, hyperCubeStore, hyperCubeIndx, 0.0); - } -#endif } - when recvHypercube[iter](int ref, std::vector data, std::vector dataIndx, double recvTime) { + when recvHypercube[iter](int ref, std::vector data, std::vector dataIndx) { serial { for(int m = 0; m < data.size(); m++) { hyperCubeStore.emplace_back(data[m]); hyperCubeIndx.emplace_back(dataIndx[m]); - CkNcpyBuffer dst(store + dataIndx[m] * k, k * sizeof(long int), zero_copy_callback, CK_BUFFER_UNREG); + CkNcpyBuffer dst(store + dispArray[dataIndx[m]], (dispArray[dataIndx[m]+1] - dispArray[dataIndx[m]])*sizeof(long int), zero_copy_callback, CK_BUFFER_UNREG); dst.get(data[m]); } -#ifdef TIMESTAMP - timeStamp = std::max(recvTime, timeStamp); -#endif } } } }; - entry void recvHypercube(int ref, std::vector data, std::vector dataIndx, double recvTime); - entry void Flood(int sender, CkNcpyBuffer data, double recvTime); + entry void recvHypercube(int ref, std::vector data, std::vector dataIndx); + entry void Flood(int sender, CkNcpyBuffer data); }; }; diff --git a/src/allGather/allGather.hh b/src/allGather/allGather.hh index 7e21185..8d31723 100644 --- a/src/allGather/allGather.hh +++ b/src/allGather/allGather.hh @@ -20,11 +20,11 @@ enum allGatherType { class AllGather : public CBase_AllGather { private: - int k{}; int n{}; + int idx{}; + long int* dispArray; long int *store; int numRecvMsg{}; - double timeStamp{}; CkCallback lib_done_callback; allGatherType type; int numHypercubeIter{}; @@ -44,19 +44,19 @@ private: public: AllGather_SDAG_CODE - AllGather(int k, int n, int type); + AllGather(int n, int type); void startGather(); - void recvRing(int sender, CkNcpyBuffer data, double recvTime); + void recvRing(int sender, CkNcpyBuffer data); void local_buff_done(CkDataMsg *m); int gen_rand(); - void Flood(int sender, CkNcpyBuffer data, double recvTime); + void Flood(int sender, CkNcpyBuffer data); - void init(long int* result, long int* data, CkCallback cb); + void init(long int* result, long int* data, long int* dispArray, int idx, CkCallback cb); void initdone(); };