-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #72 from GispoCoding/66-add-export-to-json-script
66 add export to json script
- Loading branch information
Showing
5 changed files
with
186 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
from __future__ import annotations | ||
|
||
import json | ||
from typing import Any | ||
|
||
from qgis.core import ( | ||
QgsProcessing, | ||
QgsProcessingAlgorithm, | ||
QgsProcessingContext, | ||
QgsProcessingFeedback, | ||
QgsProcessingParameterFileDestination, | ||
QgsProcessingParameterVectorLayer, | ||
) | ||
from qgis.PyQt.QtCore import QCoreApplication, QDateTime, QVariant | ||
|
||
|
||
class ExportToJSON(QgsProcessingAlgorithm): | ||
INPUT_GATES = "INPUT_GATES" | ||
OUTPUT_JSON = "OUTPUT_JSON" | ||
|
||
def __init__(self) -> None: | ||
super().__init__() | ||
|
||
self._name = "export_to_json" | ||
self._display_name = "Export to JSON" | ||
|
||
def tr(self, string) -> str: | ||
return QCoreApplication.translate("Processing", string) | ||
|
||
def createInstance(self): # noqa N802 | ||
return ExportToJSON() | ||
|
||
def name(self) -> str: | ||
return self._name | ||
|
||
def displayName(self) -> str: # noqa N802 | ||
return self.tr(self._display_name) | ||
|
||
def initAlgorithm(self, config=None): # noqa N802 | ||
self.addParameter( | ||
QgsProcessingParameterVectorLayer( | ||
name=self.INPUT_GATES, | ||
description="Input gates", | ||
types=[QgsProcessing.TypeVectorLine], | ||
) | ||
) | ||
|
||
self.addParameter( | ||
QgsProcessingParameterFileDestination( | ||
name=self.OUTPUT_JSON, | ||
description="Output JSON file", | ||
fileFilter="JSON files (*.json)", | ||
) | ||
) | ||
|
||
def processAlgorithm( # noqa N802 | ||
self, | ||
parameters: dict[str, Any], | ||
context: QgsProcessingContext, | ||
feedback: QgsProcessingFeedback, | ||
) -> dict: | ||
""" | ||
Here is where the processing itself takes place. | ||
""" | ||
|
||
# Initialize feedback if it is None | ||
if feedback is None: | ||
feedback = QgsProcessingFeedback() | ||
|
||
gate_layer = self.parameterAsVectorLayer(parameters, self.INPUT_GATES, context) | ||
output_json_path = self.parameterAsFile(parameters, self.OUTPUT_JSON, context) | ||
|
||
fields_to_exclude = ["fid", "counts_negative", "counts_positive"] | ||
|
||
features_data = [] | ||
|
||
for feature in gate_layer.getFeatures(): | ||
feature_dict = {} | ||
|
||
for field_name, field_value in zip(gate_layer.fields().names(), feature.attributes()): | ||
if field_name not in fields_to_exclude: | ||
if field_name == "name": | ||
field_name = "channel" # noqa: PLW2901 | ||
|
||
# Convert speed km/h -> m/s | ||
value = field_value / 3.6 if field_name == "speed_avg" else field_value | ||
|
||
# Convert QVariant nulls to None | ||
if isinstance(field_value, QVariant): | ||
value = None if value.isNull() else value.value() | ||
|
||
# Convert QTime objects to string | ||
if isinstance(field_value, QDateTime): | ||
value = value.toString("yyyy-MM-dd HH-mm-ss") | ||
|
||
feature_dict[field_name] = value | ||
|
||
features_data.append(feature_dict) | ||
|
||
with open(output_json_path, "w") as json_file: | ||
json.dump(features_data, json_file, indent=2) | ||
|
||
return {self.OUTPUT_JSON: output_json_path} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
try: | ||
import processing | ||
except ImportError: | ||
from qgis import processing | ||
|
||
import json | ||
from typing import TYPE_CHECKING | ||
|
||
if TYPE_CHECKING: | ||
from qgis.core import QgsVectorLayer | ||
|
||
from fvh3t.fvh3t_processing.traffic_trajectory_toolkit_provider import TTTProvider | ||
from tests.processing.test_count_trajectories import ( # noqa: F401 | ||
input_gate_layer_for_algorithm, | ||
input_point_layer_for_algorithm, | ||
) | ||
|
||
|
||
def test_export_to_json( | ||
qgis_app, | ||
qgis_processing, # noqa: ARG001 | ||
input_point_layer_for_algorithm, # noqa: F811 | ||
input_gate_layer_for_algorithm, # noqa: F811 | ||
): | ||
provider = TTTProvider() | ||
qgis_app.processingRegistry().removeProvider(provider) | ||
qgis_app.processingRegistry().addProvider(provider) | ||
|
||
params = { | ||
"INPUT_POINTS": input_point_layer_for_algorithm, | ||
"INPUT_LINES": input_gate_layer_for_algorithm, | ||
"TRAVELER_CLASS": "car", | ||
"START_TIME": None, | ||
"END_TIME": None, | ||
"OUTPUT_GATES": "TEMPORARY_OUTPUT", | ||
"OUTPUT_TRAJECTORIES": "TEMPORARY_OUTPUT", | ||
} | ||
|
||
result = processing.run( | ||
"traffic_trajectory_toolkit:count_trajectories", | ||
params, | ||
) | ||
|
||
output_gates: QgsVectorLayer = result["OUTPUT_GATES"] | ||
|
||
params = { | ||
"INPUT_GATES": output_gates, | ||
"OUTPUT_JSON": "TEMPORARY_OUTPUT", | ||
} | ||
|
||
result = processing.run( | ||
"traffic_trajectory_toolkit:export_to_json", | ||
params, | ||
) | ||
|
||
output_json = result["OUTPUT_JSON"] | ||
|
||
with open(output_json) as file: | ||
json_data = json.load(file) | ||
|
||
assert len(json_data) == 3 # Number of gates | ||
data = json_data[0] | ||
assert len(data) == 7 # Number of fields per gate | ||
|
||
keys = {"channel", "class", "interval_start", "interval_end", "vehicle_count", "speed_avg", "acceleration_avg"} | ||
assert set(data.keys()) == keys |