Skip to content

Commit

Permalink
Merge pull request #6262 from Jnction/fix-update-lag
Browse files Browse the repository at this point in the history
Make all polling updaters wait for graph update finish
  • Loading branch information
t2gran authored Jan 9, 2025
2 parents e439f87 + 74241bd commit b0b8661
Show file tree
Hide file tree
Showing 12 changed files with 173 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ public enum OTPFeature {
false,
"Whether the @async annotation in the GraphQL schema should lead to the fetch being executed asynchronously. This allows batch or alias queries to run in parallel at the cost of consuming extra threads."
),
WaitForGraphUpdateInPollingUpdaters(
true,
false,
"Make all polling updaters wait for graph updates to complete before finishing. " +
"If this is not enabled, the updaters will finish after submitting the task to update the graph."
),
Co2Emissions(false, true, "Enable the emissions sandbox module."),
DataOverlay(
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.transit.realtime.GtfsRealtime.FeedMessage;
import java.net.URI;
import java.util.concurrent.ExecutionException;
import org.opentripplanner.framework.io.OtpHttpClient;
import org.opentripplanner.framework.io.OtpHttpClientFactory;
import org.opentripplanner.routing.impl.TransitAlertServiceImpl;
Expand All @@ -26,7 +27,6 @@ public class GtfsRealtimeAlertsUpdater extends PollingGraphUpdater implements Tr
private final TransitAlertService transitAlertService;
private final HttpHeaders headers;
private final OtpHttpClient otpHttpClient;
private WriteToGraphCallback saveResultOnGraph;
private Long lastTimestamp = Long.MIN_VALUE;

public GtfsRealtimeAlertsUpdater(
Expand All @@ -48,11 +48,6 @@ public GtfsRealtimeAlertsUpdater(
LOG.info("Creating real-time alert updater running every {}: {}", pollingPeriod(), url);
}

@Override
public void setup(WriteToGraphCallback writeToGraphCallback) {
this.saveResultOnGraph = writeToGraphCallback;
}

public TransitAlertService getTransitAlertService() {
return transitAlertService;
}
Expand All @@ -63,32 +58,26 @@ public String toString() {
}

@Override
protected void runPolling() {
try {
final FeedMessage feed = otpHttpClient.getAndMap(
URI.create(url),
this.headers.asMap(),
FeedMessage.PARSER::parseFrom
);
protected void runPolling() throws InterruptedException, ExecutionException {
final FeedMessage feed = otpHttpClient.getAndMap(
URI.create(url),
this.headers.asMap(),
FeedMessage.PARSER::parseFrom
);

long feedTimestamp = feed.getHeader().getTimestamp();
if (feedTimestamp == lastTimestamp) {
LOG.debug("Ignoring feed with a timestamp that has not been updated from {}", url);
return;
}
if (feedTimestamp < lastTimestamp) {
LOG.info("Ignoring feed with older than previous timestamp from {}", url);
return;
}
long feedTimestamp = feed.getHeader().getTimestamp();
if (feedTimestamp == lastTimestamp) {
LOG.debug("Ignoring feed with a timestamp that has not been updated from {}", url);
return;
}
if (feedTimestamp < lastTimestamp) {
LOG.info("Ignoring feed with older than previous timestamp from {}", url);
return;
}

// Handle update in graph writer runnable
saveResultOnGraph.execute(context ->
updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher())
);
// Handle update in graph writer runnable
updateGraph(context -> updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher()));

lastTimestamp = feedTimestamp;
} catch (Exception e) {
LOG.error("Failed to process GTFS-RT Alerts feed from {}", url, e);
}
lastTimestamp = feedTimestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ public class SiriETUpdater extends PollingGraphUpdater {
* Feed id that is used for the trip ids in the TripUpdates
*/
private final String feedId;
/**
* Parent update manager. Is used to execute graph writer runnables.
*/
protected WriteToGraphCallback saveResultOnGraph;

private final EstimatedTimetableHandler estimatedTimetableHandler;

Expand All @@ -60,11 +56,6 @@ public SiriETUpdater(
this.metricsConsumer = metricsConsumer;
}

@Override
public void setup(WriteToGraphCallback writeToGraphCallback) {
this.saveResultOnGraph = writeToGraphCallback;
}

/**
* Repeatedly makes blocking calls to an UpdateStreamer to retrieve new stop time updates, and
* applies those updates to the graph.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.opentripplanner.updater.siri.SiriAlertsUpdateHandler;
import org.opentripplanner.updater.spi.PollingGraphUpdater;
import org.opentripplanner.updater.spi.PollingGraphUpdaterParameters;
import org.opentripplanner.updater.spi.WriteToGraphCallback;
import org.opentripplanner.updater.trip.UrlUpdaterParameters;
import org.opentripplanner.utils.tostring.ToStringBuilder;
import org.slf4j.Logger;
Expand All @@ -36,7 +35,6 @@ public class SiriSXUpdater extends PollingGraphUpdater implements TransitAlertPr
// TODO RT_AB: Document why SiriAlertsUpdateHandler is a separate instance that persists across
// many graph update operations.
private final SiriAlertsUpdateHandler updateHandler;
private WriteToGraphCallback writeToGraphCallback;
private ZonedDateTime lastTimestamp = ZonedDateTime.now().minusWeeks(1);
private String requestorRef;
/**
Expand Down Expand Up @@ -80,11 +78,6 @@ public SiriSXUpdater(
LOG.info("Creating SIRI-SX updater running every {}: {}", pollingPeriod(), url);
}

@Override
public void setup(WriteToGraphCallback writeToGraphCallback) {
this.writeToGraphCallback = writeToGraphCallback;
}

public TransitAlertService getTransitAlertService() {
return transitAlertService;
}
Expand Down Expand Up @@ -148,7 +141,7 @@ private void updateSiri() {
// All that said, out of all the update types, Alerts (and SIRI SX) are probably the ones
// that would be most tolerant of non-versioned application-wide storage since they don't
// participate in routing and are tacked on to already-completed routing responses.
writeToGraphCallback.execute(context -> {
saveResultOnGraph.execute(context -> {
updateHandler.update(serviceDelivery, context);
if (markPrimed) {
primed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import org.opentripplanner.framework.application.OTPFeature;
import org.opentripplanner.updater.GraphWriterRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -34,6 +37,10 @@ public abstract class PollingGraphUpdater implements GraphUpdater {
* removed that.
*/
protected volatile boolean primed;
/**
* Parent update manager. Is used to execute graph writer runnables.
*/
protected WriteToGraphCallback saveResultOnGraph;

/** Shared configuration code for all polling graph updaters. */
protected PollingGraphUpdater(PollingGraphUpdaterParameters config) {
Expand Down Expand Up @@ -95,9 +102,22 @@ public String getConfigRef() {
return configRef;
}

@Override
public final void setup(WriteToGraphCallback writeToGraphCallback) {
this.saveResultOnGraph = writeToGraphCallback;
}

/**
* Mirrors GraphUpdater.run method. Only difference is that runPolling will be run multiple times
* with pauses in between. The length of the pause is defined in the preference frequency.
*/
protected abstract void runPolling() throws Exception;

protected final void updateGraph(GraphWriterRunnable task)
throws ExecutionException, InterruptedException {
var result = saveResultOnGraph.execute(task);
if (OTPFeature.WaitForGraphUpdateInPollingUpdaters.isOn()) {
result.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import com.google.transit.realtime.GtfsRealtime.TripUpdate;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.opentripplanner.updater.spi.PollingGraphUpdater;
import org.opentripplanner.updater.spi.UpdateResult;
import org.opentripplanner.updater.spi.WriteToGraphCallback;
import org.opentripplanner.updater.trip.metrics.BatchTripUpdateMetrics;
import org.opentripplanner.utils.tostring.ToStringBuilder;
import org.slf4j.Logger;
Expand Down Expand Up @@ -33,10 +33,6 @@ public class PollingTripUpdater extends PollingGraphUpdater {
private final BackwardsDelayPropagationType backwardsDelayPropagationType;
private final Consumer<UpdateResult> recordMetrics;

/**
* Parent update manager. Is used to execute graph writer runnables.
*/
private WriteToGraphCallback saveResultOnGraph;
/**
* Set only if we should attempt to match the trip_id from other data in TripDescriptor
*/
Expand All @@ -63,17 +59,12 @@ public PollingTripUpdater(
);
}

@Override
public void setup(WriteToGraphCallback writeToGraphCallback) {
this.saveResultOnGraph = writeToGraphCallback;
}

/**
* Repeatedly makes blocking calls to an UpdateStreamer to retrieve new stop time updates, and
* applies those updates to the graph.
*/
@Override
public void runPolling() {
public void runPolling() throws InterruptedException, ExecutionException {
// Get update lists from update source
List<TripUpdate> updates = updateSource.getUpdates();
var incrementality = updateSource.incrementalityOfLastUpdates();
Expand All @@ -89,7 +80,7 @@ public void runPolling() {
feedId,
recordMetrics
);
saveResultOnGraph.execute(runnable);
updateGraph(runnable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.opentripplanner.service.vehicleparking.VehicleParkingRepository;
Expand All @@ -12,7 +13,6 @@
import org.opentripplanner.updater.RealTimeUpdateContext;
import org.opentripplanner.updater.spi.DataSource;
import org.opentripplanner.updater.spi.PollingGraphUpdater;
import org.opentripplanner.updater.spi.WriteToGraphCallback;
import org.opentripplanner.utils.tostring.ToStringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -27,8 +27,6 @@ public class VehicleParkingAvailabilityUpdater extends PollingGraphUpdater {
VehicleParkingAvailabilityUpdater.class
);
private final DataSource<AvailabiltyUpdate> source;
private WriteToGraphCallback saveResultOnGraph;

private final VehicleParkingRepository repository;

public VehicleParkingAvailabilityUpdater(
Expand All @@ -44,17 +42,12 @@ public VehicleParkingAvailabilityUpdater(
}

@Override
public void setup(WriteToGraphCallback writeToGraphCallback) {
this.saveResultOnGraph = writeToGraphCallback;
}

@Override
protected void runPolling() {
protected void runPolling() throws InterruptedException, ExecutionException {
if (source.update()) {
var updates = source.getUpdates();

var graphWriterRunnable = new AvailabilityUpdater(updates);
saveResultOnGraph.execute(graphWriterRunnable);
updateGraph(graphWriterRunnable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.opentripplanner.routing.graph.Graph;
Expand All @@ -26,7 +27,6 @@
import org.opentripplanner.updater.RealTimeUpdateContext;
import org.opentripplanner.updater.spi.DataSource;
import org.opentripplanner.updater.spi.PollingGraphUpdater;
import org.opentripplanner.updater.spi.WriteToGraphCallback;
import org.opentripplanner.utils.tostring.ToStringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,7 +42,6 @@ public class VehicleParkingUpdater extends PollingGraphUpdater {
private final Map<VehicleParking, List<DisposableEdgeCollection>> tempEdgesByPark = new HashMap<>();
private final DataSource<VehicleParking> source;
private final List<VehicleParking> oldVehicleParkings = new ArrayList<>();
private WriteToGraphCallback saveResultOnGraph;
private final VertexLinker linker;

private final VehicleParkingRepository parkingRepository;
Expand All @@ -64,12 +63,7 @@ public VehicleParkingUpdater(
}

@Override
public void setup(WriteToGraphCallback writeToGraphCallback) {
this.saveResultOnGraph = writeToGraphCallback;
}

@Override
protected void runPolling() {
protected void runPolling() throws InterruptedException, ExecutionException {
LOG.debug("Updating vehicle parkings from {}", source);
if (!source.update()) {
LOG.debug("No updates");
Expand All @@ -81,7 +75,7 @@ protected void runPolling() {
VehicleParkingGraphWriterRunnable graphWriterRunnable = new VehicleParkingGraphWriterRunnable(
vehicleParkings
);
saveResultOnGraph.execute(graphWriterRunnable);
updateGraph(graphWriterRunnable);
}

private class VehicleParkingGraphWriterRunnable implements GraphWriterRunnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import com.google.transit.realtime.GtfsRealtime.VehiclePosition;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.opentripplanner.service.realtimevehicles.RealtimeVehicleRepository;
import org.opentripplanner.service.realtimevehicles.model.RealtimeVehicle;
import org.opentripplanner.standalone.config.routerconfig.updaters.VehiclePositionsUpdaterConfig;
import org.opentripplanner.updater.spi.PollingGraphUpdater;
import org.opentripplanner.updater.spi.WriteToGraphCallback;
import org.opentripplanner.utils.tostring.ToStringBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -27,10 +27,6 @@ public class PollingVehiclePositionUpdater extends PollingGraphUpdater {
private final GtfsRealtimeHttpVehiclePositionSource vehiclePositionSource;
private final Set<VehiclePositionsUpdaterConfig.VehiclePositionFeature> vehiclePositionFeatures;

/**
* Parent update manager. Is used to execute graph writer runnables.
*/
private WriteToGraphCallback saveResultOnGraph;
private final String feedId;
private final RealtimeVehicleRepository realtimeVehicleRepository;
private final boolean fuzzyTripMatching;
Expand All @@ -54,17 +50,12 @@ public PollingVehiclePositionUpdater(
);
}

@Override
public void setup(WriteToGraphCallback writeToGraphCallback) {
this.saveResultOnGraph = writeToGraphCallback;
}

/**
* Repeatedly makes blocking calls to an UpdateStreamer to retrieve new stop time updates, and
* applies those updates to the graph.
*/
@Override
public void runPolling() {
public void runPolling() throws InterruptedException, ExecutionException {
// Get update lists from update source
List<VehiclePosition> updates = vehiclePositionSource.getPositions();

Expand All @@ -77,7 +68,7 @@ public void runPolling() {
fuzzyTripMatching,
updates
);
saveResultOnGraph.execute(runnable);
updateGraph(runnable);
}
}

Expand Down
Loading

0 comments on commit b0b8661

Please sign in to comment.