Skip to content

Commit

Permalink
Merge pull request #6284 from leonardehrenfried/siri-light
Browse files Browse the repository at this point in the history
Implement SIRI Lite
  • Loading branch information
leonardehrenfried authored Jan 3, 2025
2 parents e1b3064 + de1e23d commit df53a35
Show file tree
Hide file tree
Showing 36 changed files with 1,014 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import static org.opentripplanner.standalone.config.routerconfig.UpdatersConfig.Type.SIRI_AZURE_ET_UPDATER;
import static org.opentripplanner.standalone.config.routerconfig.UpdatersConfig.Type.SIRI_AZURE_SX_UPDATER;
import static org.opentripplanner.standalone.config.routerconfig.UpdatersConfig.Type.SIRI_ET_GOOGLE_PUBSUB_UPDATER;
import static org.opentripplanner.standalone.config.routerconfig.UpdatersConfig.Type.SIRI_ET_LITE;
import static org.opentripplanner.standalone.config.routerconfig.UpdatersConfig.Type.SIRI_ET_UPDATER;
import static org.opentripplanner.standalone.config.routerconfig.UpdatersConfig.Type.SIRI_SX_LITE;
import static org.opentripplanner.standalone.config.routerconfig.UpdatersConfig.Type.SIRI_SX_UPDATER;
import static org.opentripplanner.standalone.config.routerconfig.UpdatersConfig.Type.STOP_TIME_UPDATER;
import static org.opentripplanner.standalone.config.routerconfig.UpdatersConfig.Type.VEHICLE_PARKING;
Expand All @@ -30,7 +32,9 @@
import org.opentripplanner.standalone.config.routerconfig.updaters.MqttGtfsRealtimeUpdaterConfig;
import org.opentripplanner.standalone.config.routerconfig.updaters.PollingTripUpdaterConfig;
import org.opentripplanner.standalone.config.routerconfig.updaters.SiriETGooglePubsubUpdaterConfig;
import org.opentripplanner.standalone.config.routerconfig.updaters.SiriETLiteUpdaterConfig;
import org.opentripplanner.standalone.config.routerconfig.updaters.SiriETUpdaterConfig;
import org.opentripplanner.standalone.config.routerconfig.updaters.SiriSXLiteUpdaterConfig;
import org.opentripplanner.standalone.config.routerconfig.updaters.SiriSXUpdaterConfig;
import org.opentripplanner.standalone.config.routerconfig.updaters.VehicleParkingUpdaterConfig;
import org.opentripplanner.standalone.config.routerconfig.updaters.VehiclePositionsUpdaterConfig;
Expand All @@ -44,6 +48,8 @@
import org.opentripplanner.updater.siri.updater.SiriETUpdaterParameters;
import org.opentripplanner.updater.siri.updater.SiriSXUpdaterParameters;
import org.opentripplanner.updater.siri.updater.google.SiriETGooglePubsubUpdaterParameters;
import org.opentripplanner.updater.siri.updater.lite.SiriETLiteUpdaterParameters;
import org.opentripplanner.updater.siri.updater.lite.SiriSXLiteUpdaterParameters;
import org.opentripplanner.updater.trip.MqttGtfsRealtimeUpdaterParameters;
import org.opentripplanner.updater.trip.PollingTripUpdaterParameters;
import org.opentripplanner.updater.vehicle_parking.VehicleParkingUpdaterParameters;
Expand Down Expand Up @@ -182,6 +188,16 @@ public List<SiriSXUpdaterParameters> getSiriSXUpdaterParameters() {
return getParameters(SIRI_SX_UPDATER);
}

@Override
public List<SiriETLiteUpdaterParameters> getSiriETLiteUpdaterParameters() {
return getParameters(SIRI_ET_LITE);
}

@Override
public List<SiriSXLiteUpdaterParameters> getSiriSXLiteUpdaterParameters() {
return getParameters(SIRI_SX_LITE);
}

