-
Notifications
You must be signed in to change notification settings - Fork 52
CkIO Example #3649
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
CkIO Example #3649
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
-include ./home/ec2-user/charm-project/charm/examples/common.mk | ||
CHARMC=/home/ec2-user/charm-project/charm/bin/charmc $(OPTS) | ||
|
||
all: iotest | ||
|
||
iotest: iotest.ci iotest.C | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The files and executable should probably be called something other than |
||
$(CHARMC) iotest.ci | ||
$(CHARMC) iotest.C -o $@ -module CkIO | ||
|
||
test: iotest | ||
$(call run, ./iotest +p4 4 ) | ||
|
||
testp: iotest | ||
$(call run, ./iotest +p$(P) $(P) ) | ||
|
||
smptest: iotest | ||
$(call run, ./iotest 4 +p2 ++ppn 2) | ||
$(call run, ./iotest 4 +p4 ++ppn 2) | ||
|
||
clean: | ||
rm -f *.o *.decl.h *.def.h iotest test* | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,57 @@ | ||||||
#include "iotest.decl.h" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Run this and the |
||||||
#include <vector> | ||||||
|
||||||
class Main : public CBase_Main{ | ||||||
Main_SDAG_CODE; | ||||||
std::vector<Ck::IO::File> _files; // holds all of the CkIO File objects | ||||||
int _num_writers; | ||||||
int _num_iterations; | ||||||
CProxy_Writer writers; | ||||||
public: | ||||||
Main(CkArgMsg* msg){ | ||||||
// make sure the example program is being used correctly | ||||||
if(msg -> argc != 3){ | ||||||
ckout << "Usage: ./<program_name> <number_writers> <number_of_files>" << endl; | ||||||
CkExit(); | ||||||
} | ||||||
_num_writers = atoi(msg -> argv[1]); // assign the number of writers | ||||||
_num_iterations = atoi(msg -> argv[2]); // assign the number of files to write | ||||||
int num_files = _num_iterations; // save the number of files | ||||||
_files.resize(_num_iterations); | ||||||
for(int i = 0; i < num_files; ++i){ | ||||||
Comment on lines
+18
to
+21
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||
thisProxy.startWritingCycle(i); // start writing to the file numbered i | ||||||
} | ||||||
delete msg; | ||||||
|
||||||
} | ||||||
// standard bookkeeping for how many iterations we need to go through | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
void decrementRemaining(){ | ||||||
_num_iterations--; | ||||||
if(!_num_iterations){ | ||||||
ckout << "Successfully completed parallel output!" << endl; | ||||||
CkExit(); | ||||||
} | ||||||
} | ||||||
|
||||||
}; | ||||||
|
||||||
|
||||||
class Writer : public CBase_Writer { | ||||||
|
||||||
public: | ||||||
/** | ||||||
* Takes in a Session object to the current writing session. The constructor | ||||||
* will actually write data to the file in the incoming_session object. | ||||||
*/ | ||||||
Writer(Ck::IO::Session incoming_session){ | ||||||
char out[11]; // 10 bytes for the message, 1 for the nullbyte | ||||||
sprintf(out, "Writer[%d]\n", thisIndex); | ||||||
Ck::IO::write(incoming_session, out, 10, 10*thisIndex); // writing 10 bytes starting at 10*thisIndex from the beginning of the file | ||||||
} | ||||||
|
||||||
Writer(CkMigrateMessage* m){ | ||||||
|
||||||
} | ||||||
}; | ||||||
|
||||||
#include "iotest.def.h" |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,89 @@ | ||||||||||||||||||||||
mainmodule iotest { | ||||||||||||||||||||||
include "ckio.h"; // includes the necessary functions for CkIO | ||||||||||||||||||||||
mainchare Main { | ||||||||||||||||||||||
entry Main(CkArgMsg* m); | ||||||||||||||||||||||
entry void ready(Ck::IO::FileReadyMsg* msg); | ||||||||||||||||||||||
entry void startWrite(Ck::IO::SessionReadyMsg* msg); | ||||||||||||||||||||||
entry void postWrite(CkReductionMsg* msg); | ||||||||||||||||||||||
entry void close(CkReductionMsg* msg); | ||||||||||||||||||||||
entry void decrementRemaining(); | ||||||||||||||||||||||
|
||||||||||||||||||||||
entry void startWritingCycle(int file_number){ | ||||||||||||||||||||||
serial { | ||||||||||||||||||||||
/** | ||||||||||||||||||||||
Set the Options struct so that CkIO opens the file and sets the correct configuration for writing. | ||||||||||||||||||||||
The writeStripe parameter determines the amount of contiguous bytes each writer chare will write to file. | ||||||||||||||||||||||
The peStripe parameters determines how much actual data each writer chare will aggregate at a given time. This is used so that | ||||||||||||||||||||||
a bunch of tiny data gets distributed across many writing chares when it would be better to all go to a single chare. | ||||||||||||||||||||||
It is required that peStripe >= writeStripe. | ||||||||||||||||||||||
*/ | ||||||||||||||||||||||
Comment on lines
+13
to
+19
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Try to be a bit more concise in these descriptions of what the code is doing so the key points don't get lost too much in text.
Suggested change
|
||||||||||||||||||||||
ckout << "starting the writing cycle for " << file_number << endl; | ||||||||||||||||||||||
Ck::IO::Options opts; // struct containing the options for the writer | ||||||||||||||||||||||
opts.writeStripe = 1024; // collect up to 1kB of data before writing; use the specific number you'd like or it defaults to 4MB | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know this PR is old and probably just collecting dust on a shelf, but in terms of exemplifying CkIO usage, the values should probably be in the range of megabytes, to be comparable in scale to typical Lustre stripe sizes. The original design was that they should be equal to (a multiple of) the FS stripe size, to limit the contention on storage servers and the individual stripes. |
||||||||||||||||||||||
opts.peStripe = 4 * opts.writeStripe; // the amount of data that is aggregated by each "write chare" | ||||||||||||||||||||||
Comment on lines
+21
to
+23
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||
CkCallback open_cb(CkIndex_Main::ready(NULL), thisProxy); // index the function for the callback to use | ||||||||||||||||||||||
char name[20]; // buffer for the file name | ||||||||||||||||||||||
sprintf(name ,"file_%d", file_number); | ||||||||||||||||||||||
open_cb.setRefNum(file_number); // set the reference number of callback and function to the file_number | ||||||||||||||||||||||
ckout << "about to enter the open function of CkIO in " << file_number << endl; | ||||||||||||||||||||||
|
||||||||||||||||||||||
// open the file file_<file_number>, pass a FileReadyMsg* to the open_cb calback function ready, and also pass the options struct for the IO | ||||||||||||||||||||||
Ck::IO::open(name, open_cb, opts); | ||||||||||||||||||||||
} | ||||||||||||||||||||||
/** | ||||||||||||||||||||||
ready is the function that is called after opening the file by the open_cb callback in line 24. | ||||||||||||||||||||||
Uses the FileReadyMsg* msg passed to it by Ck::IO::open in order to get the file represented by (msg -> file). | ||||||||||||||||||||||
ready will create a callback start_session that will be invoked by the startSession function in the beginning. | ||||||||||||||||||||||
After the start_session callback is invoked and the write is done, the end_session callback will be invoked. | ||||||||||||||||||||||
Note that the commit_message variable will be written only after the start_session callback is completed. Having a | ||||||||||||||||||||||
commit message is optional. | ||||||||||||||||||||||
*/ | ||||||||||||||||||||||
when ready[file_number](Ck::IO::FileReadyMsg* msg) serial{ | ||||||||||||||||||||||
ckout << "ready function for file[" << file_number << "]." << endl; | ||||||||||||||||||||||
_files[file_number] = msg -> file; // set the file opened by the Ck::IO to the index of file_number | ||||||||||||||||||||||
CkCallback start_session(CkIndex_Main::startWrite(0), thisProxy); // create the callback to be used when you start session | ||||||||||||||||||||||
start_session.setRefNum(file_number); | ||||||||||||||||||||||
|
||||||||||||||||||||||
CkCallback end_session(CkIndex_Main::postWrite(0), thisProxy); // callback to be used when you close the session | ||||||||||||||||||||||
end_session.setRefNum(file_number); // invoked at the end of the session, or on completion of the data being written | ||||||||||||||||||||||
|
||||||||||||||||||||||
std::string commit_message = "Commit message\n"; // the message that gets committed at the end of the batched write; commits are an optional argument | ||||||||||||||||||||||
Ck::IO::startSession(_files[file_number], 10*_num_writers, 0, start_session, commit_message.c_str(), commit_message.size(), 10 * _num_writers, end_session); // start the writing session | ||||||||||||||||||||||
} | ||||||||||||||||||||||
/** | ||||||||||||||||||||||
This function actually does the writing to the files. It creates a chare array of Writer chares, whose constructor | ||||||||||||||||||||||
will do the writing, as defined in iotest.C. | ||||||||||||||||||||||
*/ | ||||||||||||||||||||||
when startWrite[file_number](Ck::IO::SessionReadyMsg* msg) serial{ | ||||||||||||||||||||||
writers = CProxy_Writer::ckNew(msg -> session, _num_writers); // create n writers, and pass all of them the the session | ||||||||||||||||||||||
ckout << "Finished writing\n"; | ||||||||||||||||||||||
delete msg; // it's the user's responsibility to free the SessionMsg* | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
// this function is called after the session has written the amount of bytes specified | ||||||||||||||||||||||
// by the end_session callback. Will also create a close callback, which will be invoked | ||||||||||||||||||||||
// after the call to Ck::IO::close and CkIO closes the specified file | ||||||||||||||||||||||
when postWrite[file_number](CkReductionMsg* msg) serial{ | ||||||||||||||||||||||
ckout << "This session has written the amount of bytes\n"; | ||||||||||||||||||||||
delete msg; | ||||||||||||||||||||||
// Time to close the file | ||||||||||||||||||||||
CkCallback close_cb(CkIndex_Main::close(0), thisProxy); // create the callback after the file closed | ||||||||||||||||||||||
close_cb.setRefNum(file_number); // tag the callback | ||||||||||||||||||||||
Ck::IO::close(_files[file_number], close_cb); // close the file | ||||||||||||||||||||||
} | ||||||||||||||||||||||
// executed after CkIO closes the file via the close_cb on line 70 | ||||||||||||||||||||||
when close[file_number](CkReductionMsg* msg) serial{ // only called after the file has been closed | ||||||||||||||||||||||
ckout << "File " << file_number << " has succesfully been closed!" << endl; | ||||||||||||||||||||||
delete msg; | ||||||||||||||||||||||
thisProxy.decrementRemaining(); // called to tell the Mainchare another file has been opened and closed successfully | ||||||||||||||||||||||
} | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
array [1D] Writer { | ||||||||||||||||||||||
entry Writer(Ck::IO::Session incoming_session); // constructor for the writer; stores the Ck::IO::Session token | ||||||||||||||||||||||
|
||||||||||||||||||||||
|
||||||||||||||||||||||
} | ||||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should be relative paths (see the Makefiles of other examples).