5. Maintenance Reports

Parse Maintenance reports from LimeSurvey and legacy spreadsheet formats.

#|export
import pandas as pd
from pandas._typing import (
    FilePath,
    ReadCsvBuffer,
)
import datetime as dt
import numpy as np
from toolz import assoc_in

from pyrnet import utils

5.1. Survey Export

In the following, the functions are designed to work with the survey response export in the .csv format:

  • Field separator: “Semicolon”

  • Responses: “Answer codes”

  • Headings: “Question code”

The responses can be exported manually from the website …

fn = "../../example_data/results-survey224783.csv"

df = pd.read_csv(fn, sep=';')
df = df.fillna("None")
df
id submitdate lastpage startlanguage seed startdate datestamp Q00 Q01 MainQ01 ... interviewtime groupTime57 Q00Time Q01Time groupTime59 MainQ01Time MainQ02Time groupTime58 ExtraQ01Time ExtraQ02Time
0 1 2023-05-08 15:13:33 1 en 1428982518 2023-05-08 15:02:53 2023-05-08 15:13:33 1 this is a test AO01 ... 837.45 None None None None None None None None None
1 2 2023-05-08 16:08:20 1 en 852861659 2023-05-08 16:08:09 2023-05-08 16:08:20 2 222 AO03 ... 11.13 None None None None None None None None None
2 3 2023-05-08 16:09:06 1 en 632878730 2023-05-08 16:08:46 2023-05-08 16:09:06 1 222 AO02 ... 19.89 None None None None None None None None None

3 rows × 27 columns

… or via the limepy python package.

import limepy
import getpass
from io import StringIO

pwd = getpass.getpass("LimeSurvey BeRichter Password: ")
if pwd != '':
    url = "https://lgs-car.limesurvey.net/admin/remotecontrol"
    csv = limepy.download.get_responses(
        base_url=url,
        user_name="BeRichter",
        password=pwd,
        user_id=1,
        sid=224783
    )
    df = pd.read_csv(StringIO(csv), sep=';')
    df = df.fillna("None")
    df
else:
    print("No password, no data.")
#|export
def get_responses(
        *,
        fn: FilePath | ReadCsvBuffer[bytes] | ReadCsvBuffer[str]|None = None,
        online: dict|None = None
) -> pd.DataFrame:
    """
    Get LimeSurvey responses as pandas Dataframe providing a file or online download information.

    Parameters
    ----------
    fn: str, path object or file-like object
        Any pandas readable representation of the LimeSurvey response export file
        (.csv, sep=;, answer and question codes).
    online: dict
        Dictionary of information required to download the responses via limepy:
            * base_url -> limesurvey remote_control url
            * user_name -> account name
            * password
            * user_id -> ID of account user (usually 1)
            * sid -> Survey ID

        Minimal information stored in *online* is the base_url, other information will then be filled via user input promt.

    Returns
    -------
    pd.Dataframe
        parsed responses csv file
    """
    if fn is not None:
        # legacy support:
        if fn.endswith(".xls") or fn.endswith(".xlsx"):
            return parse_legacy_logbook(fn)
        filepath_or_buffer = fn
    elif online is not None:
        import limepy
        import getpass
        from io import StringIO
        if "base_url" not in online:
            raise ValueError
        if "user_name" not in online:
            online.update({'user_name': input("LimeSurvey Account Name: ")})
        if "password" not in online:
            online.update({'password': getpass.getpass("LimeSurvey Password: ")})
        if "user_id" not in online:
            online.update({"user_id": input("LimeSurvey User ID: ")})
        if "sid" not in online:
            online.update({'sid': input("LimeSurvey Survey ID: ")})

        csv = limepy.download.get_responses(**online)
        filepath_or_buffer = StringIO(csv)
    else:
        raise ValueError
    df = pd.read_csv(filepath_or_buffer, sep=';')
    df = df.fillna("None")
    return df
url = "https://lgs-car.limesurvey.net/admin/remotecontrol"
get_responses(
    online=dict(
        base_url=url,
        user_id=1,
        sid=224783
    )
)
id submitdate lastpage startlanguage seed startdate datestamp Q00 Q01 MainQ01 MainQ01[comment] MainQ02 MainQ02[comment] ExtraQ01 ExtraQ01[comment] ExtraQ02 ExtraQ02[comment]
0 1 2023-05-08 15:13:33 1 en 1428982518 2023-05-08 15:02:53 2023-05-08 15:13:33 1 this is a test AO01 None AO02 testing notes AO03 None AO01 None
1 2 2023-05-08 16:08:20 1 en 852861659 2023-05-08 16:08:09 2023-05-08 16:08:20 2 222 AO03 None None None None None None None
2 3 2023-05-08 16:09:06 1 en 632878730 2023-05-08 16:08:46 2023-05-08 16:09:06 1 222 AO02 test AO03 None AO04 None AO02 None

5.1.1. Support legacy xls or xlsx logbook file

