Source code for spyndex.spyndex

import re
from typing import Any, List, Optional, Union

import dask
import dask.array as da
import dask.dataframe as dd
import ee
import eemont
import numpy as np
import pandas as pd
import xarray as xr

from .utils import _check_params, _get_indices


[docs]def computeIndex( index: Union[str, List[str]], params: Optional[dict] = None, online: bool = False, returnOrigin: bool = True, coordinate: str = "index", **kwargs, ) -> Any: """Computes one or more Spectral Indices from the Awesome Spectral Indices list. Parameters ---------- index : str | list[str] Index or list of indices to compute. Check all available indices from the `Awesome Spectral Indices Repository <https://github.com/davemlz/awesome-spectral-indices>`_. params: dict, default = None Parameters used as inputs for the computation. The input data must be compatible with Overloaded Operators. Some inputs' types supported are pandas series, numpy arrays, xarray objects and numeric objects. Earth Engine objects are also compatible when using eemont. online : bool, default = False Whether to retrieve the most recent list of indices directly from the GitHub repository and not from the local copy. returnOrigin : bool, default = True Whether to return multiple indices as an object with the same type as the inputs. - :code:`pandas.Series`: Returns a :code:`pandas.DataFrame`. - :code:`numpy.ndarray`: Returns a :code:`numpy.ndarray`. - :code:`xarray.DataArray`: Returns a :code:`xarray.DataArray`. - :code:`ee.Image`: Returns a :code:`ee.Image`. - :code:`ee.Number`: Returns a :code:`ee.List`. - :code:`dask.Array`: Returns a :code:`dask.Array`. - :code:`dask.Series`: Returns a :code:`dask.DataFrame`. When numeric objects are used in combination with other objects, the type of the other object is returned. If false, a list is returned. coordinate : str, default = "index" Name of the coordinate used to concatenate :code:`xarray.DataArray` objects when :code:`returnOrigin = True`. kwargs: Parameters used as inputs for the computation as keyword pairs. Ignored when params is defined. .. versionadded:: 0.0.5 Returns ------- Any Computed Spectral Indices according to the inputs' type. See Also -------- computeKernel : Computes a kernel :code:`k(a,b)`. Examples -------- Compute a Spectral Index by passing the required :code:`params` dictionary: >>> import spyndex >>> spyndex.computeIndex( ... index = "NDVI", ... params = { ... "N": 0.643, ... "R": 0.175 ... } ... ) 0.5721271393643031 Compute a Spectral Index by passing the required :code:`params` as keyword pairs: >>> spyndex.computeIndex("NDVI",N = 0.643,R = 0.175) 0.5721271393643031 Two or more Spectral Indices can also be computed: >>> spyndex.computeIndex( ... index = ["NDVI","SAVI"], ... params = { ... "N": 0.643, ... "R": 0.175, ... "L": 0.5 ... } ... ) [0.5721271393643031, 0.5326251896813354] Spyndex is versatile! Let's compute Spectral Indices from a :code:`numpy.ndarray`: >>> import numpy as np >>> R = np.random.normal(0.12,0.05,10000) >>> G = np.random.normal(0.34,0.07,10000) >>> N = np.random.normal(0.67,0.12,10000) >>> spyndex.computeIndex( ... index = ["NDVI","SAVI","GNDVI"], ... params = { ... "N": N, ... "R": R, ... "G": G, ... "L": 0.5 ... } ... ) array([[0.57190873, 0.63776266, 0.52554653, ..., 0.692647 , 0.72013087, 0.57576994], [0.5494994 , 0.60604837, 0.47157809, ..., 0.60647869, 0.65887439, 0.52585032], [0.33304486, 0.46408771, 0.28007567, ..., 0.35734698, 0.28536337, 0.50212151]]) Now, let's try a :code:`pandas.DataFrame`: >>> import pandas as pd >>> df = pd.DataFrame({"Red":R,"Green":G,"NIR":N}) >>> spyndex.computeIndex( ... index = ["NDVI","SAVI","GNDVI"], ... params = { ... "N": df["NIR"], ... "R": df["Red"], ... "G": df["Green"], ... "L": 0.5 ... } ... ) NDVI SAVI GNDVI 0 0.571909 0.549499 0.333045 1 0.637763 0.606048 0.464088 2 0.525547 0.471578 0.280076 3 0.498328 0.443842 0.514775 4 0.625445 0.512757 0.227829 ... ... ... ... 9995 0.706123 0.604131 0.233519 9996 0.731205 0.630090 0.389462 9997 0.692647 0.606479 0.357347 9998 0.720131 0.658874 0.285363 9999 0.575770 0.525850 0.502122 What about a :code:`xarray.DataArray`? >>> import xarray as xr >>> da = xr.DataArray(np.array([G,R,N]).reshape(3,100,100), ... dims = ("band","x","y"), ... coords = {"band": ["Green","Red","NIR"]}) >>> spyndex.computeIndex( ... index = ["NDVI","SAVI","GNDVI"], ... params = { ... "N": da.sel(band = "NIR"), ... "R": da.sel(band = "Red"), ... "G": da.sel(band = "Green"), ... "L": 0.5 ... } ... ) <xarray.DataArray (index: 3, x: 100, y: 100)> Coordinates: * index (index) <U5 'NDVI' 'SAVI' 'GNDVI' Dimensions without coordinates: x, y Now let's try :code:`dask`! >>> import dask.array as da >>> array_shape = (10000,10000) >>> chunk_size = (1000,1000) >>> dask_array = da.array([ ... da.random.normal(0.6,0.10,array_shape,chunks = chunk_size), ... da.random.normal(0.1,0.05,array_shape,chunks = chunk_size), ... da.random.normal(0.3,0.02,array_shape,chunks = chunk_size) ... ]) >>> spyndex.computeIndex( ... index = ["NDVI","SAVI","GNDVI"], ... params = { ... "N": dask_array[0], ... "R": dask_array[1], ... "G": dask_array[2], ... "L": 0.5 ... } ... ).compute() And a :code:`dask.DataFrame`? >>> import dask.dataframe as dd >>> df = pd.DataFrame({ ... "NIR": np.random.normal(0.6,0.10,1000), ... "RED": np.random.normal(0.1,0.05,1000), ... "GREEN": np.random.normal(0.3,0.02,1000), ... }) >>> df = dd.from_pandas(df,npartitions = 10) >>> spyndex.computeIndex( ... index = ["NDVI","SAVI","GNDVI"], ... params = { ... "N": df["NIR"], ... "R": df["RED"], ... "G": df["GREEN"], ... "L": 0.5 ... } ... ).compute() """ if params is None: params = kwargs if not isinstance(index, list): index = [index] indices = _get_indices(online) names = list(indices.keys()) result = [] for idx in index: if idx not in names: raise Exception(f"{idx} is not a valid Spectral Index!") else: _check_params(idx, params, indices) result.append(eval(indices[idx]["formula"], {}, params)) if len(result) == 1: result = result[0] else: if returnOrigin: if isinstance(result[0], np.ndarray): result = np.array(result) elif isinstance(result[0], pd.core.series.Series): result = pd.DataFrame(dict(zip(index, result))) elif isinstance(result[0], xr.core.dataarray.DataArray): result = [x.reset_coords(drop=True) for x in result] result = xr.concat(result, dim=coordinate).assign_coords( {coordinate: index} ) elif isinstance(result[0], ee.image.Image): result = ee.Image(result).rename(index) elif isinstance(result[0], ee.ee_number.Number): result = ee.List(result) elif isinstance(result[0], dask.array.core.Array): result = da.array(result) elif isinstance(result[0], dask.dataframe.core.Series): result = dd.concat(result, axis="columns") result.columns = index return result
[docs]def computeKernel(kernel: str, params: Optional[dict] = None, **kwargs) -> Any: """Computes a kernel :code:`k(a,b)`. Kernel parameters are used for kernel indices like the kNDVI that requires the :code:`kNN` (:code:`k(N,N)`) and :code:`kNR` (:code:`k(N,R)`) parameters. Parameters ---------- kernel : str Kernel to use. One of 'linear', 'poly' or 'RBF'. params : dict Parameters to use for the kernel computation. For :code:`kernel = 'linear'`, the parameters 'a' (band A) and 'b' (band B) must be declared. For :code:`kernel = 'RBF'`, the parameters 'a' (band A), 'b' (band B) and 'sigma' (length-scale) must be declared. For :code:`kernel = 'poly'`, the parameters 'a' (band A), 'b' (band B), 'p' (kernel degree) and 'c' (trade-off) must be declared. kwargs: Parameters used as inputs for the computation as keyword pairs. Ignored when params is defined. .. versionadded:: 0.0.5 Returns ------- Any Computed kernel. See Also -------- computeIndex : Computes one or more Spectral Indices from the Awesome Spectral Indices list. Examples -------- Compute a kernel index with the help of :code:`spyndex.computeKernel()`: >>> import spyndex >>> spyndex.computeIndex( ... index = "kNDVI", ... params = { ... "kNN": 1.0, ... "kNR": spyndex.computeKernel( ... kernel = "RBF", ... params = { ... "a" : 0.68, "b": 0.13, "sigma": (0.68 + 0.13) / 2 ... } ... ) ... } ... ) 0.4309459271768674 Parameters can also be passed as keyword pairs: >>> import spyndex >>> spyndex.computeIndex( ... index = "kNDVI", ... kNN = 1.0, ... kNR = spyndex.computeKernel("RBF",a = 0.68,b = 0.13,sigma = (0.68 + 0.13) / 2) ... ) 0.4309459271768674 Use the polynomial kernel: >>> import spyndex >>> spyndex.computeIndex( ... index = "kNDVI", ... params = { ... "kNN": spyndex.computeKernel( ... kernel = "poly", ... params = { ... "a" : 0.68, ... "b": 0.68, ... "p": 2.0, ... "c": spyndex.constants.c.default ... } ... ), ... "kNR": spyndex.computeKernel( ... kernel = "poly", ... params = { ... "a" : 0.68, ... "b": 0.13, ... "p": 2.0, ... "c": spyndex.constants.c.default ... } ... ) ... } ... ) 0.2870700138954041 Now let's try a :code:`numpy.ndarray`: >>> import numpy as np >>> R = np.random.normal(0.12,0.05,10000) >>> N = np.random.normal(0.67,0.12,10000) >>> spyndex.computeIndex( ... index = "kNDVI", ... params = { ... "kNN": 1.0, ... "kNR": spyndex.computeKernel( ... kernel = "RBF", ... params = { ... "a" : N, ... "b" : R, ... "sigma" : np.mean([N,R],0) ... } ... ) ... } ... ) array([0.36776416, 0.57727362, 0.5252302 , ..., 0.5209451 , 0.53162097, 0.67689597]) It's time for a :code:`pandas.DataFrame`! >>> import pandas as pd >>> R = np.random.normal(0.12,0.05,10000) >>> N = np.random.normal(0.67,0.12,10000) >>> df = pd.DataFrame({"Red":R,"NIR":N}) >>> spyndex.computeIndex( ... index = "kNDVI", ... params = { ... "kNN": 1.0, ... "kNR": spyndex.computeKernel( ... kernel = "RBF", ... params = { ... "a" : df["NIR"], ... "b" : df["Red"], ... "sigma" : df.mean(1) ... } ... ) ... } ... ) 0 0.468294 1 0.535752 2 0.745249 3 0.402761 4 0.432528 ... 9995 0.475168 9996 0.482034 9997 0.403363 9998 0.489537 9999 0.508163 Length: 10000, dtype: float64 """ if params is None: params = kwargs kernels = { "linear": "a * b", "poly": "((a * b) + c) ** p", } if ( isinstance(params["a"], ee.image.Image) or isinstance(params["b"], ee.image.Image) or isinstance(params["a"], ee.ee_number.Number) or isinstance(params["b"], ee.ee_number.Number) ): kernels["RBF"] = "exp((-1.0 * (a - b) ** 2.0)/(2.0 * sigma ** 2.0))" result = params["a"].expression(kernels[kernel], params) else: kernels["RBF"] = "np.exp((-1.0 * (a - b) ** 2.0)/(2.0 * sigma ** 2.0))" params["np"] = np result = eval(kernels[kernel], params) return result