3. Utils

Utility functions for PyrNet

#|export
from numpy.typing import ArrayLike, NDArray
import numpy as np
from scipy.signal.windows import gaussian
import jstyleson as json
from addict import Dict as adict
from operator import itemgetter
from toolz import keyfilter

# python -m pip install git+https://github.com/hdeneke/trosat-base.git#egg=trosat-base
import trosat.sunpos as sp
# extra imports for demonstration
import sys
import pandas as pd
import matplotlib.pyplot as plt
import datetime as dt
import pkg_resources as pkg_res

import pyrnet

3.1. Representation of Time

Unifying various representations of time to numpy.datetime64 comes in handy when handling user inputs.

#|export
EPOCH_JD_2000_0 = np.datetime64("2000-01-01T12:00")
def to_datetime64(time, epoch=EPOCH_JD_2000_0):
    """
    Convert various representations of time to datetime64.

    Parameters
    ----------
    time : list, ndarray, or scalar of type float, datetime or datetime64
        A representation of time. If float, interpreted as Julian date.
    epoch : np.datetime64, default JD2000.0
        The epoch to use for the calculation

    Returns
    -------
    datetime64 or ndarray of datetime64
    """
    jd = sp.to_julday(time, epoch=epoch)
    jdms = np.int64(86_400_000*jd)
    return epoch + jdms.astype('timedelta64[ms]')
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
Cell In[1], line 2
      1 #|export
----> 2 EPOCH_JD_2000_0 = np.datetime64("2000-01-01T12:00")
      3 def to_datetime64(time, epoch=EPOCH_JD_2000_0):
      4     """
      5     Convert various representations of time to datetime64.
      6 
   (...)
     16     datetime64 or ndarray of datetime64
     17     """

NameError: name 'np' is not defined
# testing to_datetime64
date_jd = 5203.5 # 2014-04-01T00:00
date_dt = dt.datetime(2014,4,1,12,10)
date_pd = pd.date_range("2014-04-01","2014-04-03",freq='1d')
date_list = [dt.date(2014,4,1), dt.date(2014,4,2)]
assert np.datetime64("2014-04-01T00:00")==to_datetime64(date_jd)
assert np.datetime64("2014-04-01T12:10")==to_datetime64(date_dt)
assert np.array_equal(
    np.array([np.datetime64("2014-04-01"),
                 np.datetime64("2014-04-02"),
                 np.datetime64("2014-04-03")]
                ).astype("datetime64[ms]"),
    to_datetime64(date_pd))
assert np.array_equal(
    np.array([np.datetime64("2014-04-01"),
                 np.datetime64("2014-04-02")]
                ).astype("datetime64[ms]"),
    to_datetime64(date_list))

3.2. Json netcdf config tools

The netCDF attributes and encoding variables are stored in CF-Compliance json files. The following utility functions are used to parse the json and return attribute and encoding dictionary’s to be used with xarray.

#|export
def read_json(fpath: str, *, object_hook: type = adict, cls = None) -> dict:
    """ Parse json file to python dict.
    """
    with open(fpath,"r") as f:
        js = json.load(f, object_hook=object_hook, cls=cls)
        return js

def pick(whitelist: list[str], d: dict) -> dict:
    """ Keep only whitelisted keys from input dict.
    """
    return keyfilter(lambda k: k in whitelist, d)

def omit(blacklist: list[str], d: dict) -> dict:
    """ Omit blacklisted keys from input dict.
    """
    return keyfilter(lambda k: k not in blacklist, d)

def get_var_attrs(d: dict) -> dict:
    """
    Parse cf-compliance dictionary.

    Parameters
    ----------
    d: dict
        Dict parsed from cf-meta json.

    Returns
    -------
    dict
        Dict with netcdf attributes for each variable.
    """
    get_vars = itemgetter("variables")
    get_attrs = itemgetter("attributes")
    vattrs = {k: get_attrs(v) for k,v in get_vars(d).items()}
    for k,v in get_vars(d).items():
        vattrs[k].update({
            "dtype": v["type"],
            "gzip":True,
            "complevel":6
        })
    return vattrs

