-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AMS Monitoring and Benchmark #94
Conversation
Signed-off-by: Loic Pottier <[email protected]>
Changed several JSON fields in RMQ config: - rabbitmq-outbound-queue -> rabbitmq-queue-physics - rabbitmq-exchange -> rabbitmq-exchange-training - rabbitmq-routing-key -> rabbitmq-key-training Signed-off-by: Loic Pottier <[email protected]>
Signed-off-by: Loic Pottier <[email protected]>
Signed-off-by: Loic Pottier <[email protected]>
Signed-off-by: Loic Pottier <[email protected]>
Signed-off-by: Loic Pottier <[email protected]>
Signed-off-by: Loic Pottier <[email protected]>
…age ID from AMSlib Signed-off-by: Loic Pottier <[email protected]>
@@ -56,9 +60,9 @@ def header_format(self) -> str: | |||
- 4 bytes are the number of elements in the message. Limit max: 2^32 - 1 | |||
- 2 bytes are the input dimension. Limit max: 65535 | |||
- 2 bytes are the output dimension. Limit max: 65535 | |||
- 2 bytes are for aligning memory to 8 | |||
- 2 bytes are the message ID given by AMSlib (local to each MPI rank). Limit max: 65535 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the application sends more than the max messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Valid question. I should add a failsafe for that case.
current_offset += sizeof(uint16_t); | ||
// Message ID (should be 2 bytes) | ||
uint16_t new_message_id; | ||
std::memcpy(&new_message_id, data_blob + current_offset, sizeof(uint16_t)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid memcpy and use it only if we need to (e.g. we get a blob of data). A couple of lines above we do this:
uint16_t new_domain_size =
(reinterpret_cast<uint16_t*>(data_blob + current_offset))[0];
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I will fix that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lpottier It is hard for me to understand what exactly we are trying to do here. As far as I understood the code we are trying to:
- Have a performance benchmark. This follows the design of the example code. Correct?
- You extend rmq to monitor performance. You monitor performance by attaching a timestamp? and outputing it to a JSON file?
- You extend rmq with additional caliper capabilities. Are these matched with the JSON monitoring ?
- There is some locking/unlocking in mutexes. What is the reasoning of this?
- Some extensions modify the msg header. Is this necessary?
@koparasy Yes the PR is a bit all over the place.. You understood correctly.
|
# FIXME: temporary solution to kill properly the stager when using srun | ||
self.write_pid() | ||
|
||
def write_pid(self): | ||
""" | ||
Write the PID of the current process in | ||
a file. Append it to a file if the file | ||
exists (multiple stagers could be running). | ||
|
||
This is useful to kill the stager. | ||
|
||
FIXME: this solution is not very clean or elegant | ||
and can be improved. The simulations side could send a message | ||
on a specific queue to signify that no more data will arrive for example. | ||
""" | ||
|
||
with open("ams-stagers.pid", 'a') as f: | ||
f.write(f" {os.getpid()}") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this code. Do you need the pid to do kill -SIG <signal> PID
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, yes. I need the PID of the stager to send a signal when scheduling tasks with Slurm. That "fix" is only needed when using srun
to start the stager because srun
wraps its target process and creates another process with another PID. If you kill the srun
PID the signal is not being captured properly. I have to write the internal PID somewhere if we want to exit cleanly
@lpottier I am trying to come with a plan here to see what we need, what we can merge in and what needs either modification: I think commit 7aef9b279ec1f1c08664859343ab0f89d195c3af is standalone and clean. So we can merge this as is? Do you have any objections? |
I would like to split this commit into 2 commits:
|
Add the changes to pyproject.toml in a separate commit and make a PR, I will merge this asap |
I think there are some additional changes you made changes in RMQ orthogonal to our monitoring. Is this correct? If so, we probably should make a separate PR for them as well. |
After these changes we will still need to address the monitoring and the benchmark. For monitoring after the discussion we had with Timo we do not need something so complicated. We will likely need to simplify. The benchmark is likely to be correct as is. We may want to move it under tests. But let's leave that as a last step. |
@koparasy Okay sounds good, I will make the requested separate PRs.
Yes this commit is standalone. |
Make it a separate PR and ping me. |
We can close this PR as we merged all the changes we wanted in different PRs. |
This PR adds monitoring capabilities to AMS, especially for the RabbitMQ case:
This PR also fixes #83 and harmonizes the name of different RabbitMQ fields across the stack:
rabbitmq-outbound-queue
->rabbitmq-queue-physics
rabbitmq-exchange
->rabbitmq-exchange-training
rabbitmq-routing-key
->rabbitmq-key-training
Various optimizations:
std::vector
tostd::list
when keeping a buffer of AMSMessage,std::list
incurs fewer move operations when deleting messages