Source code for spatialist.vector

# -*- coding: utf-8 -*-
################################################################
# OGR wrapper for convenient vector data handling and processing
# John Truckenbrodt 2015-2026
################################################################


from __future__ import annotations

import os
import yaml
from datetime import datetime, timezone, timedelta
from osgeo import ogr, osr, gdal
from osgeo.gdalconst import GDT_Byte
from types import TracebackType
from typing import Any, TYPE_CHECKING
from numpy.typing import NDArray

if TYPE_CHECKING:
    from .raster import Raster

from .auxil import crsConvert
from .ancillary import parse_literal
from .sqlite_util import sqlite_setup

import pandas as pd
import geopandas as gpd
from shapely.wkb import loads as wkb_loads

ogr.UseExceptions()
osr.UseExceptions()
gdal.UseExceptions()

CRS = int | str | osr.SpatialReference


[docs] class Vector: """ This is intended as a vector meta information handler with options for reading and writing vector data in a convenient manner by simplifying the numerous options provided by the OGR python binding. Parameters ---------- filename the vector file to read; if filename is `None`, a new in-memory Vector object is created. In this case `driver` is overridden and set to 'MEM'. The following file extensions are auto-detected: .. list_drivers:: vector driver the vector file format; needs to be defined if the format cannot be auto-detected from the filename extension """ filename: str | None def __init__(self, filename: str | None = None, driver: str | None = None) -> None: if filename is None: driver = 'MEM' elif isinstance(filename, str): if not os.path.isfile(filename): raise OSError('file does not exist') if driver is None: driver = self.__driver_autodetect(filename) else: raise TypeError('filename must either be str or None') self.filename = filename self.driver = ogr.GetDriverByName(driver) self.vector = self.driver.CreateDataSource('out') if driver == 'MEM' else self.driver.Open(filename) nlayers = self.vector.GetLayerCount() if nlayers > 1: raise RuntimeError('multiple layers are currently not supported') elif nlayers == 1: self.init_layer()
[docs] def __getitem__(self, expression: int | str) -> Vector | None: """ subset the vector object by index or attribute. Parameters ---------- expression the key or expression to be used for subsetting. See :meth:`osgeo.ogr.Layer.SetAttributeFilter` for details on the expression syntax. Returns ------- out a vector object matching the specified criteria Examples -------- Assuming we have a shapefile called `testsites.shp`, which has an attribute `sitename`, we can subset individual sites and write them to new files like so: >>> from spatialist import Vector >>> filename = 'testsites.shp' >>> with Vector(filename)["sitename='site1'"] as site1: >>> site1.write('site1.shp') """ if not isinstance(expression, (int, str)): raise RuntimeError('expression must be of type int or str') expression = parse_literal(expression) if isinstance(expression, str) else expression if isinstance(expression, int): feat = self.getFeatureByIndex(expression) else: self.layer.SetAttributeFilter(expression) feat = self.getfeatures() feat = feat if len(feat) > 0 else None self.layer.SetAttributeFilter('') if feat is None: return None else: return feature2vector(feat, ref=self)
def __enter__(self) -> Vector: return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None ) -> None: self.close() def __str__(self) -> str: vals = dict() vals['proj4'] = self.proj4 vals.update(self.extent) vals['filename'] = self.filename if self.filename is not None else 'memory' vals['geomtype'] = ', '.join(list(set(self.geomTypes))) info = 'class : spatialist Vector object\n' \ 'geometry type : {geomtype}\n' \ 'extent : {xmin:.3f}, {xmax:.3f}, {ymin:.3f}, {ymax:.3f} (xmin, xmax, ymin, ymax)\n' \ 'coord. ref. : {proj4}\n' \ 'data source : {filename}'.format(**vals) return info @staticmethod def __driver_autodetect(filename: str) -> str: path = os.path.dirname(os.path.realpath(__file__)) drivers = yaml.safe_load(open(os.path.join(path, 'drivers_vector.yml'))) extension = os.path.splitext(filename)[1][1:] if extension not in drivers.keys(): message = "the file extension '{}' is not supported. " \ "Please provide the OGR format descriptor via " \ "parameter 'driver' or use one of the supported extensions:\n- .{}" message = message.format(extension, '\n- .'.join(drivers.keys())) raise RuntimeError(message) else: return drivers[extension]
[docs] def addfeature(self, geometry: ogr.Geometry, fields: dict[str, Any] | None = None) -> None: """ add a feature to the vector object from a geometry Parameters ---------- geometry the geometry to add as a feature fields the field names and values to assign to the new feature """ feature = ogr.Feature(self.layerdef) feature.SetGeometry(geometry) if fields is not None: for field_name, value in fields.items(): if field_name not in self.fieldnames: raise IOError('field "{}" is missing'.format(field_name)) field_defn = feature.GetFieldDefnRef(field_name) field_type = field_defn.GetType() field_type_name = field_defn.GetTypeName() try: set_field(target=feature, name=field_name, type=field_type, values=value) except Exception as e: message = str(e) + (f'\ntrying to set field {field_name} ' f'(type {field_type_name}) to value ' f'{value} (type {type(value)})') raise RuntimeError(message) from e self.layer.CreateFeature(feature) feature = None self.init_features()
[docs] def addfield(self, name: str, type: int, width: int = 10, values: list[Any] | None = None) -> None: """ add a field to the vector layer Parameters ---------- name the field name type the OGR Field Type (OFT), e.g. ogr.OFTString. See :class:`osgeo.ogr.FieldDefn`. width the width of the new field (only for ogr.OFTString fields) values an optional list with values for each feature to assign to the new field. The length must be identical to the number of features. """ set_field(self, name, type, width=width, values=values)
[docs] def addlayer(self, name: str, srs: CRS, geomType: int) -> None: """ add a layer to the vector layer Parameters ---------- name the layer name srs the spatial reference system. See :func:`spatialist.auxil.crsConvert` for options. geomType an OGR well-known binary data type. See `Module ogr <https://gdal.org/python/osgeo.ogr-module.html>`_. """ self.vector.CreateLayer(name, crsConvert(srs, 'osr'), geomType) self.init_layer()
[docs] def addvector(self, vec: Vector) -> None: """ add a vector object to the layer of the current Vector object Parameters ---------- vec the vector object to add merge: bool merge overlapping polygons? """ vec.layer.ResetReading() for feature in vec.layer: self.layer.CreateFeature(feature) self.init_features() vec.layer.ResetReading()
[docs] def bbox( self, outname: str | None = None, driver: str | None = None, overwrite: bool = True ) -> Vector | None: """ create a bounding box from the extent of the Vector object Parameters ---------- outname the name of the vector file to be written; if None, a Vector object is returned driver the name of the file format to write overwrite overwrite an already existing file? Returns ------- if outname is None, the bounding box Vector object """ if outname is None: return bbox(self.extent, self.srs) else: bbox(self.extent, self.srs, outname=outname, driver=driver, overwrite=overwrite)
[docs] def clone(self) -> Vector: return feature2vector(self.getfeatures(), ref=self)
[docs] def close(self) -> None: """ closes the OGR vector file connection """ self.vector = None for feature in self.__features: if feature is not None: feature = None
[docs] def convert2wkt(self, set3D: bool = True) -> list[str]: """ export the geometry of each feature as a wkt string Parameters ---------- set3D keep the third (height) dimension? """ features = self.getfeatures() for feature in features: try: feature.geometry().Set3D(set3D) except AttributeError: dim = 3 if set3D else 2 feature.geometry().SetCoordinateDimension(dim) return [feature.geometry().ExportToWkt() for feature in features]
@property def extent(self) -> dict[str, float]: """ the extent of the vector object Returns ------- a dictionary with keys `xmin`, `xmax`, `ymin`, `ymax` """ return dict(zip(['xmin', 'xmax', 'ymin', 'ymax'], self.layer.GetExtent())) @property def fieldDefs(self) -> list[ogr.FieldDefn]: """ Returns ------- the field definition for each field of the Vector object """ return [self.layerdef.GetFieldDefn(x) for x in range(0, self.nfields)] @property def fieldnames(self) -> list[str]: """ Returns ------- the names of the fields """ return sorted([field.GetName() for field in self.fieldDefs]) @property def geomType(self) -> int: """ Returns ------- the layer geometry type """ return self.layerdef.GetGeomType() @property def geomTypes(self) -> list[str]: """ Returns ------- the geometry type of each feature """ return [feat.GetGeometryRef().GetGeometryName() for feat in self.getfeatures()]
[docs] def getArea(self) -> float: """ Returns ------- the area of the vector geometries """ return sum([x.GetGeometryRef().GetArea() for x in self.getfeatures()])
[docs] def getFeatureByAttribute( self, fieldname: str, attribute: int | str ) -> ogr.Feature | list[ogr.Feature] | None: """ get features by field attribute Parameters ---------- fieldname the name of the queried field attribute the field value of interest Returns ------- the feature(s) matching the search query """ attr = attribute.strip() if isinstance(attribute, str) else attribute if fieldname not in self.fieldnames: raise KeyError('invalid field name') out = [] self.layer.ResetReading() for feature in self.layer: field = feature.GetField(fieldname) field = field.strip() if isinstance(field, str) else field if field == attr: out.append(feature.Clone()) self.layer.ResetReading() if len(out) == 0: return None elif len(out) == 1: return out[0] else: return out
[docs] def getFeatureByIndex(self, index: int) -> ogr.Feature | None: """ get features by numerical (positional) index Parameters ---------- index the queried index Returns ------- the requested feature """ feature = self.layer[index] if feature is None: feature = self.getfeatures()[index] return feature
[docs] def getfeatures(self) -> list[ogr.Feature]: """ Returns ------- a list of cloned features """ self.layer.ResetReading() features = [x.Clone() for x in self.layer] self.layer.ResetReading() return features
[docs] def getProjection(self, type: str) -> CRS: """ get the CRS of the Vector object. See :func:`spatialist.auxil.crsConvert`. Parameters ---------- type the type of projection required. Returns ------- the output CRS """ return crsConvert(self.layer.GetSpatialRef(), type)
[docs] def getUniqueAttributes(self, fieldname: str) -> list[int | str]: """ Parameters ---------- fieldname the name of the field of interest Returns ------- the unique attributes of the field """ self.layer.ResetReading() attributes = list(set([x.GetField(fieldname) for x in self.layer])) self.layer.ResetReading() return sorted(attributes)
[docs] def init_features(self) -> None: """ delete all in-memory features """ del self.__features self.__features = [None] * self.nfeatures
[docs] def init_layer(self) -> None: """ initialize a layer object """ self.layer = self.vector.GetLayer() self.__features = [None] * self.nfeatures
@property def layerdef(self) -> ogr.FeatureDefn: """ Returns ------- the layer's feature definition """ return self.layer.GetLayerDefn() @property def layername(self) -> str: """ Returns ------- the name of the layer """ return self.layer.GetName()
[docs] def load(self) -> None: """ load all feature into memory """ self.layer.ResetReading() for i in range(self.nfeatures): if self.__features[i] is None: self.__features[i] = self.layer[i]
@property def nfeatures(self) -> int: """ Returns ------- the number of features """ return len(self.layer) @property def nfields(self) -> int: """ Returns ------- the number of fields """ return self.layerdef.GetFieldCount() @property def nlayers(self) -> int: """ Returns ------- the number of layers """ return self.vector.GetLayerCount() @property def proj4(self) -> str: """ Returns ------- the CRS in PRO4 format """ return self.srs.ExportToProj4().strip()
[docs] def reproject(self, projection: CRS) -> None: """ in-memory reprojection Parameters ---------- projection the target CRS. See :func:`spatialist.auxil.crsConvert`. """ srs_out = crsConvert(projection, 'osr') # the following check was found to not work in GDAL 3.0.1; likely a bug # if self.srs.IsSame(srs_out) == 0: if self.getProjection('epsg') != crsConvert(projection, 'epsg'): # create the CoordinateTransformation coordTrans = osr.CoordinateTransformation(self.srs, srs_out) layername = self.layername geomType = self.geomType features = self.getfeatures() feat_def = features[0].GetDefnRef() fields = [feat_def.GetFieldDefn(x) for x in range(0, feat_def.GetFieldCount())] self.__init__() self.addlayer(layername, srs_out, geomType) self.layer.CreateFields(fields) for feature in features: geom = feature.GetGeometryRef() geom.Transform(coordTrans) newfeature = feature.Clone() newfeature.SetGeometry(geom) self.layer.CreateFeature(newfeature) newfeature = None self.init_features()
[docs] def setCRS(self, crs: CRS) -> None: """ directly reset the spatial reference system of the vector object. This is not going to reproject the Vector object, see :meth:`reproject` instead. Parameters ---------- crs the input CRS Example ------- >>> site = Vector('shape.shp') >>> site.setCRS('+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs ') """ # try to convert the input crs to osr.SpatialReference srs_out = crsConvert(crs, 'osr') # save all relevant info from the existing vector object layername = self.layername geomType = self.geomType layer_definition = ogr.Feature(self.layer.GetLayerDefn()) fields = [layer_definition.GetFieldDefnRef(x) for x in range(layer_definition.GetFieldCount())] features = self.getfeatures() # initialize a new vector object and create a layer self.__init__() self.addlayer(layername, srs_out, geomType) # add the fields to new layer self.layer.CreateFields(fields) # add the features to the newly created layer for feat in features: self.layer.CreateFeature(feat) self.init_features()
@property def srs(self) -> osr.SpatialReference: """ Returns ------- the geometry's spatial reference system """ return self.layer.GetSpatialRef()
[docs] def to_geopandas(self) -> gpd.GeoDataFrame: """ Convert the object to a geopandas GeoDataFrame. `DateTime` fields are converted to :class:`pandas.Timestamp` using :func:`pandas.to_datetime`. Returns ------- the dataframe object See Also -------- osgeo.ogr.Feature.items """ field_types = {x.GetName(): x.GetTypeName() for x in self.fieldDefs} features = [] self.layer.ResetReading() for feature in self.layer: geom = feature.GetGeometryRef() geom_wkb = geom.ExportToWkb() properties = feature.items() properties["geometry"] = wkb_loads(bytes(geom_wkb)) features.append(properties) self.layer.ResetReading() gdf = gpd.GeoDataFrame(features, crs=self.srs.ExportToWkt()) for field_name, field_type in field_types.items(): if field_type == "DateTime": gdf[field_name] = pd.to_datetime(arg=gdf[field_name], format='ISO8601') return gdf
[docs] def write(self, outfile: str, driver: str | None = None, overwrite: bool = True) -> None: """ write the Vector object to a file Parameters ---------- outfile the name of the file to write; the following extensions are automatically detected for determining the format driver: .. list_drivers:: vector driver the output file format; default None: try to autodetect from the file name extension overwrite overwrite an already existing file? """ if driver is None: driver = self.__driver_autodetect(outfile) driver = ogr.GetDriverByName(driver) if os.path.exists(outfile): if overwrite: driver.DeleteDataSource(outfile) else: raise RuntimeError('target file already exists') ds_out = driver.CreateDataSource(outfile) ds_out.CopyLayer(self.layer, self.layer.GetName()) ds_out = driver = None
[docs] def bbox( coordinates: dict[str, int | float], crs: CRS, outname: str | None = None, driver: str | None = None, overwrite: bool = True, buffer: int | float | tuple[int | float, int | float] | None = None ) -> Vector | None: """ create a bounding box vector object or file. The CRS can be in either WKT, EPSG or PROJ4 format Parameters ---------- coordinates a dictionary containing numerical variables with keys `xmin`, `xmax`, `ymin` and `ymax`. crs the coordinate reference system of the `coordinates`. See :func:`~spatialist.auxil.crsConvert` for options. outname the file to write to. If `None`, the bounding box is returned as :class:`~spatialist.vector.Vector` object. driver the output file format; needs to be defined if the format cannot be auto-detected from the filename extension. overwrite overwrite an existing file? buffer a buffer to add around `coordinates`. Default None: do not add a buffer. A tuple is interpreted as (x buffer, y buffer). Returns ------- the bounding box Vector object """ if buffer is not None: coordinates = coordinates.copy() if isinstance(buffer, tuple): xbuffer, ybuffer = buffer else: xbuffer = ybuffer = buffer coordinates['xmin'] -= xbuffer coordinates['xmax'] += xbuffer coordinates['ymin'] -= ybuffer coordinates['ymax'] += ybuffer srs = crsConvert(crs, 'osr') ring = ogr.Geometry(ogr.wkbLinearRing) ring.AddPoint(coordinates['xmin'], coordinates['ymin']) ring.AddPoint(coordinates['xmax'], coordinates['ymin']) ring.AddPoint(coordinates['xmax'], coordinates['ymax']) ring.AddPoint(coordinates['xmin'], coordinates['ymax']) ring.CloseRings() geom = ogr.Geometry(ogr.wkbPolygon) geom.AddGeometry(ring) geom.FlattenTo2D() bbox = Vector() bbox.addlayer('bbox', srs, geom.GetGeometryType()) bbox.addfield('area', ogr.OFTReal) bbox.addfeature(geom, fields={'area': geom.Area()}) geom = None if outname is None: return bbox else: bbox.write(outfile=outname, driver=driver, overwrite=overwrite)
[docs] def boundary( vectorobject: Vector, expression: str | None = None, outname: str | None = None ) -> Vector | None: """ Get the boundary of the largest geometry as new vector object. The following steps are performed: - find the largest geometry matching the expression - compute the geometry's boundary using :meth:`osgeo.ogr.Geometry.Boundary`, returning a MULTILINE geometry - select the longest line of the MULTILINE geometry - create a closed linear ring from this longest line - create a polygon from this linear ring - create a new :class:`Vector` object and add the newly created polygon Parameters ---------- vectorobject the vector object containing multiple polygon geometries, e.g. all geometries with a certain value in one field. expression the SQL expression to select the candidates for the largest geometry. outname the name of the output vector file; if None, an in-memory object of type :class:`Vector` is returned. Returns ------- if `outname` is `None`, a vector object pointing to an in-memory dataset else `None` """ largest = None area = None vectorobject.layer.ResetReading() if expression is not None: vectorobject.layer.SetAttributeFilter(expression) for feat in vectorobject.layer: geom = feat.GetGeometryRef() geom_area = geom.GetArea() if (largest is None) or (geom_area > area): largest = feat.GetFID() area = geom_area if expression is not None: vectorobject.layer.SetAttributeFilter('') vectorobject.layer.ResetReading() feat_major = vectorobject.layer.GetFeature(largest) major = feat_major.GetGeometryRef() boundary = major.Boundary() if boundary.GetGeometryName() == 'LINESTRING': longest = boundary else: # MULTILINESTRING longest = None for line in boundary: if (longest is None) or (line.Length() > longest.Length()): longest = line points = longest.GetPoints() ring = ogr.Geometry(ogr.wkbLinearRing) for point in points: ring.AddPoint_2D(*point) ring.CloseRings() poly = ogr.Geometry(ogr.wkbPolygon) poly.AddGeometry(ring) vec_out = Vector() vec_out.addlayer('layer', vectorobject.layer.GetSpatialRef(), poly.GetGeometryType()) vec_out.addfield('area', ogr.OFTReal) fields = {'area': poly.Area()} vec_out.addfeature(poly, fields=fields) ring = None geom = None line = None longest = None poly = None boundary = None major = None feat_major = None if outname is not None: vec_out.write(outname) vec_out.close() else: return vec_out
[docs] def centerdist(obj1: Vector, obj2: Vector) -> float: """ Get the center distance between two vector objects. Parameters ---------- obj1 obj2 Returns ------- the distance in units of the source object's CRS """ if not isinstance(obj1, Vector) or isinstance(obj2, Vector): raise IOError('both objects must be of type Vector') feature1 = obj1.getFeatureByIndex(0) geometry1 = feature1.GetGeometryRef() center1 = geometry1.Centroid() feature2 = obj2.getFeatureByIndex(0) geometry2 = feature2.GetGeometryRef() center2 = geometry2.Centroid() return center1.Distance(center2)
[docs] def dissolve(infile: str, outfile: str, field: str, layername: str | None = None) -> None: """ dissolve the polygons of a vector file by an attribute field Parameters ---------- infile the input vector file outfile the output shapefile field the field name to merge the polygons by layername the name of the output vector layer; If set to None the layername will be the basename of infile without extension """ with Vector(infile) as vec: srs = vec.srs feat = vec.layer[0] d = feat.GetFieldDefnRef(field) width = d.width type = d.type feat = None layername = layername if layername is not None else os.path.splitext(os.path.basename(infile))[0] # the following can be used if GDAL was compiled with the spatialite extension # not tested, might need some additional/different lines # with Vector(infile) as vec: # vec.vector.ExecuteSQL('SELECT ST_Union(geometry), {0} FROM {1} GROUP BY {0}'.format(field, vec.layername), # dialect='SQLITE') # vec.write(outfile) conn = sqlite_setup(extensions=['spatialite', 'gdal']) conn.execute('CREATE VIRTUAL TABLE merge USING VirtualOGR("{}");'.format(infile)) select = conn.execute('SELECT {0},asText(ST_Union(geometry)) as geometry FROM merge GROUP BY {0};'.format(field)) fetch = select.fetchall() with Vector() as merge: merge.addlayer(layername, srs, ogr.wkbPolygon) merge.addfield(field, type=type, width=width) for i in range(len(fetch)): merge.addfeature(ogr.CreateGeometryFromWkt(fetch[i][1]), {field: fetch[i][0]}) merge.write(outfile) conn.close()
[docs] def feature2vector( feature: ogr.Feature | list[ogr.Feature], ref: Vector, layername: str | None = None ) -> Vector: """ create a Vector object from ogr features Parameters ---------- feature a single feature or a list of features ref a reference Vector object to retrieve geo information from layername the name of the output layer; retrieved from `ref` if `None` Returns ------- the new Vector object """ features = feature if isinstance(feature, list) else [feature] layername = layername if layername is not None else ref.layername vec = Vector() vec.addlayer(layername, ref.srs, ref.geomType) feat_def = features[0].GetDefnRef() fields = [feat_def.GetFieldDefn(x) for x in range(0, feat_def.GetFieldCount())] vec.layer.CreateFields(fields) for feat in features: feat2 = ogr.Feature(vec.layer.GetLayerDefn()) feat2.SetFrom(feat) vec.layer.CreateFeature(feat2) vec.init_features() return vec
[docs] def intersect(obj1: Vector, obj2: Vector) -> Vector | None: """ intersect two Vector objects Parameters ---------- obj1 the first vector object; this object is reprojected to the CRS of obj2 if necessary obj2 the second vector object Returns ------- the intersection of obj1 and obj2 if both intersect and None otherwise """ if not isinstance(obj1, Vector) or not isinstance(obj2, Vector): raise RuntimeError('both objects must be of type Vector') obj1 = obj1.clone() obj2 = obj2.clone() obj1.reproject(obj2.srs) ####################################################### # create basic overlap def union(vector: Vector) -> ogr.Geometry: out = ogr.Geometry(ogr.wkbMultiPolygon) for feat in vector.layer: geom = feat.GetGeometryRef() type = geom.GetGeometryName() if type == 'MULTIPOLYGON': for subgeom in geom: out.AddGeometry(subgeom) else: out.AddGeometry(geom) vector.layer.ResetReading() out.Simplify(0) return out # get the union of all polygons of the two layers union1 = union(obj1) union2 = union(obj2) # intersection intersect_base = union1.Intersection(union2) union1 = None union2 = None ####################################################### # compute detailed per-geometry overlaps if intersect_base.GetArea() > 0: intersection = Vector() intersection.addlayer('intersect', obj1.srs, ogr.wkbPolygon) fieldmap = [] for index, fielddef in enumerate([obj1.fieldDefs, obj2.fieldDefs]): for field in fielddef: name = field.GetName() i = 2 while name in intersection.fieldnames: name = '{}_{}'.format(field.GetName(), i) i += 1 fieldmap.append((index, field.GetName(), name)) intersection.addfield(name, type=field.GetType(), width=field.GetWidth()) for feature1 in obj1.layer: geom1 = feature1.GetGeometryRef() if geom1.Intersects(intersect_base): for feature2 in obj2.layer: geom2 = feature2.GetGeometryRef() # select only the intersections if geom2.Intersects(intersect_base): intersect = geom2.Intersection(geom1) fields = {} for item in fieldmap: if item[0] == 0: fields[item[2]] = feature1.GetField(item[1]) else: fields[item[2]] = feature2.GetField(item[1]) intersection.addfeature(intersect, fields) intersect_base = None return intersection
[docs] def set_field( target: Vector | ogr.Feature, name: str, type: int, width: int = 10, values: Any = None ) -> None: """ Wrapper for setting a field. DateTime fields are rounded to milliseconds. Parameters ---------- target the object for which to set the field name the field name type the OGR Field Type (OFT), e.g. `ogr.OFTString`. See :class:`osgeo.ogr.FieldDefn`. width the width of the new field (only for `ogr.OFTString` fields) values an optional list with values for each feature to assign to the new field. If `target` is of type :class:`~spatialist.vector.Vector`, the length must be identical to the number of features. """ type_name = ogr.GetFieldTypeName(type) field_defn = ogr.FieldDefn(name, type) if type == ogr.OFTString: field_defn.SetWidth(width) if isinstance(target, Vector): target.layer.CreateField(field_defn) if type_name in ['String', 'Integer', 'Real', 'Binary', 'DateTime']: method_name = 'SetField' elif type_name in ['StringList', 'IntegerList', 'Integer64', 'Integer64List']: method_name = f'SetField{type_name}' elif type_name == 'RealList': method_name = 'SetFieldDoubleList' else: raise ValueError(f'Unsupported field type: {type_name}') def setter(feature: ogr.Feature, value: Any, field_name: str, method_name: str, type_name: str) -> None: def tz_to_nTZFlag(dt: datetime) -> int: """ Determine OGR nTZFlag from a timezone-aware datetime. """ if dt.tzinfo is None: return 0 # unknown offset = dt.utcoffset() if offset == timezone.utc.utcoffset(None): return 100 # UTC return 1 # assume local (non-UTC, but known) index = feature.GetFieldIndex(field_name) method = getattr(feature, method_name) if type_name == 'DateTime': if isinstance(value, datetime): # Round to milliseconds and normalize value = value + timedelta(microseconds=500) # for rounding, not truncation value = value.replace(microsecond=(value.microsecond // 1000) * 1000) value = [ value.year, value.month, value.day, value.hour, value.minute, value.second + value.microsecond / 1_000_000, tz_to_nTZFlag(value) ] else: raise TypeError("If 'type' is 'DateTime', the value " "must be a datetime.datetime object") method(index, *value) else: method(index, value) if values is not None: if isinstance(target, Vector): if len(values) != target.nfeatures: raise RuntimeError('number of values does not match number of features') target.layer.ResetReading() for i, feature in enumerate(target.layer): setter(feature=feature, value=values[i], field_name=name, method_name=method_name, type_name=type_name) target.layer.SetFeature(feature) target.layer.ResetReading() elif isinstance(target, ogr.Feature): setter(feature=target, value=values, field_name=name, method_name=method_name, type_name=type_name) else: raise TypeError("'target' must be of type spatialist.vector.Vector " "or osgeo.ogr.Feature")
[docs] def wkt2vector(wkt: str | list[str], srs: CRS, layername: str = 'wkt') -> Vector: """ convert well-known text geometries to a Vector object. Parameters ---------- wkt the well-known text description(s). Each geometry will be placed in a separate feature. srs the spatial reference system; see :func:`spatialist.auxil.crsConvert` for options. layername the name of the internal :class:`osgeo.ogr.Layer` object. Returns ------- the vector representation Examples -------- >>> from spatialist.vector import wkt2vector >>> wkt1 = 'POLYGON ((0. 0., 0. 1., 1. 1., 1. 0., 0. 0.))' >>> with wkt2vector(wkt1, srs=4326) as vec: >>> print(vec.getArea()) 1.0 >>> wkt2 = 'POLYGON ((1. 1., 1. 2., 2. 2., 2. 1., 1. 1.))' >>> with wkt2vector([wkt1, wkt2], srs=4326) as vec: >>> print(vec.getArea()) 2.0 """ if isinstance(wkt, str): wkt = [wkt] srs = crsConvert(srs, 'osr') vec = Vector() area = [] for item in wkt: geom = ogr.CreateGeometryFromWkt(item) geom.FlattenTo2D() if not hasattr(vec, 'layer'): vec.addlayer(layername, srs, geom.GetGeometryType()) if geom.GetGeometryName() != 'POINT': area.append(geom.Area()) else: area.append(None) vec.addfeature(geom) geom = None vec.addfield('area', ogr.OFTReal, values=area) return vec
[docs] def vectorize( target: NDArray[Any], reference: Raster, outname: str | None = None, layername: str = 'layer', fieldname: str = 'value', driver: str | None = None ) -> Vector | None: """ Vectorization of an array using :func:`osgeo.gdal.Polygonize`. Parameters ---------- target the input array. Each identified object of pixels with the same value will be converted into a vector feature. reference a reference Raster object to retrieve geo information and extent from. outname the name of the vector file. If `None` a vector object is returned. layername the name of the vector object layer. fieldname the name of the field to contain the raster value for the respective vector feature. driver the vector file type of `outname`. Several extensions are read automatically (see :meth:`Vector.write`). Is ignored if ``outname=None``. """ cols = reference.cols rows = reference.rows meta = reference.raster.GetMetadata() geo = reference.raster.GetGeoTransform() proj = reference.raster.GetProjection() tmp_driver = gdal.GetDriverByName('MEM') tmp = tmp_driver.Create(layername, cols, rows, 1, GDT_Byte) tmp.SetMetadata(meta) tmp.SetGeoTransform(geo) tmp.SetProjection(proj) outband = tmp.GetRasterBand(1) outband.WriteArray(target, 0, 0) try: with Vector() as vec: vec.addlayer(name=layername, srs=proj, geomType=ogr.wkbPolygon) vec.addfield(fieldname, ogr.OFTInteger) gdal.Polygonize(srcBand=outband, maskBand=None, outLayer=vec.layer, iPixValField=0) if outname is not None: vec.write(outfile=outname, driver=driver) else: return vec.clone() except Exception as e: raise e finally: outband = None tmp_driver = None out = None