Skip to content

hydraflow

source package hydraflow

Integrate Hydra and MLflow to manage and track machine learning experiments.

Classes

  • Run Represent an MLflow Run in HydraFlow.

  • RunCollection A collection of Run instances that implements the Sequence protocol.

Functions

  • chdir_artifact Change the current working directory to the artifact directory of the given run.

  • get_artifact_dir Retrieve the artifact directory for the given run.

  • iter_artifact_paths Iterate over the artifact paths in the tracking directory.

  • iter_artifacts_dirs Iterate over the artifacts directories in the tracking directory.

  • iter_experiment_dirs Iterate over the experiment directories in the tracking directory.

  • iter_run_dirs Iterate over the run directories in the tracking directory.

  • log_run Log the parameters from the given configuration instance.

  • main Decorator for configuring and running MLflow experiments with Hydra.

  • start_run Start an MLflow run and log parameters using the provided configuration instance.

source class Run[C, I = None](run_dir: Path, impl_factory: Callable[[Path], I] | Callable[[Path, C], I] = lambda _: None)

Represent an MLflow Run in HydraFlow.

A Run contains information about the run, configuration, and implementation. The configuration type C and implementation type I are specified as type parameters.

Attributes

  • info : RunInfo Information about the run, such as run directory, run ID, and job name.

  • impl_factory : Callable[[Path], I] | Callable[[Path, C], I] Factory function to create the implementation instance.

  • cfg : C The configuration instance loaded from the Hydra configuration file.

  • impl : I The implementation instance created by the factory function.

Methods

  • load Load a Run from a run directory.

  • update Set default value(s) in the configuration if they don't already exist.

  • get Get a value from the information or configuration.

  • predicate Check if a value satisfies a condition for filtering.

  • to_dict Convert the Run to a dictionary.

source property Run.cfg: C

The configuration instance loaded from the Hydra configuration file.

source property Run.impl: I

The implementation instance created by the factory function.

This property dynamically examines the signature of the impl_factory using the inspect module and calls it with the appropriate arguments:

  • If the factory accepts one parameter: called with just the artifacts directory
  • If the factory accepts two parameters: called with the artifacts directory and the configuration instance

This allows implementation classes to be configuration-aware and utilize both the file system and configuration information.

source classmethod Run.load(run_dir: str | Path | Iterable[str | Path], impl_factory: Callable[[Path], I] | Callable[[Path, C], I] = lambda _: None, *, n_jobs: int = 0)Self | RunCollection[Self]

Load a Run from a run directory.

Parameters

  • run_dir : str | Path | Iterable[str | Path] The directory where the MLflow runs are stored, either as a string, a Path instance, or an iterable of them.

  • impl_factory : Callable[[Path], I] | Callable[[Path, C], I] A factory function that creates the implementation instance. It can accept either just the artifacts directory path, or both the path and the configuration instance. Defaults to a function that returns None.

  • n_jobs : int The number of parallel jobs. If 0 (default), runs sequentially. If -1, uses all available CPU cores.

Returns

  • Self | RunCollection[Self] A single Run instance or a RunCollection of Run instances.

source method Run.update(key: str | tuple[str, ...], value: Any | Callable[[Self], Any], *, force: bool = False)None

Set default value(s) in the configuration if they don't already exist.

This method adds a value or multiple values to the configuration, but only if the corresponding keys don't already have values. Existing values will not be modified.

Parameters

  • key : str | tuple[str, ...] Either a string representing a single configuration path (can use dot notation like "section.subsection.param"), or a tuple of strings to set multiple related configuration values at once.

  • value : Any | Callable[[Self], Any] The value to set. This can be: - For string keys: Any value, or a callable that returns a value - For tuple keys: An iterable with the same length as the key tuple, or a callable that returns such an iterable - For callable values: The callable must accept a single argument of type Run (self) and return the appropriate value type

  • force : bool Whether to force the update even if the key already exists.

Raises

  • TypeError If a tuple key is provided but the value is not an iterable, or if the callable doesn't return an iterable.

