-
Notifications
You must be signed in to change notification settings - Fork 1
feat(trim): introduce trim operator #329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Operation cleanup logic was mixed together (a single function for cleaning up everything was required/was different for different scenarios) and also mixed with signal handling (a shutdown could be successful operation or a signal). In this commit - cleanup code required by different operations/functions is separated - A single signal handler exists that is generic - operations register their cleanup requirements with the handler so they are cleaned-up - shutdown global now only indicates if a signal was received NOT if an operation finished
Signed-off-by: Michael Johnston <66301584+michael-johnston@users.noreply.github.com>
…ation It is raised as a new InterruptedOperationError which contains the operation identifier and is subclass of KeyboardInterrupt Previously nothing was raised and the operation exited normally after interruption. However, this pattern is not easy to maintain with multiple nested operations and each operation would have to check if the inner operation exited due to KeyboardInterrupt. This way operators do not have to handle KeyboardInterrupt. Each outer operation will catch the inner interrupt and raise a new exception with its own id. The outermost handler (in operation/create.py) now catches InterruptedOperationError and prints the id of the outermost (parent) interrupted operation.
Co-authored-by: Alessandro Pomponio <10339005+AlessandroPomponio@users.noreply.github.com> Signed-off-by: Michael Johnston <66301584+michael-johnston@users.noreply.github.com>
Signed-off-by: Daniele Lotito <daniele.lotito@ibm.com>
Pattern is orchestrate() creates/destroys the resource cleaner which exists in default namespace. If another function calls an explore operation that function has same responsibility.
Signed-off-by: Daniele Lotito <daniele.lotito@ibm.com>
… in the source space Note that there is a WIP on the random_shift strategy for high D sampling fix point in time to replicate comment #281 (comment) Signed-off-by: Daniele Lotito <daniele.lotito@ibm.com>
This is so the actors registered with the cleaner during the operation can be cleaned BEFORE the operation clean up deletes their parents (causing them to be deleted). The other option is to have a single resource cleaner but then all actors wanting to use it must be created "detached"
- Remove get_measurement_queue class method. It does nothing anymore - Add ray_namespace parameter ray_namespace parameter allows less parallel passing of same information, which potentially is source of errors, and more consistency checks. A set of actuators, discovery space manager, and queue instance should all be in same ray namespace.
- Move all actuator validation and initialization code to setup_actuator. - Create actuators first so validation failures are detected early - setup_actuators obtains namespace from queue rather than having potential inconsistency.
Normal shutdown and SIGTERM were handled but other exception were not leading to operation resource being cleaned up
Co-authored-by: Alessandro Pomponio <10339005+AlessandroPomponio@users.noreply.github.com> Signed-off-by: Michael Johnston <66301584+michael-johnston@users.noreply.github.com>
Signed-off-by: Daniele Lotito <daniele.lotito@ibm.com>
Signed-off-by: Daniele Lotito <daniele.lotito@ibm.com>
The batchsize passed to the trim random walk is different from what random walk uses Signed-off-by: Daniele Lotito <daniele.lotito@ibm.com>
Operation cleanup logic was mixed together (a single function for cleaning up everything was required/was different for different scenarios) and also mixed with signal handling (a shutdown could be successful operation or a signal). In this commit - cleanup code required by different operations/functions is separated - A single signal handler exists that is generic - operations register their cleanup requirements with the handler so they are cleaned-up - shutdown global now only indicates if a signal was received NOT if an operation finished
Operation cleanup logic was mixed together (a single function for cleaning up everything was required/was different for different scenarios) and also mixed with signal handling (a shutdown could be successful operation or a signal). In this commit - cleanup code required by different operations/functions is separated - A single signal handler exists that is generic - operations register their cleanup requirements with the handler so they are cleaned-up - shutdown global now only indicates if a signal was received NOT if an operation finished
Operation cleanup logic was mixed together (a single function for cleaning up everything was required/was different for different scenarios) and also mixed with signal handling (a shutdown could be successful operation or a signal). In this commit - cleanup code required by different operations/functions is separated - A single signal handler exists that is generic - operations register their cleanup requirements with the handler so they are cleaned-up - shutdown global now only indicates if a signal was received NOT if an operation finished
* build: update pre-commit hooks * style(docs): update feature request template * style(docs): update creating custom experiments * style(docs): update run_experiment * style(docs): update sft-trainer * style(docs): update vllm-performance * style(docs): update data-sharing * style(docs): update vllm-performance-full * style(docs): update ado * style(docs): update optimisation-with-ray-tune * style(docs): update random-walk * style(docs): update datacontainer * style(docs): update discovery-spaces
* build: enable python 3.13 * refactor(test): replace test actuatorconfiguration * refactor(test): replace test discoveryspace * test: support testing python 3.13 * build(sfttrainer): require python <3.13 * docs(sfttrainer): mention supported python versions * build: replace workspace construct * build: return most packages to workspace * fix(test): test_custom_experiments strikes again
|
@michael-johnston you can have a look at the doc file I have created with docs(example): present trim to a broad public with an example |
…the sampling logic
AlessandroPomponio
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initial comments
examples/trim/custom_experiments/trim_custom_experiments/experiments.py
Outdated
Show resolved
Hide resolved
examples/trim/custom_experiments/trim_custom_experiments/experiments.py
Outdated
Show resolved
Hide resolved
examples/trim/custom_experiments/trim_custom_experiments/experiments.py
Outdated
Show resolved
Hide resolved
|
|
||
| [project.entry-points."ado.custom_experiments"] | ||
| #This should be python file with your decorated function(s). | ||
| my_experiment = "trim_custom_experiments.experiments" No newline at end of file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably have a name
| from orchestrator.utilities.environment import enable_ray_actor_coverage | ||
| from orchestrator.utilities.logging import configure_logging | ||
|
|
||
| PropertyFormatType = typing.Literal["observed", "target"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This symbol is already defined in DiscoverySpace, but it's not available in this scope. It could be worth moving it to orchestrator/schema/property.py and importing it from there in both places
| set_this, set_prev = set(this_iteration_source_df.columns), set( | ||
| previous_iteration_source_df.columns | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to have these on separate lines
| common_cols = sorted(set_this & set_prev) | ||
| if not common_cols: | ||
| logger.warning( | ||
| "describe_source_spaces: No common columns available to compare rows." | ||
| ) | ||
| else: | ||
| this_tuples = set( | ||
| map( | ||
| tuple, | ||
| this_iteration_source_df[common_cols].itertuples( | ||
| index=False, name=None | ||
| ), | ||
| ) | ||
| ) | ||
| prev_tuples = set( | ||
| map( | ||
| tuple, | ||
| previous_iteration_source_df[common_cols].itertuples( | ||
| index=False, name=None | ||
| ), | ||
| ) | ||
| ) | ||
| logger.info( | ||
| f"describe_source_spaces: Common rows (on shared columns, n_cols={len(common_cols)}): " | ||
| f"{len(this_tuples & prev_tuples)}" | ||
| ) | ||
|
|
||
| # common rows restricted to filter_cols | ||
| if filter_cols is not None: | ||
| try: | ||
| filter_cols = list(filter_cols) | ||
| except Exception: | ||
| logger.warning( | ||
| "describe_source_spaces: filter_cols is not list-like; skipping filtered comparison." | ||
| ) | ||
| filter_cols = None | ||
|
|
||
| if filter_cols: | ||
| present_in_both = [ | ||
| c for c in filter_cols if c in set_this and c in set_prev | ||
| ] | ||
| ignored = sorted(set(filter_cols) - set(present_in_both)) | ||
| if ignored: | ||
| logger.warning( | ||
| f"describe_source_spaces: Some filter_cols not present in both DataFrames: {ignored}" | ||
| ) | ||
| if not present_in_both: | ||
| logger.warning( | ||
| "describe_source_spaces: No valid filter_cols present in both DataFrames." | ||
| ) | ||
| else: | ||
| this_tuples_f = set( | ||
| map( | ||
| tuple, | ||
| this_iteration_source_df[present_in_both].itertuples( | ||
| index=False, name=None | ||
| ), | ||
| ) | ||
| ) | ||
| prev_tuples_f = set( | ||
| map( | ||
| tuple, | ||
| previous_iteration_source_df[present_in_both].itertuples( | ||
| index=False, name=None | ||
| ), | ||
| ) | ||
| ) | ||
| logger.info( | ||
| f"describe_source_spaces: Common rows (restricted to filter_cols): " | ||
| f"{len(this_tuples_f & prev_tuples_f)} | cols={present_in_both}" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function should definitely be revisited - leaving myself a note
| # final note if previous df is missing/empty | ||
| if previous_iteration_source_df is None or len(previous_iteration_source_df) == 0: | ||
| logger.warning( | ||
| "describe_source_spaces: previous_iteration_source_df is None or empty; " | ||
| "downstream code may raise errors." | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this should be the first check and if the df is empty, the function should likely just return
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These seem debug functions, right?
Signed-off-by: Michael Johnston <66301584+michael-johnston@users.noreply.github.com>
Looks good. I have some comments but they can wait lets make it appear in the docs first
|
michael-johnston
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial comments is we should see if there is a way to add a "update" or "refresh" method to SQLSampleStore that causes an update of the internal cache. This will remove the need for discovery_space_manager everywhere and simplify the code.
@AlessandroPomponio could you check if this is possible. The issue being the cache of entities is created once on init but if a different process add entities to the sample store the instance will not see them.
| gen = ExplicitEntitySpaceGridSampleGenerator(WalkModeEnum.SEQUENTIAL) | ||
| # VV: We need a list that contains all entities associated with this Discovery Space in its entity source | ||
| # regardless of whether an experiment (on this space or others) have measured any properties using the | ||
| # experiments that this Discovery Space references. | ||
| # To this end, we find all matching entities in the entity source which have at least 1 measurement from at least | ||
| # one of the experiments that this discovery space defines. We then fill in any missing entities by iteratively | ||
| # generating all the entities that this discovery space contains. These "backfill" entities will not be associated | ||
| # with any observed property | ||
|
|
||
| if discoverySpaceManager: | ||
| logger.info("Using manager in getting entities no measurement") | ||
| all_entities = reduce( | ||
| # operator.add, gen.entitySpaceIterator(entitySpace=discoverySpaceManager.discoverySpace.remote()), [] | ||
| operator.add, | ||
| gen.entitySpaceIterator( | ||
| entitySpace=ray.get(discoverySpaceManager.entitySpace.remote()) | ||
| ), | ||
| [], | ||
| ) | ||
| else: | ||
| all_entities = reduce( | ||
| operator.add, gen.entitySpaceIterator(entitySpace=space.entitySpace), [] | ||
| ) | ||
|
|
||
| # same as: | ||
| # all_entities = sum(list(gen.entitySpaceIterator(entitySpace=space.entitySpace)), []) | ||
|
|
||
| cp_ids = [cp.identifier for cp in space.entitySpace.constitutiveProperties] | ||
|
|
||
| list_of_dicts_to_convert = [] | ||
| for e in all_entities: | ||
| ed = {} | ||
| ed["identifier"] = e.identifier | ||
| for cp in cp_ids: | ||
| obj = e.valueForConstitutivePropertyIdentifier(identifier=str(cp)) | ||
| ed[str(cp)] = obj.value | ||
| list_of_dicts_to_convert.append(ed) | ||
|
|
||
| return pd.DataFrame(list_of_dicts_to_convert) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be simplified avoiding the iterator, entities and discovery space manager
| gen = ExplicitEntitySpaceGridSampleGenerator(WalkModeEnum.SEQUENTIAL) | |
| # VV: We need a list that contains all entities associated with this Discovery Space in its entity source | |
| # regardless of whether an experiment (on this space or others) have measured any properties using the | |
| # experiments that this Discovery Space references. | |
| # To this end, we find all matching entities in the entity source which have at least 1 measurement from at least | |
| # one of the experiments that this discovery space defines. We then fill in any missing entities by iteratively | |
| # generating all the entities that this discovery space contains. These "backfill" entities will not be associated | |
| # with any observed property | |
| if discoverySpaceManager: | |
| logger.info("Using manager in getting entities no measurement") | |
| all_entities = reduce( | |
| # operator.add, gen.entitySpaceIterator(entitySpace=discoverySpaceManager.discoverySpace.remote()), [] | |
| operator.add, | |
| gen.entitySpaceIterator( | |
| entitySpace=ray.get(discoverySpaceManager.entitySpace.remote()) | |
| ), | |
| [], | |
| ) | |
| else: | |
| all_entities = reduce( | |
| operator.add, gen.entitySpaceIterator(entitySpace=space.entitySpace), [] | |
| ) | |
| # same as: | |
| # all_entities = sum(list(gen.entitySpaceIterator(entitySpace=space.entitySpace)), []) | |
| cp_ids = [cp.identifier for cp in space.entitySpace.constitutiveProperties] | |
| list_of_dicts_to_convert = [] | |
| for e in all_entities: | |
| ed = {} | |
| ed["identifier"] = e.identifier | |
| for cp in cp_ids: | |
| obj = e.valueForConstitutivePropertyIdentifier(identifier=str(cp)) | |
| ed[str(cp)] = obj.value | |
| list_of_dicts_to_convert.append(ed) | |
| return pd.DataFrame(list_of_dicts_to_convert) | |
| cp_ids = [cp.identifier for cp in space.entitySpace.constitutiveProperties] | |
| # Use sequential_point_iterator to get all points directly and create df without ids | |
| df = pd.DataFrame(entity_space.sequential_point_iterator(), columns=cp_ids) | |
| #Note: we will create an variant of this function in entity.py so it doesn't have to done here | |
| def generate_id(row_dict): | |
| # Create ConstitutivePropertyValue objects for this point | |
| property_values = [ | |
| ConstitutivePropertyValue( | |
| value=value, | |
| property=ConstitutivePropertyDescriptor(identifier=cp_id), | |
| ) | |
| for cp_id, value in row_dict.items() | |
| ] | |
| return Entity.identifier_from_property_values(property_values) | |
| df["identifier"] = [generate_id(row) for row in df.to_dict("records")] | |
| return df |
This PR introduces in ADO the possibility to set up the exploration of a discovery space that stops when a good predictive model for the rest of the space is acquired.
In this iterative modeling, a holdout set is made up the last sampled points and are used to evaluate the performance of the latest acquired model.
As we progress these performance metrics are compared, if the prediction is stable iterative modeling stops.
You can try iterative modeling following the example at
examples/trim_custom_experiments/README.md.My TODOs: