from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict, List
from dask.diagnostics.progress import ProgressBar
from xarray import DataArray
from spectral_indices.transformations.base import Transformation
[docs]
class Pipeline:
"""Pipeline wraps a succession of transformations into a unique object. It can be serialized and registerd in PipelineRegistry for CLI commands.
All transformations will be applied in the order of the transformation list.
Pipeline can be serialized to json and be instantiate from a json.
Args:
transformations (List[Transformation]): List of all the Transformations that are used in pipeline
name (str, optional): Name of the pipeline (it will the way to access registered pipeline through factory). Defaults to "".
description (str, optional): Description of pipeline. Defaults to "".
"""
# TODO add serialization and build from json
def __init__(
self, transformations: List[Transformation], name="", description: str = ""
):
self.transformations = transformations
self.description = description
self.name = name
[docs]
def run(self, array: DataArray) -> DataArray:
"""Apply the pipeline to an array. This is a lazy application, real outputsq will be be computed with dask.
Args:
array (DataArray): Array to apply pipeline on.
Returns:
DataArray: Lazy processed array.
"""
with ProgressBar():
for transformation in self.transformations:
array = transformation.planify(array)
return array
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Return pipeline as dictionnary with all transformations.
Returns:
Dict[str, Any]: Pipeline as dict.
"""
pipeline_dict = {
"transformations": [],
"name": self.name,
"description": self.description,
}
for transformation in self.transformations:
pipeline_dict["transformations"].append(
{"name": transformation.name, "params": transformation.to_dict()}
)
return pipeline_dict
def to_json(self, json_path: str):
Path(json_path).write_text(json.dumps(self.to_dict()))
@classmethod
def from_dict(cls, pipeline_dict: Dict[str, Any]) -> Pipeline:
transformations = []
for transform in pipeline_dict["transformations"]:
transformations.append(Transformation.from_dict(transform))
pipeline = Pipeline(
transformations=transformations,
name=pipeline_dict["name"],
description=pipeline_dict["description"],
)
return pipeline
@classmethod
def from_json(cls, json_path: str) -> Pipeline:
pipeline_dict = json.load(Path(json_path).open())
return Pipeline.from_dict(pipeline_dict)
def __repr__(self):
repr = f"* {self.name}\n"
repr += "* " + self.description + "\n"
for i, t in enumerate(self.transformations):
repr += f"* {i+1} - {t}\n"
repr = repr[:-1]
return repr
[docs]
class PipelinesRegistry:
"""Pipeline registry to store and access existing pipelines through names."""
_pipelines: Dict[str, Pipeline] = {}
[docs]
@classmethod
def register(cls, pipeline: Pipeline):
"""Register an existing pipeline into the registry.
Args:
pipeline (``Pipeline``): Existing pipeline to register.
"""
cls._pipelines[pipeline.name] = pipeline
[docs]
@classmethod
def get(cls, name: str) -> Pipeline:
"""Gather a pieline from registry.
Args:
name (``str``): Pipeline name.
Returns:
``Pipeline``:
- Pipeline to gather.
"""
return cls._pipelines[name]