diff --git a/examples/charm++/ckio/Makefile b/examples/charm++/ckio/Makefile new file mode 100644 index 0000000000..08fa901352 --- /dev/null +++ b/examples/charm++/ckio/Makefile @@ -0,0 +1,22 @@ +-include ./home/ec2-user/charm-project/charm/examples/common.mk +CHARMC=/home/ec2-user/charm-project/charm/bin/charmc $(OPTS) + +all: iotest + +iotest: iotest.ci iotest.C + $(CHARMC) iotest.ci + $(CHARMC) iotest.C -o $@ -module CkIO + +test: iotest + $(call run, ./iotest +p4 4 ) + +testp: iotest + $(call run, ./iotest +p$(P) $(P) ) + +smptest: iotest + $(call run, ./iotest 4 +p2 ++ppn 2) + $(call run, ./iotest 4 +p4 ++ppn 2) + +clean: + rm -f *.o *.decl.h *.def.h iotest test* + diff --git a/examples/charm++/ckio/iotest.C b/examples/charm++/ckio/iotest.C new file mode 100644 index 0000000000..c6753bef05 --- /dev/null +++ b/examples/charm++/ckio/iotest.C @@ -0,0 +1,57 @@ +#include "iotest.decl.h" +#include + +class Main : public CBase_Main{ + Main_SDAG_CODE; + std::vector _files; // holds all of the CkIO File objects + int _num_writers; + int _num_iterations; + CProxy_Writer writers; +public: + Main(CkArgMsg* msg){ + // make sure the example program is being used correctly + if(msg -> argc != 3){ + ckout << "Usage: ./ " << endl; + CkExit(); + } + _num_writers = atoi(msg -> argv[1]); // assign the number of writers + _num_iterations = atoi(msg -> argv[2]); // assign the number of files to write + int num_files = _num_iterations; // save the number of files + _files.resize(_num_iterations); + for(int i = 0; i < num_files; ++i){ + thisProxy.startWritingCycle(i); // start writing to the file numbered i + } + delete msg; + + } + // standard bookkeeping for how many iterations we need to go through + void decrementRemaining(){ + _num_iterations--; + if(!_num_iterations){ + ckout << "Successfully completed parallel output!" << endl; + CkExit(); + } + } + +}; + + +class Writer : public CBase_Writer { + +public: + /** + * Takes in a Session object to the current writing session. The constructor + * will actually write data to the file in the incoming_session object. + */ + Writer(Ck::IO::Session incoming_session){ + char out[11]; // 10 bytes for the message, 1 for the nullbyte + sprintf(out, "Writer[%d]\n", thisIndex); + Ck::IO::write(incoming_session, out, 10, 10*thisIndex); // writing 10 bytes starting at 10*thisIndex from the beginning of the file + } + + Writer(CkMigrateMessage* m){ + + } +}; + +#include "iotest.def.h" diff --git a/examples/charm++/ckio/iotest.ci b/examples/charm++/ckio/iotest.ci new file mode 100644 index 0000000000..ea08a9509c --- /dev/null +++ b/examples/charm++/ckio/iotest.ci @@ -0,0 +1,89 @@ +mainmodule iotest { + include "ckio.h"; // includes the necessary functions for CkIO + mainchare Main { + entry Main(CkArgMsg* m); + entry void ready(Ck::IO::FileReadyMsg* msg); + entry void startWrite(Ck::IO::SessionReadyMsg* msg); + entry void postWrite(CkReductionMsg* msg); + entry void close(CkReductionMsg* msg); + entry void decrementRemaining(); + + entry void startWritingCycle(int file_number){ + serial { + /** + Set the Options struct so that CkIO opens the file and sets the correct configuration for writing. + The writeStripe parameter determines the amount of contiguous bytes each writer chare will write to file. + The peStripe parameters determines how much actual data each writer chare will aggregate at a given time. This is used so that + a bunch of tiny data gets distributed across many writing chares when it would be better to all go to a single chare. + It is required that peStripe >= writeStripe. + */ + ckout << "starting the writing cycle for " << file_number << endl; + Ck::IO::Options opts; // struct containing the options for the writer + opts.writeStripe = 1024; // collect up to 1kB of data before writing; use the specific number you'd like or it defaults to 4MB + opts.peStripe = 4 * opts.writeStripe; // the amount of data that is aggregated by each "write chare" + CkCallback open_cb(CkIndex_Main::ready(NULL), thisProxy); // index the function for the callback to use + char name[20]; // buffer for the file name + sprintf(name ,"file_%d", file_number); + open_cb.setRefNum(file_number); // set the reference number of callback and function to the file_number + ckout << "about to enter the open function of CkIO in " << file_number << endl; + + // open the file file_, pass a FileReadyMsg* to the open_cb calback function ready, and also pass the options struct for the IO + Ck::IO::open(name, open_cb, opts); + } + /** + ready is the function that is called after opening the file by the open_cb callback in line 24. + Uses the FileReadyMsg* msg passed to it by Ck::IO::open in order to get the file represented by (msg -> file). + ready will create a callback start_session that will be invoked by the startSession function in the beginning. + After the start_session callback is invoked and the write is done, the end_session callback will be invoked. + Note that the commit_message variable will be written only after the start_session callback is completed. Having a + commit message is optional. + */ + when ready[file_number](Ck::IO::FileReadyMsg* msg) serial{ + ckout << "ready function for file[" << file_number << "]." << endl; + _files[file_number] = msg -> file; // set the file opened by the Ck::IO to the index of file_number + CkCallback start_session(CkIndex_Main::startWrite(0), thisProxy); // create the callback to be used when you start session + start_session.setRefNum(file_number); + + CkCallback end_session(CkIndex_Main::postWrite(0), thisProxy); // callback to be used when you close the session + end_session.setRefNum(file_number); // invoked at the end of the session, or on completion of the data being written + + std::string commit_message = "Commit message\n"; // the message that gets committed at the end of the batched write; commits are an optional argument + Ck::IO::startSession(_files[file_number], 10*_num_writers, 0, start_session, commit_message.c_str(), commit_message.size(), 10 * _num_writers, end_session); // start the writing session + } + /** + This function actually does the writing to the files. It creates a chare array of Writer chares, whose constructor + will do the writing, as defined in iotest.C. + */ + when startWrite[file_number](Ck::IO::SessionReadyMsg* msg) serial{ + writers = CProxy_Writer::ckNew(msg -> session, _num_writers); // create n writers, and pass all of them the the session + ckout << "Finished writing\n"; + delete msg; // it's the user's responsibility to free the SessionMsg* + } + + // this function is called after the session has written the amount of bytes specified + // by the end_session callback. Will also create a close callback, which will be invoked + // after the call to Ck::IO::close and CkIO closes the specified file + when postWrite[file_number](CkReductionMsg* msg) serial{ + ckout << "This session has written the amount of bytes\n"; + delete msg; + // Time to close the file + CkCallback close_cb(CkIndex_Main::close(0), thisProxy); // create the callback after the file closed + close_cb.setRefNum(file_number); // tag the callback + Ck::IO::close(_files[file_number], close_cb); // close the file + } + // executed after CkIO closes the file via the close_cb on line 70 + when close[file_number](CkReductionMsg* msg) serial{ // only called after the file has been closed + ckout << "File " << file_number << " has succesfully been closed!" << endl; + delete msg; + thisProxy.decrementRemaining(); // called to tell the Mainchare another file has been opened and closed successfully + } + } + + } + + array [1D] Writer { + entry Writer(Ck::IO::Session incoming_session); // constructor for the writer; stores the Ck::IO::Session token + + + } +}