hydraflow
Integrate Hydra and MLflow to manage and track machine learning experiments.
Classes
-
Run — Represent an MLflow Run in HydraFlow.
-
RunCollection — A collection of Run instances that implements the Sequence protocol.
Functions
-
chdir_artifact — Change the current working directory to the artifact directory of the given run.
-
get_artifact_dir — Retrieve the artifact directory for the given run.
-
iter_artifact_paths — Iterate over the artifact paths in the tracking directory.
-
iter_artifacts_dirs — Iterate over the artifacts directories in the tracking directory.
-
iter_experiment_dirs — Iterate over the experiment directories in the tracking directory.
-
iter_run_dirs — Iterate over the run directories in the tracking directory.
-
log_run — Log the parameters from the given configuration instance.
-
main — Decorator for configuring and running MLflow experiments with Hydra.
-
start_run — Start an MLflow run and log parameters using the provided configuration instance.
source class Run[C, I = None](run_dir: Path, impl_factory: Callable[[Path], I] | Callable[[Path, C], I] = lambda _: None)
Represent an MLflow Run in HydraFlow.
A Run contains information about the run, configuration, and implementation. The configuration type C and implementation type I are specified as type parameters.
Attributes
-
info : RunInfo — Information about the run, such as run directory, run ID, and job name.
-
impl_factory : Callable[[Path], I] | Callable[[Path, C], I] — Factory function to create the implementation instance.
-
cfg : C — The configuration instance loaded from the Hydra configuration file.
-
impl : I — The implementation instance created by the factory function.
Methods
The configuration instance loaded from the Hydra configuration file.
The implementation instance created by the factory function.
This property dynamically examines the signature of the impl_factory using the inspect module and calls it with the appropriate arguments:
- If the factory accepts one parameter: called with just the artifacts directory
- If the factory accepts two parameters: called with the artifacts directory and the configuration instance
This allows implementation classes to be configuration-aware and utilize both the file system and configuration information.
source classmethod Run.load(run_dir: str | Path | Iterable[str | Path], impl_factory: Callable[[Path], I] | Callable[[Path, C], I] = lambda _: None, *, n_jobs: int = 0) → Self | RunCollection[Self]
Load a Run from a run directory.
Parameters
-
run_dir : str | Path | Iterable[str | Path] — The directory where the MLflow runs are stored, either as a string, a Path instance, or an iterable of them.
-
impl_factory : Callable[[Path], I] | Callable[[Path, C], I] — A factory function that creates the implementation instance. It can accept either just the artifacts directory path, or both the path and the configuration instance. Defaults to a function that returns None.
-
n_jobs : int — The number of parallel jobs. If 0 (default), runs sequentially. If -1, uses all available CPU cores.
Returns
-
Self | RunCollection[Self] — A single Run instance or a RunCollection of Run instances.
source method Run.update(key: str | tuple[str, ...], value: Any | Callable[[Self], Any], *, force: bool = False) → None
Set default value(s) in the configuration if they don't already exist.
This method adds a value or multiple values to the configuration, but only if the corresponding keys don't already have values. Existing values will not be modified.
Parameters
-
key : str | tuple[str, ...] — Either a string representing a single configuration path (can use dot notation like "section.subsection.param"), or a tuple of strings to set multiple related configuration values at once.
-
value : Any | Callable[[Self], Any] — The value to set. This can be: - For string keys: Any value, or a callable that returns a value - For tuple keys: An iterable with the same length as the key tuple, or a callable that returns such an iterable - For callable values: The callable must accept a single argument of type Run (self) and return the appropriate value type
-
force : bool — Whether to force the update even if the key already exists.
Raises
-
TypeError — If a tuple key is provided but the value is not an iterable, or if the callable doesn't return an iterable.
source method Run.get(key: str, default: Any = MISSING) → Any
Get a value from the information or configuration.
Parameters
-
key : str — The key to look for. Can use dot notation for nested keys in configuration.
-
default : Any — Value to return if the key is not found. If not provided, AttributeError will be raised.
Returns
-
Any — The value associated with the key, or the default value if the key is not found and a default is provided.
Raises
-
AttributeError — If the key is not found and no default is provided.
source method Run.predicate(key: str, value: Any) → bool
Check if a value satisfies a condition for filtering.
This method retrieves the attribute specified by the key using the get method, and then compares it with the given value according to the following rules:
- If value is callable: Call it with the attribute and return the boolean result
- If value is a list or set: Check if the attribute is in the list/set
- If value is a tuple of length 2: Check if the attribute is in the range [value[0], value[1]]. Both sides are inclusive
- Otherwise: Check if the attribute equals the value
Parameters
-
key : str — The key to get the attribute from.
-
value : Any — The value to compare with, or a callable that takes the attribute and returns a boolean.
Returns
-
bool — True if the attribute satisfies the condition, False otherwise.
source method Run.to_dict() → dict[str, Any]
Convert the Run to a dictionary.
source class RunCollection[R: Run[Any, Any]](runs: Iterable[R])
Bases : Sequence[R]
A collection of Run instances that implements the Sequence protocol.
RunCollection provides methods for filtering, sorting, grouping, and analyzing runs, as well as converting run data to various formats such as DataFrames.
Parameters
-
runs : Iterable[Run] — An iterable of Run instances to include in the collection.
Attributes
-
runs : list[R] — A list containing the Run instances in this collection.
Methods
-
update — Update configuration values for all runs in the collection.
-
filter — Filter runs based on predicates or key-value conditions.
-
try_get — Try to get a single run matching the specified criteria.
-
get — Get a single run matching the specified criteria.
-
first — Get the first run matching the specified criteria.
-
last — Get the last run matching the specified criteria.
-
to_list — Extract a list of values for a specific key from all runs.
-
to_numpy — Extract values for a specific key from all runs as a NumPy array.
-
unique — Get the unique values for a specific key across all runs.
-
n_unique — Count the number of unique values for a specific key across all runs.
-
sort — Sort runs based on one or more keys.
-
to_frame — Convert the collection to a Polars DataFrame.
-
group_by — Group runs by one or more keys.
source method RunCollection.update(key: str | tuple[str, ...], value: Any | Callable[[R], Any], *, force: bool = False) → None
Update configuration values for all runs in the collection.
This method calls the update method on each run in the collection.
Parameters
-
key : str | tuple[str, ...] — Either a string representing a single configuration path or a tuple of strings to set multiple configuration values.
-
value : Any | Callable[[R], Any] — The value(s) to set or a callable that returns such values.
-
force : bool — Whether to force updates even if the keys already exist.
source method RunCollection.filter(*predicates: Callable[[R], bool] | tuple[str, Any], **kwargs: Any) → Self
Filter runs based on predicates or key-value conditions.
This method allows filtering runs using various criteria
- Callable predicates that take a Run and return a boolean
- Key-value tuples where the key is a string and the value is compared using the Run.predicate method
- Keyword arguments, where the key is a string and the value is compared using the Run.predicate method
Parameters
-
*predicates : Callable[[R], bool] | tuple[str, Any] — Callable predicates or (key, value) tuples for filtering.
-
**kwargs : Any — Additional key-value pairs for filtering.
Returns
-
Self — A new RunCollection containing only the runs that match all criteria.
source method RunCollection.try_get(*predicates: Callable[[R], bool] | tuple[str, Any], **kwargs: Any) → R | None
Try to get a single run matching the specified criteria.
This method applies filters and returns a single matching run if exactly one is found, None if no runs are found, or raises ValueError if multiple runs match.
Parameters
-
*predicates : Callable[[R], bool] | tuple[str, Any] — Callable predicates or (key, value) tuples for filtering.
-
**kwargs : Any — Additional key-value pairs for filtering.
Returns
-
R | None — A single Run that matches the criteria, or None if no matches are found.
Raises
-
ValueError — If multiple runs match the criteria.
source method RunCollection.get(*predicates: Callable[[R], bool] | tuple[str, Any], **kwargs: Any) → R
Get a single run matching the specified criteria.
This method applies filters and returns a single matching run, or raises ValueError if no runs or multiple runs match.
Parameters
-
*predicates : Callable[[R], bool] | tuple[str, Any] — Callable predicates or (key, value) tuples for filtering.
-
**kwargs : Any — Additional key-value pairs for filtering.
Returns
-
R — A single Run that matches the criteria.
Raises
-
ValueError — If no runs match or if multiple runs match
-
the criteria.
-
_value_error
source method RunCollection.first(*predicates: Callable[[R], bool] | tuple[str, Any], **kwargs: Any) → R
Get the first run matching the specified criteria.
This method applies filters and returns the first matching run, or raises ValueError if no runs match.
Parameters
-
*predicates : Callable[[R], bool] | tuple[str, Any] — Callable predicates or (key, value) tuples for filtering.
-
**kwargs : Any — Additional key-value pairs for filtering.
Returns
-
R — The first Run that matches the criteria.
Raises
-
ValueError — If no runs match the criteria.
-
_value_error
source method RunCollection.last(*predicates: Callable[[R], bool] | tuple[str, Any], **kwargs: Any) → R
Get the last run matching the specified criteria.
This method applies filters and returns the last matching run, or raises ValueError if no runs match.
Parameters
-
*predicates : Callable[[R], bool] | tuple[str, Any] — Callable predicates or (key, value) tuples for filtering.
-
**kwargs : Any — Additional key-value pairs for filtering.
Returns
-
R — The last Run that matches the criteria.
Raises
-
ValueError — If no runs match the criteria.
-
_value_error
source method RunCollection.to_list(key: str) → list[Any]
Extract a list of values for a specific key from all runs.
Parameters
-
key : str — The key to extract from each run.
Returns
-
list[Any] — A list containing the values for the specified key from each run.
source method RunCollection.to_numpy(key: str) → NDArray
Extract values for a specific key from all runs as a NumPy array.
Parameters
-
key : str — The key to extract from each run.
Returns
-
NDArray — A NumPy array containing the values for the specified key from each run.
source method RunCollection.unique(key: str) → NDArray
Get the unique values for a specific key across all runs.
Parameters
-
key : str — The key to extract unique values for.
Returns
-
NDArray — A NumPy array containing the unique values for the specified key.
source method RunCollection.n_unique(key: str) → int
Count the number of unique values for a specific key across all runs.
Parameters
-
key : str — The key to count unique values for.
Returns
-
int — The number of unique values for the specified key.
source method RunCollection.sort(*keys: str, reverse: bool = False) → Self
Sort runs based on one or more keys.
Parameters
-
*keys : str — The keys to sort by, in order of priority.
-
reverse : bool — Whether to sort in descending order (default is ascending).
Returns
-
Self — A new RunCollection with the runs sorted according to the specified keys.
source method RunCollection.to_frame(*keys: str, **kwargs: Callable[[R], Any]) → DataFrame
Convert the collection to a Polars DataFrame.
Parameters
-
*keys : str — The keys to include as columns in the DataFrame. If not provided, all keys from each run's to_dict() method will be used.
-
**kwargs : Callable[[R], Any] — Additional columns to compute using callables that take a Run and return a value.
Returns
-
DataFrame — A Polars DataFrame containing the specified data from the runs.
source method RunCollection.group_by(*keys: str, **kwargs: Callable[[Self | Sequence[R]], Any]) → dict[Any, Self] | DataFrame
Group runs by one or more keys.
This method can return either
- A dictionary mapping group keys to RunCollections (no kwargs provided)
- A Polars DataFrame with group keys and aggregated values (kwargs provided)
Parameters
-
*keys : str — The keys to group by.
-
**kwargs : Callable[[Self | Sequence[R]], Any] — Aggregation functions to apply to each group. Each function should accept a RunCollection or Sequence[Run] and return a value.
Returns
-
dict[Any, Self] | DataFrame — Either a dictionary mapping group keys to RunCollections, or a Polars DataFrame with group keys and aggregated values.
source chdir_artifact(run: Run) → Iterator[Path]
Change the current working directory to the artifact directory of the given run.
This context manager changes the current working directory to the artifact directory of the given run. It ensures that the directory is changed back to the original directory after the context is exited.
Parameters
-
run : Run | None — The run to get the artifact directory from.
source get_artifact_dir(run: Run) → Path
Retrieve the artifact directory for the given run.
This function uses MLflow to get the artifact directory for the given run.
Parameters
-
run : Run | None — The run instance. Defaults to None.
Returns
-
Path — The local path to the directory where the artifacts are downloaded.
Raises
-
NotImplementedError
source iter_artifact_paths(tracking_dir: str | Path, artifact_path: str | Path, experiment_names: str | list[str] | Callable[[str], bool] | None = None) → Iterator[Path]
Iterate over the artifact paths in the tracking directory.
source iter_artifacts_dirs(tracking_dir: str | Path, experiment_names: str | list[str] | Callable[[str], bool] | None = None) → Iterator[Path]
Iterate over the artifacts directories in the tracking directory.
source iter_experiment_dirs(tracking_dir: str | Path, experiment_names: str | list[str] | Callable[[str], bool] | None = None) → Iterator[Path]
Iterate over the experiment directories in the tracking directory.
source iter_run_dirs(tracking_dir: str | Path, experiment_names: str | list[str] | Callable[[str], bool] | None = None) → Iterator[Path]
Iterate over the run directories in the tracking directory.
source log_run(run: Run) → Iterator[None]
Log the parameters from the given configuration instance.
This context manager logs the parameters from the provided configuration instance using MLflow. It also manages the MLflow run context, ensuring that artifacts are logged and the run is properly closed.
Parameters
-
run : Run — The run instance.
Yields
-
None — None
source main(node: T | type[T], config_name: str = 'config', *, chdir: bool = False, force_new_run: bool = False, match_overrides: bool = False, rerun_finished: bool = False)
Decorator for configuring and running MLflow experiments with Hydra.
This decorator combines Hydra configuration management with MLflow experiment tracking. It automatically handles run deduplication and configuration storage.
Parameters
-
node : T | type[T] — Configuration node class or instance defining the structure of the configuration.
-
config_name : str — Name of the configuration. Defaults to "config".
-
chdir : bool — If True, changes working directory to the artifact directory of the run. Defaults to False.
-
force_new_run : bool — If True, always creates a new MLflow run instead of reusing existing ones. Defaults to False.
-
match_overrides : bool — If True, matches runs based on Hydra CLI overrides instead of full config. Defaults to False.
-
rerun_finished : bool — If True, allows rerunning completed runs. Defaults to False.
source start_run(*, chdir: bool = False, run_id: str | None = None, experiment_id: str | None = None, run_name: str | None = None, nested: bool = False, parent_run_id: str | None = None, tags: dict[str, str] | None = None, description: str | None = None, log_system_metrics: bool | None = None) → Iterator[Run]
Start an MLflow run and log parameters using the provided configuration instance.
This context manager starts an MLflow run and logs parameters using the specified configuration instance. It ensures that the run is properly closed after completion.
Parameters
-
config : object — The configuration instance to log parameters from.
-
chdir : bool — Whether to change the current working directory to the artifact directory of the current run. Defaults to False.
-
run_id : str | None — The existing run ID. Defaults to None.
-
experiment_id : str | None — The experiment ID. Defaults to None.
-
run_name : str | None — The name of the run. Defaults to None.
-
nested : bool — Whether to allow nested runs. Defaults to False.
-
parent_run_id : str | None — The parent run ID. Defaults to None.
-
tags : dict[str, str] | None — Tags to associate with the run. Defaults to None.
-
description : str | None — A description of the run. Defaults to None.
-
log_system_metrics : bool | None — Whether to log system metrics. Defaults to None.
-
synchronous : bool | None — Whether to log parameters synchronously. Defaults to None.
Yields
-
Run — An MLflow Run instance representing the started run.