Skip to content

Commit

Permalink
Merge pull request #141 from cemsbv/remove_xmlschema
Browse files Browse the repository at this point in the history
- Removed library `xmlschema` to improve performance, used  `lxml` instead
- removed `resources` folder
  • Loading branch information
martinapippi authored Dec 7, 2021
2 parents 4e90332 + 1969c13 commit 89fc616
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 735 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
Simple parser for \*.gef files. These are ASCII based files used for soil properties measurements.
Compatible with Python 3.7.

Recently added parsing of xml boreholes file compatible with schema's defined in `./pygef/resources/`.
Recently added the parsing of xml boreholes file, the xml parsing is still in a preliminary phase,
not all the files are supported. If you find a file that doesn't work with pygef, please make an issue about it or PR :)

## Installation

Expand All @@ -18,7 +19,7 @@ Latest stable version:

Cutting-edge version (might break):

`$ pip install git+https://github.com/ritchie46/pygef.git`
`$ pip install git+https://github.com/cemsbv/pygef.git`

### Read BORE and CPT files

Expand Down
2 changes: 1 addition & 1 deletion pygef/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.7.0"
__version__ = "0.7.1"
175 changes: 65 additions & 110 deletions pygef/broxml.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import os.path
from abc import ABC
from datetime import datetime

import numpy as np
import polars as pl
import xmlschema
from lxml import etree

NS_MAP_VALUES = [
"http://www.broservices.nl/xsd/brocommon/",
"http://www.broservices.nl/xsd/isbhr-gt/",
"http://www.opengis.net/gml/",
"http://www.broservices.nl/xsd/bhrgtcommon/",
]


class _BroXml(ABC):
Expand All @@ -24,8 +30,12 @@ def __init__(self, path: str = None, string: str = None):
), "One of [path, string] should be not none."
self.path = path
if string is None and path is not None:
with open(path, "rb") as f:
self.s_bin = f.read()
with open(path, encoding="utf-8", errors="ignore") as f:
string = f.read()
elif string is not None:
self.s_bin = string.encode("ascii")

self.s = string
# Initialize attributes
Expand All @@ -52,128 +62,73 @@ def __init__(self, path=None, string=None):
super().__init__(path=path, string=string)
self.nen_version = "NEN-EN-ISO 14688"
self.type = "bore"
self._root = etree.fromstring(self.s_bin)
self._ns_map_keys = list(self._root.nsmap.keys())

if (
self.s.find(
'isbhrgt:registrationRequest xmlns:isbhrgt="http://www.broservices.nl/xsd/isbhr-gt/1.0"'
)
> 0.0
): # version 1.0.
schema = xmlschema.XMLSchema(
os.path.join(
os.path.dirname(__file__), "resources", "isbhr-gt-messages_v1.xsd"
)
)

self._schema_type = "isbhrgt:"
self._schema2 = "bhrgtcom"
self._schema3 = "gml"
self._schema4 = "brocom"

self._validate(schema)

self._parse_attributes()
self.df = self._parse_df()
elif (
self.s.find(
'ns1:registrationRequest xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"'
)
> 0
): # version 2.1
schema = xmlschema.XMLSchema(
os.path.join(
os.path.dirname(__file__), "resources", "isbhr-gt-messages_v2.1.xsd"
)
)
self._validate(schema)
self._schema_type = "ns1:"
self._schema2 = "ns3"
self._schema3 = "ns2"
self._schema4 = "ns"

self._parse_attributes()
self.df = self._parse_df()
elif (
self.s.find(
'registrationRequest xmlns="http://www.broservices.nl/xsd/isbhr-gt/2.1"'
)
> 0
): # version 2.1
schema = xmlschema.XMLSchema(
os.path.join(
os.path.dirname(__file__), "resources", "isbhr-gt-messages_v2.1.xsd"
)
)

