7. Data level¶
Reading and advance in data processing levels to be stored in netcdf database format.
Note
raw > l1a > l1b > l1c
#|export
import os
import numpy as np
import pandas as pd
import xarray as xr
import logging
from toolz import assoc_in, merge_with
#import pkg_resources as pkg_res
import importlib.resources
import warnings
from trosat import sunpos as sp
from trosat.cfconv import read_cfjson
import pyrnet
pyrnet_version = pyrnet.__version__
import pyrnet.pyrnet
import pyrnet.utils
import pyrnet.logger
import pyrnet.reports
import pyrnet.qcrad
# logging setup
logging.basicConfig(
filename='pyrnet.log',
encoding='utf-8',
level=logging.DEBUG,
format='%(asctime)s %(name)s %(levelname)s:%(message)s'
)
logger = logging.getLogger(__name__)
import matplotlib.pyplot as plt
7.1. netCDF dataset operations¶
7.1.1. Filename¶
Filename from config format string.
#|export
def get_fname(ds, freq, period=None, timevar='time', sfx='nc', kind=None, station=None, config=None):
config = get_config(config)
if period is None:
period = ds[timevar].values[-1] - ds[timevar].values[0]
period = pd.to_timedelta(period).floor("s").isoformat()
if kind is None:
if ds.station.size==1:
kind = 's'
station = ds.station.values[0]
else:
kind = 'n'
station = ds.station.size
format_dict = dict(
dt = pd.to_datetime(ds[timevar].values[0]),
period = period,
campaign = config["campaign"],
kind = kind,
station = int(station),
level = ds.processing_level,
freq = freq,
collection = int(config["collection"]),
sfx = sfx
)
return config["output"].format(**format_dict)
7.1.2. Update netcdf coverage metadata¶
Add and update ACDD covarge metadata to dataset
#|export
def update_coverage_meta(ds, timevar='time'):
"""Update global attributes related to geospatial and time coverage
"""
duration = ds[timevar].values[-1] - ds[timevar].values[0]
resolution = np.mean(np.diff(ds[timevar].values))
now = pd.to_datetime(np.datetime64("now"))
gattrs = {
'date_created': now.isoformat(),
'geospatial_lat_min': np.nanmin(ds.lat.values),
'geospatial_lat_max': np.nanmax(ds.lat.values),
'geospatial_lat_units': 'degN',
'geospatial_lon_min': np.nanmin(ds.lon.values),
'geospatial_lon_max': np.nanmax(ds.lon.values),
'geospatial_lon_units': 'degE',
'time_coverage_start': pd.to_datetime(ds[timevar].values[0]).isoformat(),
'time_coverage_end': pd.to_datetime(ds[timevar].values[-1]).isoformat(),
'time_coverage_duration': pd.to_timedelta(duration).isoformat(),
'time_coverage_resolution': pd.to_timedelta(resolution).isoformat(),
}
ds.attrs.update(gattrs)
return ds
7.1.3. Stretch resolution¶
Update encoding scale_factor and add_offset to use the full dtype resolution.
#|export
def stretch_resolution(ds: xr.Dataset) -> xr.Dataset:
""" Stretch variable resolution to full integer size,
to not lose resolution after averaging ADC count data."""
for var in ds:
if "scale_factor" not in ds[var].encoding:
continue
if "valid_range" not in ds[var].attrs:
continue
dtype = ds[var].encoding['dtype']
valid_range = ds[var].valid_range
int_limit = np.iinfo(dtype).max
scale_factor = ds[var].encoding['scale_factor']
scale_factor_mod = int((int_limit-1)/valid_range[1])
ds[var].encoding.update({
"scale_factor": scale_factor / scale_factor_mod,
"_FillValue": int_limit,
})
ds[var].attrs.update({
"valid_range": valid_range * scale_factor_mod
})
return ds
7.1.4. Write to netcdf¶
Ensure complete metadata and merge if needed before writing to netcdf.
#|export
def to_netcdf(ds, fname, timevar="time"):
"""xarray to netcdf, but merge if exist
"""
# save to netCDF4
ds = update_coverage_meta(ds, timevar=timevar)
ds.to_netcdf(fname,
encoding={timevar:{'dtype':'float64'}}) # for OpenDAP 2 compatibility
#|export
def to_netcdf_l1b(ds, fname, freq='1s', timevar="time"):
"""xarray to netcdf, but merge if exist
"""
# merge if necessary
if isinstance(ds, xr.Dataset):
dslist = [ds]
else:
dslist = ds
if os.path.exists(fname):
ds1 = xr.load_dataset(fname)
dslist.append(ds1)
ds = merge_l1b(dslist, freq=freq, timevar=timevar)
# save to netCDF4
ds = update_coverage_meta(ds, timevar=timevar)
if os.path.exists(fname):
os.remove(fname)
ds.to_netcdf(fname,
encoding={timevar:{'dtype':'float64'}}) # for OpenDAP 2 compatibility
7.1.5. Resample¶
Specialized (fast) resample methods, as xarray resample is sometimes very slow.
#|export
def resample(ds, freq, methods='mean', kwargs={}):
""" Resample xarray dataset using pandas for speed.
https://github.com/pydata/xarray/issues/4498#issuecomment-706688398
"""
if isinstance(methods,str):
methods = [methods]
dsr = ds.to_dataframe().resample(freq)
dsouts = []
for method in methods:
# what we want (quickly), but in Pandas form
with warnings.catch_warnings():
warnings.simplefilter(action='ignore', category=FutureWarning)
df_h = dsr.apply(method)
# rebuild xarray dataset with attributes
vals = []
for c in df_h.columns:
vals.append(
xr.DataArray(data=df_h[c],
dims=['time'],
coords={'time': df_h.index},
attrs=ds[c].attrs)
)
dsouts.append(xr.Dataset(dict(zip(df_h.columns, vals)), attrs=ds.attrs))
if len(dsouts) == 1:
dsouts = dsouts[0]
return dsouts
7.2. Config¶
Parse default config
#|export
def get_config(config: dict|None = None) -> dict:
"""Read default config and merge with input config
"""
fn_config = os.path.join(importlib.resources.files("pyrnet"), "share/pyrnet_config.json")
default_config = pyrnet.utils.read_json(fn_config)
if config is None:
config = default_config
config = {**default_config, **config}
# add default files
cfiles = {
"file_cfmeta": "share/pyrnet_cfmeta.json",
"file_calibration": "share/pyrnet_calibration.json",
"file_mapping": "share/pyrnet_station_map.json",
"file_gti_angles": "share/pyrnet_gti_angles.json",
"file_site": "share/pyrnet_sites.json",
}
for fn in cfiles:
if config[fn] is None:
config[fn] = os.path.join(importlib.resources.files("pyrnet"), cfiles[fn])
return config
def get_sensor_config(sconfig: dict|None = None) -> dict:
""" Read the sensor configuration from the default json file and merge if needed.
"""
fn_config = os.path.join(importlib.resources.files("pyrnet"), "share/pyrnet_sensor_config.json")
default_config = pyrnet.utils.read_json(fn_config)
if sconfig is None:
sconfig = default_config
sconfig = {**default_config, **sconfig}
return sconfig
def get_cfmeta(config: dict|None = None) -> dict:
"""Read global and variable attributes and encoding from cfmeta.json
"""
config= get_config(config)
# parse the json file
cfdict = read_cfjson(config["file_cfmeta"])
# get global attributes:
gattrs = cfdict['attributes']
# apply config
gattrs = {k:v.format_map(config) for k,v in gattrs.items()}
# get variable attributes
d = pyrnet.utils.get_var_attrs(cfdict)
# split encoding attributes
vattrs, vencode = pyrnet.utils.get_attrs_enc(d)
return gattrs, vattrs, vencode
7.3. Encoding¶
The encoding attributes are defined within the file PyrNet/share/pyrnet_cfmeta.json per default.
The netcdf packing of data is realized via the two attributes scale_factor and add_offset:
unpacked_variable = scale_factor * packed_variable + add_offset
#|export
def calc_encoding(sconfig:dict, ADCV=3.3, ADCbits=10) -> dict:
ADCfac = ADCV / (2**ADCbits-1) # Last bit is reserved
sencoding = {}
for k, v in sconfig.items():
sencoding.update(
{k: dict(
units=v['units'],
scale_factor=v['C']*ADCfac/v['gain'],
add_offset=v['offset'],
valid_range= [0, min(((2**ADCbits-1), int(v['Vmax']*v['gain']/ADCfac)))]
)}
)
return sencoding
sconfig = get_sensor_config()
sencoding = calc_encoding(sconfig)
sencoding
{'ta': {'units': 'K',
'scale_factor': 0.12903225806451613,
'add_offset': 253.15,
'valid_range': [0, 775]},
'rh': {'units': '1',
'scale_factor': 0.0012903225806451613,
'add_offset': 0.0,
'valid_range': [0, 775]},
'battery_voltage': {'units': 'V',
'scale_factor': 0.0064516129032258064,
'add_offset': 0.0,
'valid_range': [0, 992]},
'ghi': {'units': 'V',
'scale_factor': 1.075268817204301e-05,
'add_offset': 0.0,
'valid_range': [0, 1023]},
'gti': {'units': 'V',
'scale_factor': 1.075268817204301e-05,
'add_offset': 0.0,
'valid_range': [0, 1023]}}
7.3.1. Irradiance values¶
Irradiance values are stored as Voltage for later calibration. Assigning 1500Wm-2 as maximum measurable irradiance from the irradiance sensor. The maximum counts (valid_max) measured by the logger can be calculated by:
maximum counts = $\mathrm{gain}1500\mathrm{C_{max}}*1023/3.3$
Most calibration factors are in the area of 7.5 uV/Wm-2, assuming $\mathrm{C_{max}}=8$uV/Wm-2 for this estimate seems sufficient. All radiation sensors are amplified with a fixed gain of $\mathrm{gain}=300$.
Later (level 1c+) the scale factor will be instrument specific by adding the calibration from V to W m-2
7.3.2. Temperature and humidity¶
Calibration coefficients for Temperature and Humidity DKRF-4000 series (discontinued) https://www.driesen-kern.de/downloads/produktlinie_feuchte.pdf are:
Temperature (T) range :-20-80 degC from 0-5V
relative Humidity (rH) range: 0-100% from 0-5V
As the logger ADC range is 0-3.3V with a 10bit resolution, the sensors are measured through a voltage splitting circuit. Therefore, the ADC counts have to be doubled.
Voltage U [V] = $2* 3.3/1023$ [counts]
T = $-20 + 100*(U/5)$ [degC] = $253.15 + 100*(U/5)$ [K]
rH = $(U/5)$ [%] = $(U/5)$ [%]
7.3.3. Ancillary data¶
solar zenith angle (szen)
valid range unpacked: (0,180) (deg)
packing in u2 integer (unsigned 16bit)
fill value = $2^{16} - 1$
scale_factor = $180 /(2^{16}-2)$
solar azimuth angle (sazi)
valid range unpacked: (0,360)
packing in u2 integer (unsigned 16bit)
fill_value = $2^{16} -1$
scale_factor= $360 / (2^{16}-2)$
earth sun distance (esd)
valid range unpacked: (0.98,1.02)
packing in u2 integer (unsigned 16bit)
fill_value = $2^{16} -1$
scale_factor= $(1.02-0.98)/(2^{16}-2)$
add_offset = 0.98
7.3.4. Add encoding to netcdf¶
#|export
def add_encoding(ds, vencode=None):
"""
Set valid_range attribute and encoding to every variable of the dataset.
Parameters
----------
ds: xr.Dataset
Dataset of any processing level. The processing level will be
determined by the global attribute 'processing_level'.
vencode: dict or None
Dictionary of encoding attributes by variable name, will be merged with pyrnet default cfmeta. The default is None.
Returns
-------
xr.Dataset
The input dataset but with encoding and valid_range attribute.
"""
# prepare netcdf encoding
_, vattrs_default, vencode_default = get_cfmeta()
# Add valid range temporary to encoding dict.
# As valid_range is not implemented in xarray encoding,
# it has to be stored as a variable attribute later.
for k in vencode_default:
if "valid_range" in vencode_default[k]:
continue
if "valid_range" not in vattrs_default[k]:
continue
vencode_default = assoc_in(vencode_default,
[k,'valid_range'],
vattrs_default[k]['valid_range'])
# merge input and default with preference on input
if vencode is None:
vencode = vencode_default
else:
a = vencode_default.copy()
b = vencode.copy()
vencode = {}
for k in set(a)-set(b):
vencode.update({k:a[k]})
for k in set(a)&set(b):
vencode.update({k: {**a[k],**b[k]}})
for k in set(b)-set(a):
vencode.update({k:b[k]})
# add encoding to Dataset
for k, v in vencode.items():
for ki in [key for key in ds if key.startswith(k)]:
ds[ki].encoding.update(v)
if "valid_range" not in vencode[k]:
continue
# add valid_range to variable attributes
for ki in [key for key in ds if key.startswith(k)]:
ds[ki].attrs.update({
'valid_range': vencode[k]['valid_range']
})
# add encoding to coords
if ds.processing_level=='l1a':
ds["gpstime"].encoding.update({
**vencode["time"],
"units": f"seconds since {np.datetime_as_string(ds.gpstime.data[0], unit='D')}T00:00Z",
})
ds["maintenancetime"].encoding.update({
**vencode["time"],
"units": f"seconds since {np.datetime_as_string(ds.maintenancetime.data[0], unit='D')}T00:00Z",
})
ds["adctime"].encoding.update({
**vencode["adctime"],
"units": "milliseconds"
})
ds["station"].encoding.update({
**vencode["station"]
})
elif ds.processing_level == 'l1b':
ds["time"].encoding.update({
**vencode["time"],
"units": f"seconds since {np.datetime_as_string(ds.time.data[0], unit='D')}T00:00Z",
})
ds["maintenancetime"].encoding.update({
**vencode["time"],
"units": f"seconds since {np.datetime_as_string(ds.maintenancetime.data[0], unit='D')}T00:00Z",
})
else:
raise ValueError("Dataset has no 'processing_level' attribute.")
return ds
7.4. l1a¶
Full resolution unprocessed, uncalibrated data. One data file corresponds to one maintenance period of one PyrNet station.
Level l1a will be processed from the logger raw data with the following workflow:
Parse raw logger file
pyrnet.logger.read_records
Get maintenance logbook quality flags
pyrnet.reports
Get metadata and encoding
pyrnet_cfmeta_l1b.json
Make xarray Dataset
Add variable and global attributes and encoding
config = get_config()
gattrs, vattrs, vencode = get_cfmeta(config)
# update encoding with sensor config for l1a
sconfig = get_sensor_config()
sencoding = calc_encoding(sconfig, ADCV=3.3, ADCbits=10)
# from toolz import update_in
for var,enc in sencoding.items():
for k,v in enc.items():
if k=="valid_range":
vattrs = assoc_in(vattrs, [var,k], v)
else:
vencode = assoc_in(vencode, [var,k], v)
vencode
{'ta': {'scale_factor': 0.12903225806451613,
'add_offset': 253.15,
'_FillValue': np.uint16(65535),
'zlib': True,
'dtype': 'u2',
'gzip': True,
'complevel': 6,
'units': 'K'},
'rh': {'scale_factor': 0.0012903225806451613,
'add_offset': 0.0,
'_FillValue': np.uint16(65535),
'zlib': True,
'dtype': 'u2',
'gzip': True,
'complevel': 6,
'units': '1'},
'battery_voltage': {'scale_factor': 0.0064516129032258064,
'add_offset': 0.0,
'_FillValue': np.uint16(65535),
'zlib': True,
'dtype': 'u2',
'gzip': True,
'complevel': 6,
'units': 'V'},
'ghi': {'scale_factor': 1.075268817204301e-05,
'add_offset': 0.0,
'_FillValue': np.uint16(65535),
'zlib': True,
'dtype': 'u2',
'gzip': True,
'complevel': 6,
'units': 'V'},
'gti': {'scale_factor': 1.075268817204301e-05,
'add_offset': 0.0,
'_FillValue': np.uint16(65535),
'zlib': True,
'dtype': 'u2',
'gzip': True,
'complevel': 6,
'units': 'V'},
'station': {'_FillValue': np.uint8(255),
'zlib': True,
'dtype': 'u1',
'gzip': True,
'complevel': 6},
'szen': {'scale_factor': 0.005,
'add_offset': 0.0,
'_FillValue': np.uint16(65535),
'zlib': True,
'dtype': 'u2',
'gzip': True,
'complevel': 6},
'sazi': {'scale_factor': 0.01,
'add_offset': 0.0,
'_FillValue': np.uint16(65535),
'zlib': True,
'dtype': 'u2',
'gzip': True,
'complevel': 6},
'esd': {'dtype': 'f8', 'gzip': True, 'complevel': 6},
'maintenance_flag': {'_FillValue': np.uint8(255),
'zlib': True,
'dtype': 'u1',
'gzip': True,
'complevel': 6},
'qc_flag': {'_FillValue': np.uint8(255),
'zlib': True,
'dtype': 'u1',
'gzip': True,
'complevel': 6},
'lat': {'_FillValue': np.float64(-9999.0),
'zlib': True,
'dtype': 'f8',
'gzip': True,
'complevel': 6},
'lon': {'_FillValue': np.float64(-99999.0),
'zlib': True,
'dtype': 'f8',
'gzip': True,
'complevel': 6},
'iadc': {'_FillValue': np.uint32(4294967295),
'zlib': True,
'dtype': 'u4',
'gzip': True,
'complevel': 6},
'adctime': {'_FillValue': np.uint32(4294967295),
'zlib': True,
'dtype': 'u4',
'gzip': True,
'complevel': 6},
'time': {'zlib': True, 'dtype': 'f8', 'gzip': True, 'complevel': 6}}
7.5. to l1a_function¶
#|export
def to_l1a(
fname : str,
*,
station: int,
report: dict|pd.DataFrame|None,
date_of_measure : np.datetime64 = np.datetime64("now"),
config: dict|None = None,
sconfig: dict|None = None,
global_attrs: dict|None = None
) -> xr.Dataset|None:
"""
Read logger raw file and parse it to xarray Dataset. Thereby, attributes and names are defined via cfmeta.json file and sun position values are calculated and added.
Parameters
----------
fname: str
Path and filename of the raw logger file.
station: int
PyrNet station box number.
report: dict
Parsed maintenance report, see reports.ipynb
bins: int
Number of desired bins per day. The default is 86400, which result in
mean values of 1 second steps per day. Maximum resolution is 86400000.
date_of_measure: float, datetime or datetime64
A rough date of measure to account for GPS week rollover. If measured in 2019, day resolution is recommended, before 2019 annual resolution, 2020 onwards not required. If float, interpreted as Julian day from 2000-01-01T12:00. the default is np.datetime64("now").
config: dict
Stores processing specific configuration.
* cfjson -> path to cfmeta.json, the default is "../share/pyrnet_cfmeta.json"
* stripminutes -> number of minutes to be stripped from the data at start and end,
the default is 5.
sconfig: dict
Config for ADC and amplifier for each sensor. The default is "../share/pyrnet_sensor_config.json"
global_attrs: dict
Additional global attributes for the Dataset. (Overrides cfmeta.json attributes)
Returns
-------
xarray.Dataset
Raw Logger data for one measurement periode.
"""
ADCV = 3.3
ADCbits = 10
# load and merge default config
config = get_config(config)
gattrs, vattrs, vencode = get_cfmeta(config)
# update encoding with sensor config for l1a
sconfig = get_sensor_config(sconfig)
sencoding = calc_encoding(sconfig, ADCV=ADCV, ADCbits=ADCbits)
# update encoding with sensor config for l1a
for var, enc in sencoding.items():
for k, v in enc.items():
if k in ["units"]:
vattrs = assoc_in(vattrs, [var, k], v)
else:
vencode = assoc_in(vencode, [var, k], v)
# update additional global attributes
if global_attrs is not None:
gattrs.update(global_attrs)
date_of_measure = pyrnet.utils.to_datetime64(date_of_measure)
# 1. Parse raw file
rec_adc, rec_gprmc = pyrnet.logger.read_records(fname=fname, date_of_measure=date_of_measure)
if type(rec_adc)==bool or len(rec_gprmc.time)<3:
logger.debug("Failed to load the data from the file, because of not enough stable GPS data, or file is empty.")
return None
# Get ADC time
adctime = pyrnet.logger.get_adc_time(rec_adc)
# ADC to Volts
# Drop time and internal battery sensor output (columns 0 and 1)
adc_volts = ADCV * rec_adc[:,2:] / float(2**ADCbits - 1)
# 2. Get Logbook maintenance quality flags
key = f"{station:03d}"
if report is None:
logger.warning("No report available!")
report = {}
if isinstance(report, pd.DataFrame):
logger.info(f"Parsing report at date {rec_gprmc.time[-1]}")
report = pyrnet.reports.parse_report(
report,
date_of_maintenance=rec_gprmc.time[-1],
stations=station
)
if key not in report:
logger.warning(f"No report for station {station} available.")
warnings.warn(f"No report for station {station} available.")
qc_main = pyrnet.reports.get_qcflag(4,3)
qc_extra = pyrnet.reports.get_qcflag(4,3)
vattrs = assoc_in(vattrs, ["maintenance_flag_ghi","note_general"], "No maintenance report!")
vattrs = assoc_in(vattrs, ["maintenance_flag_gti","note_general"], "No maintenance report!")
vattrs = assoc_in(vattrs, ["maintenance_flag_ghi","note_clean"], "")
vattrs = assoc_in(vattrs, ["maintenance_flag_gti","note_clean"], "")
vattrs = assoc_in(vattrs, ["maintenance_flag_ghi","note_level"], "")
vattrs = assoc_in(vattrs, ["maintenance_flag_gti","note_level"], "")
maintenancetime = np.array([rec_gprmc.time.astype('datetime64[ns]')[-1]])
else:
qc_main = pyrnet.reports.get_qcflag(
qc_clean=report[key]['clean'],
qc_level=report[key]['align']
)
qc_extra = pyrnet.reports.get_qcflag(
qc_clean=report[key]['clean2'],
qc_level=report[key]['align2']
)
maintenancetime = np.array([pyrnet.utils.to_datetime64(report[key]["maintenancetime"])])
# add qc notes
vattrs = assoc_in(vattrs, ["maintenance_flag_ghi","note_general"], report[key]["note_general"])
vattrs = assoc_in(vattrs, ["maintenance_flag_gti","note_general"], report[key]["note_general"])
vattrs = assoc_in(vattrs, ["maintenance_flag_ghi","note_clean"], report[key]["note_clean"])
vattrs = assoc_in(vattrs, ["maintenance_flag_gti","note_clean"], report[key]["note_clean2"])
vattrs = assoc_in(vattrs, ["maintenance_flag_ghi","note_level"], report[key]["note_align"])
vattrs = assoc_in(vattrs, ["maintenance_flag_gti","note_level"], report[key]["note_align2"])
qc_main = np.ubyte(qc_main)
qc_extra = np.ubyte(qc_extra)
vattrs = assoc_in(vattrs, ["ghi","ancillary_variables"], "maintenance_flag_ghi")
vattrs = assoc_in(vattrs, ["gti","ancillary_variables"], "maintenance_flag_gti")
# 3. Add global meta data
now = pd.to_datetime(np.datetime64("now"))
gattrs.update({
'processing_level': 'l1a',
'product_version': pyrnet_version,
'history': f'{now.isoformat()}: Generated level l1a by pyrnet version {pyrnet_version}; ',
})
# add site information
if config['sites'] is not None:
sites = pyrnet.utils.read_json(config['file_site'])[config['sites']]
if key in sites:
gattrs.update({ "site" : sites[key]})
# add gti angles
# default horizontal
vattrs = assoc_in(vattrs, ["gti","hangle"], 0.)
vattrs = assoc_in(vattrs, ["gti","vangle"], 0.)
# update with angles from mapping file
if config['gti_angles'] is not None:
gti_angles = pyrnet.utils.read_json(config['file_gti_angles'])[config['gti_angles']]
if key in gti_angles:
hangle = np.nan if gti_angles[key][0] is None else gti_angles[key][0]
vangle = np.nan if gti_angles[key][1] is None else gti_angles[key][1]
vattrs = assoc_in(vattrs, ["gti","hangle"], hangle)
vattrs = assoc_in(vattrs, ["gti","vangle"], vangle)
if adc_volts.shape[1]<5: # gti data is not available
adc_volts = np.concatenate((adc_volts,-1*np.ones(adc_volts.shape[0])[:,None]),axis=1)
# 8. Make xarray Dataset
values = {}
for k, v in sconfig.items():
offset = sconfig[k]["offset"]
gain = sconfig[k]["gain"]
C = sconfig[k]["C"]
iadc = sconfig[k]["iadc"]
volts = adc_volts[:,iadc][:,None]
values.update(
{k: offset + C*volts/gain}
)
# Remove rh and temperature error if input voltage to low
# Allow only sensor values of voltage V < (V(battery)-2),
# as input voltage needs to be at least 2V larger then sensor output
for k in ["rh","ta"]:
iadc = sconfig[k]["iadc"]
volts_sensor = (values[k][:,0]-sconfig[k]["offset"])/sconfig[k]["C"] # no gain, as we need voltage at sensor
mask = volts_sensor > (values["battery_voltage"][:,0] - 2.)
values[k][mask,0] = np.nan
ds = xr.Dataset(
data_vars={
"ghi": (("adctime","station"), values["ghi"]), # [V]
"gti": (("adctime","station"), values["gti"]), # [V]
"ta": (("adctime","station"), values["ta"]), # [K]
"rh": (("adctime","station"), values["rh"]), # [-]
"battery_voltage": (("adctime","station"), values["battery_voltage"]), # [V]
"lat": (("gpstime","station"), rec_gprmc.lat[:,None]), # [degN]
"lon": (("gpstime","station"), rec_gprmc.lon[:,None]), # [degE]
"maintenance_flag_ghi": (("maintenancetime","station"), [[qc_main]]),
"maintenance_flag_gti": (("maintenancetime","station"), [[qc_extra]]),
"iadc": (("gpstime", "station"), rec_gprmc.iadc[:,None])
},
coords={
"adctime": ("adctime", adctime.astype('timedelta64[ns]')),
"gpstime": ("gpstime", rec_gprmc.time.astype('datetime64[ns]')),
"maintenancetime": ("maintenancetime", maintenancetime),
"station": ("station", [np.ubyte(station)]),
},
attrs=gattrs
)
# drop ocurance of douplicate gps values
ds = ds.drop_duplicates("gpstime")
# add global coverage attributes
ds = update_coverage_meta(ds, timevar="gpstime")
# add attributes to Dataset
for k,v in vattrs.items():
for key in [key for key in ds if key.startswith(k)]:
ds[key].attrs.update(v)
# add encoding to Dataset
ds = add_encoding(ds, vencode)
return ds
7.6. Test to_l1a function¶
#|dropout
fn_report = "../../example_data/results-survey224783.csv"
fn_data = "../../example_data/Pyr9_000.bin"
fn_cfmeta = os.path.join(importlib.resources.files("pyrnet"), "share/pyrnet_cfmeta.c01.json")
# parse report
df_report = pyrnet.reports.get_responses(fn="../../example_data/results-survey224783.csv")
report = pyrnet.reports.parse_report(df_report,
date_of_maintenance=np.datetime64("2023-05-08T12:00"))
# read logger file to xarray
ds = to_l1a(
fname=fn_data,
station=1, # actually test data is from station 9, but test reports are for station 1 and 2 only
# bins=86400, # seconds resolution
report=report,
config={"file_cfmeta": fn_cfmeta, "stripminutes": 0},
global_attrs={"TESTNOTE": "This is a test note."}
)
print("Processing fname:")
print(get_fname(ds, freq="10Hz", timevar="gpstime", config=None))
ds.to_netcdf("../../example_data/to_l1a_output.nc")
ds
# import cfchecker.cfchecks
# init = cfchecker.cfchecks.CFChecker()
# res = init.checker("../../example_data/to_l1a_output.nc")
#|dropout
import netCDF4
netCDF4.Dataset("../../example_data/to_l1a_output.nc",'r')
7.7. l1b¶
Resampled, calibrated data as daily files per station. Resampling is per default to one second, but can be configured.
fname = "../../example_data/to_l1a_output.nc"
config = {
"l1bfreq": "1s",
"stripminutes": 5,
"average_latlon": True,
"radflux_varname": ["ghi", "gti"]
}
config = get_config(config)
gattrs, vattrs, vencode = get_cfmeta(config)
The l1b data is processed from the l1b data with the following workflow:
7.7.1. Read l1a netcdf¶
Ensure its l1a by checking processing_level attribute.
7.7.2. Sync GPS to ADC time¶
Use sync_adc_time to fit logger ADC timedelta since operation start to GPS timestamps. This is crucial to overcome time drifts and ensure synchronized measurements.
7.7.3. Make dataset with new time¶
Initialize new l1b dataset with the synchronized time as coordinate
7.7.4. Drop first and last X minutes of data¶
To avoid bad data due to maintenance during start or end of the maintenance interval, a certain amound of minutes is dropped (defined in config).
Note
config.json -> “stripminutes”
7.7.5. Resample¶
Resample the data to desired time interval
* pyrnet.data.resample
7.7.6. Interpolate GPS coordinates¶
Use xarray.interp to interpolate lat and lon to the resampled time dimension.
Note
At this point the descision to whether store geocoordinates in full time resolution or as mean over the time interval is made.
This is configured in config.json -> “average_latlon”
7.7.7. Add sun position¶
Use trosat.sunpos to calculate sun position from time and lat,lon coordinates.
7.7.8. (Not implemented) Alignment detection¶
Detect the alignment of the instruments assuming perfect alignment at the beginning of the maintenance period. Refine GTI hangle and vangle if available.
7.7.9. Calibrate radiation flux¶
Calibration and correction of measured voltage (U) is done via the following equations:
$ \mathrm{GHI} = \mathrm{U} * \mathrm{C}\mathrm{a} * \mathrm{C}\mathrm{c}(\mu_a) * \frac{\mu_0}{\mu_a}$,
$ \mathrm{GTI} = \mathrm{U} * \mathrm{C}\mathrm{a} * \mathrm{C}\mathrm{c}(\mu_a)$,
with the absolute calibration factor $\mathrm{C}\mathrm{a}$ $\left(\frac{W}{m^2}V^{-1}\right)$ and the cosine correction factor $\mathrm{C}\mathrm{c}$ which depends on the cosine of the apparent solar zenith angle ($\mu_a$). The fraction $\frac{\mu_0}{\mu_a}$ is applied as a factor to correct for misalignment (neglecting diffuse radiation) (Boers et al. (1998)).
Therefore, the full calibration including cosine and misalignment correction can only be applied if the apparent zenith angle is known. For GHI we apply the full calibration assuming $\mu_a = \mu_0$. For GTI, only the absolute calibration is added. GTI cosine correction is applied only if both hangle and vangle are known.
# 5. rad flux calibration
box = ds_l1b.station.values[0]
boxnumber, serial, cfac, CCcoef = pyrnet.pyrnet.meta_lookup(
ds_l1b.time.values[0],
box=box,
cfile=config['file_calibration'],
mapfile=config['file_mapping'],
)
print(f"Meta Lookup:")
print(f">> Box = {box}")
print(f">> serial(s) = {serial}")
print(f">> calibration factor(s) = {cfac}")
print(f">> cosine correction factor = {np.polynomial.polynomial.Polynomial(CCcoef)},\n>> with x = cos(apparent solar zenith angle)")
mu0 = np.cos(np.deg2rad(ds_l1b.szen.values))
# calibrate radiation flux with gain=300
for i, radflx in enumerate(config['radflux_varname']):
# all radflux related variables (including <radflux>_<resamplemethod> variables)
radflx_vars = [var for var in ds_l1b if var.startswith(radflx)]
if cfac[i] is None:
# drop if calibration/instrument don't exist (probably secondary pyranometer).
ds_l1b = ds_l1b.drop_vars(radflx_vars)
continue
# calc apparent zenith angle if possible
mua = mu0.copy()
if "vangle" in ds[radflx].attrs:
vangle = pyrnet.utils.make_iter(ds[radflx].attrs["vangle"])
hangle = pyrnet.utils.make_iter(ds[radflx].attrs["hangle"])
mua = pyrnet.utils.calc_apparent_coszen(
pitch=vangle,
yaw=hangle,
zen=ds_l1b.szen.values,
azi=ds_l1b.sazi.values
)
mua[mua<=0] = np.nan
mask_mua = ~np.isnan(mua)
Ca = 1e6/cfac[i]
Cc = np.polynomial.polynomial.polyval(mua, c=CCcoef)
Cmu = mu0/mua
# apply to all variables
for var in radflx_vars:
calib_func = "flux (W m-2) = flux (V) * Cabsolute (W m-2 V-1)"
C = np.ones(mu0.shape)*Ca
if radflx == "gti":
C[mask_mua] *= Cc[mask_mua]
calib_func += "" if np.all(np.isnan(mua)) else " * Ccoscorr(mua)"
else:
C[mask_mua] *= Cc[mask_mua] * Cmu[mask_mua]
calib_func += " * Ccoscorr(mua)"# * mu0/mua" (not implemented)
ds_l1b[var].values = ds_l1b[var].values*C
ds_l1b[var].attrs['units'] = "W m-2",
ds_l1b[var].attrs.update({
"units": "W m-2",
"serial": serial[i],
"calibration_Cabsolute": Ca,
"calibration_Ccoscorr": str(np.polynomial.polynomial.Polynomial(CCcoef)),
"calibration_function": calib_func
})
ds_l1b
Meta Lookup:
>> Box = 1.0
>> serial(s) = ['S12128.001', 'S12137.049']
>> calibration factor(s) = [7.73, 6.98]
>> cosine correction factor = 1.45 - 3.04·x + 5.59·x² - 3.01·x³,
>> with x = cos(apparent solar zenith angle)
<xarray.Dataset> Size: 668B
Dimensions: (station: 1, time: 5, maintenancetime: 1)
Coordinates:
* station (station) float32 4B 1.0
* time (time) datetime64[ns] 40B 2022-08-30T11:21:03 ... 2...
* maintenancetime (maintenancetime) datetime64[ns] 8B 2023-05-08T16:0...
Data variables: (12/18)
ghi (time, station) float64 40B 269.9 269.7 ... 280.9
gti (time, station) float64 40B 277.3 277.3 ... 288.6
ta (time, station) float64 40B 294.8 294.8 ... 294.8
rh (time, station) float64 40B 0.6263 0.6266 ... 0.629
battery_voltage (time, station) float64 40B 6.445 6.446 ... 6.458
ghi_min (time, station) float64 40B 269.9 268.5 ... 280.9
... ...
lon (time, station) float64 40B nan nan 11.89 11.89 11.89
maintenance_flag_ghi (maintenancetime, station) float32 4B 9.0
maintenance_flag_gti (maintenancetime, station) float32 4B 7.0
szen (time, station) float64 40B nan nan 42.51 42.51 42.51
sazi (time, station) float64 40B nan nan 182.9 182.9 182.9
esd (station) float64 8B 1.01
Attributes: (12/31)
Conventions: CF-1.10, ACDD-1.3
title: TROPOS pyranometer network (PyrNet) observatio...
history: 2025-10-16T11:47:26: Generated level l1a by p...
institution: Leibniz Institute for Tropospheric Research (T...
source: TROPOS pyranometer network (PyrNet)
references: https://doi.org/10.5194/amt-9-1153-2016
... ...
geospatial_lon_max: 11.885256666666667
geospatial_lon_units: degE
time_coverage_start: 2022-08-30T11:21:04.065000
time_coverage_end: 2022-08-30T11:21:09
time_coverage_duration: P0DT0H0M4.935S
time_coverage_resolution: P0DT0H0M1.23375S7.7.10. Add automatic quality flags¶
Add BSRN recommended limit checks and intercompare checks if multiple pyranometers are in the dataset.
7.7.11. Update variables and global attributes and encoding¶
7.8. to_l1b function¶
7.9. Test to_l1b function¶
# import cfchecker.cfchecks
# init = cfchecker.cfchecks.CFChecker()
# res = init.checker("../../example_data/to_l1b_output.nc")
#|dropout
import netCDF4
netCDF4.Dataset("../../example_data/to_l1b_output.nc",'r')
7.10. Merging¶
Merging datasets with xarray default merge overwrites or drops certain variable attributes (e.g., serial number, calibration factor). The methods here are specialized to merge PyrNet datasets and attributes.
# make 3 artificial datasets for testing
# original ds_l1b
ds1 = ds_l1b.copy()
ds1.attrs["site"] = "testsite1"
# ds_l1b but new times
ds2 = ds1.copy()
ds2 = ds2.assign_coords({
"time": ("time", ds2.time.values+(ds2.time.values[-1]-ds2.time.values[0]))
})
# ds_l1b, same time but other station
ds3 = ds1.copy()
ds3 = ds3.assign_coords({
"station": ("station", [2])
})
ds3.attrs["site"] = "testsite2"
for var in ds3:
if "ghi" in var:
ds3[var].attrs["serial"] = "test_ghi_ds3"
if "gti" in var:
ds3[var].attrs["serial"] = "test_gti_ds3"
dslist = [ds1, ds3, ds2]
7.10.1. Sort by station coordinates¶
Before merging, sort them by station
#|dropout
dslist = _sort_by_station(dslist)
[print(dslist[i].station.values) for i in range(len(dslist))]
7.10.2. Merge global and variable attributes¶
Some attributes are tied to a specific statin (e.g., calibration factor, serial number). These attributes are converted to a list and merged with respect to the station dimension.
#|dropout
merged_gattrs = _merge_gattrs_by_station(dslist, merge_gattrs={"site":""})
print(merged_gattrs)
_, merged_attrs = _merge_vattrs_by_station(
dslist,
merge_attrs={
"serial":dict(
apply_to=["ghi","gti"],
fill_value=""
),
"note_general":dict(
apply_to=["maintenance"],
fill_value=""
),})
print(merged_attrs)
7.10.3. Dataset unification¶
Reindex all kinds of dimension to unify datasets.
7.10.4. Snap Maintenance time coordinate¶
The Maintenancetime coordinate is given at report time, which is actually somewhere after the maintenance. Assuming the report takes place on same or next day, we snap the coordinate to matching data gaps of 10-120min.