def get_attrs_enc(d : dict) -> (dict,dict):
    """ Split variable attributes in attributes and encoding-attributes.
    """
    _enc_attrs = {
        "scale_factor",
        "add_offset",
        "_FillValue",
        "dtype",
        "zlib",
        "gzip",
        "complevel",
        "calendar",
    }
    # extract variable attributes
    vattrs = {k: omit(_enc_attrs, v) for k, v in d.items()}
    # extract variable encoding
    vencode = {k: pick(_enc_attrs, v) for k, v in d.items()}
    return vattrs, vencode

3.2.1. Usage:

#|dropout
fn = pkg_res.resource_filename("pyrnet", "share/pyrnet_cfmeta.json")

config =  dict(
    contributor_name = "Jon Doe; Roger Rogers",
    contributor_role = "Set-up; Tear-down",
    project = "Notebook Example",
    creator_name = "Adam Alpha",
    dt = np.datetime64("now").item(),
    sdate = dt.datetime(2014,4,1,12,0),
    edate = dt.datetime(2014,4,2,18,0),
    notes = "This is just an example.",
)

# parse the json file
cfdict = read_json(fn)

# get global attributes:
gattrs = cfdict['attributes']

# apply config
gattrs = {k:v.format_map(config) for k,v in gattrs.items()}

gattrs
#|dropout
# get variable attributes
d = get_var_attrs(cfdict)

# split encoding attributes
vattrs, vencode = get_attrs_enc(d)

vattrs, vencode

3.3. Euclidian distances of stations

#|export
def pairwise_distance_matrix( x: ArrayLike, y: ArrayLike ) -> NDArray:
    """
    Get square matrix with Euclidian distances of stations

    Parameters
    ----------
    x: array_like
        X coordinates
    y: array_like
        Y coordinates

    Returns
    -------
    ndarray
        A square matrix with Euclidian distances of stations
    """
    x = np.array(x)
    y = np.array(y)
    return np.sqrt( (x[None,:]-x[:,None])**2+(y[None,:]-y[:,None])**2 )
x = [0, 1, 2]
y = [0, 0, 0]
dist = pairwise_distance_matrix(x,y)
print(dist)
assert dist[0,0]==dist[1,1]==dist[2,2]==0
assert dist[0,1]==dist[1,0]==1
assert dist[0,2]==dist[2,0]==2

3.4. Fourier utility

3.4.1. Create a gaussian window

Convert a scale parameter J to FWHM of a normal distribution

$\mathrm{FWHM} = 2 \sqrt{2 \ln 2} \sigma = f \sigma$

define FWHM with scale parameter J, so that : $\mathrm{FWHM} = 60*2^J$

for J in range(-2,4):
    print(f"J = {J:2d} -> FWHM(J) = {60.*2**J}")
J=2
f = 2.0*np.sqrt(2*np.log(2))
sig = (60.0/f)*2**J

# FWHM from scale parameter
FWHM = 60*2**J

# normalized gaussian distribution
g  = gaussian(3*FWHM, sig, sym=False)
# distribution density (sums up to 1)
gd = g/(np.sqrt(2*np.pi)*sig)
print(f"sum of distribution density: {np.sum(gd):.3f}")

x0 = np.argmax(gd)
x1, x2 = x0-.5*FWHM, x0+.5*FWHM
plt.figure()
_ = plt.plot(g/np.sqrt(2*np.pi)/sig, label='gaussian distribution density')
_ = plt.hlines(.5*np.max(gd),x1,x2,color='k', label=f'FWHM={FWHM}')
_ = plt.grid()
_ = plt.legend()
plt.show()
# shift array, so that it starts with the maxima
gd_rolled = np.roll(gd, np.floor_divide(3*FWHM, 2))

# calculate the frequency response of the Gaussian window
gd_fft = np.fft.fft(gd_rolled)
fft_freq = np.fft.fftfreq(gd_rolled.shape[0])

fig, axs = plt.subplots(2,1)
_ = axs[0].set_title("Gaussian distribution density")
_ = axs[0].plot(gd_rolled)
_ = axs[0].grid(True)
_ = axs[0].set_xlabel('Sample')
_ = axs[0].set_ylabel('Amplitude')

