from collections import defaultdict
from datetime import datetime
from pathlib import Path
import os
from pprint import pprint, pformat
import lasio
import numpy as np
import pandas as pd
from pandas.api.types import is_numeric_dtype
from scipy.interpolate import interp1d
def convert_numeric(v):
if v == "NA":
return pd.NA
else:
return pd.to_numeric(v)
[docs]def read(
fn, encoding="utf-16", parse_dts=True, datefmt="auto", auto_revert_encoding="cp1252"
):
"""Read KorEXO sonde profile.
Args:
fn (str): filename
encoding (str): file encoding. I believe raw KorEXO output is
in UTF-16.
parse_dts (bool): whether to attempt to parse datetimes
or not.
datefmt (str): use "auto" to use the column header to infer the date
format, although note that this isn't always correct. In that case,
set it here using Python datetime string formats e.g.
``"%d/%m/%Y"``
auto_revert_encoding (str or None/False): attempt to check whether
the file is UTF-16 and if it is not i.e. there is no BOM, then use
whatever encoding is set here. Set to ``False`` only if you want the
code to attempt *encoding* and fail messily if it is not.
Returns:
dict: with keys 'metadata', 'dataframe' and 'datasets'. See
package documentation for more details.
"""
if auto_revert_encoding:
values = open(fn, "rb").read(2)
if values != b"\xff\xfe":
encoding = auto_revert_encoding
with open(fn, "rb") as f:
count = 0
header_line = 0
for i, line in enumerate(f.read().decode(encoding).splitlines()):
if i < 1:
# print(f"i < 1: {i}: {line}")
continue
elif i > 1:
if count == 0:
# print(f"i > 1: count==0: {i}: {line}")
break
else:
# print(f"i > 0: {i}: {line}")
if line.startswith("Date ("):
header_line = i
break
else:
# print(f"i == 0: {i}: {line}")
for char in line:
if char == ",":
count += 1
# print(f" (count=={count})")
if count > 0:
return _read_korexo_format(fn, encoding, parse_dts, datefmt)
else:
return _read_korexo_format(fn, encoding, parse_dts, datefmt)
def _read_korexo_format(fn, encoding, parse_dts=True, datefmt="auto"):
md = {}
md["created_file"] = datetime.fromtimestamp(os.path.getctime(fn))
md["modified_file"] = datetime.fromtimestamp(os.path.getmtime(fn))
p_offset = 4
with open(fn, "rb") as f:
for i, line in enumerate(f.read().decode(encoding).splitlines()):
if line.startswith("FILE CREATED"):
created_stated = line.split(",", 1)[1].strip()
md["created_info"] = created_stated
elif "MEAN VALUE:" in line:
means_line = line.split(",")
means = [convert_numeric(x) for x in means_line[p_offset:]]
elif "STANDARD DEVIATION:" in line:
stdev_line = line.split(",")
stdevs = [convert_numeric(x) for x in stdev_line[p_offset:]]
elif "SENSOR SERIAL NUMBER:" in line:
sensor_line = line.split(",")
sensors = sensor_line[p_offset:]
elif line.startswith("Date ("):
md["header_line_no"] = i + 1
params_ = line.split(",")
params = params_[p_offset:]
indices = [i for i in range(len(params)) if params[i] != ""]
md["params"] = [params[i] for i in indices]
md["sensors"] = [sensors[i] for i in indices]
md["means"] = [means[i] for i in indices]
md["stdevs"] = [stdevs[i] for i in indices]
df = pd.read_csv(
fn,
skiprows=(md["header_line_no"] - 1),
encoding=encoding,
)
record = {}
datasets = []
# print(f"md = \n" + pformat(md))
for i in range(len(params_)):
param = params_[i]
# print(f"{i} {param}")
if i >= p_offset and param != "":
pi = i - p_offset
data = df[param].values
if len(np.unique(data)) == 1:
median = data[0]
else:
try:
median = np.median(data)
except TypeError:
median = data[0]
dataset = {
"name": params[pi],
"column": param,
"sensor": sensors[pi],
"mean": means[pi],
"stdev": stdevs[pi],
"data": data,
"median": median,
}
datasets.append(dataset)
elif params_[i] != "":
param = params_[i]
if "(" in param:
name = param.split("(", 1)[0].strip()
else:
name = param
data = df[param].values
if parse_dts:
if name == "Date":
if datefmt == "auto":
unitfmt = param.split("(", 1)[1][:-1].strip()
if unitfmt == "MM/DD/YYYY":
datefmt = "%m/%d/%Y"
elif unitfmt == "DD/MM/YYYY":
datefmt = "%d/%m/%Y"
if len(data) > 0:
parts = data[0].split("/")
first = parts[0]
second = parts[1]
if unitfmt == "MM/DD/YYYY" and int(first) > 12:
datefmt = "%d/%m/%Y"
elif unitfmt == "DD/MM/YYYY" and int(second) > 12:
datefmt = "%m/%d/%Y"
elif int(first) <= 12 and int(second) <= 12:
if unitfmt == "MM/DD/YYYY" and second.startswith("0"):
datefmt = "%d/%m/%Y"
else:
pass
try:
data = [
ts.date()
for ts in pd.to_datetime(
data, format=datefmt, errors="coerce"
)
]
except:
pass
if len(np.unique(data)) == 1:
median = data[0]
else:
try:
median = np.median(data)
except TypeError:
median = data[0]
dataset = {
"name": name,
"column": param,
"sensor": "",
"mean": pd.NA,
"stdev": pd.NA,
"data": data,
"median": median,
}
datasets.append(dataset)
record["metadata"] = md
record["datasets"] = datasets
record["dataframe"] = df
return record
COL_MAPPING = defaultdict(lambda: "NA")
COL_MAPPING = {}
COL_MAPPING.update(
{
"Date (MM/DD/YYYY)": "date",
"Date (DD/MM/YYYY)": "date",
"Time (HH:mm:ss)": "time",
"Time (Fract. Sec)": "time_sec",
"Site Name": "site",
"Cond µS/cm": "cond",
"Depth m": "water_depth",
"nLF Cond µS/cm": "cond_nlf",
"ODO % sat": "do_sat",
"ODO % local": "do_local",
"ODO mg/L": "do_conc",
"ORP mV": "orp_mv",
"Pressure psi a": "press",
"Sal psu": "sal_psu",
"SpCond µS/cm": "spcond",
"TDS mg/L": "tds",
"pH": "ph",
"pH mV": "ph_mv",
"Temp °C": "temp",
"Vertical Position m": "vert_pos",
"Battery V": "battery",
"Cable Pwr V": "cable_power",
}
)
[docs]def convert_datasets_to_df(datasets, mapping=COL_MAPPING):
"""Convert a list of datasets to a dataframe, include renaming of
column names if desired.
Args:
datasets (list): see output of :func:`korexo_profile.read`.
mapping (dict): optional. The default mapping is stored
in korexo_profile.COL_MAPPING
Returns:
pandas.DataFrame: dataframe with "datetime" column added.
"""
##### TODO FIX THIS SO THAT ANY COLUMNS CAN SURVIVE THE MAPPING
mapping = dict(mapping) # make a copy
missing_from_mapping = []
for dset in datasets:
if not dset['column'] in mapping:
mapping[dset['column']] = dset['column']
df = pd.DataFrame({mapping[dset["column"]]: dset["data"] for dset in datasets})
timestamp = df["date"].astype(str) + " " + df["time"].astype(str)
timestamps = pd.to_datetime(timestamp, format="%Y-%m-%d %H:%M:%S")
df.insert(0, "datetime", timestamps)
return df
[docs]def make_regularly_spaced(df, index_col="dtw", step=0.05, step_precision=5):
"""Convert dataframe to regular spacing based on an index column.
Args:
df (pandas DataFrame)
index_col (str): column of *df* for which a regularly-spaced set of
values should be created at *step* and then all other data
interpolated against.
step (float): interval desired in *index_col*
step_precision (int)
Returns:
pandas.DataFrame: dataframe, where the newly created *index_col* values are set
as the dataframe index. All other columns of *df* are included
as columns of the *df*, interpolated at the new *index_col* values.
"""
index_min = np.round(df[index_col].min(), 0) - 1
while index_min < df[index_col].min():
index_min += step
index_min = np.round(index_min - step, step_precision)
index_max = np.round(df[index_col].max(), 0) + 1
while index_max > df[index_col].max():
index_max -= step
index_max = np.round(index_max + step, step_precision)
index_new = np.linspace(
index_min, index_max, int((index_max - index_min) / step) + 1
)
new_df = {}
groupby = df.groupby(index_col)
for col in df.columns:
if is_numeric_dtype(df[col]) and not col == index_col:
series = groupby[col].mean()
data = interp1d(
series.index, series.values, assume_sorted=True, bounds_error=False
)(index_new)
new_df[col] = data
return pd.DataFrame(new_df, index=index_new).rename_axis(index_col)
[docs]def to_las(
df,
fn,
encoding="utf-16",
col_metadata=None,
well_metadata=None,
param_metadata=None,
add_mtime_date="DATE",
auto_revert_encoding="cp1252",
):
"""Convert a KorEXO profile file to Log ASCII Standard (LAS).
Args:
df (pandas DataFrame): a regularly-spaced output from
reading a KorEXO profile CSV file.
fn (str): the original CSV file
col_metadata (dict): optional metadata for the columns. The keys should
refer to columns of *df* and each value should be a tuple.
The first item of the tuple is a string for the unit
e.g. ``"m"``, and the second item is the description.
well_metadata (dict): dict of metadata to add to the LAS
file's ~Well section.
param_metadata (dict): dict of metadata to add to the LAS
file's ~Param section
add_mtime_date (str): add the file modified time of *fn*
as a value in the ~Well section. Set to False or None
to prevent adding it at all.
auto_revert_encoding (bool): attempt to check whether the file is UTF-16
and if it is not i.e. there is no BOM, then use this encoding
instead. Set to ``False`` only if you want the code to fail
messily if you have the encoding wrong.
Returns:
lasio.LASFile object
The contents of the original KorEXO profile CSV file will be
recorded in the LAS file's ~Other block.
Example:
.. code-block::
>>> import korexo_profile
>>> data = korexo_profile.read(fn, datefmt="%d/%m/%Y")
>>> df = korexo_profile.convert_datasets_to_df(data["datasets"])
>>> df["water_depth"] += WELL_DEPTH_TO_WATER_MEASUREMENT
>>> df2 = korexo_profile.make_regularly_spaced(df, "water_depth", step=0.05)
>>> las = korexo_profile.to_las(df2, fn)
"""
if auto_revert_encoding:
values = open(fn, "rb").read(2)
if values != b"\xff\xfe":
encoding = auto_revert_encoding
if col_metadata is None:
col_metadata = {}
if well_metadata is None:
well_metadata = {}
if param_metadata is None:
param_metadata = {}
las = lasio.LASFile()
las.set_data_from_df(
df,
)
for curve in las.curves:
if curve.mnemonic in col_metadata.keys():
unit, descr = col_metadata[curve.mnemonic]
curve.unit = unit
curve.descr = descr
p = Path(fn)
stat = p.stat()
from datetime import datetime as dt_class
ctime = pd.Timestamp(dt_class.fromtimestamp(stat.st_ctime))
mtime = pd.Timestamp(dt_class.fromtimestamp(stat.st_mtime))
other = f"Filename: {p.absolute()}"
other += f"\nFile creation date: {ctime}"
other += f"\nFile modified date: {mtime}"
other += "\nFile contents follow.\n"
with open(fn, "rb") as f:
other += f.read().decode(encoding)
las.other = other
for key, value in well_metadata.items():
if "DATE" in las.well:
del las.well["DATE"]
if not key in las.well:
las.well.append(lasio.HeaderItem(key, value=value))
else:
las.well[key] = value
for key, value in param_metadata.items():
if not key in las.params:
las.params.append(lasio.HeaderItem(key, value=value))
else:
las.params[key] = value
if add_mtime_date:
las.well.append(
lasio.HeaderItem(add_mtime_date, value=mtime.strftime("%Y-%m-%d"))
)
return las