Skip to content

All gather v #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: group
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 46 additions & 26 deletions example/user.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,25 @@ start::start(CkArgMsg *msg) {
}

n = atoi(msg->argv[1]);
k = atoi(msg->argv[2]);
k = atoi(msg->argv[2]);// TODO: remove this
x = atoi(msg->argv[3]);
y = atoi(msg->argv[4]);
sizeArray = (long int *)malloc((n + 1) * sizeof(long int));
delete msg;

sim = CProxy_simBox::ckNew(thisProxy, k, n, x, y, n);
#ifdef FLOODING
CProxy_AllGather allGatherProxy = CProxy_AllGather::ckNew(n, (int)allGatherType::ALL_GATHER_FLOODING);
#endif

#ifdef HYPERCUBE
CProxy_AllGather allGatherProxy = CProxy_AllGather::ckNew(n, (int)allGatherType::ALL_GATHER_HYPERCUBE);
#endif

#ifdef RING
CProxy_AllGather allGatherProxy = CProxy_AllGather::ckNew(n, (int)allGatherType::ALL_GATHER_RING);
#endif

#ifdef FLOODING
AllGather = CProxy_AllGather::ckNew(k, n, (int)allGatherType::ALL_GATHER_FLOODING);
#endif

#ifdef HYPERCUBE
AllGather = CProxy_AllGather::ckNew(k, n, (int)allGatherType::ALL_GATHER_HYPERCUBE);
#endif

#ifdef RING
AllGather = CProxy_AllGather::ckNew(k, n, (int)allGatherType::ALL_GATHER_RING);
#endif

sim.begin(AllGather);
sim = CProxy_simBox::ckNew(thisProxy, allGatherProxy, n, x, y, n);
}

void start::fini(int numDone) {
Expand All @@ -38,26 +37,45 @@ void start::fini(int numDone) {
CkExit();
}
}
void start::gatherSize(int arrayIndex, int dataSize) {
sizeArray[arrayIndex] = dataSize;
numSizeGathered++;
if(numSizeGathered==n){
long int* dispArray = (long int *)malloc((n + 1) * sizeof(long int));
// do a prefix sum
dispArray[0] = 0;
for (int i = 1; i < n + 1; i++) {
dispArray[i] = dispArray[i - 1] + sizeArray[i - 1];
}
// send the displacement arrays to begin
sim.begin(dispArray, n+1);

}
}

