3. Utils¶
Utility functions for PyrNet
#|export
from numpy.typing import ArrayLike, NDArray
import numpy as np
from scipy.signal.windows import gaussian
import jstyleson as json
from addict import Dict as adict
from operator import itemgetter
from toolz import keyfilter
# python -m pip install git+https://github.com/hdeneke/trosat-base.git#egg=trosat-base
import trosat.sunpos as sp
# extra imports for demonstration
import sys
import pandas as pd
import matplotlib.pyplot as plt
import datetime as dt
import pkg_resources as pkg_res
import pyrnet
3.1. Representation of Time¶
Unifying various representations of time to numpy.datetime64 comes in handy when handling user inputs.
#|export
EPOCH_JD_2000_0 = np.datetime64("2000-01-01T12:00")
def to_datetime64(time, epoch=EPOCH_JD_2000_0):
"""
Convert various representations of time to datetime64.
Parameters
----------
time : list, ndarray, or scalar of type float, datetime or datetime64
A representation of time. If float, interpreted as Julian date.
epoch : np.datetime64, default JD2000.0
The epoch to use for the calculation
Returns
-------
datetime64 or ndarray of datetime64
"""
jd = sp.to_julday(time, epoch=epoch)
jdms = np.int64(86_400_000*jd)
return epoch + jdms.astype('timedelta64[ms]')
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[1], line 2
1 #|export
----> 2 EPOCH_JD_2000_0 = np.datetime64("2000-01-01T12:00")
3 def to_datetime64(time, epoch=EPOCH_JD_2000_0):
4 """
5 Convert various representations of time to datetime64.
6
(...)
16 datetime64 or ndarray of datetime64
17 """
NameError: name 'np' is not defined
# testing to_datetime64
date_jd = 5203.5 # 2014-04-01T00:00
date_dt = dt.datetime(2014,4,1,12,10)
date_pd = pd.date_range("2014-04-01","2014-04-03",freq='1d')
date_list = [dt.date(2014,4,1), dt.date(2014,4,2)]
assert np.datetime64("2014-04-01T00:00")==to_datetime64(date_jd)
assert np.datetime64("2014-04-01T12:10")==to_datetime64(date_dt)
assert np.array_equal(
np.array([np.datetime64("2014-04-01"),
np.datetime64("2014-04-02"),
np.datetime64("2014-04-03")]
).astype("datetime64[ms]"),
to_datetime64(date_pd))
assert np.array_equal(
np.array([np.datetime64("2014-04-01"),
np.datetime64("2014-04-02")]
).astype("datetime64[ms]"),
to_datetime64(date_list))
3.2. Json netcdf config tools¶
The netCDF attributes and encoding variables are stored in CF-Compliance json files. The following utility functions are used to parse the json and return attribute and encoding dictionary’s to be used with xarray.
#|export
def read_json(fpath: str, *, object_hook: type = adict, cls = None) -> dict:
""" Parse json file to python dict.
"""
with open(fpath,"r") as f:
js = json.load(f, object_hook=object_hook, cls=cls)
return js
def pick(whitelist: list[str], d: dict) -> dict:
""" Keep only whitelisted keys from input dict.
"""
return keyfilter(lambda k: k in whitelist, d)
def omit(blacklist: list[str], d: dict) -> dict:
""" Omit blacklisted keys from input dict.
"""
return keyfilter(lambda k: k not in blacklist, d)
def get_var_attrs(d: dict) -> dict:
"""
Parse cf-compliance dictionary.
Parameters
----------
d: dict
Dict parsed from cf-meta json.
Returns
-------
dict
Dict with netcdf attributes for each variable.
"""
get_vars = itemgetter("variables")
get_attrs = itemgetter("attributes")
vattrs = {k: get_attrs(v) for k,v in get_vars(d).items()}
for k,v in get_vars(d).items():
vattrs[k].update({
"dtype": v["type"],
"gzip":True,
"complevel":6
})
return vattrs
def get_attrs_enc(d : dict) -> (dict,dict):
""" Split variable attributes in attributes and encoding-attributes.
"""
_enc_attrs = {
"scale_factor",
"add_offset",
"_FillValue",
"dtype",
"zlib",
"gzip",
"complevel",
"calendar",
}
# extract variable attributes
vattrs = {k: omit(_enc_attrs, v) for k, v in d.items()}
# extract variable encoding
vencode = {k: pick(_enc_attrs, v) for k, v in d.items()}
return vattrs, vencode
3.2.1. Usage:¶
#|dropout
fn = pkg_res.resource_filename("pyrnet", "share/pyrnet_cfmeta.json")
config = dict(
contributor_name = "Jon Doe; Roger Rogers",
contributor_role = "Set-up; Tear-down",
project = "Notebook Example",
creator_name = "Adam Alpha",
dt = np.datetime64("now").item(),
sdate = dt.datetime(2014,4,1,12,0),
edate = dt.datetime(2014,4,2,18,0),
notes = "This is just an example.",
)
# parse the json file
cfdict = read_json(fn)
# get global attributes:
gattrs = cfdict['attributes']
# apply config
gattrs = {k:v.format_map(config) for k,v in gattrs.items()}
gattrs
#|dropout
# get variable attributes
d = get_var_attrs(cfdict)
# split encoding attributes
vattrs, vencode = get_attrs_enc(d)
vattrs, vencode
3.3. Euclidian distances of stations¶
#|export
def pairwise_distance_matrix( x: ArrayLike, y: ArrayLike ) -> NDArray:
"""
Get square matrix with Euclidian distances of stations
Parameters
----------
x: array_like
X coordinates
y: array_like
Y coordinates
Returns
-------
ndarray
A square matrix with Euclidian distances of stations
"""
x = np.array(x)
y = np.array(y)
return np.sqrt( (x[None,:]-x[:,None])**2+(y[None,:]-y[:,None])**2 )
x = [0, 1, 2]
y = [0, 0, 0]
dist = pairwise_distance_matrix(x,y)
print(dist)
assert dist[0,0]==dist[1,1]==dist[2,2]==0
assert dist[0,1]==dist[1,0]==1
assert dist[0,2]==dist[2,0]==2
3.4. Fourier utility¶
3.4.1. Create a gaussian window¶
Convert a scale parameter J to FWHM of a normal distribution
$\mathrm{FWHM} = 2 \sqrt{2 \ln 2} \sigma = f \sigma$
define FWHM with scale parameter J, so that : $\mathrm{FWHM} = 60*2^J$
for J in range(-2,4):
print(f"J = {J:2d} -> FWHM(J) = {60.*2**J}")
J=2
f = 2.0*np.sqrt(2*np.log(2))
sig = (60.0/f)*2**J
# FWHM from scale parameter
FWHM = 60*2**J
# normalized gaussian distribution
g = gaussian(3*FWHM, sig, sym=False)
# distribution density (sums up to 1)
gd = g/(np.sqrt(2*np.pi)*sig)
print(f"sum of distribution density: {np.sum(gd):.3f}")
x0 = np.argmax(gd)
x1, x2 = x0-.5*FWHM, x0+.5*FWHM
plt.figure()
_ = plt.plot(g/np.sqrt(2*np.pi)/sig, label='gaussian distribution density')
_ = plt.hlines(.5*np.max(gd),x1,x2,color='k', label=f'FWHM={FWHM}')
_ = plt.grid()
_ = plt.legend()
plt.show()
# shift array, so that it starts with the maxima
gd_rolled = np.roll(gd, np.floor_divide(3*FWHM, 2))
# calculate the frequency response of the Gaussian window
gd_fft = np.fft.fft(gd_rolled)
fft_freq = np.fft.fftfreq(gd_rolled.shape[0])
fig, axs = plt.subplots(2,1)
_ = axs[0].set_title("Gaussian distribution density")
_ = axs[0].plot(gd_rolled)
_ = axs[0].grid(True)
_ = axs[0].set_xlabel('Sample')
_ = axs[0].set_ylabel('Amplitude')
_ = axs[1].set_title("Frequency response of the Gaussian window")
_ = axs[1].plot(fft_freq,gd_fft,'b.')
_ = axs[1].set_xlabel('normalized frequency (cycles per sample)')
_ = axs[1].set_ylabel('normalized magnitude (dB)')
_ = axs[1].set_xlim([-.05,.05])
fig.tight_layout()
#|export
def gauss_fwin_fwhm(fwhm: float, N: int = 86400) -> NDArray:
"""
Convert scale parameter to FWHM of Normal distribution see
[https://en.wikipedia.org/wiki/Full_width_at_half_maximum#Normal_distribution]
Parameters
----------
fwhm: float
FWHM of the gaussian distribution
N: int
Number of points in the output window. If zero, an empty array is returned. An exception is thrown when it is negative.
The default is 86400 (seconds per day).
Returns
-------
ndarray
Frequency response of the gaussian window
"""
f = 2.0*np.sqrt(2*np.log(2))
sig = fwhm/f
g = gaussian(N, sig, sym=False)/np.sqrt(2*np.pi)/sig
return np.fft.fft(np.roll(g, np.floor_divide(N, 2)))
def gauss_fwin(J: float, N: int=86400) -> NDArray:
"""
Convert scale parameter to FWHM of Normal distribution see
[https://en.wikipedia.org/wiki/Full_width_at_half_maximum#Normal_distribution]
Parameters
----------
J: float
scale parameter for FWHM, with FWHM=60*2**J
N: int
Number of points in the output window. If zero, an empty array is returned. An exception is thrown when it is negative.
The default is 86400 (seconds per day).
Returns
-------
ndarray
Frequency response of the gaussian window
"""
fwhm = 60.*2**J
return gauss_fwin_fwhm(fwhm, N=N)
3.4.2. Smoothing Data by convolution¶
#|export
def smooth_fwhm(y: ArrayLike, fwhm: float, axis: int = 0) -> NDArray:
"""
Smooth data with gaussian window by convolution
Parameters
----------
y: array_like
Input array.
fwhm: float
FWHM of the gaussian window to be convolved with the input array.
axis: int, optional
Axis over which to smoothing is applied.
Returns
-------
ndarray
Smoothed array of the same shape as the input array `y`.
"""
Y = np.fft.fft(y, axis=axis)
W = gauss_fwin_fwhm(fwhm, y.shape[axis])
if y.ndim > 1:
W = np.expand_dims(W, axis=axis).T
return np.fft.ifft(Y * W, axis=axis).real
def smooth(y: ArrayLike, J: float, axis: int = 0) -> NDArray:
"""
Smooth data with gaussian window by convolution
Parameters
----------
y: array_like
Input array.
J: float
Scale parameter for the FWHM (FWHM=60*2**J) of the
gaussian window to be convolved with the input array.
axis: int, optional
Axis over which to smoothing is applied.
Returns
-------
ndarray
Smoothed array of the same shape as the input array `y`.
"""
fwhm = 60.*2**J
return smooth_fwhm(y, fwhm, axis=axis)
axis=0
sig = np.repeat([0., 1., 0.], 100)
sig2 = np.vstack((sig*2,sig*4))
fwhm = 10
filtered = smooth_fwhm(sig, fwhm, axis=0)
filtered2 = smooth_fwhm(sig2, fwhm, axis=1)
fig, ax = plt.subplots(1,1)
_ = ax.plot(sig, label='signal')
_ = ax.plot(sig2.T, label='signal')
_ = ax.plot(filtered, label='smoothed signal')
_ = ax.plot(filtered2.T, label='smoothed signal')
_ = ax.legend()
fig.show()
from pyrnet import pyrnet
date = dt.datetime(2013,7,15)
pyr = pyrnet.read_pyrnet(date, 'hope_juelich')
z0 = pyr.ghi.data[:,0]
z1 = smooth(pyr.ghi.data, 0, axis=0)
z2 = smooth(pyr.ghi.data.T, 5, axis=-1)
fig, ax = plt.subplots(1,1)
_ = ax.plot(z0,'k')
_ = ax.plot(z1[:,0],'g')
_ = ax.plot(z2[0,:],'r')
_ = ax.grid(True)
fig.show()