Source code for spectral_indices.pipeline.pipeline

from __future__ import annotations

import json
from pathlib import Path
from typing import Any, Dict, List

from dask.diagnostics.progress import ProgressBar
from xarray import DataArray

from spectral_indices.transformations.base import Transformation


[docs] class Pipeline: """Pipeline wraps a succession of transformations into a unique object. It can be serialized and registerd in PipelineRegistry for CLI commands. All transformations will be applied in the order of the transformation list. Pipeline can be serialized to json and be instantiate from a json. Args: transformations (List[Transformation]): List of all the Transformations that are used in pipeline name (str, optional): Name of the pipeline (it will the way to access registered pipeline through factory). Defaults to "". description (str, optional): Description of pipeline. Defaults to "". """ # TODO add serialization and build from json def __init__( self, transformations: List[Transformation], name="", description: str = "" ): self.transformations = transformations self.description = description self.name = name
[docs] def run(self, array: DataArray) -> DataArray: """Apply the pipeline to an array. This is a lazy application, real outputsq will be be computed with dask. Args: array (DataArray): Array to apply pipeline on. Returns: DataArray: Lazy processed array. """ with ProgressBar(): for transformation in self.transformations: array = transformation.planify(array) return array
[docs] def to_dict(self) -> Dict[str, Any]: """Return pipeline as dictionnary with all transformations. Returns: Dict[str, Any]: Pipeline as dict. """ pipeline_dict = { "transformations": [], "name": self.name, "description": self.description, } for transformation in self.transformations: pipeline_dict["transformations"].append( {"name": transformation.name, "params": transformation.to_dict()} ) return pipeline_dict
def to_json(self, json_path: str): Path(json_path).write_text(json.dumps(self.to_dict())) @classmethod def from_dict(cls, pipeline_dict: Dict[str, Any]) -> Pipeline: transformations = [] for transform in pipeline_dict["transformations"]: transformations.append(Transformation.from_dict(transform)) pipeline = Pipeline( transformations=transformations, name=pipeline_dict["name"], description=pipeline_dict["description"], ) return pipeline @classmethod def from_json(cls, json_path: str) -> Pipeline: pipeline_dict = json.load(Path(json_path).open()) return Pipeline.from_dict(pipeline_dict) def __repr__(self): repr = f"* {self.name}\n" repr += "* " + self.description + "\n" for i, t in enumerate(self.transformations): repr += f"* {i+1} - {t}\n" repr = repr[:-1] return repr
[docs] class PipelinesRegistry: """Pipeline registry to store and access existing pipelines through names.""" _pipelines: Dict[str, Pipeline] = {}
[docs] @classmethod def register(cls, pipeline: Pipeline): """Register an existing pipeline into the registry. Args: pipeline (``Pipeline``): Existing pipeline to register. """ cls._pipelines[pipeline.name] = pipeline
[docs] @classmethod def get(cls, name: str) -> Pipeline: """Gather a pieline from registry. Args: name (``str``): Pipeline name. Returns: ``Pipeline``: - Pipeline to gather. """ return cls._pipelines[name]