Source code for spatialist.vector

# -*- coding: utf-8 -*-
################################################################
# OGR wrapper for convenient vector data handling and processing
# John Truckenbrodt 2015-2026
################################################################


import os
import yaml
from datetime import datetime, timezone, timedelta
from osgeo import ogr, osr, gdal
from osgeo.gdalconst import GDT_Byte
from .auxil import crsConvert
from .ancillary import parse_literal
from .sqlite_util import sqlite_setup

import pandas as pd
import geopandas as gpd
from shapely.wkb import loads as wkb_loads

ogr.UseExceptions()
osr.UseExceptions()
gdal.UseExceptions()


[docs] class Vector(object): """ This is intended as a vector meta information handler with options for reading and writing vector data in a convenient manner by simplifying the numerous options provided by the OGR python binding. Parameters ---------- filename: str or None the vector file to read; if filename is `None`, a new in-memory Vector object is created. In this case `driver` is overridden and set to 'MEM'. The following file extensions are auto-detected: .. list_drivers:: vector driver: str the vector file format; needs to be defined if the format cannot be auto-detected from the filename extension """ def __init__(self, filename=None, driver=None): if filename is None: driver = 'MEM' elif isinstance(filename, str): if not os.path.isfile(filename): raise OSError('file does not exist') if driver is None: driver = self.__driver_autodetect(filename) else: raise TypeError('filename must either be str or None') self.filename = filename self.driver = ogr.GetDriverByName(driver) self.vector = self.driver.CreateDataSource('out') if driver == 'MEM' else self.driver.Open(filename) nlayers = self.vector.GetLayerCount() if nlayers > 1: raise RuntimeError('multiple layers are currently not supported') elif nlayers == 1: self.init_layer()
[docs] def __getitem__(self, expression): """ subset the vector object by index or attribute. Parameters ---------- expression: int or str the key or expression to be used for subsetting. See :meth:`osgeo.ogr.Layer.SetAttributeFilter` for details on the expression syntax. Returns ------- Vector a vector object matching the specified criteria Examples -------- Assuming we have a shapefile called `testsites.shp`, which has an attribute `sitename`, we can subset individual sites and write them to new files like so: >>> from spatialist import Vector >>> filename = 'testsites.shp' >>> with Vector(filename)["sitename='site1'"] as site1: >>> site1.write('site1.shp') """ if not isinstance(expression, (int, str)): raise RuntimeError('expression must be of type int or str') expression = parse_literal(expression) if isinstance(expression, str) else expression if isinstance(expression, int): feat = self.getFeatureByIndex(expression) else: self.layer.SetAttributeFilter(expression) feat = self.getfeatures() feat = feat if len(feat) > 0 else None self.layer.SetAttributeFilter('') if feat is None: return None else: return feature2vector(feat, ref=self)
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def __str__(self): vals = dict() vals['proj4'] = self.proj4 vals.update(self.extent) vals['filename'] = self.filename if self.filename is not None else 'memory' vals['geomtype'] = ', '.join(list(set(self.geomTypes))) info = 'class : spatialist Vector object\n' \ 'geometry type : {geomtype}\n' \ 'extent : {xmin:.3f}, {xmax:.3f}, {ymin:.3f}, {ymax:.3f} (xmin, xmax, ymin, ymax)\n' \ 'coord. ref. : {proj4}\n' \ 'data source : {filename}'.format(**vals) return info @staticmethod def __driver_autodetect(filename): path = os.path.dirname(os.path.realpath(__file__)) drivers = yaml.safe_load(open(os.path.join(path, 'drivers_vector.yml'))) extension = os.path.splitext(filename)[1][1:] if extension not in drivers.keys(): message = "the file extension '{}' is not supported. " \ "Please provide the OGR format descriptor via " \ "parameter 'driver' or use one of the supported extensions:\n- .{}" message = message.format(extension, '\n- .'.join(drivers.keys())) raise RuntimeError(message) else: return drivers[extension]
[docs] def addfeature(self, geometry, fields=None): """ add a feature to the vector object from a geometry Parameters ---------- geometry: osgeo.ogr.Geometry the geometry to add as a feature fields: dict or None the field names and values to assign to the new feature Returns ------- """ feature = ogr.Feature(self.layerdef) feature.SetGeometry(geometry) if fields is not None: for field_name, value in fields.items(): if field_name not in self.fieldnames: raise IOError('field "{}" is missing'.format(field_name)) field_defn = feature.GetFieldDefnRef(field_name) field_type = field_defn.GetType() field_type_name = field_defn.GetTypeName() try: set_field(target=feature, name=field_name, type=field_type, values=value) except Exception as e: message = str(e) + (f'\ntrying to set field {field_name} ' f'(type {field_type_name}) to value ' f'{value} (type {type(value)})') raise RuntimeError(message) from e self.layer.CreateFeature(feature) feature = None self.init_features()
[docs] def addfield(self, name, type, width=10, values=None): """ add a field to the vector layer Parameters ---------- name: str the field name type: int the OGR Field Type (OFT), e.g. ogr.OFTString. See :class:`osgeo.ogr.FieldDefn`. width: int the width of the new field (only for ogr.OFTString fields) values: List or None an optional list with values for each feature to assign to the new field. The length must be identical to the number of features. Returns ------- """ set_field(self, name, type, width=width, values=values)
[docs] def addlayer(self, name, srs, geomType): """ add a layer to the vector layer Parameters ---------- name: str the layer name srs: int or str or osgeo.osr.SpatialReference the spatial reference system. See :func:`spatialist.auxil.crsConvert` for options. geomType: int an OGR well-known binary data type. See `Module ogr <https://gdal.org/python/osgeo.ogr-module.html>`_. Returns ------- """ self.vector.CreateLayer(name, crsConvert(srs, 'osr'), geomType) self.init_layer()
[docs] def addvector(self, vec): """ add a vector object to the layer of the current Vector object Parameters ---------- vec: Vector the vector object to add merge: bool merge overlapping polygons? Returns ------- """ vec.layer.ResetReading() for feature in vec.layer: self.layer.CreateFeature(feature) self.init_features() vec.layer.ResetReading()
[docs] def bbox(self, outname=None, driver=None, overwrite=True): """ create a bounding box from the extent of the Vector object Parameters ---------- outname: str or None the name of the vector file to be written; if None, a Vector object is returned driver: str the name of the file format to write overwrite: bool overwrite an already existing file? Returns ------- Vector or None if outname is None, the bounding box Vector object """ if outname is None: return bbox(self.extent, self.srs) else: bbox(self.extent, self.srs, outname=outname, driver=driver, overwrite=overwrite)
[docs] def clone(self): return feature2vector(self.getfeatures(), ref=self)
[docs] def close(self): """ closes the OGR vector file connection Returns ------- """ self.vector = None for feature in self.__features: if feature is not None: feature = None
[docs] def convert2wkt(self, set3D=True): """ export the geometry of each feature as a wkt string Parameters ---------- set3D: bool keep the third (height) dimension? Returns ------- """ features = self.getfeatures() for feature in features: try: feature.geometry().Set3D(set3D) except AttributeError: dim = 3 if set3D else 2 feature.geometry().SetCoordinateDimension(dim) return [feature.geometry().ExportToWkt() for feature in features]
@property def extent(self): """ the extent of the vector object Returns ------- dict a dictionary with keys `xmin`, `xmax`, `ymin`, `ymax` """ return dict(zip(['xmin', 'xmax', 'ymin', 'ymax'], self.layer.GetExtent())) @property def fieldDefs(self): """ Returns ------- list[osgeo.ogr.FieldDefn] the field definition for each field of the Vector object """ return [self.layerdef.GetFieldDefn(x) for x in range(0, self.nfields)] @property def fieldnames(self): """ Returns ------- list of str the names of the fields """ return sorted([field.GetName() for field in self.fieldDefs]) @property def geomType(self): """ Returns ------- int the layer geometry type """ return self.layerdef.GetGeomType() @property def geomTypes(self): """ Returns ------- list the geometry type of each feature """ return [feat.GetGeometryRef().GetGeometryName() for feat in self.getfeatures()]
[docs] def getArea(self): """ Returns ------- float the area of the vector geometries """ return sum([x.GetGeometryRef().GetArea() for x in self.getfeatures()])
[docs] def getFeatureByAttribute(self, fieldname, attribute): """ get features by field attribute Parameters ---------- fieldname: str the name of the queried field attribute: int or str the field value of interest Returns ------- list[osgeo.ogr.Feature] or osgeo.ogr.Feature the feature(s) matching the search query """ attr = attribute.strip() if isinstance(attribute, str) else attribute if fieldname not in self.fieldnames: raise KeyError('invalid field name') out = [] self.layer.ResetReading() for feature in self.layer: field = feature.GetField(fieldname) field = field.strip() if isinstance(field, str) else field if field == attr: out.append(feature.Clone()) self.layer.ResetReading() if len(out) == 0: return None elif len(out) == 1: return out[0] else: return out
[docs] def getFeatureByIndex(self, index): """ get features by numerical (positional) index Parameters ---------- index: int the queried index Returns ------- osgeo.ogr.Feature the requested feature """ feature = self.layer[index] if feature is None: feature = self.getfeatures()[index] return feature
[docs] def getfeatures(self): """ Returns ------- list[osgeo.ogr.Feature] a list of cloned features """ self.layer.ResetReading() features = [x.Clone() for x in self.layer] self.layer.ResetReading() return features
[docs] def getProjection(self, type): """ get the CRS of the Vector object. See :func:`spatialist.auxil.crsConvert`. Parameters ---------- type: str the type of projection required. Returns ------- int or str or osgeo.osr.SpatialReference the output CRS """ return crsConvert(self.layer.GetSpatialRef(), type)
[docs] def getUniqueAttributes(self, fieldname): """ Parameters ---------- fieldname: str the name of the field of interest Returns ------- list of str or int the unique attributes of the field """ self.layer.ResetReading() attributes = list(set([x.GetField(fieldname) for x in self.layer])) self.layer.ResetReading() return sorted(attributes)
[docs] def init_features(self): """ delete all in-memory features Returns ------- """ del self.__features self.__features = [None] * self.nfeatures
[docs] def init_layer(self): """ initialize a layer object Returns ------- """ self.layer = self.vector.GetLayer() self.__features = [None] * self.nfeatures
@property def layerdef(self): """ Returns ------- osgeo.ogr.FeatureDefn the layer's feature definition """ return self.layer.GetLayerDefn() @property def layername(self): """ Returns ------- str the name of the layer """ return self.layer.GetName()
[docs] def load(self): """ load all feature into memory Returns ------- """ self.layer.ResetReading() for i in range(self.nfeatures): if self.__features[i] is None: self.__features[i] = self.layer[i]
@property def nfeatures(self): """ Returns ------- int the number of features """ return len(self.layer) @property def nfields(self): """ Returns ------- int the number of fields """ return self.layerdef.GetFieldCount() @property def nlayers(self): """ Returns ------- int the number of layers """ return self.vector.GetLayerCount() @property def proj4(self): """ Returns ------- str the CRS in PRO4 format """ return self.srs.ExportToProj4().strip()
[docs] def reproject(self, projection): """ in-memory reprojection Parameters ---------- projection: int or str or osgeo.osr.SpatialReference the target CRS. See :func:`spatialist.auxil.crsConvert`. Returns ------- """ srs_out = crsConvert(projection, 'osr') # the following check was found to not work in GDAL 3.0.1; likely a bug # if self.srs.IsSame(srs_out) == 0: if self.getProjection('epsg') != crsConvert(projection, 'epsg'): # create the CoordinateTransformation coordTrans = osr.CoordinateTransformation(self.srs, srs_out) layername = self.layername geomType = self.geomType features = self.getfeatures() feat_def = features[0].GetDefnRef() fields = [feat_def.GetFieldDefn(x) for x in range(0, feat_def.GetFieldCount())] self.__init__() self.addlayer(layername, srs_out, geomType) self.layer.CreateFields(fields) for feature in features: geom = feature.GetGeometryRef() geom.Transform(coordTrans) newfeature = feature.Clone() newfeature.SetGeometry(geom) self.layer.CreateFeature(newfeature) newfeature = None self.init_features()
[docs] def setCRS(self, crs): """ directly reset the spatial reference system of the vector object. This is not going to reproject the Vector object, see :meth:`reproject` instead. Parameters ---------- crs: int or str or osgeo.osr.SpatialReference the input CRS Returns ------- Example ------- >>> site = Vector('shape.shp') >>> site.setCRS('+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs ') """ # try to convert the input crs to osr.SpatialReference srs_out = crsConvert(crs, 'osr') # save all relevant info from the existing vector object layername = self.layername geomType = self.geomType layer_definition = ogr.Feature(self.layer.GetLayerDefn()) fields = [layer_definition.GetFieldDefnRef(x) for x in range(layer_definition.GetFieldCount())] features = self.getfeatures() # initialize a new vector object and create a layer self.__init__() self.addlayer(layername, srs_out, geomType) # add the fields to new layer self.layer.CreateFields(fields) # add the features to the newly created layer for feat in features: self.layer.CreateFeature(feat) self.init_features()
@property def srs(self): """ Returns ------- osgeo.osr.SpatialReference the geometry's spatial reference system """ return self.layer.GetSpatialRef()
[docs] def to_geopandas(self): """ Convert the object to a geopandas GeoDataFrame. `DateTime` fields are converted to :class:`pandas.Timestamp` using :func:`pandas.to_datetime`. Returns ------- geopandas.GeoDataFrame See Also -------- osgeo.ogr.Feature.items """ field_types = {x.GetName(): x.GetTypeName() for x in self.fieldDefs} features = [] self.layer.ResetReading() for feature in self.layer: geom = feature.GetGeometryRef() geom_wkb = geom.ExportToWkb() properties = feature.items() properties["geometry"] = wkb_loads(bytes(geom_wkb)) features.append(properties) self.layer.ResetReading() gdf = gpd.GeoDataFrame(features, crs=self.srs.ExportToWkt()) for field_name, field_type in field_types.items(): if field_type == "DateTime": gdf[field_name] = pd.to_datetime(arg=gdf[field_name], format='ISO8601') return gdf
[docs] def write(self, outfile, driver=None, overwrite=True): """ write the Vector object to a file Parameters ---------- outfile: the name of the file to write; the following extensions are automatically detected for determining the format driver: .. list_drivers:: vector driver: str or None the output file format; default None: try to autodetect from the file name extension overwrite: bool overwrite an already existing file? Returns ------- """ if driver is None: driver = self.__driver_autodetect(outfile) driver = ogr.GetDriverByName(driver) if os.path.exists(outfile): if overwrite: driver.DeleteDataSource(outfile) else: raise RuntimeError('target file already exists') ds_out = driver.CreateDataSource(outfile) ds_out.CopyLayer(self.layer, self.layer.GetName()) ds_out = driver = None
[docs] def bbox(coordinates, crs, outname=None, driver=None, overwrite=True, buffer=None): """ create a bounding box vector object or file. The CRS can be in either WKT, EPSG or PROJ4 format Parameters ---------- coordinates: dict a dictionary containing numerical variables with keys `xmin`, `xmax`, `ymin` and `ymax`. crs: int or str or osgeo.osr.SpatialReference the coordinate reference system of the `coordinates`. See :func:`~spatialist.auxil.crsConvert` for options. outname: str the file to write to. If `None`, the bounding box is returned as :class:`~spatialist.vector.Vector` object. driver: str the output file format; needs to be defined if the format cannot be auto-detected from the filename extension. overwrite: bool overwrite an existing file? buffer: None or int or float or tuple[int or float] a buffer to add around `coordinates`. Default None: do not add a buffer. A tuple is interpreted as (x buffer, y buffer). Returns ------- Vector or None the bounding box Vector object """ if buffer is not None: coordinates = coordinates.copy() if isinstance(buffer, tuple): xbuffer, ybuffer = buffer else: xbuffer = ybuffer = buffer coordinates['xmin'] -= xbuffer coordinates['xmax'] += xbuffer coordinates['ymin'] -= ybuffer coordinates['ymax'] += ybuffer srs = crsConvert(crs, 'osr') ring = ogr.Geometry(ogr.wkbLinearRing) ring.AddPoint(coordinates['xmin'], coordinates['ymin']) ring.AddPoint(coordinates['xmax'], coordinates['ymin']) ring.AddPoint(coordinates['xmax'], coordinates['ymax']) ring.AddPoint(coordinates['xmin'], coordinates['ymax']) ring.CloseRings() geom = ogr.Geometry(ogr.wkbPolygon) geom.AddGeometry(ring) geom.FlattenTo2D() bbox = Vector() bbox.addlayer('bbox', srs, geom.GetGeometryType()) bbox.addfield('area', ogr.OFTReal) bbox.addfeature(geom, fields={'area': geom.Area()}) geom = None if outname is None: return bbox else: bbox.write(outfile=outname, driver=driver, overwrite=overwrite)
[docs] def boundary(vectorobject, expression=None, outname=None): """ Get the boundary of the largest geometry as new vector object. The following steps are performed: - find the largest geometry matching the expression - compute the geometry's boundary using :meth:`osgeo.ogr.Geometry.Boundary`, returning a MULTILINE geometry - select the longest line of the MULTILINE geometry - create a closed linear ring from this longest line - create a polygon from this linear ring - create a new :class:`Vector` object and add the newly created polygon Parameters ---------- vectorobject: Vector the vector object containing multiple polygon geometries, e.g. all geometries with a certain value in one field. expression: str or None the SQL expression to select the candidates for the largest geometry. outname: str or None the name of the output vector file; if None, an in-memory object of type :class:`Vector` is returned. Returns ------- Vector or None if `outname` is `None`, a vector object pointing to an in-memory dataset else `None` """ largest = None area = None vectorobject.layer.ResetReading() if expression is not None: vectorobject.layer.SetAttributeFilter(expression) for feat in vectorobject.layer: geom = feat.GetGeometryRef() geom_area = geom.GetArea() if (largest is None) or (geom_area > area): largest = feat.GetFID() area = geom_area if expression is not None: vectorobject.layer.SetAttributeFilter('') vectorobject.layer.ResetReading() feat_major = vectorobject.layer.GetFeature(largest) major = feat_major.GetGeometryRef() boundary = major.Boundary() if boundary.GetGeometryName() == 'LINESTRING': longest = boundary else: # MULTILINESTRING longest = None for line in boundary: if (longest is None) or (line.Length() > longest.Length()): longest = line points = longest.GetPoints() ring = ogr.Geometry(ogr.wkbLinearRing) for point in points: ring.AddPoint_2D(*point) ring.CloseRings() poly = ogr.Geometry(ogr.wkbPolygon) poly.AddGeometry(ring) vec_out = Vector() vec_out.addlayer('layer', vectorobject.layer.GetSpatialRef(), poly.GetGeometryType()) vec_out.addfield('area', ogr.OFTReal) fields = {'area': poly.Area()} vec_out.addfeature(poly, fields=fields) ring = None geom = None line = None longest = None poly = None boundary = None major = None feat_major = None if outname is not None: vec_out.write(outname) vec_out.close() else: return vec_out
[docs] def centerdist(obj1, obj2): """ Get the center distance between two vector objects. Parameters ---------- obj1: Vector obj2: Vector Returns ------- float """ if not isinstance(obj1, Vector) or isinstance(obj2, Vector): raise IOError('both objects must be of type Vector') feature1 = obj1.getFeatureByIndex(0) geometry1 = feature1.GetGeometryRef() center1 = geometry1.Centroid() feature2 = obj2.getFeatureByIndex(0) geometry2 = feature2.GetGeometryRef() center2 = geometry2.Centroid() return center1.Distance(center2)
[docs] def dissolve(infile, outfile, field, layername=None): """ dissolve the polygons of a vector file by an attribute field Parameters ---------- infile: str the input vector file outfile: str the output shapefile field: str the field name to merge the polygons by layername: str the name of the output vector layer; If set to None the layername will be the basename of infile without extension Returns ------- """ with Vector(infile) as vec: srs = vec.srs feat = vec.layer[0] d = feat.GetFieldDefnRef(field) width = d.width type = d.type feat = None layername = layername if layername is not None else os.path.splitext(os.path.basename(infile))[0] # the following can be used if GDAL was compiled with the spatialite extension # not tested, might need some additional/different lines # with Vector(infile) as vec: # vec.vector.ExecuteSQL('SELECT ST_Union(geometry), {0} FROM {1} GROUP BY {0}'.format(field, vec.layername), # dialect='SQLITE') # vec.write(outfile) conn = sqlite_setup(extensions=['spatialite', 'gdal']) conn.execute('CREATE VIRTUAL TABLE merge USING VirtualOGR("{}");'.format(infile)) select = conn.execute('SELECT {0},asText(ST_Union(geometry)) as geometry FROM merge GROUP BY {0};'.format(field)) fetch = select.fetchall() with Vector() as merge: merge.addlayer(layername, srs, ogr.wkbPolygon) merge.addfield(field, type=type, width=width) for i in range(len(fetch)): merge.addfeature(ogr.CreateGeometryFromWkt(fetch[i][1]), {field: fetch[i][0]}) merge.write(outfile) conn.close()
[docs] def feature2vector(feature, ref, layername=None): """ create a Vector object from ogr features Parameters ---------- feature: list[osgeo.ogr.Feature] or osgeo.ogr.Feature a single feature or a list of features ref: Vector a reference Vector object to retrieve geo information from layername: str or None the name of the output layer; retrieved from `ref` if `None` Returns ------- Vector the new Vector object """ features = feature if isinstance(feature, list) else [feature] layername = layername if layername is not None else ref.layername vec = Vector() vec.addlayer(layername, ref.srs, ref.geomType) feat_def = features[0].GetDefnRef() fields = [feat_def.GetFieldDefn(x) for x in range(0, feat_def.GetFieldCount())] vec.layer.CreateFields(fields) for feat in features: feat2 = ogr.Feature(vec.layer.GetLayerDefn()) feat2.SetFrom(feat) vec.layer.CreateFeature(feat2) vec.init_features() return vec
[docs] def intersect(obj1, obj2): """ intersect two Vector objects Parameters ---------- obj1: Vector the first vector object; this object is reprojected to the CRS of obj2 if necessary obj2: Vector the second vector object Returns ------- Vector or None the intersection of obj1 and obj2 if both intersect and None otherwise """ if not isinstance(obj1, Vector) or not isinstance(obj2, Vector): raise RuntimeError('both objects must be of type Vector') obj1 = obj1.clone() obj2 = obj2.clone() obj1.reproject(obj2.srs) ####################################################### # create basic overlap def union(vector): out = ogr.Geometry(ogr.wkbMultiPolygon) for feat in vector.layer: geom = feat.GetGeometryRef() type = geom.GetGeometryName() if type == 'MULTIPOLYGON': for subgeom in geom: out.AddGeometry(subgeom) else: out.AddGeometry(geom) vector.layer.ResetReading() out.Simplify(0) return out # get the union of all polygons of the two layers union1 = union(obj1) union2 = union(obj2) # intersection intersect_base = union1.Intersection(union2) union1 = None union2 = None ####################################################### # compute detailed per-geometry overlaps if intersect_base.GetArea() > 0: intersection = Vector() intersection.addlayer('intersect', obj1.srs, ogr.wkbPolygon) fieldmap = [] for index, fielddef in enumerate([obj1.fieldDefs, obj2.fieldDefs]): for field in fielddef: name = field.GetName() i = 2 while name in intersection.fieldnames: name = '{}_{}'.format(field.GetName(), i) i += 1 fieldmap.append((index, field.GetName(), name)) intersection.addfield(name, type=field.GetType(), width=field.GetWidth()) for feature1 in obj1.layer: geom1 = feature1.GetGeometryRef() if geom1.Intersects(intersect_base): for feature2 in obj2.layer: geom2 = feature2.GetGeometryRef() # select only the intersections if geom2.Intersects(intersect_base): intersect = geom2.Intersection(geom1) fields = {} for item in fieldmap: if item[0] == 0: fields[item[2]] = feature1.GetField(item[1]) else: fields[item[2]] = feature2.GetField(item[1]) intersection.addfeature(intersect, fields) intersect_base = None return intersection
[docs] def set_field(target, name, type, width=10, values=None): """ Wrapper for setting a field. DateTime fields are rounded to milliseconds. Parameters ---------- target: Vector or osgeo.ogr.Feature the object for which to set the field name: str the field name type: int the OGR Field Type (OFT), e.g. `ogr.OFTString`. See :class:`osgeo.ogr.FieldDefn`. width: int the width of the new field (only for `ogr.OFTString` fields) values: List or None an optional list with values for each feature to assign to the new field. If `target` is of type :class:`~spatialist.vector.Vector`, the length must be identical to the number of features. Returns ------- """ type_name = ogr.GetFieldTypeName(type) field_defn = ogr.FieldDefn(name, type) if type == ogr.OFTString: field_defn.SetWidth(width) if isinstance(target, Vector): target.layer.CreateField(field_defn) if type_name in ['String', 'Integer', 'Real', 'Binary', 'DateTime']: method_name = 'SetField' elif type_name in ['StringList', 'IntegerList', 'Integer64', 'Integer64List']: method_name = f'SetField{type_name}' elif type_name == 'RealList': method_name = 'SetFieldDoubleList' else: raise ValueError(f'Unsupported field type: {type_name}') def setter(feature, value, field_name, method_name, type_name): def tz_to_nTZFlag(dt): """ Determine OGR nTZFlag from a timezone-aware datetime. """ if dt.tzinfo is None: return 0 # unknown offset = dt.utcoffset() if offset == timezone.utc.utcoffset(None): return 100 # UTC return 1 # assume local (non-UTC, but known) index = feature.GetFieldIndex(field_name) method = getattr(feature, method_name) if type_name == 'DateTime': if isinstance(value, datetime): # Round to milliseconds and normalize value = value + timedelta(microseconds=500) # for rounding, not truncation value = value.replace(microsecond=(value.microsecond // 1000) * 1000) value = [ value.year, value.month, value.day, value.hour, value.minute, value.second + value.microsecond / 1_000_000, tz_to_nTZFlag(value) ] else: raise TypeError("If 'type' is 'DateTime', the value " "must be a datetime.datetime object") method(index, *value) else: method(index, value) if values is not None: if isinstance(target, Vector): if len(values) != target.nfeatures: raise RuntimeError('number of values does not match number of features') target.layer.ResetReading() for i, feature in enumerate(target.layer): setter(feature=feature, value=values[i], field_name=name, method_name=method_name, type_name=type_name) target.layer.SetFeature(feature) target.layer.ResetReading() elif isinstance(target, ogr.Feature): setter(feature=target, value=values, field_name=name, method_name=method_name, type_name=type_name) else: raise TypeError("'target' must be of type spatialist.vector.Vector " "or osgeo.ogr.Feature")
[docs] def wkt2vector(wkt, srs, layername='wkt'): """ convert well-known text geometries to a Vector object. Parameters ---------- wkt: str or list[str] the well-known text description(s). Each geometry will be placed in a separate feature. srs: int or str the spatial reference system; see :func:`spatialist.auxil.crsConvert` for options. layername: str the name of the internal :class:`osgeo.ogr.Layer` object. Returns ------- Vector the vector representation Examples -------- >>> from spatialist.vector import wkt2vector >>> wkt1 = 'POLYGON ((0. 0., 0. 1., 1. 1., 1. 0., 0. 0.))' >>> with wkt2vector(wkt1, srs=4326) as vec: >>> print(vec.getArea()) 1.0 >>> wkt2 = 'POLYGON ((1. 1., 1. 2., 2. 2., 2. 1., 1. 1.))' >>> with wkt2vector([wkt1, wkt2], srs=4326) as vec: >>> print(vec.getArea()) 2.0 """ if isinstance(wkt, str): wkt = [wkt] srs = crsConvert(srs, 'osr') vec = Vector() area = [] for item in wkt: geom = ogr.CreateGeometryFromWkt(item) geom.FlattenTo2D() if not hasattr(vec, 'layer'): vec.addlayer(layername, srs, geom.GetGeometryType()) if geom.GetGeometryName() != 'POINT': area.append(geom.Area()) else: area.append(None) vec.addfeature(geom) geom = None vec.addfield('area', ogr.OFTReal, values=area) return vec
[docs] def vectorize(target, reference, outname=None, layername='layer', fieldname='value', driver=None): """ Vectorization of an array using :func:`osgeo.gdal.Polygonize`. Parameters ---------- target: numpy.ndarray the input array. Each identified object of pixels with the same value will be converted into a vector feature. outname: str or None the name of the vector file. If `None` a vector object is returned. reference: Raster a reference Raster object to retrieve geo information and extent from. layername: str the name of the vector object layer. fieldname: str the name of the field to contain the raster value for the respective vector feature. driver: str or None the vector file type of `outname`. Several extensions are read automatically (see :meth:`Vector.write`). Is ignored if ``outname=None``. Returns ------- """ cols = reference.cols rows = reference.rows meta = reference.raster.GetMetadata() geo = reference.raster.GetGeoTransform() proj = reference.raster.GetProjection() tmp_driver = gdal.GetDriverByName('MEM') tmp = tmp_driver.Create(layername, cols, rows, 1, GDT_Byte) tmp.SetMetadata(meta) tmp.SetGeoTransform(geo) tmp.SetProjection(proj) outband = tmp.GetRasterBand(1) outband.WriteArray(target, 0, 0) try: with Vector() as vec: vec.addlayer(name=layername, srs=proj, geomType=ogr.wkbPolygon) vec.addfield(fieldname, ogr.OFTInteger) gdal.Polygonize(srcBand=outband, maskBand=None, outLayer=vec.layer, iPixValField=0) if outname is not None: vec.write(outfile=outname, driver=driver) else: return vec.clone() except Exception as e: raise e finally: outband = None tmp_driver = None out = None