Prior to LimeSurvey, the maintenance notes and marks are collected within a spreadsheet file. The following functions add legacy support, transfromig the xls notation into a pandas Dataframe ready to be parsed with

#|export
def read_logbook(lfile):
    '''
    Load logbook file and store it as dictionary of rec arrays with stID keys.

    Parameters
    ----------
    cfile: string
        path and filename of loogbook file -> should be .xls file
        each sheet represent a maintenance periode.
        First row of .xls file have to include column names
        *box*, *date*, *clean*, *clean_tilt*, *level*, *level_tilt*, *Hangle*, *Vangle*, *notes*

    Returns
    -------
    logbook: dict
        dict of recarray for each station ID including quality flags from each maintenance cicle.
    '''
    dtype_log =[
        ('box',         np.uint8),
        ('site',        'U50'),
        ('serial_pyr',  'U50'),
        ('serial_pyr_tilt', 'U50'),
        ('user',          'U50'),
        ('campaign',    'U50'),
        ('date',        'datetime64[ms]' ),
        ('clean',       np.uint8),
        ('clean_tilt',  np.uint8),
        ('level',       np.uint8),
        ('level_tilt',  np.uint8),
        ('Hangle',      'f8'),
        ('Vangle',      'f8'),
        ('notes',       'U50')
    ]
    def _parse_name(name):
        key=name.lower().strip()
        if key in ['date']:
            return 'date'
        elif key in ['clean','cleanliness','clean(pyr1)','clean1']:
            return 'clean'
        elif key in ['clean_tilt','clean2','clean(pyr2)']:
            return 'clean_tilt'
        elif key in ['level','level(pyr1)','level1']:
            return 'level'
        elif key in ['level_tilt','level(pyr2)','level2']:
            return 'level_tilt'
        elif key in ['box','station','id','pyrbox','pyranometerbox']:
            return 'box'
        elif key in ['hangle','azimuth','azi','horizontal_angle']:
            return 'Hangle'
        elif key in ['vangle','zenith','zen','vertical_angle']:
            return 'Vangle'
        elif key in ['notes','note','description']:
            return 'notes'
        elif key in ['serial','serial1','serial_pyr','pyranometerID','pyrID']:
            return 'serial_pyr'
        elif key in ['serial2','serial_pyr2','serial_pyr_tilt']:
            return 'serial_pyr_tilt'
        elif key in ['site','location']:
            return 'site'
        elif key in ['user','author']:
            return 'user'
        elif key in ['campaign']:
            return 'campaign'
        elif key == 'index':
            return False
        else:
            return False
    def _hstack2(arrays):
        return arrays[0].__array_wrap__(np.hstack(arrays))

    logbook={}
    df=pd.read_excel(lfile,sheet_name=None)#,engine='openpyxl')
    for sheet in df.keys():# read all sheets from xls file
        sh = df[sheet].dropna(axis=0,how='all',subset=['date']) #remove empty lines
        sh = sh.dropna(axis=1,how='all') # remove empty columns
        for row in sh.itertuples(index=True,name='Pandas'):
            A=np.zeros(1,dtype=dtype_log).view(np.recarray)
            for name,value in row._asdict().items():
                key=_parse_name(name)
                if key:
                    if dict(dtype_log)[key]==np.uint8 and np.isnan(value):
                        value=9
                    A[key]=value
            if str(A.box[0]) in logbook.keys():
                logbook.update({str(A.box[0]):_hstack2([logbook[str(A.box[0])],A])})
            else:
                logbook.update({str(A.box[0]):A})
    return logbook
#|export
def parse_legacy_logbook(fn):
    df = None
    lb = read_logbook(fn)
    for box in lb:
        lbb = lb[box]
        N = lbb['date'].shape[0]
        faccept = [1,2,3,4]
        dfb = pd.DataFrame(
            {"datestamp": lbb['date'],
             "Q00": box,
             "Q01": lbb['notes'],
             "MainQ01[comment]": np.repeat("",N),
             "MainQ02[comment]": np.repeat("",N),
             "ExtraQ01[comment]": np.repeat("",N),
             "ExtraQ02[comment]": np.repeat("",N),
             "MainQ01": [f"AO0{f}" if f in faccept else "None" for f in lbb['clean']],
             "MainQ02": [f"AO0{f}" if f in faccept else "None" for f in lbb['level']],
             "ExtraQ01": [f"AO0{f}" if f in faccept else "None" for f in lbb['clean_tilt']],
             "ExtraQ02": [f"AO0{f}" if f in faccept else "None" for f in lbb['level_tilt']],
             }
        )
        if df is None:
            df = dfb.copy()
        else:
            df = pd.concat((df,dfb),ignore_index=True)
    df = df.fillna("None")
    return df
#|dropout
fn_lb = "../../example_data/legacy_logbook.xls"

lb = read_logbook(fn_lb)
lb
#|dropout
parse_legacy_logbook(fn_lb)

5.2. Parse Responses to dict