simBox::simBox(CProxy_start startProxy, int k, int n, int x, int y)
: startProxy(startProxy), k(k), n(n), x(x), y(y) {
result = (long int *)malloc(k * n * sizeof(long int));
data = (long int *)malloc(k * sizeof(long int));
simBox::simBox(CProxy_start startProxy, CProxy_AllGather allGatherProxy, int n, int x, int y)
: startProxy(startProxy), allGatherProxy(allGatherProxy), n(n), x(x), y(y) {
srand(thisIndex);
dataSize = random()%100+1;
data = (long int *)malloc(dataSize*sizeof(long int));
long int max_serial = (1 << y) - 1;
long int base = thisIndex;
while (max_serial > 0) {
base = base * 10;
max_serial = max_serial / 10;
}
for (int i = 0; i < k; i++) {
for (int i = 0; i < dataSize; i++) {
data[i] = base + i;
}
startProxy.gatherSize(thisIndex, dataSize);
}

void simBox::begin(CProxy_AllGather AllGatherGroup) {
void simBox::begin(long* dispArray, int _) {
this->dispArray = dispArray;
result = (long int *)malloc(dispArray[n]*sizeof(long int));
CkCallback cb(CkIndex_simBox::done(NULL), CkArrayIndex1D(thisIndex), thisProxy);
AllGather* libptr = AllGatherGroup.ckLocalBranch();
libptr->init(result, data, cb);
AllGather* libptr = allGatherProxy.ckLocalBranch();
libptr->init(result, data, dispArray, thisIndex, cb);
}

void simBox::done(allGatherMsg *msg) {
Expand All @@ -69,8 +87,10 @@ void simBox::done(allGatherMsg *msg) {
base = base * 10;
max_serial = max_serial / 10;
}
for(int j = 0; j < k; j++) {
if(result[i * k + j] != base + j) {
long int dataSize = dispArray[i+1] - dispArray[i];
int offset = dispArray[i];
for(int j = 0; j < dataSize; j++) {
if(result[offset + j] != base + j) {
success = false;
break;
}
Expand All @@ -81,7 +101,7 @@ void simBox::done(allGatherMsg *msg) {
if(success) ckout << "[STATUS] Correct result for Chare " << thisIndex << endl;
else {
ckout << "[STATUS] Incorrect result for Chare " << thisIndex << endl;
for(int i = 0; i < n * k; i++) {
for(int i = 0; i < dispArray[thisIndex+1] - dispArray[thisIndex]; i++) {
ckout << result[i] << " ";
}
ckout << endl;
Expand Down
5 changes: 3 additions & 2 deletions example/user.ci
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ mainmodule user {
mainchare start {
entry start(CkArgMsg * m);
entry[reductiontarget] void fini(int numDone);
entry void gatherSize(int arrayIndex, int dataSize);
};

array[1D] simBox {
entry simBox(CProxy_start startProxy, int k, int n, int x, int y);
entry void begin(CProxy_AllGather allGatherProxy);
entry simBox(CProxy_start startProxy, CProxy_AllGather allGatherProxy, int n, int x, int y);
entry void begin(long dispArray[size], int size);
entry void done(allGatherMsg * m);
}
};
13 changes: 9 additions & 4 deletions example/user.hh
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,33 @@ private:
int x;
int y;
CProxy_simBox sim;
CProxy_AllGather AllGather;
int numSizeGathered{};
long int* sizeArray;

public:
start(CkArgMsg *msg);

void fini(int numDone);

void gatherSize(int arrayIndex, int dataSize);
};

class simBox : public CBase_simBox {
private:
CProxy_start startProxy;
int k;
long dataSize;
int n;
int x;
int y;
long int *data;
long int *result;
long int *dispArray;
CProxy_AllGather allGatherProxy;

public:
simBox(CProxy_start startProxy, int k, int n, int x, int y);
simBox(CProxy_start startProxy, CProxy_AllGather allGatherProxy, int n, int x, int y);

void begin(CProxy_AllGather AllGather);
void begin(long* dispArray, int size);

void done(allGatherMsg *msg);
};
69 changes: 21 additions & 48 deletions src/allGather/allGather.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ int AllGather::gen_rand() {
return dis(gen);
}

AllGather::AllGather(int k, int n, int type) : k(k), n(n) {
AllGather::AllGather(int n, int type) : n(n) {
this->type = (allGatherType)type;
switch (type) {
case allGatherType::ALL_GATHER_HYPERCUBE: {
Expand Down Expand Up @@ -54,8 +54,10 @@ void AllGather::initdone() {
}

// TODO: remove this broadcast
void AllGather::init(long int* result, long int* data, CkCallback cb) {
void AllGather::init(long int* result, long int* data, long int* dispArray, int idx, CkCallback cb) {
this->lib_done_callback = cb;
this->dispArray = dispArray;
this->idx = idx;
zero_copy_callback = CkCallback(CkIndex_AllGather::local_buff_done(NULL), thisProxy[CkMyPe()]);
dum_dum = CkCallback(CkCallback::ignore);
this->store = result;
Expand All @@ -71,79 +73,50 @@ void AllGather::local_buff_done(CkDataMsg *m) {
}

void AllGather::startGather() {
int currPE = CkMyPe();
for (int i = 0; i < k; i++) {
store[k * currPE + i] = data[i];
int offset = dispArray[idx];
int dataSize = dispArray[idx + 1] - dispArray[idx];
for (int i = 0; i < dataSize; i++) {
store[offset + i] = data[i];
}
CkNcpyBuffer src(data, k*sizeof(long int), dum_dum, CK_BUFFER_UNREG);

CkNcpyBuffer src(data, dataSize * sizeof(long int), dum_dum, CK_BUFFER_UNREG);
switch (type) {
case allGatherType::ALL_GATHER_RING: {
#ifdef TIMESTAMP
thisProxy[(currPE + 1) % n].recvRing(
currPE, src, (timeStamp + alpha + beta * k * 8));
timeStamp += alpha;
#else
thisProxy[(currPE + 1) % n].recvRing(currPE, src, 0.0);
#endif
thisProxy[(idx + 1) % n].recvRing(idx, src);
} break;
case allGatherType::ALL_GATHER_HYPERCUBE: {
hyperCubeIndx.push_back(currPE);
hyperCubeIndx.push_back(idx);
hyperCubeStore.push_back(src);
thisProxy[currPE].Hypercube();
thisProxy[idx].Hypercube();
} break;
case allGatherType::ALL_GATHER_FLOODING: {
recvFloodMsg[currPE] = true;
recvFloodMsg[idx] = true;
for (int i = 0; i < n; i++) {
if (graph[currPE][i] == 1) {
#ifdef TIMESTAMP
thisProxy[i].Flood(currPE, src,
(timeStamp + alpha + beta * k * 8));
timeStamp += alpha;
#else
thisProxy[i].Flood(currPE, src, 0.0);
#endif
if (graph[idx][i] == 1) {
thisProxy[i].Flood(idx, src);
}
}
} break;
}
}

void AllGather::recvRing(int sender, CkNcpyBuffer src, double recvTime) {
CkNcpyBuffer dst(store + sender * k, k * sizeof(long int), zero_copy_callback, CK_BUFFER_UNREG);
void AllGather::recvRing(int sender, CkNcpyBuffer src) {
CkNcpyBuffer dst(store + dispArray[sender], (dispArray[sender+1] - dispArray[sender])*sizeof(long int), zero_copy_callback, CK_BUFFER_UNREG);
dst.get(src);
#ifdef TIMESTAMP
timeStamp = std::max(recvTime, timeStamp);
#endif
if (((CkMyPe() + 1) % n) != sender) {
#ifdef TIMESTAMP
thisProxy[(CkMyPe() + 1) % n].recvRing(
sender, src, (timeStamp + alpha + beta * k * 8));
timeStamp += alpha;
#else
thisProxy[(CkMyPe() + 1) % n].recvRing(sender, src, 0.0);
#endif
thisProxy[(CkMyPe() + 1) % n].recvRing(sender, src);
}
}

void AllGather::Flood(int sender, CkNcpyBuffer src, double recvTime) {
void AllGather::Flood(int sender, CkNcpyBuffer src) {
if (recvFloodMsg[sender]) {
return;
}
recvFloodMsg[sender] = true;
CkNcpyBuffer dst(store + sender * k, k * sizeof(long int), zero_copy_callback, CK_BUFFER_UNREG);
CkNcpyBuffer dst(store + dispArray[sender], (dispArray[sender+1] - dispArray[sender])*sizeof(long int), zero_copy_callback, CK_BUFFER_UNREG);
dst.get(src);
#ifdef TIMESTAMP
timeStamp = std::max(recvTime, timeStamp);
#endif
for (int i = 0; i < n; i++) {
if (graph[CkMyPe()][i] == 1 and i != sender) {
#ifdef TIMESTAMP
thisProxy[i].Flood(sender, src, (timeStamp + alpha + beta * k * 8));
timeStamp += alpha;
#else
thisProxy[i].Flood(sender, src, 0.0);
#endif
thisProxy[i].Flood(sender, src);
}
}
}
Expand Down
34 changes: 9 additions & 25 deletions src/allGather/allGather.ci
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ module allGather {
message allGatherMsg;

group AllGather {
entry AllGather(int size, int n, int type);
entry AllGather(int n, int type);
entry void initdone();
entry void startGather();
entry void recvRing(int sender, CkNcpyBuffer data, double recvTime);
entry void recvRing(int sender, CkNcpyBuffer data);
entry void local_buff_done(CkDataMsg *m);
entry void Hypercube() {
for(iter = 0; iter < numHypercubeIter; iter++) {
Expand All @@ -19,45 +19,29 @@ module allGather {
} else {
HypercubeToSend = CkMyPe() ^ ((int)pow(2, iter));
}

}
serial {
#ifdef TIMESTAMP
serial {
if(HypercubeRecursiveDoubling && iter == numHypercubeIter-1) {
std::vector<CkNcpyBuffer> hyperCubeStoreCopy(hyperCubeStore.begin(), hyperCubeStore.begin()+(n-(int)pow(2,iter)));
std::vector<int> hyperCubeIndxCopy(hyperCubeIndx.begin(), hyperCubeIndx.begin()+(n-(int)pow(2,iter)));
thisProxy[HypercubeToSend].recvHypercube(iter, hyperCubeStoreCopy, hyperCubeIndxCopy, (timeStamp + alpha + beta * hyperCubeStore.size() * 8));
thisProxy[HypercubeToSend].recvHypercube(iter, hyperCubeStoreCopy, hyperCubeIndxCopy);
} else {
thisProxy[HypercubeToSend].recvHypercube(iter, hyperCubeStore, hyperCubeIndx, (timeStamp + alpha + beta * hyperCubeStore.size() * 8));
thisProxy[HypercubeToSend].recvHypercube(iter, hyperCubeStore, hyperCubeIndx);
}
timeStamp += alpha;
#else

if(HypercubeRecursiveDoubling && iter == numHypercubeIter-1) {
std::vector<CkNcpyBuffer> hyperCubeStoreCopy(hyperCubeStore.begin(), hyperCubeStore.begin()+(n-(int)pow(2,iter)));
std::vector<int> hyperCubeIndxCopy(hyperCubeIndx.begin(), hyperCubeIndx.begin()+(n-(int)pow(2,iter)));
thisProxy[HypercubeToSend].recvHypercube(iter, hyperCubeStoreCopy, hyperCubeIndxCopy, 0.0);
} else {
thisProxy[HypercubeToSend].recvHypercube(iter, hyperCubeStore, hyperCubeIndx, 0.0);
}
#endif
}
when recvHypercube[iter](int ref, std::vector<CkNcpyBuffer> data, std::vector<int> dataIndx, double recvTime) {
when recvHypercube[iter](int ref, std::vector<CkNcpyBuffer> data, std::vector<int> dataIndx) {
serial {
for(int m = 0; m < data.size(); m++) {
hyperCubeStore.emplace_back(data[m]);
hyperCubeIndx.emplace_back(dataIndx[m]);
CkNcpyBuffer dst(store + dataIndx[m] * k, k * sizeof(long int), zero_copy_callback, CK_BUFFER_UNREG);
CkNcpyBuffer dst(store + dispArray[dataIndx[m]], (dispArray[dataIndx[m]+1] - dispArray[dataIndx[m]])*sizeof(long int), zero_copy_callback, CK_BUFFER_UNREG);
dst.get(data[m]);
}
#ifdef TIMESTAMP
timeStamp = std::max(recvTime, timeStamp);
#endif
}
}
}
};
entry void recvHypercube(int ref, std::vector<CkNcpyBuffer> data, std::vector<int> dataIndx, double recvTime);
entry void Flood(int sender, CkNcpyBuffer data, double recvTime);
entry void recvHypercube(int ref, std::vector<CkNcpyBuffer> data, std::vector<int> dataIndx);
entry void Flood(int sender, CkNcpyBuffer data);
};
};
12 changes: 6 additions & 6 deletions src/allGather/allGather.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ enum allGatherType {

class AllGather : public CBase_AllGather {
private:
int k{};
int n{};
int idx{};
long int* dispArray;
long int *store;
int numRecvMsg{};
double timeStamp{};
CkCallback lib_done_callback;
allGatherType type;
int numHypercubeIter{};
Expand All @@ -44,19 +44,19 @@ private:
public:
AllGather_SDAG_CODE

AllGather(int k, int n, int type);
AllGather(int n, int type);

void startGather();

void recvRing(int sender, CkNcpyBuffer data, double recvTime);
void recvRing(int sender, CkNcpyBuffer data);

void local_buff_done(CkDataMsg *m);

int gen_rand();

void Flood(int sender, CkNcpyBuffer data, double recvTime);
void Flood(int sender, CkNcpyBuffer data);

void init(long int* result, long int* data, CkCallback cb);
void init(long int* result, long int* data, long int* dispArray, int idx, CkCallback cb);

void initdone();
};