self._validate(schema)
self._schema_type = ""
self._schema2 = "bhrgtcom"
self._schema3 = "gml"
self._schema4 = "brocom"
# find versions
self._ns_map_values = []
for pos in NS_MAP_VALUES:
for val in list(self._root.nsmap.values()):
if pos in val:
self._ns_map_values.append(val)
break

self._parse_attributes()
self.df = self._parse_df()
else:
raise ValueError("This xml schema type is not supported yet.")
self._parse_attributes()

def _validate(self, schema: xmlschema.XMLSchema) -> None:
"""
Validate xml file with xsd.
Parameters
----------
schema
xmlschema.XMLSchema based on xsd file
"""
schema.validate(self.s)
self._isvalid = schema.is_valid(self.s)
self._decoded = schema.to_dict(self.s)
self.df = self._parse_df()

def _parse_attributes(self) -> None:
report_name = "BHR_GT_CompleteReport_V1"

self._main_dict = self._decoded[f"{self._schema_type}sourceDocument"][
f"{self._schema_type}{report_name}"
]

# Main attributes
self.zid = self._main_dict[f"{self._schema_type}deliveredVerticalPosition"][
f"{self._schema2}:offset"
]["$"]
coords = self._main_dict[f"{self._schema_type}deliveredLocation"][
f"{self._schema2}:location"
][f"{self._schema3}:Point"][f"{self._schema3}:pos"]

self.x = coords[0]
self.y = coords[1]
self.test_id = list(
self._root.iter(
"{" + self._ns_map_values[1] + "}" + "objectIdAccountableParty"
)
)[0].text
self.zid = float(
list(self._root.iter("{" + self._ns_map_values[3] + "}" + "offset"))[0].text
)
self.file_date = datetime.strptime(
self._main_dict[f"{self._schema_type}boring"][
f"{self._schema2}:boringStartDate"
][f"{self._schema4}:date"],
list(
list(
self._root.iter("{" + self._ns_map_values[3] + "}boringStartDate")
)[0].iter("{" + self._ns_map_values[0] + "}" + "date")
)[0].text,
"%Y-%m-%d",
).date()
self.test_id = self._main_dict[f"{self._schema_type}objectIdAccountableParty"]

coords = list(self._root.iter("{" + self._ns_map_values[2] + "}pos"))[
0
].text.split()
self.x = float(coords[0])
self.y = float(coords[1])

def _parse_df(self) -> pl.DataFrame:
depth_top = []
depth_bottom = []
soil_name = []
for i, layer in enumerate(
self._main_dict[f"{self._schema_type}boreholeSampleDescription"][
f"{self._schema2}:descriptiveBoreholeLog"
][0][f"{self._schema2}:layer"]
):
depth_top.append(layer[f"{self._schema2}:upperBoundary"]["$"])
depth_bottom.append(layer[f"{self._schema2}:lowerBoundary"]["$"])
if f"{self._schema2}:specialMaterial" in layer.keys():
soil_name.append(layer[f"{self._schema2}:specialMaterial"]["$"])
elif f"{self._schema2}:soil" in layer.keys():
soil_name.append(
layer[f"{self._schema2}:soil"][
f"{self._schema2}:geotechnicalSoilName"
]["$"]
for layer in self._root.iter("{" + self._ns_map_values[3] + "}layer"):
depth_top.append(
float(
list(layer.iter("{" + self._ns_map_values[3] + "}upperBoundary"))[
0
].text
)
else:
)
depth_bottom.append(
float(
list(layer.iter("{" + self._ns_map_values[3] + "}lowerBoundary"))[
0
].text
)
)
for loc in layer:
if loc.tag == "{" + self._ns_map_values[3] + "}soil":
s = list(
loc.iter("{" + self._ns_map_values[3] + "}geotechnicalSoilName")
)[0].text
soil_name.append(s)
elif loc.tag == "{" + self._ns_map_values[3] + "}specialMaterial":
soil_name.append(loc.text)
if len(soil_name) < len(depth_bottom):
soil_name.append("not classified")

df = pl.DataFrame(
Expand Down
Loading

0 comments on commit 89fc616

Please sign in to comment.