source method Run.get(key: str, default: Any = MISSING)Any

Get a value from the information or configuration.

Parameters

  • key : str The key to look for. Can use dot notation for nested keys in configuration.

  • default : Any Value to return if the key is not found. If not provided, AttributeError will be raised.

Returns

  • Any The value associated with the key, or the default value if the key is not found and a default is provided.

Raises

  • AttributeError If the key is not found and no default is provided.

source method Run.predicate(key: str, value: Any)bool

Check if a value satisfies a condition for filtering.

This method retrieves the attribute specified by the key using the get method, and then compares it with the given value according to the following rules:

  • If value is callable: Call it with the attribute and return the boolean result
  • If value is a list or set: Check if the attribute is in the list/set
  • If value is a tuple of length 2: Check if the attribute is in the range [value[0], value[1]]. Both sides are inclusive
  • Otherwise: Check if the attribute equals the value

Parameters

  • key : str The key to get the attribute from.

  • value : Any The value to compare with, or a callable that takes the attribute and returns a boolean.

Returns

  • bool True if the attribute satisfies the condition, False otherwise.

source method Run.to_dict()dict[str, Any]

Convert the Run to a dictionary.

source class RunCollection[R: Run[Any, Any]](runs: Iterable[R])

Bases : Sequence[R]

A collection of Run instances that implements the Sequence protocol.

RunCollection provides methods for filtering, sorting, grouping, and analyzing runs, as well as converting run data to various formats such as DataFrames.

Parameters

  • runs : Iterable[Run] An iterable of Run instances to include in the collection.

Attributes

  • runs : list[R] A list containing the Run instances in this collection.

Methods

  • update Update configuration values for all runs in the collection.

  • filter Filter runs based on predicates or key-value conditions.

  • try_get Try to get a single run matching the specified criteria.

  • get Get a single run matching the specified criteria.

  • first Get the first run matching the specified criteria.

  • last Get the last run matching the specified criteria.

  • to_list Extract a list of values for a specific key from all runs.

  • to_numpy Extract values for a specific key from all runs as a NumPy array.

  • unique Get the unique values for a specific key across all runs.

  • n_unique Count the number of unique values for a specific key across all runs.

  • sort Sort runs based on one or more keys.

  • to_frame Convert the collection to a Polars DataFrame.

  • group_by Group runs by one or more keys.

source method RunCollection.update(key: str | tuple[str, ...], value: Any | Callable[[R], Any], *, force: bool = False)None

Update configuration values for all runs in the collection.

This method calls the update method on each run in the collection.

Parameters

  • key : str | tuple[str, ...] Either a string representing a single configuration path or a tuple of strings to set multiple configuration values.

  • value : Any | Callable[[R], Any] The value(s) to set or a callable that returns such values.

  • force : bool Whether to force updates even if the keys already exist.

source method RunCollection.filter(*predicates: Callable[[R], bool] | tuple[str, Any], **kwargs: Any)Self

Filter runs based on predicates or key-value conditions.

This method allows filtering runs using various criteria

  • Callable predicates that take a Run and return a boolean
  • Key-value tuples where the key is a string and the value is compared using the Run.predicate method
  • Keyword arguments, where the key is a string and the value is compared using the Run.predicate method

Parameters

  • *predicates : Callable[[R], bool] | tuple[str, Any] Callable predicates or (key, value) tuples for filtering.

  • **kwargs : Any Additional key-value pairs for filtering.

Returns

  • Self A new RunCollection containing only the runs that match all criteria.

source method RunCollection.try_get(*predicates: Callable[[R], bool] | tuple[str, Any], **kwargs: Any)R | None

Try to get a single run matching the specified criteria.

This method applies filters and returns a single matching run if exactly one is found, None if no runs are found, or raises ValueError if multiple runs match.

Parameters

  • *predicates : Callable[[R], bool] | tuple[str, Any] Callable predicates or (key, value) tuples for filtering.

  • **kwargs : Any Additional key-value pairs for filtering.

Returns

  • R | None A single Run that matches the criteria, or None if no matches are found.