@Override
public List<MqttGtfsRealtimeUpdaterParameters> getMqttGtfsRealtimeUpdaterParameters() {
return getParameters(MQTT_GTFS_RT_UPDATER);
Expand Down Expand Up @@ -218,8 +234,10 @@ public enum Type {
REAL_TIME_ALERTS(GtfsRealtimeAlertsUpdaterConfig::create),
VEHICLE_POSITIONS(VehiclePositionsUpdaterConfig::create),
SIRI_ET_UPDATER(SiriETUpdaterConfig::create),
SIRI_ET_LITE(SiriETLiteUpdaterConfig::create),
SIRI_ET_GOOGLE_PUBSUB_UPDATER(SiriETGooglePubsubUpdaterConfig::create),
SIRI_SX_UPDATER(SiriSXUpdaterConfig::create),
SIRI_SX_LITE(SiriSXLiteUpdaterConfig::create),
SIRI_AZURE_ET_UPDATER(SiriAzureETUpdaterConfig::create),
SIRI_AZURE_SX_UPDATER(SiriAzureSXUpdaterConfig::create);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.opentripplanner.standalone.config.routerconfig.updaters;

import static org.opentripplanner.standalone.config.framework.json.OtpVersion.V2_7;

import java.time.Duration;
import org.opentripplanner.standalone.config.framework.json.NodeAdapter;
import org.opentripplanner.updater.siri.updater.lite.SiriETLiteUpdaterParameters;

public class SiriETLiteUpdaterConfig {

public static SiriETLiteUpdaterParameters create(String configRef, NodeAdapter c) {
return new SiriETLiteUpdaterParameters(
configRef,
c.of("feedId").since(V2_7).summary("The ID of the feed to apply the updates to.").asString(),
c
.of("url")
.since(V2_7)
.summary("The URL to send the HTTP requests to.")
.description(SiriSXUpdaterConfig.URL_DESCRIPTION)
.asUri(),
c
.of("frequency")
.since(V2_7)
.summary("How often the updates should be retrieved.")
.asDuration(Duration.ofMinutes(1)),
c
.of("timeout")
.since(V2_7)
.summary("The HTTP timeout to download the updates.")
.asDuration(Duration.ofSeconds(15)),
c
.of("fuzzyTripMatching")
.since(V2_7)
.summary("If the fuzzy trip matcher should be used to match trips.")
.asBoolean(false),
HttpHeadersConfig.headers(c, V2_7)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.opentripplanner.standalone.config.routerconfig.updaters;

import static org.opentripplanner.standalone.config.framework.json.OtpVersion.V2_0;
import static org.opentripplanner.standalone.config.framework.json.OtpVersion.V2_7;

import java.time.Duration;
import org.opentripplanner.standalone.config.framework.json.NodeAdapter;
import org.opentripplanner.updater.siri.updater.lite.SiriSXLiteUpdaterParameters;

public class SiriSXLiteUpdaterConfig {

public static SiriSXLiteUpdaterParameters create(String configRef, NodeAdapter c) {
return new SiriSXLiteUpdaterParameters(
configRef,
c.of("feedId").since(V2_7).summary("The ID of the feed to apply the updates to.").asString(),
c
.of("url")
.since(V2_7)
.summary("The URL to send the HTTP requests to.")
.description(SiriSXUpdaterConfig.URL_DESCRIPTION)
.asUri(),
c
.of("frequency")
.since(V2_7)
.summary("How often the updates should be retrieved.")
.asDuration(Duration.ofMinutes(1)),
c
.of("timeout")
.since(V2_7)
.summary("The HTTP timeout to download the updates.")
.asDuration(Duration.ofSeconds(15)),
c
.of("earlyStart")
.since(V2_0)
.summary("This value is subtracted from the actual validity defined in the message.")
.description(
"""
Normally the planned departure time is used, so setting this to 10s will cause the
SX-message to be included in trip-results 10 seconds before the the planned departure
time."""
)
.asDuration(Duration.ZERO),
HttpHeadersConfig.headers(c, V2_7)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.opentripplanner.updater.siri.updater.SiriETUpdaterParameters;
import org.opentripplanner.updater.siri.updater.SiriSXUpdaterParameters;
import org.opentripplanner.updater.siri.updater.google.SiriETGooglePubsubUpdaterParameters;
import org.opentripplanner.updater.siri.updater.lite.SiriETLiteUpdaterParameters;
import org.opentripplanner.updater.siri.updater.lite.SiriSXLiteUpdaterParameters;
import org.opentripplanner.updater.trip.MqttGtfsRealtimeUpdaterParameters;
import org.opentripplanner.updater.trip.PollingTripUpdaterParameters;
import org.opentripplanner.updater.vehicle_parking.VehicleParkingUpdaterParameters;
Expand All @@ -33,6 +35,10 @@ public interface UpdatersParameters {

List<SiriSXUpdaterParameters> getSiriSXUpdaterParameters();

List<SiriETLiteUpdaterParameters> getSiriETLiteUpdaterParameters();

List<SiriSXLiteUpdaterParameters> getSiriSXLiteUpdaterParameters();

List<MqttGtfsRealtimeUpdaterParameters> getMqttGtfsRealtimeUpdaterParameters();

List<VehicleParkingUpdaterParameters> getVehicleParkingUpdaterParameters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.opentripplanner.updater.UpdatersParameters;
import org.opentripplanner.updater.alert.GtfsRealtimeAlertsUpdater;
import org.opentripplanner.updater.siri.SiriTimetableSnapshotSource;
import org.opentripplanner.updater.siri.updater.SiriETUpdater;
import org.opentripplanner.updater.siri.updater.SiriHttpLoader;
import org.opentripplanner.updater.siri.updater.SiriSXUpdater;
import org.opentripplanner.updater.siri.updater.configure.SiriUpdaterModule;
import org.opentripplanner.updater.siri.updater.google.SiriETGooglePubsubUpdater;
import org.opentripplanner.updater.siri.updater.lite.SiriLiteHttpLoader;
import org.opentripplanner.updater.spi.GraphUpdater;
import org.opentripplanner.updater.spi.TimetableSnapshotFlush;
import org.opentripplanner.updater.trip.MqttGtfsRealtimeUpdater;
Expand Down Expand Up @@ -182,13 +184,23 @@ private List<GraphUpdater> createUpdatersFromConfig() {
updaters.add(new PollingVehiclePositionUpdater(configItem, realtimeVehicleRepository));
}
for (var configItem : updatersParameters.getSiriETUpdaterParameters()) {
updaters.add(new SiriETUpdater(configItem, provideSiriTimetableSnapshot()));
updaters.add(
SiriUpdaterModule.createSiriETUpdater(configItem, provideSiriTimetableSnapshot())
);
}
for (var configItem : updatersParameters.getSiriETLiteUpdaterParameters()) {
updaters.add(
SiriUpdaterModule.createSiriETUpdater(configItem, provideSiriTimetableSnapshot())
);
}
for (var configItem : updatersParameters.getSiriETGooglePubsubUpdaterParameters()) {
updaters.add(new SiriETGooglePubsubUpdater(configItem, provideSiriTimetableSnapshot()));
}
for (var configItem : updatersParameters.getSiriSXUpdaterParameters()) {
updaters.add(new SiriSXUpdater(configItem, timetableRepository));
updaters.add(SiriUpdaterModule.createSiriSXUpdater(configItem, timetableRepository));
}
for (var configItem : updatersParameters.getSiriSXLiteUpdaterParameters()) {
updaters.add(SiriUpdaterModule.createSiriSXUpdater(configItem, timetableRepository));
}
for (var configItem : updatersParameters.getMqttGtfsRealtimeUpdaterParameters()) {
updaters.add(new MqttGtfsRealtimeUpdater(configItem, provideGtfsTimetableSnapshot()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,4 @@ public interface EstimatedTimetableSource {
* {@link UpdateIncrementality}
*/
UpdateIncrementality incrementalityOfLastUpdates();

String getFeedId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opentripplanner.framework.io.OtpHttpClientException;
import org.opentripplanner.updater.spi.HttpHeaders;
import org.opentripplanner.updater.trip.UpdateIncrementality;
import org.opentripplanner.utils.tostring.ToStringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri20.Siri;
Expand All @@ -19,11 +20,6 @@ public class SiriETHttpTripUpdateSource implements EstimatedTimetableSource {

private static final Logger LOG = LoggerFactory.getLogger(SiriETHttpTripUpdateSource.class);

/**
* Feed id that is used to match trip ids in the TripUpdates
*/
private final String feedId;

private final String url;

private final SiriLoader siriLoader;
Expand All @@ -35,16 +31,15 @@ public class SiriETHttpTripUpdateSource implements EstimatedTimetableSource {
private UpdateIncrementality updateIncrementality = FULL_DATASET;
private ZonedDateTime lastTimestamp = ZonedDateTime.now().minusMonths(1);

public SiriETHttpTripUpdateSource(Parameters parameters) {
this.feedId = parameters.feedId();
public SiriETHttpTripUpdateSource(Parameters parameters, SiriLoader siriLoader) {
this.url = parameters.url();

this.requestorRef =
parameters.requestorRef() == null || parameters.requestorRef().isEmpty()
? "otp-" + UUID.randomUUID()
: parameters.requestorRef();

this.siriLoader = createLoader(url, parameters);
this.siriLoader = siriLoader;
}

@Override
Expand Down Expand Up @@ -82,28 +77,8 @@ public UpdateIncrementality incrementalityOfLastUpdates() {
}

@Override
public String getFeedId() {
return this.feedId;
}

public String toString() {
return "SiriETHttpTripUpdateSource(" + url + ")";
}

private static SiriLoader createLoader(String url, Parameters parameters) {
// Load real-time updates from a file.
if (SiriFileLoader.matchesUrl(url)) {
return new SiriFileLoader(url);
}
// Fallback to default loader
else {
return new SiriHttpLoader(
url,
parameters.timeout(),
parameters.httpRequestHeaders(),
parameters.previewInterval()
);
}
return ToStringBuilder.of(SiriETHttpTripUpdateSource.class).addStr("url", url).toString();
}

public interface Parameters {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import java.util.function.Consumer;
import org.opentripplanner.updater.siri.SiriTimetableSnapshotSource;
import org.opentripplanner.updater.spi.PollingGraphUpdater;
import org.opentripplanner.updater.spi.PollingGraphUpdaterParameters;
import org.opentripplanner.updater.spi.ResultLogger;
import org.opentripplanner.updater.spi.UpdateResult;
import org.opentripplanner.updater.spi.WriteToGraphCallback;
import org.opentripplanner.updater.trip.metrics.TripUpdateMetrics;
import org.opentripplanner.updater.trip.UrlUpdaterParameters;
import org.opentripplanner.utils.tostring.ToStringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri20.EstimatedTimetableDeliveryStructure;
Expand Down Expand Up @@ -35,30 +37,27 @@ public class SiriETUpdater extends PollingGraphUpdater {

private final EstimatedTimetableHandler estimatedTimetableHandler;

private final Consumer<UpdateResult> recordMetrics;
private final Consumer<UpdateResult> metricsConsumer;

public SiriETUpdater(
SiriETUpdaterParameters config,
SiriTimetableSnapshotSource timetableSnapshotSource
Parameters config,
SiriTimetableSnapshotSource timetableSnapshotSource,
EstimatedTimetableSource source,
Consumer<UpdateResult> metricsConsumer
) {
super(config);
// Create update streamer from preferences
this.feedId = config.feedId();

this.updateSource = new SiriETHttpTripUpdateSource(config.sourceParameters());
this.updateSource = source;

this.blockReadinessUntilInitialized = config.blockReadinessUntilInitialized();

LOG.info(
"Creating stop time updater (SIRI ET) running every {} seconds : {}",
pollingPeriod(),
updateSource
);
LOG.info("Creating SIRI-ET updater running every {}: {}", pollingPeriod(), updateSource);

estimatedTimetableHandler =
new EstimatedTimetableHandler(timetableSnapshotSource, config.fuzzyTripMatching(), feedId);

recordMetrics = TripUpdateMetrics.streaming(config);
this.metricsConsumer = metricsConsumer;
}

@Override
Expand Down Expand Up @@ -87,7 +86,7 @@ public void runPolling() {
saveResultOnGraph.execute(context -> {
var result = estimatedTimetableHandler.applyUpdate(etds, incrementality, context);
ResultLogger.logUpdateResult(feedId, "siri-et", result);
recordMetrics.accept(result);
metricsConsumer.accept(result);
if (markPrimed) {
primed = true;
}
Expand All @@ -97,8 +96,20 @@ public void runPolling() {
} while (moreData);
}

@Override
public String toString() {
String s = (updateSource == null) ? "NONE" : updateSource.toString();
return "Polling SIRI ET updater with update source = " + s;
return ToStringBuilder
.of(SiriETUpdater.class)
.addStr("source", updateSource.toString())
.addDuration("frequency", pollingPeriod())
.toString();
}

public interface Parameters extends UrlUpdaterParameters, PollingGraphUpdaterParameters {
String url();

boolean blockReadinessUntilInitialized();

boolean fuzzyTripMatching();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import java.time.Duration;
import org.opentripplanner.updater.spi.HttpHeaders;
import org.opentripplanner.updater.spi.PollingGraphUpdaterParameters;
import org.opentripplanner.updater.trip.UrlUpdaterParameters;

public record SiriETUpdaterParameters(
String configRef,
Expand All @@ -18,8 +16,7 @@ public record SiriETUpdaterParameters(
HttpHeaders httpRequestHeaders,
boolean producerMetrics
)
implements
PollingGraphUpdaterParameters, UrlUpdaterParameters, SiriETHttpTripUpdateSource.Parameters {
implements SiriETUpdater.Parameters, SiriETHttpTripUpdateSource.Parameters {
public SiriETHttpTripUpdateSource.Parameters sourceParameters() {
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
import uk.org.siri.siri20.Siri;

/**
* Load real-time updates from SIRI-SX and SIRI-ET feeds over HTTP.
* Load real-time updates from SIRI-SX and SIRI-ET feeds over HTTP using the request/response
* flow, which asks the server to only send the latest updates for a given requestor ref.
*/
public class SiriHttpLoader implements SiriLoader {

Expand Down
Loading

0 comments on commit df53a35

Please sign in to comment.