Source code for spatialist.vector

# -*- coding: utf-8 -*-
# OGR wrapper for convenient vector data handling and processing
# John Truckenbrodt 2015-2019

import os
import yaml
from osgeo import ogr, osr

from .auxil import crsConvert
from .ancillary import parse_literal
from .sqlite_util import sqlite_setup


[docs]class Vector(object): """ This is intended as a vector meta information handler with options for reading and writing vector data in a convenient manner by simplifying the numerous options provided by the OGR python binding. Parameters ---------- filename: str or None the vector file to read; if filename is `None`, a new in-memory Vector object is created. In this case `driver` is overridden and set to 'Memory'. The following file extensions are auto-detected: .. list_drivers:: vector driver: str the vector file format; needs to be defined if the format cannot be auto-detected from the filename extension """ def __init__(self, filename=None, driver=None): if filename is None: driver = 'Memory' elif isinstance(filename, str): if not os.path.isfile(filename): raise OSError('file does not exist') if driver is None: driver = self.__driver_autodetect(filename) else: raise TypeError('filename must either be str or None') self.filename = filename self.driver = ogr.GetDriverByName(driver) self.vector = self.driver.CreateDataSource('out') if driver == 'Memory' else self.driver.Open(filename) nlayers = self.vector.GetLayerCount() if nlayers > 1: raise RuntimeError('multiple layers are currently not supported') elif nlayers == 1: self.init_layer()
[docs] def __getitem__(self, expression): """ subset the vector object by index or attribute. Parameters ---------- expression: int or str the key or expression to be used for subsetting. See :osgeo:meth:`ogr.Layer.SetAttributeFilter` for details on the expression syntax. Returns ------- Vector a vector object matching the specified criteria Examples -------- Assuming we have a shapefile called `testsites.shp`, which has an attribute `sitename`, we can subset individual sites and write them to new files like so: >>> from spatialist import Vector >>> filename = 'testsites.shp' >>> with Vector(filename)["sitename='site1'"] as site1: >>> site1.write('site1.shp') """ if not isinstance(expression, (int, str)): raise RuntimeError('expression must be of type int or str') expression = parse_literal(expression) if isinstance(expression, str) else expression if isinstance(expression, int): feat = self.getFeatureByIndex(expression) else: self.layer.SetAttributeFilter(expression) feat = self.getfeatures() feat = feat if len(feat) > 0 else None self.layer.SetAttributeFilter('') if feat is None: return None else: return feature2vector(feat, ref=self)
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def __str__(self): vals = dict() vals['proj4'] = self.proj4 vals.update(self.extent) vals['filename'] = self.filename if self.filename is not None else 'memory' vals['geomtype'] = ', '.join(list(set(self.geomTypes))) info = 'class : spatialist Vector object\n' \ 'geometry type : {geomtype}\n' \ 'extent : {xmin:.3f}, {xmax:.3f}, {ymin:.3f}, {ymax:.3f} (xmin, xmax, ymin, ymax)\n' \ 'coord. ref. : {proj4}\n' \ 'data source : {filename}'.format(**vals) return info @staticmethod def __driver_autodetect(filename): path = os.path.dirname(os.path.realpath(__file__)) drivers = yaml.safe_load(open(os.path.join(path, 'drivers_vector.yml'))) extension = os.path.splitext(filename)[1][1:] if extension not in drivers.keys(): message = "the file extension '{}' is not supported. " \ "Please provide the OGR format descriptor via " \ "parameter 'driver' or use one of the supported extensions:\n- .{}" message = message.format(extension, '\n- .'.join(drivers.keys())) raise RuntimeError(message) else: return drivers[extension]
[docs] def addfeature(self, geometry, fields=None): """ add a feature to the vector object from a geometry Parameters ---------- geometry: :osgeo:class:`ogr.Geometry` the geometry to add as a feature fields: dict or None the field names and values to assign to the new feature Returns ------- """ feature = ogr.Feature(self.layerdef) feature.SetGeometry(geometry) if fields is not None: for fieldname, value in fields.items(): if fieldname not in self.fieldnames: raise IOError('field "{}" is missing'.format(fieldname)) try: feature.SetField(fieldname, value) except NotImplementedError as e: fieldindex = self.fieldnames.index(fieldname) fieldtype = feature.GetFieldDefnRef(fieldindex).GetTypeName() message = str(e) + '\ntrying to set field {} (type {}) to value {} (type {})' message = message.format(fieldname, fieldtype, value, type(value)) raise (NotImplementedError(message)) self.layer.CreateFeature(feature) feature = None self.init_features()
[docs] def addfield(self, name, type, width=10): """ add a field to the vector layer Parameters ---------- name: str the field name type: int the OGR Field Type (OFT), e.g. ogr.OFTString. See `Module ogr <>`_. width: int the width of the new field (only for ogr.OFTString fields) Returns ------- """ fieldDefn = ogr.FieldDefn(name, type) if type == ogr.OFTString: fieldDefn.SetWidth(width) self.layer.CreateField(fieldDefn)
[docs] def addlayer(self, name, srs, geomType): """ add a layer to the vector layer Parameters ---------- name: str the layer name srs: int, str or :osgeo:class:`osr.SpatialReference` the spatial reference system. See :func:`spatialist.auxil.crsConvert` for options. geomType: int an OGR well-known binary data type. See `Module ogr <>`_. Returns ------- """ self.vector.CreateLayer(name, srs, geomType) self.init_layer()
[docs] def addvector(self, vec): """ add a vector object to the layer of the current Vector object Parameters ---------- vec: Vector the vector object to add merge: bool merge overlapping polygons? Returns ------- """ vec.layer.ResetReading() for feature in vec.layer: self.layer.CreateFeature(feature) self.init_features() vec.layer.ResetReading()
[docs] def bbox(self, outname=None, driver=None, overwrite=True): """ create a bounding box from the extent of the Vector object Parameters ---------- outname: str or None the name of the vector file to be written; if None, a Vector object is returned driver: str the name of the file format to write overwrite: bool overwrite an already existing file? Returns ------- Vector or None if outname is None, the bounding box Vector object """ if outname is None: return bbox(self.extent, self.srs) else: bbox(self.extent, self.srs, outname=outname, driver=driver, overwrite=overwrite)
def clone(self): return feature2vector(self.getfeatures(), ref=self)
[docs] def close(self): """ closes the OGR vector file connection Returns ------- """ self.vector = None for feature in self.__features: if feature is not None: feature = None
[docs] def convert2wkt(self, set3D=True): """ export the geometry of each feature as a wkt string Parameters ---------- set3D: bool keep the third (height) dimension? Returns ------- """ features = self.getfeatures() for feature in features: try: feature.geometry().Set3D(set3D) except AttributeError: dim = 3 if set3D else 2 feature.geometry().SetCoordinateDimension(dim) return [feature.geometry().ExportToWkt() for feature in features]
@property def extent(self): """ the extent of the vector object Returns ------- dict a dictionary with keys `xmin`, `xmax`, `ymin`, `ymax` """ return dict(zip(['xmin', 'xmax', 'ymin', 'ymax'], self.layer.GetExtent())) @property def fieldDefs(self): """ Returns ------- list of :osgeo:class:`ogr.FieldDefn` the field definition for each field of the Vector object """ return [self.layerdef.GetFieldDefn(x) for x in range(0, self.nfields)] @property def fieldnames(self): """ Returns ------- list of str the names of the fields """ return sorted([field.GetName() for field in self.fieldDefs]) @property def geomType(self): """ Returns ------- int the layer geometry type """ return self.layerdef.GetGeomType() @property def geomTypes(self): """ Returns ------- list the geometry type of each feature """ return [feat.GetGeometryRef().GetGeometryName() for feat in self.getfeatures()]
[docs] def getArea(self): """ Returns ------- float the area of the vector geometries """ return sum([x.GetGeometryRef().GetArea() for x in self.getfeatures()])
[docs] def getFeatureByAttribute(self, fieldname, attribute): """ get features by field attribute Parameters ---------- fieldname: str the name of the queried field attribute: int or str the field value of interest Returns ------- list of :osgeo:class:`ogr.Feature` or :osgeo:class:`ogr.Feature` the feature(s) matching the search query """ attr = attribute.strip() if isinstance(attribute, str) else attribute if fieldname not in self.fieldnames: raise KeyError('invalid field name') out = [] self.layer.ResetReading() for feature in self.layer: field = feature.GetField(fieldname) field = field.strip() if isinstance(field, str) else field if field == attr: out.append(feature.Clone()) self.layer.ResetReading() if len(out) == 0: return None elif len(out) == 1: return out[0] else: return out
[docs] def getFeatureByIndex(self, index): """ get features by numerical (positional) index Parameters ---------- index: int the queried index Returns ------- :osgeo:class:`ogr.Feature` the requested feature """ feature = self.layer[index] if feature is None: feature = self.getfeatures()[index] return feature
[docs] def getfeatures(self): """ Returns ------- list of :osgeo:class:`ogr.Feature` a list of cloned features """ self.layer.ResetReading() features = [x.Clone() for x in self.layer] self.layer.ResetReading() return features
[docs] def getProjection(self, type): """ get the CRS of the Vector object. See :func:`spatialist.auxil.crsConvert`. Parameters ---------- type: str the type of projection required. Returns ------- int, str or :osgeo:class:`osr.SpatialReference` the output CRS """ return crsConvert(self.layer.GetSpatialRef(), type)
[docs] def getUniqueAttributes(self, fieldname): """ Parameters ---------- fieldname: str the name of the field of interest Returns ------- list of str or int the unique attributes of the field """ self.layer.ResetReading() attributes = list(set([x.GetField(fieldname) for x in self.layer])) self.layer.ResetReading() return sorted(attributes)
[docs] def init_features(self): """ delete all in-memory features Returns ------- """ del self.__features self.__features = [None] * self.nfeatures
[docs] def init_layer(self): """ initialize a layer object Returns ------- """ self.layer = self.vector.GetLayer() self.__features = [None] * self.nfeatures
@property def layerdef(self): """ Returns ------- :osgeo:class:`ogr.FeatureDefn` the layer's feature definition """ return self.layer.GetLayerDefn() @property def layername(self): """ Returns ------- str the name of the layer """ return self.layer.GetName()
[docs] def load(self): """ load all feature into memory Returns ------- """ self.layer.ResetReading() for i in range(self.nfeatures): if self.__features[i] is None: self.__features[i] = self.layer[i]
@property def nfeatures(self): """ Returns ------- int the number of features """ return len(self.layer) @property def nfields(self): """ Returns ------- int the number of fields """ return self.layerdef.GetFieldCount() @property def nlayers(self): """ Returns ------- int the number of layers """ return self.vector.GetLayerCount() @property def proj4(self): """ Returns ------- str the CRS in PRO4 format """ return self.srs.ExportToProj4().strip()
[docs] def reproject(self, projection): """ in-memory reprojection Parameters ---------- projection: int, str, :osgeo:class:`osr.SpatialReference` the target CRS. See :func:`spatialist.auxil.crsConvert`. Returns ------- """ srs_out = crsConvert(projection, 'osr') # the following check was found to not work in GDAL 3.0.1; likely a bug # if self.srs.IsSame(srs_out) == 0: if self.getProjection('epsg') != crsConvert(projection, 'epsg'): # create the CoordinateTransformation coordTrans = osr.CoordinateTransformation(self.srs, srs_out) layername = self.layername geomType = self.geomType features = self.getfeatures() feat_def = features[0].GetDefnRef() fields = [feat_def.GetFieldDefn(x) for x in range(0, feat_def.GetFieldCount())] self.__init__() self.addlayer(layername, srs_out, geomType) self.layer.CreateFields(fields) for feature in features: geom = feature.GetGeometryRef() geom.Transform(coordTrans) newfeature = feature.Clone() newfeature.SetGeometry(geom) self.layer.CreateFeature(newfeature) newfeature = None self.init_features()
[docs] def setCRS(self, crs): """ directly reset the spatial reference system of the vector object. This is not going to reproject the Vector object, see :meth:`reproject` instead. Parameters ---------- crs: int, str, :osgeo:class:`osr.SpatialReference` the input CRS Returns ------- Example ------- >>> site = Vector('shape.shp') >>> site.setCRS('+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs ') """ # try to convert the input crs to osr.SpatialReference srs_out = crsConvert(crs, 'osr') # save all relevant info from the existing vector object layername = self.layername geomType = self.geomType layer_definition = ogr.Feature(self.layer.GetLayerDefn()) fields = [layer_definition.GetFieldDefnRef(x) for x in range(layer_definition.GetFieldCount())] features = self.getfeatures() # initialize a new vector object and create a layer self.__init__() self.addlayer(layername, srs_out, geomType) # add the fields to new layer self.layer.CreateFields(fields) # add the features to the newly created layer for feat in features: self.layer.CreateFeature(feat) self.init_features()
@property def srs(self): """ Returns ------- :osgeo:class:`osr.SpatialReference` the geometry's spatial reference system """ return self.layer.GetSpatialRef()
[docs] def write(self, outfile, driver=None, overwrite=True): """ write the Vector object to a file Parameters ---------- outfile: the name of the file to write; the following extensions are automatically detected for determining the format driver: .. list_drivers:: vector driver: str the output file format; needs to be defined if the format cannot be auto-detected from the filename extension overwrite: bool overwrite an already existing file? Returns ------- """ if driver is None: driver = self.__driver_autodetect(outfile) driver = ogr.GetDriverByName(driver) if os.path.exists(outfile): if overwrite: driver.DeleteDataSource(outfile) else: raise RuntimeError('target file already exists') outdataset = driver.CreateDataSource(outfile) outlayer = outdataset.CreateLayer(name=self.layername, srs=self.srs, geom_type=self.geomType) outlayerdef = outlayer.GetLayerDefn() for fieldDef in self.fieldDefs: outlayer.CreateField(fieldDef) self.layer.ResetReading() for feature in self.layer: outFeature = ogr.Feature(outlayerdef) outFeature.SetGeometry(feature.GetGeometryRef()) for name in self.fieldnames: outFeature.SetField(name, feature.GetField(name)) # add the feature to the shapefile outlayer.CreateFeature(outFeature) outFeature = None self.layer.ResetReading() outdataset = None
[docs]def bbox(coordinates, crs, outname=None, driver=None, overwrite=True): """ create a bounding box vector object or shapefile from coordinates and coordinate reference system. The CRS can be in either WKT, EPSG or PROJ4 format Parameters ---------- coordinates: dict a dictionary containing numerical variables with keys `xmin`, `xmax`, `ymin` and `ymax` crs: int, str, :osgeo:class:`osr.SpatialReference` the CRS of the `coordinates`. See :func:`~spatialist.auxil.crsConvert` for options. outname: str the file to write to. If `None`, the bounding box is returned as :class:`~spatialist.vector.Vector` object driver: str the output file format; needs to be defined if the format cannot be auto-detected from the filename extension overwrite: bool overwrite an existing file? Returns ------- Vector or None the bounding box Vector object """ srs = crsConvert(crs, 'osr') ring = ogr.Geometry(ogr.wkbLinearRing) ring.AddPoint(coordinates['xmin'], coordinates['ymin']) ring.AddPoint(coordinates['xmin'], coordinates['ymax']) ring.AddPoint(coordinates['xmax'], coordinates['ymax']) ring.AddPoint(coordinates['xmax'], coordinates['ymin']) ring.CloseRings() geom = ogr.Geometry(ogr.wkbPolygon) geom.AddGeometry(ring) geom.FlattenTo2D() bbox = Vector(driver='Memory') bbox.addlayer('bbox', srs, geom.GetGeometryType()) bbox.addfield('area', ogr.OFTReal) bbox.addfeature(geom, fields={'area': geom.Area()}) geom = None if outname is None: return bbox else: bbox.write(outfile=outname, driver=driver, overwrite=overwrite)
def centerdist(obj1, obj2): if not isinstance(obj1, Vector) or isinstance(obj2, Vector): raise IOError('both objects must be of type Vector') feature1 = obj1.getFeatureByIndex(0) geometry1 = feature1.GetGeometryRef() center1 = geometry1.Centroid() feature2 = obj2.getFeatureByIndex(0) geometry2 = feature2.GetGeometryRef() center2 = geometry2.Centroid() return center1.Distance(center2) def dissolve(infile, outfile, field, layername=None): """ dissolve the polygons of a vector file by an attribute field Parameters ---------- infile: str the input vector file outfile: str the output shapefile field: str the field name to merge the polygons by layername: str the name of the output vector layer; If set to None the layername will be the basename of infile without extension Returns ------- """ with Vector(infile) as vec: srs = vec.srs feat = vec.layer[0] d = feat.GetFieldDefnRef(field) width = d.width type = d.type feat = None layername = layername if layername is not None else os.path.splitext(os.path.basename(infile))[0] # the following can be used if GDAL was compiled with the spatialite extension # not tested, might need some additional/different lines # with Vector(infile) as vec: # vec.vector.ExecuteSQL('SELECT ST_Union(geometry), {0} FROM {1} GROUP BY {0}'.format(field, vec.layername), # dialect='SQLITE') # vec.write(outfile) conn = sqlite_setup(extensions=['spatialite', 'gdal']) conn.execute('CREATE VIRTUAL TABLE merge USING VirtualOGR("{}");'.format(infile)) select = conn.execute('SELECT {0},asText(ST_Union(geometry)) as geometry FROM merge GROUP BY {0};'.format(field)) fetch = select.fetchall() with Vector(driver='Memory') as merge: merge.addlayer(layername, srs, ogr.wkbPolygon) merge.addfield(field, type=type, width=width) for i in range(len(fetch)): merge.addfeature(ogr.CreateGeometryFromWkt(fetch[i][1]), {field: fetch[i][0]}) merge.write(outfile) conn.close()
[docs]def feature2vector(feature, ref, layername=None): """ create a Vector object from ogr features Parameters ---------- feature: list of :osgeo:class:`ogr.Feature` or :osgeo:class:`ogr.Feature` a single feature or a list of features ref: Vector a reference Vector object to retrieve geo information from layername: str or None the name of the output layer; retrieved from `ref` if `None` Returns ------- Vector the new Vector object """ features = feature if isinstance(feature, list) else [feature] layername = layername if layername is not None else ref.layername vec = Vector(driver='Memory') vec.addlayer(layername, ref.srs, ref.geomType) feat_def = features[0].GetDefnRef() fields = [feat_def.GetFieldDefn(x) for x in range(0, feat_def.GetFieldCount())] vec.layer.CreateFields(fields) for feat in features: vec.layer.CreateFeature(feat) vec.init_features() return vec
[docs]def intersect(obj1, obj2): """ intersect two Vector objects Parameters ---------- obj1: Vector the first vector object; this object is reprojected to the CRS of obj2 if necessary obj2: Vector the second vector object Returns ------- Vector or None the intersect of obj1 and obj2 if both intersect and None otherwise """ if not isinstance(obj1, Vector) or not isinstance(obj2, Vector): raise RuntimeError('both objects must be of type Vector') obj1 = obj1.clone() obj2 = obj2.clone() obj1.reproject(obj2.srs) ####################################################### # create basic overlap union1 = ogr.Geometry(ogr.wkbMultiPolygon) # union all the geometrical features of layer 1 for feat in obj1.layer: union1.AddGeometry(feat.GetGeometryRef()) obj1.layer.ResetReading() union1.Simplify(0) # same for layer2 union2 = ogr.Geometry(ogr.wkbMultiPolygon) for feat in obj2.layer: union2.AddGeometry(feat.GetGeometryRef()) obj2.layer.ResetReading() union2.Simplify(0) # intersection intersect_base = union1.Intersection(union2) union1 = None union2 = None ####################################################### # compute detailed per-geometry overlaps if intersect_base.GetArea() > 0: intersection = Vector(driver='Memory') intersection.addlayer('intersect', obj1.srs, ogr.wkbPolygon) fieldmap = [] for index, fielddef in enumerate([obj1.fieldDefs, obj2.fieldDefs]): for field in fielddef: name = field.GetName() i = 2 while name in intersection.fieldnames: name = '{}_{}'.format(field.GetName(), i) i += 1 fieldmap.append((index, field.GetName(), name)) intersection.addfield(name, type=field.GetType(), width=field.GetWidth()) for feature1 in obj1.layer: geom1 = feature1.GetGeometryRef() if geom1.Intersects(intersect_base): for feature2 in obj2.layer: geom2 = feature2.GetGeometryRef() # select only the intersections if geom2.Intersects(intersect_base): intersect = geom2.Intersection(geom1) fields = {} for item in fieldmap: if item[0] == 0: fields[item[2]] = feature1.GetField(item[1]) else: fields[item[2]] = feature2.GetField(item[1]) intersection.addfeature(intersect, fields) intersect_base = None return intersection
[docs]def wkt2vector(wkt, srs, layername='wkt'): """ convert a well-known text string geometry to a Vector object Parameters ---------- wkt: str the well-known text description srs: int, str the spatial reference system; see :func:`spatialist.auxil.crsConvert` for options. layername: str the name of the internal :osgeo:class:`ogr.Layer` object Returns ------- Vector the vector representation Examples -------- >>> from spatialist.vector import wkt2vector >>> wkt = 'POLYGON ((0. 0., 0. 1., 1. 1., 1. 0., 0. 0.))' >>> with wkt2vector(wkt, srs=4326) as vec: >>> print(vec.getArea()) 1.0 """ geom = ogr.CreateGeometryFromWkt(wkt) geom.FlattenTo2D() srs = crsConvert(srs, 'osr') vec = Vector(driver='Memory') vec.addlayer(layername, srs, geom.GetGeometryType()) if geom.GetGeometryName() != 'POINT': vec.addfield('area', ogr.OFTReal) fields = {'area': geom.Area()} else: fields = None vec.addfeature(geom, fields=fields) geom = None return vec