Raises

  • ValueError If multiple runs match the criteria.

source method RunCollection.get(*predicates: Callable[[R], bool] | tuple[str, Any], **kwargs: Any)R

Get a single run matching the specified criteria.

This method applies filters and returns a single matching run, or raises ValueError if no runs or multiple runs match.

Parameters

  • *predicates : Callable[[R], bool] | tuple[str, Any] Callable predicates or (key, value) tuples for filtering.

  • **kwargs : Any Additional key-value pairs for filtering.

Returns

  • R A single Run that matches the criteria.

Raises

  • ValueError If no runs match or if multiple runs match

  • the criteria.

  • _value_error

source method RunCollection.first(*predicates: Callable[[R], bool] | tuple[str, Any], **kwargs: Any)R

Get the first run matching the specified criteria.

This method applies filters and returns the first matching run, or raises ValueError if no runs match.

Parameters

  • *predicates : Callable[[R], bool] | tuple[str, Any] Callable predicates or (key, value) tuples for filtering.

  • **kwargs : Any Additional key-value pairs for filtering.

Returns

  • R The first Run that matches the criteria.

Raises

  • ValueError If no runs match the criteria.

  • _value_error

source method RunCollection.last(*predicates: Callable[[R], bool] | tuple[str, Any], **kwargs: Any)R

Get the last run matching the specified criteria.

This method applies filters and returns the last matching run, or raises ValueError if no runs match.

Parameters

  • *predicates : Callable[[R], bool] | tuple[str, Any] Callable predicates or (key, value) tuples for filtering.

  • **kwargs : Any Additional key-value pairs for filtering.

Returns

  • R The last Run that matches the criteria.

Raises

  • ValueError If no runs match the criteria.

  • _value_error

source method RunCollection.to_list(key: str)list[Any]

Extract a list of values for a specific key from all runs.

Parameters

  • key : str The key to extract from each run.

Returns

  • list[Any] A list containing the values for the specified key from each run.

source method RunCollection.to_numpy(key: str)NDArray

Extract values for a specific key from all runs as a NumPy array.

Parameters

  • key : str The key to extract from each run.

Returns

  • NDArray A NumPy array containing the values for the specified key from each run.

source method RunCollection.unique(key: str)NDArray

Get the unique values for a specific key across all runs.

Parameters

  • key : str The key to extract unique values for.

Returns

  • NDArray A NumPy array containing the unique values for the specified key.

source method RunCollection.n_unique(key: str)int

Count the number of unique values for a specific key across all runs.

Parameters

  • key : str The key to count unique values for.

Returns

  • int The number of unique values for the specified key.

source method RunCollection.sort(*keys: str, reverse: bool = False)Self

Sort runs based on one or more keys.

Parameters

  • *keys : str The keys to sort by, in order of priority.

  • reverse : bool Whether to sort in descending order (default is ascending).

Returns

  • Self A new RunCollection with the runs sorted according to the specified keys.

source method RunCollection.to_frame(*keys: str, **kwargs: Callable[[R], Any])DataFrame

Convert the collection to a Polars DataFrame.

Parameters

  • *keys : str The keys to include as columns in the DataFrame. If not provided, all keys from each run's to_dict() method will be used.

  • **kwargs : Callable[[R], Any] Additional columns to compute using callables that take a Run and return a value.

Returns

  • DataFrame A Polars DataFrame containing the specified data from the runs.

source method RunCollection.group_by(*keys: str, **kwargs: Callable[[Self | Sequence[R]], Any])dict[Any, Self] | DataFrame

Group runs by one or more keys.

This method can return either

  • A dictionary mapping group keys to RunCollections (no kwargs provided)
  • A Polars DataFrame with group keys and aggregated values (kwargs provided)

Parameters

  • *keys : str The keys to group by.

  • **kwargs : Callable[[Self | Sequence[R]], Any] Aggregation functions to apply to each group. Each function should accept a RunCollection or Sequence[Run] and return a value.

