##############################################################
# core routines for software spatialist
# John Truckenbrodt 2014-2026
##############################################################
"""
This script gathers central functions and classes for general applications
"""
import dill
import tempfile
import platform
import tblib.pickling_support
from io import StringIO
from types import TracebackType
from urllib.parse import urlparse, urlunparse, urlencode
from builtins import str
import re
import sys
import fnmatch
import inspect
import itertools
import os
import subprocess as sp
import tarfile as tf
import zipfile as zf
from collections.abc import Callable, Iterator
from typing import Any, Literal, TextIO, Self
import numpy as np
from numpy.typing import NDArray
import progressbar as pb
try:
import pathos.multiprocessing as mp
except ImportError:
pass
# typing
ParsedLiteral = int | float | str | bytes
[docs]
class HiddenPrints:
"""
| Suppress console stdout prints, i.e. redirect them to a temporary string object.
| Adapted from https://stackoverflow.com/questions/8391411/suppress-calls-to-print-python
Examples
--------
>>> with HiddenPrints():
>>> print('foobar')
>>> print('foobar')
"""
_original_stdout: TextIO
def __enter__(self) -> Self:
self._original_stdout = sys.stdout
sys.stdout = StringIO()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None
) -> None:
sys.stdout = self._original_stdout
def dictmerge(x: dict, y: dict) -> dict:
"""
merge two dictionaries
"""
z = x.copy()
z.update(y)
return z
# todo consider using itertools.chain like in function finder
[docs]
def dissolve(inlist: list[Any] | tuple[Any, ...]) -> list[Any]:
"""
list and tuple flattening
Parameters
----------
inlist
the list with sub-lists or tuples to be flattened
Returns
-------
out
the flattened result
Examples
--------
>>> dissolve([[1, 2], [3, 4]])
[1, 2, 3, 4]
>>> dissolve([(1, 2, (3, 4)), [5, (6, 7)]])
[1, 2, 3, 4, 5, 6, 7]
"""
out = []
for i in inlist:
i = list(i) if isinstance(i, tuple) else i
out.extend(dissolve(i)) if isinstance(i, list) else out.append(i)
return out
def parent_dirs(path: str) -> Iterator[str]:
"""
generator that yields parent directories of a zipfile path
Parameters
----------
path
a path to get parent directories from
Yields
-------
dir_name
parent directory names with trailing "/"
"""
parent = os.path.dirname(path)
if parent:
yield from parent_dirs(parent)
yield parent + "/"
def namelist_with_implicit_dirs(root: zf.ZipFile) -> list[str]:
"""
returns a list of files in zipfile archive, including implicit directories
Parameters
----------
root
zipfile archive get namelist from
Returns
-------
names
list of zipfile folders and files in the archive
"""
complete_namelist: set[str] = set()
for file_name in root.namelist():
complete_namelist.update(set(parent_dirs(file_name)))
complete_namelist.add(file_name)
return list(complete_namelist)
[docs]
def finder(
target: str | list[str],
matchlist: list[str],
foldermode: int = 0,
regex: bool = False,
recursive: bool = True
) -> list[str]:
"""
function for finding files/folders in folders and their subdirectories
Parameters
----------
target
a directory, zip- or tar-archive or a list of them to be searched
matchlist
a list of search patterns
foldermode
* 0: only files
* 1: files and folders
* 2: only folders
regex
are the search patterns in matchlist regular expressions or unix shell standard (default)?
recursive
search target recursively into all subdirectories or only in the top level?
This is currently only implemented for parameter `target` being a directory.
Returns
-------
paths
the absolute names of files/folders matching the patterns
"""
if foldermode not in [0, 1, 2]:
raise ValueError("'foldermode' must be either 0, 1 or 2")
# match patterns
if isinstance(target, str):
pattern = r'|'.join(matchlist if regex else [fnmatch.translate(x) for x in matchlist])
if os.path.isdir(target):
if recursive:
out = dissolve([[os.path.join(root, x)
for x in dirs + files
if re.search(pattern, x)]
for root, dirs, files in os.walk(target)])
else:
out = [os.path.join(target, x)
for x in os.listdir(target)
if re.search(pattern, x)]
if foldermode == 0:
out = [x for x in out if not os.path.isdir(x)]
if foldermode == 2:
out = [x for x in out if os.path.isdir(x)]
return sorted(out)
elif os.path.isfile(target):
if zf.is_zipfile(target):
with zf.ZipFile(target, 'r') as zip:
out = [os.path.join(target, name)
for name in namelist_with_implicit_dirs(zip)
if re.search(pattern, os.path.basename(name.strip('/')))]
if foldermode == 0:
out = [x for x in out if not x.endswith('/')]
elif foldermode == 1:
out = [x.strip('/') for x in out]
elif foldermode == 2:
out = [x.strip('/') for x in out if x.endswith('/')]
return sorted(out)
elif tf.is_tarfile(target):
tar = tf.open(target)
out = [name for name in tar.getnames()
if re.search(pattern, os.path.basename(name.strip('/')))]
if foldermode == 0:
out = [x for x in out if not tar.getmember(x).isdir()]
elif foldermode == 2:
out = [x for x in out if tar.getmember(x).isdir()]
tar.close()
out = [os.path.join(target, x) for x in out]
return sorted(out)
else:
raise RuntimeError("if parameter 'target' is a file, "
"it must be a zip or tar archive:\n {}"
.format(target))
else:
raise RuntimeError("if parameter 'target' is of type str, "
"it must be a directory or a file:\n {}"
.format(target))
elif isinstance(target, list):
groups = [finder(x, matchlist, foldermode, regex, recursive) for x in target]
return list(itertools.chain(*groups))
else:
raise TypeError("parameter 'target' must be of type str or list")
[docs]
def multicore(
function: Callable[..., Any],
cores: int,
multiargs: dict[str, Any],
pbar: bool = False,
**singleargs: Any
) -> list[Any] | None:
"""
wrapper for multicore process execution
Parameters
----------
function
individual function to be applied to each process item
cores
the number of subprocesses started/CPUs used;
this value is reduced in case the number of subprocesses is smaller
multiargs
a dictionary containing sub-function argument names as keys and lists of arguments to be
distributed among the processes as values
pbar
add a progress bar? Does not yet work on Windows.
singleargs
all remaining arguments which are invariant among the subprocesses
Returns
-------
out
the return of the function for all subprocesses
Notes
-----
- all `multiargs` value lists must be of same length, i.e. all argument keys must be explicitly defined for each
subprocess
- all function arguments passed via `singleargs` must be provided with the full argument name and its value
(i.e. argname=argval); default function args are not accepted
- if the processes return anything else than None, this function will return a list of results
- if all processes return None, this function will be of type void
Examples
--------
>>> def add(x, y, z):
>>> return x + y + z
>>> multicore(add, cores=2, multiargs={'x': [1, 2]}, y=5, z=9)
[15, 16]
>>> multicore(add, cores=2, multiargs={'x': [1, 2], 'y': [5, 6]}, z=9)
[15, 17]
See Also
--------
:mod:`pathos.multiprocessing`
"""
tblib.pickling_support.install()
# compare the function arguments with the multi and single arguments and raise errors if mismatches occur
check = inspect.getfullargspec(function)
varkw = check.varkw
if not check.varargs and not varkw:
multiargs_check = [x for x in multiargs if x not in check.args]
singleargs_check = [x for x in singleargs if x not in check.args]
if len(multiargs_check) > 0:
raise AttributeError('incompatible multi arguments: {0}'.format(', '.join(multiargs_check)))
if len(singleargs_check) > 0:
raise AttributeError('incompatible single arguments: {0}'.format(', '.join(singleargs_check)))
# compare the list lengths of the multi arguments and raise errors if they are of different length
arglengths = list(set([len(multiargs[x]) for x in multiargs]))
if len(arglengths) > 1:
raise AttributeError('multi argument lists of different length')
if arglengths[0] == 0:
raise RuntimeError('did not get any multiargs')
# prevent starting more threads than necessary
cores = cores if arglengths[0] >= cores else arglengths[0]
# create a list of dictionaries each containing the arguments for individual
# function calls to be passed to the multicore processes
processlist = [dictmerge(dict([(arg, multiargs[arg][i]) for arg in multiargs]), singleargs)
for i in range(len(multiargs[list(multiargs.keys())[0]]))]
if platform.system() == 'Windows':
# in Windows parallel processing needs to strictly be in a "if __name__ == '__main__':" wrapper
# it was thus necessary to outsource this to a different script and try to serialize all input for sharing objects
# https://stackoverflow.com/questions/38236211/why-multiprocessing-process-behave-differently-on-windows-and-linux-for-global-o
# a helper script to perform the parallel processing
script = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'multicore_helper.py')
# a temporary file to write the serialized function variables
tmpfile = os.path.join(tempfile.gettempdir(), 'spatialist_dump')
# check if everything can be serialized
if not dill.pickles([function, cores, processlist]):
raise RuntimeError('cannot fully serialize function arguments;\n'
' see https://github.com/uqfoundation/dill for supported types')
# write the serialized variables
with open(tmpfile, 'wb') as tmp:
dill.dump([function, cores, processlist], tmp, byref=False)
# run the helper script
proc = sp.Popen([sys.executable, script], stdin=sp.PIPE, stderr=sp.PIPE)
out, err = proc.communicate()
if proc.returncode != 0:
raise RuntimeError(err.decode())
# retrieve the serialized output of the processing which was written to the temporary file by the helper script
with open(tmpfile, 'rb') as tmp:
result = dill.load(tmp)
return result
else:
def wrapper(**kwargs):
try:
# hide print messages in the sub-processes
with HiddenPrints():
out = function(**kwargs)
return out
except Exception as e:
return ExceptionWrapper(e)
jobs = len(processlist)
progress = None
chunksize, remainder = divmod(jobs, cores * 4)
if remainder:
chunksize += 1
if pbar:
widgets = [pb.Percentage(), pb.Bar(), pb.Timer(), ' ', pb.ETA()]
progress = pb.ProgressBar(max_value=jobs, widgets=widgets).start()
with mp.ProcessPool(processes=cores) as pool:
results = pool.amap(lambda x: wrapper(**x), processlist)
while not results.ready():
left = results._number_left * chunksize
done = jobs - left if left <= jobs else 0
if pbar:
progress.update(done)
results = results.get()
if progress is not None:
progress.finish()
i = 0
out = []
for item in results:
if isinstance(item, ExceptionWrapper):
item.ee = type(item.ee)(str(item.ee) +
"\n(called function '{}' with args {})"
.format(function.__name__, processlist[i]))
raise (item.re_raise())
out.append(item)
i += 1
# evaluate the return of the processing function;
# if any value is not None then the whole list of results is returned
eval = [x for x in out if x is not None]
if len(eval) == 0:
return None
else:
return out
def add(x: int, y: int, z: int):
"""
only a dummy function for testing the multicore function
defining it in the test script is not possible since it cannot be serialized
with a reference module that does not exist (i.e. the test script)
"""
return x + y + z
class ExceptionWrapper(object):
"""
| class for enabling traceback pickling in function multiprocess
| https://stackoverflow.com/questions/6126007/python-getting-a-traceback-from-a-multiprocessing-process
| https://stackoverflow.com/questions/34463087/valid-syntax-in-both-python-2-x-and-3-x-for-raising-exception
"""
ee: BaseException
tb: TracebackType | None
def __init__(self, ee: BaseException) -> None:
self.ee = ee
__, __, self.tb = sys.exc_info()
def re_raise(self) -> None:
def reraise(tp, value, tb=None):
raise tp.with_traceback(tb)
reraise(self.ee, None, self.tb)
[docs]
def parse_literal(
x: str | bytes | list[str | bytes]
) -> ParsedLiteral | list[ParsedLiteral]:
"""
return the smallest possible data type for a string or list of strings
Parameters
----------
x
a string to be parsed
Returns
-------
out
the parsing result
Examples
--------
>>> isinstance(parse_literal('1.5'), float)
True
>>> isinstance(parse_literal('1'), int)
True
>>> isinstance(parse_literal('foobar'), str)
True
"""
if isinstance(x, list):
return [parse_literal(y) for y in x]
elif isinstance(x, (bytes, str)):
try:
return int(x)
except ValueError:
try:
return float(x)
except ValueError:
return x
else:
raise TypeError(f'expected str|bytes, got {type(x)}')
def rescale(
inlist: list[int | float],
newrange: tuple[int | float, int | float] = (0, 1)
) -> list[float]:
"""
rescale the values in a list between the values in newrange (a tuple with the new minimum and maximum)
"""
OldMax = max(inlist)
OldMin = min(inlist)
if OldMin == OldMax:
raise RuntimeError('list contains of only one unique value')
OldRange = OldMax - OldMin
NewRange = newrange[1] - newrange[0]
result = [(((float(x) - OldMin) * NewRange) / OldRange) + newrange[0] for x in inlist]
return result
[docs]
def run(
cmd: list[Any],
outdir: str | None = None,
logfile: str | None = None,
inlist: list[Any] | None = None,
void: bool = True,
errorpass: bool = False,
env: dict[str, Any] | None = None
) -> tuple[int, str, str] | None:
"""
| Wrapper for subprocess execution including logfile writing and command prompt piping.
| This is a convenience wrapper around the :mod:`subprocess` module and calls
its class :class:`~subprocess.Popen` internally.
Parameters
----------
cmd:
the command arguments
outdir:
the directory to execute the command in
logfile:
a file to write stdout to
inlist:
a list of arguments passed to stdin, i.e., arguments passed to interactive input of the program
void:
return stdout and stderr?
errorpass:
if False, a :class:`subprocess.CalledProcessError` is raised if the command fails
env:
the environment to be passed to the subprocess
Returns
-------
a tuple of (returncode, stdout, stderr) if `void=False` otherwise `None`
"""
cmd = [str(x) for x in dissolve(cmd)]
if outdir is None:
outdir = os.getcwd()
log = sp.PIPE if logfile is None else open(logfile, 'a')
try:
proc = sp.Popen(args=cmd, stdin=sp.PIPE, stdout=log, stderr=sp.PIPE,
cwd=outdir, env=env, text=True, encoding='utf-8')
instream = None if inlist is None else ''.join(str(x) + '\n' for x in inlist)
out, err = proc.communicate(input=instream)
if not errorpass and proc.returncode != 0:
raise sp.CalledProcessError(returncode=proc.returncode,
cmd=cmd, output=out, stderr=err)
# add line for separating log entries of repeated function calls
if logfile:
log.write('#' * 70 + '\n')
finally:
if logfile is not None:
log.close()
# normalize None to '' and return
if not void:
out = '' if out is None else out
err = '' if err is None else err
return proc.returncode, out, err
return None
def union(a: list[Any], b: list[Any]) -> list[Any]:
"""
union of two lists
"""
return list(set(a) & set(b))
def urlQueryParser(url: str, querydict: dict[str, str]) -> str:
"""
parse a url query
"""
address_parse = urlparse(url)
return str(urlunparse(address_parse._replace(query=urlencode(querydict))))
[docs]
def parallel_apply_along_axis(
func1d: Callable[..., Any],
axis: int,
arr: NDArray[Any],
cores: int = 4,
*args: Any,
**kwargs: Any
) -> NDArray[Any]:
"""
Like :func:`numpy.apply_along_axis()` but using multiple threads.
Adapted from `here <https://stackoverflow.com/questions/45526700/
easy-parallelization-of-numpy-apply-along-axis>`_.
Parameters
----------
func1d
the function to be applied
axis
the axis along which to apply `func1d`
arr
the input array
cores
the number of parallel cores
args
Additional arguments to `func1d`.
kwargs
Additional named arguments to `func1d`.
Returns
-------
out
"""
# Effective axis where apply_along_axis() will be applied by each
# worker (any non-zero axis number would work, so as to allow the use
# of `np.array_split()`, which is only done on axis 0):
effective_axis = 1 if axis == 0 else axis
if effective_axis != axis:
arr = arr.swapaxes(axis, effective_axis)
def unpack(arguments):
func1d, axis, arr, args, kwargs = arguments
return np.apply_along_axis(func1d, axis, arr, *args, **kwargs)
if cores <= 0:
raise ValueError('cores must be larger than 0')
elif cores == 1:
return np.apply_along_axis(func1d, axis, arr, *args, **kwargs)
else:
chunks = [(func1d, effective_axis, sub_arr, args, kwargs)
for sub_arr in np.array_split(arr, mp.cpu_count())]
pool = mp.Pool(cores)
individual_results = pool.map(unpack, chunks)
# Freeing the workers:
pool.close()
pool.join()
return np.concatenate(individual_results)
[docs]
def sampler(
mask: NDArray[np.bool_],
samples: int | None = None,
dim: Literal[1, 2] = 1,
replace: bool = False,
seed: int = 42
) -> NDArray[np.integer[Any]]: # any kind of integer array
"""
General function to select random sample indexes from arrays.
Adapted from package `S1_ARD <https://github.com/johntruckenbrodt/S1_ARD>`_.
Parameters
----------
mask
A 2D boolean mask to limit the sample selection.
samples
The number of samples to select. If None, the positions of all matching values are returned.
If there are fewer values than required samples, the positions of all values are returned.
dim
The dimensions of the output array and its indexes. If 1, the returned array has one
dimension and the indexes refer to the one-dimensional (i.e., flattened) representation
of the input mask. If 2, the output array is of shape `(2, samples)` with two separate
2D arrays for y (index 0) and x respectively, which reference positions in the original
2D shape of the input array.
replace
Draw samples with or without replacement?
seed
Seed used to initialize the pseudo-random number generator.
Returns
-------
idx
The index positions of the generated random samples as 1D or 2D array.
Examples
--------
>>> import numpy as np
>>> from spatialist.ancillary import sampler
>>> array = np.array([[1, 2], [3, 4], [5, 6], [7, 8]])
>>> mask = array > 2
>>> s1d = sampler(mask=mask, samples=2, dim=1)
>>> s2d = sampler(mask=mask, samples=2, dim=2)
>>> print(s1d)
[2 3]
>>> print(s2d)
[[1 1]
[0 1]]
>>> print(array.flatten()[s1d] == array[s2d[0], s2d[1]])
[ True True]
See Also
--------
numpy.random.seed
numpy.random.choice
"""
cols, rows = mask.shape
indices = np.where(mask.flatten())[0]
samplesize = min(indices.size, samples) if samples is not None else indices.size
np.random.seed(seed)
sample_ids = np.random.choice(a=indices, size=samplesize, replace=replace)
if dim == 1:
return sample_ids
elif dim == 2:
out = np.ndarray(shape=(2, samples), dtype=np.uint)
out[0] = sample_ids // rows
out[1] = sample_ids % rows
return out
else:
raise ValueError("'dim' must either be 1 or 2")