Maintenance Flags shall be parsed to a dictionary sorted by the PyrNet box number. The Survey reports are collected over the entire campaign. Therefore, consider only reports within a certain time interval around maintenance time (2 days) for quality flagging the measurement periode.

Reports within +-2 days around maintenance time are considered, giving the opportunity for corrections within this time frame by issuing another response (or via insert in the website interface). For example, the first report at 1PM includes the quality marks and some notes. Later, if one want to add notes for this station or correct marks, another report can be filled. The Values will be updated by the parsing function: * Valid Marks (not None) of the latest report within +-2days * Notes of multiple reports are attached (separated by “;”) starting with the oldest report notes.

#|export
_pollution_marks = {
    "None":4,
    "AO01":0,
    "AO02":1,
    "AO03":2,
    "AO04":3,
}
_alignment_marks = {
    "None":4,
    "AO01":0,
    "AO02":1,
    "AO03":2,
}
_note_keys = {
    "note_general": "Q01",
    "note_align": "MainQ01[comment]",
    "note_clean": "MainQ02[comment]",
    "note_align2": "ExtraQ01[comment]",
    "note_clean2": "ExtraQ02[comment]",
}
_mark_keys = {
    "clean": "MainQ01",
    "align": "MainQ02",
    "clean2": "ExtraQ01",
    "align2": "ExtraQ02",
}

def parse_report(
        df:  pd.DataFrame,
        date_of_maintenance: float | dt.datetime | np.datetime64 | None,
) -> dict:
    """
    Use pandas.read_csv (sep=;) to parse the survey report.

    Parameters
    ----------
    df: Dataframe
        LimeSurvey response parsed as pandas Dataframe.
    date_of_maintenance: float, datetime, datetime64 or None
        A rough date of maintenance (at least day resolution).
        If float, interpreted as Julian day from 2000-01-01T12:00.
        If None, the most recent logbook entries will be parsed.

    Returns
    -------
    dict
        Dictionary storing maintenance flags and notes by PyrNet box number.
    """
    if date_of_maintenance is not None:
        date_of_maintenance = utils.to_datetime64(date_of_maintenance)

    results = {}
    for i in range(df.shape[0]):
        box = int(df["Q00"].values[i])
        key = f"{box:03d}"

        # consider only reports +-2 days around date of maintenance
        mdate = pd.to_datetime(df['datestamp'][i])
        if date_of_maintenance is None:
            dtime = np.abs(mdate - np.max(df['datestamp']))
        else:
            dtime = np.abs(mdate - date_of_maintenance)

        if dtime > np.timedelta64(2,'D'):
            continue

        # store report in dictionary

        if key not in results:
            # initialize marks
            for mkey in _mark_keys:
                results = assoc_in(results, [key,mkey], 4)
            # initialize notes
            for nkey in _note_keys:
                results = assoc_in(results, [key,nkey], "")

        # merge notes if multiple reports exist
        for nkey in _note_keys:
            new_note = df[_note_keys[nkey]].values[i]
            update_note = (results[key][nkey]+'; '+new_note).strip('; ')
            results = assoc_in(results, [key,nkey], update_note)

        # update marks with most recent report if not None
        for mkey in _mark_keys:
            new_mark = df[_mark_keys[mkey]][i]
            if new_mark=="None":
                continue
            if mkey.startswith("clean"):
                new_mark = _pollution_marks[new_mark]
            else:
                new_mark = _alignment_marks[new_mark]
            results = assoc_in(results, [key,mkey], new_mark)
    return results
#|dropout
# Parse LimeSurvey report
fn = "../../example_data/results-survey224783.csv"

df = pd.read_csv(fn, sep=';')
df = df.fillna("None")

parse_report(df, np.datetime64("2023-05-08T12:00"))
#|dropout
fn_lb = "../../example_data/legacy_logbook.xls"
# Parse legacy xls notebook
parse_report(parse_legacy_logbook(fn_lb), np.datetime64("2019-06-17T12:00"))

5.3. Make aggregated quality flags

Aggregate quality marks to a binary number according to CF-Convention section 3.5 for quality flags.

QC flag binary representation, bits: XXYY with:

  • XX - level - [00,01,10] - good, slight out of level, bad out of level

  • YY - clean - [00,01,10,11] good, slight-, moderate-, strong covered

flag_mask = '3b,3b,3b, 12b, 12b'
flag_values = '1b, 2b, 3b, 4b, 8b '
flag_meanings = "
    soiling_light
    soiling_moderate
    soiling_heavy
    level_problematic
    level_bad"
#|export
def get_qcflag(qc_clean, qc_level):
    """
    Aggregate quality flags.

    Parameters
    ----------
    qc_clean: int
        [0,1,2,3] [clean, slight-, moderate-, strong covered]
    qc_level: int
        [0,1,2] [good, slight misalignment, strong misalignment]

    Returns
    -------
    int
        aggregated quality flagg [0-11]
    """
    qc = (qc_level<<2) + qc_clean
    return qc
get_qcflag(np.array([0,1,2,3]),np.array([0,1,2,2]))