Returns

  • dict[Any, Self] | DataFrame Either a dictionary mapping group keys to RunCollections, or a Polars DataFrame with group keys and aggregated values.

source chdir_artifact(run: Run)Iterator[Path]

Change the current working directory to the artifact directory of the given run.

This context manager changes the current working directory to the artifact directory of the given run. It ensures that the directory is changed back to the original directory after the context is exited.

Parameters

  • run : Run | None The run to get the artifact directory from.

source get_artifact_dir(run: Run)Path

Retrieve the artifact directory for the given run.

This function uses MLflow to get the artifact directory for the given run.

Parameters

  • run : Run | None The run instance. Defaults to None.

Returns

  • Path The local path to the directory where the artifacts are downloaded.

Raises

  • NotImplementedError

source iter_artifact_paths(tracking_dir: str | Path, artifact_path: str | Path, experiment_names: str | list[str] | Callable[[str], bool] | None = None)Iterator[Path]

Iterate over the artifact paths in the tracking directory.

source iter_artifacts_dirs(tracking_dir: str | Path, experiment_names: str | list[str] | Callable[[str], bool] | None = None)Iterator[Path]

Iterate over the artifacts directories in the tracking directory.

source iter_experiment_dirs(tracking_dir: str | Path, experiment_names: str | list[str] | Callable[[str], bool] | None = None)Iterator[Path]

Iterate over the experiment directories in the tracking directory.

source iter_run_dirs(tracking_dir: str | Path, experiment_names: str | list[str] | Callable[[str], bool] | None = None)Iterator[Path]

Iterate over the run directories in the tracking directory.

source log_run(run: Run)Iterator[None]

Log the parameters from the given configuration instance.

This context manager logs the parameters from the provided configuration instance using MLflow. It also manages the MLflow run context, ensuring that artifacts are logged and the run is properly closed.

Parameters

  • run : Run The run instance.

Yields

  • None None

source main(node: T | type[T], config_name: str = 'config', *, chdir: bool = False, force_new_run: bool = False, match_overrides: bool = False, rerun_finished: bool = False)

Decorator for configuring and running MLflow experiments with Hydra.

This decorator combines Hydra configuration management with MLflow experiment tracking. It automatically handles run deduplication and configuration storage.

Parameters

  • node : T | type[T] Configuration node class or instance defining the structure of the configuration.

  • config_name : str Name of the configuration. Defaults to "config".

  • chdir : bool If True, changes working directory to the artifact directory of the run. Defaults to False.

  • force_new_run : bool If True, always creates a new MLflow run instead of reusing existing ones. Defaults to False.

  • match_overrides : bool If True, matches runs based on Hydra CLI overrides instead of full config. Defaults to False.

  • rerun_finished : bool If True, allows rerunning completed runs. Defaults to False.

source start_run(*, chdir: bool = False, run_id: str | None = None, experiment_id: str | None = None, run_name: str | None = None, nested: bool = False, parent_run_id: str | None = None, tags: dict[str, str] | None = None, description: str | None = None, log_system_metrics: bool | None = None)Iterator[Run]

Start an MLflow run and log parameters using the provided configuration instance.

This context manager starts an MLflow run and logs parameters using the specified configuration instance. It ensures that the run is properly closed after completion.

Parameters

  • config : object The configuration instance to log parameters from.

  • chdir : bool Whether to change the current working directory to the artifact directory of the current run. Defaults to False.

  • run_id : str | None The existing run ID. Defaults to None.

  • experiment_id : str | None The experiment ID. Defaults to None.

  • run_name : str | None The name of the run. Defaults to None.

  • nested : bool Whether to allow nested runs. Defaults to False.

  • parent_run_id : str | None The parent run ID. Defaults to None.

  • tags : dict[str, str] | None Tags to associate with the run. Defaults to None.

  • description : str | None A description of the run. Defaults to None.

  • log_system_metrics : bool | None Whether to log system metrics. Defaults to None.

  • synchronous : bool | None Whether to log parameters synchronously. Defaults to None.

Yields

  • Run An MLflow Run instance representing the started run.