_ = axs[1].set_title("Frequency response of the Gaussian window")
_ = axs[1].plot(fft_freq,gd_fft,'b.')
_ = axs[1].set_xlabel('normalized frequency (cycles per sample)')
_ = axs[1].set_ylabel('normalized magnitude (dB)')
_ = axs[1].set_xlim([-.05,.05])
fig.tight_layout()
#|export
def gauss_fwin_fwhm(fwhm: float, N: int = 86400) -> NDArray:
    """
    Convert scale parameter to FWHM of Normal distribution see
    [https://en.wikipedia.org/wiki/Full_width_at_half_maximum#Normal_distribution]

    Parameters
    ----------
    fwhm: float
        FWHM of the gaussian distribution
    N: int
        Number of points in the output window. If zero, an empty array is returned. An exception is thrown when it is negative.
        The default is 86400 (seconds per day).

    Returns
    -------
    ndarray
        Frequency response of the gaussian window
    """
    f = 2.0*np.sqrt(2*np.log(2))
    sig = fwhm/f
    g  = gaussian(N, sig, sym=False)/np.sqrt(2*np.pi)/sig
    return np.fft.fft(np.roll(g, np.floor_divide(N, 2)))

def gauss_fwin(J: float, N: int=86400) -> NDArray:
    """
    Convert scale parameter to FWHM of Normal distribution see
    [https://en.wikipedia.org/wiki/Full_width_at_half_maximum#Normal_distribution]

    Parameters
    ----------
    J: float
        scale parameter for FWHM, with FWHM=60*2**J
    N: int
        Number of points in the output window. If zero, an empty array is returned. An exception is thrown when it is negative.
        The default is 86400 (seconds per day).

    Returns
    -------
    ndarray
        Frequency response of the gaussian window
    """
    fwhm = 60.*2**J
    return gauss_fwin_fwhm(fwhm, N=N)

3.4.2. Smoothing Data by convolution

#|export
def smooth_fwhm(y: ArrayLike, fwhm: float, axis: int = 0) -> NDArray:
    """
    Smooth data with gaussian window by convolution

    Parameters
    ----------
    y: array_like
        Input array.
    fwhm: float
        FWHM of the gaussian window to be convolved with the input array.
    axis: int, optional
        Axis over which to smoothing is applied.

    Returns
    -------
    ndarray
        Smoothed array of the same shape as the input array `y`.
    """
    Y = np.fft.fft(y, axis=axis)
    W = gauss_fwin_fwhm(fwhm, y.shape[axis])
    if y.ndim > 1:
        W = np.expand_dims(W, axis=axis).T
    return np.fft.ifft(Y * W, axis=axis).real

def smooth(y: ArrayLike, J: float, axis: int = 0) -> NDArray:
    """
    Smooth data with gaussian window by convolution

    Parameters
    ----------
    y: array_like
        Input array.
    J: float
        Scale parameter for the FWHM (FWHM=60*2**J) of the
        gaussian window to be convolved with the input array.
    axis: int, optional
        Axis over which to smoothing is applied.

    Returns
    -------
    ndarray
        Smoothed array of the same shape as the input array `y`.
    """
    fwhm = 60.*2**J
    return smooth_fwhm(y, fwhm, axis=axis)
axis=0
sig = np.repeat([0., 1., 0.], 100)
sig2 = np.vstack((sig*2,sig*4))

fwhm = 10
filtered = smooth_fwhm(sig, fwhm, axis=0)
filtered2 = smooth_fwhm(sig2, fwhm, axis=1)


fig, ax = plt.subplots(1,1)
_ = ax.plot(sig, label='signal')
_ = ax.plot(sig2.T, label='signal')
_ = ax.plot(filtered, label='smoothed signal')
_ = ax.plot(filtered2.T, label='smoothed signal')
_ = ax.legend()
fig.show()
from pyrnet import pyrnet
date = dt.datetime(2013,7,15)
pyr  = pyrnet.read_pyrnet(date, 'hope_juelich')

z0 = pyr.ghi.data[:,0]
z1 = smooth(pyr.ghi.data, 0, axis=0)
z2 = smooth(pyr.ghi.data.T, 5, axis=-1)

fig, ax = plt.subplots(1,1)
_ = ax.plot(z0,'k')
_ = ax.plot(z1[:,0],'g')
_ = ax.plot(z2[0,:],'r')
_ = ax.grid(True)
fig.show()