From 12b49957ece1db3f3295c251c93186ac0527d964 Mon Sep 17 00:00:00 2001 From: Michael Tsang Date: Tue, 19 Nov 2024 11:37:22 +0000 Subject: [PATCH 1/8] make all polling updater wait for graph update finish --- .../alert/GtfsRealtimeAlertsUpdater.java | 45 +++++++++---------- .../updater/trip/PollingTripUpdater.java | 5 ++- .../VehicleParkingAvailabilityUpdater.java | 5 ++- .../VehicleParkingUpdater.java | 5 ++- .../PollingVehiclePositionUpdater.java | 5 ++- .../vehicle_rental/VehicleRentalUpdater.java | 5 ++- 6 files changed, 36 insertions(+), 34 deletions(-) diff --git a/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java b/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java index b71dad6a656..0e7ab35cb13 100644 --- a/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java @@ -2,6 +2,7 @@ import com.google.transit.realtime.GtfsRealtime.FeedMessage; import java.net.URI; +import java.util.concurrent.ExecutionException; import org.opentripplanner.framework.io.OtpHttpClient; import org.opentripplanner.framework.io.OtpHttpClientFactory; import org.opentripplanner.routing.impl.TransitAlertServiceImpl; @@ -63,32 +64,28 @@ public String toString() { } @Override - protected void runPolling() { - try { - final FeedMessage feed = otpHttpClient.getAndMap( - URI.create(url), - this.headers.asMap(), - FeedMessage.PARSER::parseFrom - ); + protected void runPolling() throws InterruptedException, ExecutionException { + final FeedMessage feed = otpHttpClient.getAndMap( + URI.create(url), + this.headers.asMap(), + FeedMessage.PARSER::parseFrom + ); - long feedTimestamp = feed.getHeader().getTimestamp(); - if (feedTimestamp == lastTimestamp) { - LOG.debug("Ignoring feed with a timestamp that has not been updated from {}", url); - return; - } - if (feedTimestamp < lastTimestamp) { - LOG.info("Ignoring feed with older than previous timestamp from {}", url); - return; - } + long feedTimestamp = feed.getHeader().getTimestamp(); + if (feedTimestamp == lastTimestamp) { + LOG.debug("Ignoring feed with a timestamp that has not been updated from {}", url); + return; + } + if (feedTimestamp < lastTimestamp) { + LOG.info("Ignoring feed with older than previous timestamp from {}", url); + return; + } - // Handle update in graph writer runnable - saveResultOnGraph.execute(context -> - updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher()) - ); + // Handle update in graph writer runnable + saveResultOnGraph + .execute(context -> updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher())) + .get(); - lastTimestamp = feedTimestamp; - } catch (Exception e) { - LOG.error("Error reading gtfs-realtime feed from " + url, e); - } + lastTimestamp = feedTimestamp; } } diff --git a/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java b/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java index c725c8b1088..42f24031839 100644 --- a/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java @@ -2,6 +2,7 @@ import com.google.transit.realtime.GtfsRealtime.TripUpdate; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import org.opentripplanner.updater.spi.PollingGraphUpdater; import org.opentripplanner.updater.spi.UpdateResult; @@ -73,7 +74,7 @@ public void setup(WriteToGraphCallback writeToGraphCallback) { * applies those updates to the graph. */ @Override - public void runPolling() { + public void runPolling() throws InterruptedException, ExecutionException { // Get update lists from update source List updates = updateSource.getUpdates(); var incrementality = updateSource.incrementalityOfLastUpdates(); @@ -89,7 +90,7 @@ public void runPolling() { feedId, recordMetrics ); - saveResultOnGraph.execute(runnable); + saveResultOnGraph.execute(runnable).get(); } } diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java index b23f46522c3..bc5217f7534 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java @@ -2,6 +2,7 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; import org.opentripplanner.routing.vehicle_parking.VehicleParking; @@ -49,12 +50,12 @@ public void setup(WriteToGraphCallback writeToGraphCallback) { } @Override - protected void runPolling() { + protected void runPolling() throws InterruptedException, ExecutionException { if (source.update()) { var updates = source.getUpdates(); var graphWriterRunnable = new AvailabilityUpdater(updates); - saveResultOnGraph.execute(graphWriterRunnable); + saveResultOnGraph.execute(graphWriterRunnable).get(); } } diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java index 830429151b4..60a7b306052 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java @@ -6,6 +6,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; import org.opentripplanner.routing.graph.Graph; @@ -69,7 +70,7 @@ public void setup(WriteToGraphCallback writeToGraphCallback) { } @Override - protected void runPolling() { + protected void runPolling() throws InterruptedException, ExecutionException { LOG.debug("Updating vehicle parkings from {}", source); if (!source.update()) { LOG.debug("No updates"); @@ -81,7 +82,7 @@ protected void runPolling() { VehicleParkingGraphWriterRunnable graphWriterRunnable = new VehicleParkingGraphWriterRunnable( vehicleParkings ); - saveResultOnGraph.execute(graphWriterRunnable); + saveResultOnGraph.execute(graphWriterRunnable).get(); } private class VehicleParkingGraphWriterRunnable implements GraphWriterRunnable { diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java index 4c487ac997b..a4d36f1d1c3 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java @@ -3,6 +3,7 @@ import com.google.transit.realtime.GtfsRealtime.VehiclePosition; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; import org.opentripplanner.service.realtimevehicles.RealtimeVehicleRepository; import org.opentripplanner.service.realtimevehicles.model.RealtimeVehicle; import org.opentripplanner.standalone.config.routerconfig.updaters.VehiclePositionsUpdaterConfig; @@ -64,7 +65,7 @@ public void setup(WriteToGraphCallback writeToGraphCallback) { * applies those updates to the graph. */ @Override - public void runPolling() { + public void runPolling() throws InterruptedException, ExecutionException { // Get update lists from update source List updates = vehiclePositionSource.getPositions(); @@ -77,7 +78,7 @@ public void runPolling() { fuzzyTripMatching, updates ); - saveResultOnGraph.execute(runnable); + saveResultOnGraph.execute(runnable).get(); } } diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java index 24686edce6c..c8030e5492a 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java @@ -8,6 +8,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.Stream; import org.opentripplanner.routing.linking.DisposableEdgeCollection; @@ -124,7 +125,7 @@ public String getConfigRef() { } @Override - protected void runPolling() { + protected void runPolling() throws InterruptedException, ExecutionException { LOG.debug("Updating vehicle rental stations from {}", nameForLogging); if (!source.update()) { LOG.debug("No updates from {}", nameForLogging); @@ -138,7 +139,7 @@ protected void runPolling() { stations, geofencingZones ); - saveResultOnGraph.execute(graphWriterRunnable); + saveResultOnGraph.execute(graphWriterRunnable).get(); } private class VehicleRentalGraphWriterRunnable implements GraphWriterRunnable { From 0944921250cfc2a36d422a24c9af341865162ad0 Mon Sep 17 00:00:00 2001 From: Michael Tsang Date: Thu, 12 Dec 2024 10:41:40 +0000 Subject: [PATCH 2/8] extract future handling logic to a new method --- .../updater/alert/GtfsRealtimeAlertsUpdater.java | 8 +++++--- .../opentripplanner/updater/spi/PollingGraphUpdater.java | 7 +++++++ .../opentripplanner/updater/trip/PollingTripUpdater.java | 2 +- .../VehicleParkingAvailabilityUpdater.java | 2 +- .../updater/vehicle_parking/VehicleParkingUpdater.java | 2 +- .../vehicle_position/PollingVehiclePositionUpdater.java | 2 +- .../updater/vehicle_rental/VehicleRentalUpdater.java | 2 +- 7 files changed, 17 insertions(+), 8 deletions(-) diff --git a/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java b/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java index 0e7ab35cb13..bd20d8ddcc3 100644 --- a/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java @@ -82,9 +82,11 @@ protected void runPolling() throws InterruptedException, ExecutionException { } // Handle update in graph writer runnable - saveResultOnGraph - .execute(context -> updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher())) - .get(); + processGraphUpdaterResult( + saveResultOnGraph.execute(context -> + updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher()) + ) + ); lastTimestamp = feedTimestamp; } diff --git a/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java b/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java index e0859371de8..21eb60c2737 100644 --- a/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java @@ -2,6 +2,8 @@ import java.time.Duration; import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,4 +102,9 @@ public String getConfigRef() { * with pauses in between. The length of the pause is defined in the preference frequency. */ protected abstract void runPolling() throws Exception; + + protected void processGraphUpdaterResult(Future result) + throws ExecutionException, InterruptedException { + result.get(); + } } diff --git a/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java b/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java index 42f24031839..1f88044d51a 100644 --- a/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java @@ -90,7 +90,7 @@ public void runPolling() throws InterruptedException, ExecutionException { feedId, recordMetrics ); - saveResultOnGraph.execute(runnable).get(); + processGraphUpdaterResult(saveResultOnGraph.execute(runnable)); } } diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java index 64f8ca81763..c65ff7bc4c1 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java @@ -55,7 +55,7 @@ protected void runPolling() throws InterruptedException, ExecutionException { var updates = source.getUpdates(); var graphWriterRunnable = new AvailabilityUpdater(updates); - saveResultOnGraph.execute(graphWriterRunnable).get(); + processGraphUpdaterResult(saveResultOnGraph.execute(graphWriterRunnable)); } } diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java index 4116486ee2d..ea356200f3b 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java @@ -82,7 +82,7 @@ protected void runPolling() throws InterruptedException, ExecutionException { VehicleParkingGraphWriterRunnable graphWriterRunnable = new VehicleParkingGraphWriterRunnable( vehicleParkings ); - saveResultOnGraph.execute(graphWriterRunnable).get(); + processGraphUpdaterResult(saveResultOnGraph.execute(graphWriterRunnable)); } private class VehicleParkingGraphWriterRunnable implements GraphWriterRunnable { diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java index a4d36f1d1c3..c787989c5d2 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java @@ -78,7 +78,7 @@ public void runPolling() throws InterruptedException, ExecutionException { fuzzyTripMatching, updates ); - saveResultOnGraph.execute(runnable).get(); + processGraphUpdaterResult(saveResultOnGraph.execute(runnable)); } } diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java index c8030e5492a..efcf9812feb 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java @@ -139,7 +139,7 @@ protected void runPolling() throws InterruptedException, ExecutionException { stations, geofencingZones ); - saveResultOnGraph.execute(graphWriterRunnable).get(); + processGraphUpdaterResult(saveResultOnGraph.execute(graphWriterRunnable)); } private class VehicleRentalGraphWriterRunnable implements GraphWriterRunnable { From 4df65a19cc68d495e5aedfecea0b134b798c39eb Mon Sep 17 00:00:00 2001 From: Michael Tsang Date: Thu, 12 Dec 2024 10:55:00 +0000 Subject: [PATCH 3/8] add feature flag --- .../opentripplanner/framework/application/OTPFeature.java | 6 ++++++ doc/user/Configuration.md | 1 + 2 files changed, 7 insertions(+) diff --git a/application/src/main/java/org/opentripplanner/framework/application/OTPFeature.java b/application/src/main/java/org/opentripplanner/framework/application/OTPFeature.java index 6631287613b..add5d690e2e 100644 --- a/application/src/main/java/org/opentripplanner/framework/application/OTPFeature.java +++ b/application/src/main/java/org/opentripplanner/framework/application/OTPFeature.java @@ -97,6 +97,12 @@ public enum OTPFeature { false, "Whether the @async annotation in the GraphQL schema should lead to the fetch being executed asynchronously. This allows batch or alias queries to run in parallel at the cost of consuming extra threads." ), + WaitForGraphUpdateInPollingUpdaters( + true, + false, + "Make all polling updaters wait for graph updates to complete before finishing. " + + "If this is not enabled, the updaters will finish after submitting the task to update the graph." + ), Co2Emissions(false, true, "Enable the emissions sandbox module."), DataOverlay( false, diff --git a/doc/user/Configuration.md b/doc/user/Configuration.md index f80966b8cb3..1d65c2af939 100644 --- a/doc/user/Configuration.md +++ b/doc/user/Configuration.md @@ -238,6 +238,7 @@ Here is a list of all features which can be toggled on/off and their default val | `TransmodelGraphQlApi` | Enable the [Transmodel (NeTEx) GraphQL API](apis/TransmodelApi.md). | ✓️ | ✓️ | | `ActuatorAPI` | Endpoint for actuators (service health status). | | ✓️ | | `AsyncGraphQLFetchers` | Whether the @async annotation in the GraphQL schema should lead to the fetch being executed asynchronously. This allows batch or alias queries to run in parallel at the cost of consuming extra threads. | | | +| `WaitForGraphUpdateInPollingUpdaters` | Make all polling updaters wait for graph updates to complete before finishing. If this is not enabled, the updaters will finish after submitting the task to update the graph. | ✓️ | | | `Co2Emissions` | Enable the emissions sandbox module. | | ✓️ | | `DataOverlay` | Enable usage of data overlay when calculating costs for the street network. | | ✓️ | | `FaresV2` | Enable import of GTFS-Fares v2 data. | | ✓️ | From d8a34ddcfe80dabfba1c08a47f95adaa900d643d Mon Sep 17 00:00:00 2001 From: Michael Tsang Date: Thu, 12 Dec 2024 12:50:06 +0000 Subject: [PATCH 4/8] refactor update graph logic to PollingGraphUpdater --- .../updater/alert/GtfsRealtimeAlertsUpdater.java | 12 +----------- .../updater/siri/updater/SiriETUpdater.java | 9 --------- .../updater/siri/updater/SiriSXUpdater.java | 9 +-------- .../updater/spi/PollingGraphUpdater.java | 15 ++++++++++++--- .../updater/trip/PollingTripUpdater.java | 12 +----------- .../VehicleParkingAvailabilityUpdater.java | 10 +--------- .../vehicle_parking/VehicleParkingUpdater.java | 9 +-------- .../PollingVehiclePositionUpdater.java | 12 +----------- .../vehicle_rental/VehicleRentalUpdater.java | 10 +--------- 9 files changed, 19 insertions(+), 79 deletions(-) diff --git a/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java b/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java index bd20d8ddcc3..b217c60a05b 100644 --- a/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/alert/GtfsRealtimeAlertsUpdater.java @@ -27,7 +27,6 @@ public class GtfsRealtimeAlertsUpdater extends PollingGraphUpdater implements Tr private final TransitAlertService transitAlertService; private final HttpHeaders headers; private final OtpHttpClient otpHttpClient; - private WriteToGraphCallback saveResultOnGraph; private Long lastTimestamp = Long.MIN_VALUE; public GtfsRealtimeAlertsUpdater( @@ -49,11 +48,6 @@ public GtfsRealtimeAlertsUpdater( LOG.info("Creating real-time alert updater running every {}: {}", pollingPeriod(), url); } - @Override - public void setup(WriteToGraphCallback writeToGraphCallback) { - this.saveResultOnGraph = writeToGraphCallback; - } - public TransitAlertService getTransitAlertService() { return transitAlertService; } @@ -82,11 +76,7 @@ protected void runPolling() throws InterruptedException, ExecutionException { } // Handle update in graph writer runnable - processGraphUpdaterResult( - saveResultOnGraph.execute(context -> - updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher()) - ) - ); + updateGraph(context -> updateHandler.update(feed, context.gtfsRealtimeFuzzyTripMatcher())); lastTimestamp = feedTimestamp; } diff --git a/application/src/main/java/org/opentripplanner/updater/siri/updater/SiriETUpdater.java b/application/src/main/java/org/opentripplanner/updater/siri/updater/SiriETUpdater.java index 087bf28e875..38efa18c177 100644 --- a/application/src/main/java/org/opentripplanner/updater/siri/updater/SiriETUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/siri/updater/SiriETUpdater.java @@ -28,10 +28,6 @@ public class SiriETUpdater extends PollingGraphUpdater { * Feed id that is used for the trip ids in the TripUpdates */ private final String feedId; - /** - * Parent update manager. Is used to execute graph writer runnables. - */ - protected WriteToGraphCallback saveResultOnGraph; private final EstimatedTimetableHandler estimatedTimetableHandler; @@ -61,11 +57,6 @@ public SiriETUpdater( recordMetrics = TripUpdateMetrics.streaming(config); } - @Override - public void setup(WriteToGraphCallback writeToGraphCallback) { - this.saveResultOnGraph = writeToGraphCallback; - } - /** * Repeatedly makes blocking calls to an UpdateStreamer to retrieve new stop time updates, and * applies those updates to the graph. diff --git a/application/src/main/java/org/opentripplanner/updater/siri/updater/SiriSXUpdater.java b/application/src/main/java/org/opentripplanner/updater/siri/updater/SiriSXUpdater.java index 83200db30d3..8311a385e9c 100644 --- a/application/src/main/java/org/opentripplanner/updater/siri/updater/SiriSXUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/siri/updater/SiriSXUpdater.java @@ -13,7 +13,6 @@ import org.opentripplanner.updater.alert.TransitAlertProvider; import org.opentripplanner.updater.siri.SiriAlertsUpdateHandler; import org.opentripplanner.updater.spi.PollingGraphUpdater; -import org.opentripplanner.updater.spi.WriteToGraphCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import uk.org.siri.siri20.ServiceDelivery; @@ -33,7 +32,6 @@ public class SiriSXUpdater extends PollingGraphUpdater implements TransitAlertPr // TODO RT_AB: Document why SiriAlertsUpdateHandler is a separate instance that persists across // many graph update operations. private final SiriAlertsUpdateHandler updateHandler; - private WriteToGraphCallback writeToGraphCallback; private ZonedDateTime lastTimestamp = ZonedDateTime.now().minusWeeks(1); private String requestorRef; /** @@ -78,11 +76,6 @@ public SiriSXUpdater(SiriSXUpdaterParameters config, TimetableRepository timetab ); } - @Override - public void setup(WriteToGraphCallback writeToGraphCallback) { - this.writeToGraphCallback = writeToGraphCallback; - } - public TransitAlertService getTransitAlertService() { return transitAlertService; } @@ -141,7 +134,7 @@ private void updateSiri() { // All that said, out of all the update types, Alerts (and SIRI SX) are probably the ones // that would be most tolerant of non-versioned application-wide storage since they don't // participate in routing and are tacked on to already-completed routing responses. - writeToGraphCallback.execute(context -> { + saveResultOnGraph.execute(context -> { updateHandler.update(serviceDelivery, context); if (markPrimed) { primed = true; diff --git a/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java b/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java index 21eb60c2737..0525406505e 100644 --- a/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java @@ -3,7 +3,7 @@ import java.time.Duration; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import org.opentripplanner.updater.GraphWriterRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +36,10 @@ public abstract class PollingGraphUpdater implements GraphUpdater { * removed that. */ protected volatile boolean primed; + /** + * Parent update manager. Is used to execute graph writer runnables. + */ + protected WriteToGraphCallback saveResultOnGraph; /** Shared configuration code for all polling graph updaters. */ protected PollingGraphUpdater(PollingGraphUpdaterParameters config) { @@ -97,14 +101,19 @@ public String getConfigRef() { return configRef; } + @Override + public final void setup(WriteToGraphCallback writeToGraphCallback) { + this.saveResultOnGraph = writeToGraphCallback; + } + /** * Mirrors GraphUpdater.run method. Only difference is that runPolling will be run multiple times * with pauses in between. The length of the pause is defined in the preference frequency. */ protected abstract void runPolling() throws Exception; - protected void processGraphUpdaterResult(Future result) + protected final void updateGraph(GraphWriterRunnable task) throws ExecutionException, InterruptedException { - result.get(); + saveResultOnGraph.execute(task).get(); } } diff --git a/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java b/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java index 1f88044d51a..8331bf19d3b 100644 --- a/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/trip/PollingTripUpdater.java @@ -6,7 +6,6 @@ import java.util.function.Consumer; import org.opentripplanner.updater.spi.PollingGraphUpdater; import org.opentripplanner.updater.spi.UpdateResult; -import org.opentripplanner.updater.spi.WriteToGraphCallback; import org.opentripplanner.updater.trip.metrics.BatchTripUpdateMetrics; import org.opentripplanner.utils.tostring.ToStringBuilder; import org.slf4j.Logger; @@ -34,10 +33,6 @@ public class PollingTripUpdater extends PollingGraphUpdater { private final BackwardsDelayPropagationType backwardsDelayPropagationType; private final Consumer recordMetrics; - /** - * Parent update manager. Is used to execute graph writer runnables. - */ - private WriteToGraphCallback saveResultOnGraph; /** * Set only if we should attempt to match the trip_id from other data in TripDescriptor */ @@ -64,11 +59,6 @@ public PollingTripUpdater( ); } - @Override - public void setup(WriteToGraphCallback writeToGraphCallback) { - this.saveResultOnGraph = writeToGraphCallback; - } - /** * Repeatedly makes blocking calls to an UpdateStreamer to retrieve new stop time updates, and * applies those updates to the graph. @@ -90,7 +80,7 @@ public void runPolling() throws InterruptedException, ExecutionException { feedId, recordMetrics ); - processGraphUpdaterResult(saveResultOnGraph.execute(runnable)); + updateGraph(runnable); } } diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java index c65ff7bc4c1..29e820ff952 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingAvailabilityUpdater.java @@ -13,7 +13,6 @@ import org.opentripplanner.updater.RealTimeUpdateContext; import org.opentripplanner.updater.spi.DataSource; import org.opentripplanner.updater.spi.PollingGraphUpdater; -import org.opentripplanner.updater.spi.WriteToGraphCallback; import org.opentripplanner.utils.tostring.ToStringBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,8 +27,6 @@ public class VehicleParkingAvailabilityUpdater extends PollingGraphUpdater { VehicleParkingAvailabilityUpdater.class ); private final DataSource source; - private WriteToGraphCallback saveResultOnGraph; - private final VehicleParkingRepository repository; public VehicleParkingAvailabilityUpdater( @@ -44,18 +41,13 @@ public VehicleParkingAvailabilityUpdater( LOG.info("Creating vehicle-parking updater running every {}: {}", pollingPeriod(), source); } - @Override - public void setup(WriteToGraphCallback writeToGraphCallback) { - this.saveResultOnGraph = writeToGraphCallback; - } - @Override protected void runPolling() throws InterruptedException, ExecutionException { if (source.update()) { var updates = source.getUpdates(); var graphWriterRunnable = new AvailabilityUpdater(updates); - processGraphUpdaterResult(saveResultOnGraph.execute(graphWriterRunnable)); + updateGraph(graphWriterRunnable); } } diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java index ea356200f3b..5513e224f57 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_parking/VehicleParkingUpdater.java @@ -27,7 +27,6 @@ import org.opentripplanner.updater.RealTimeUpdateContext; import org.opentripplanner.updater.spi.DataSource; import org.opentripplanner.updater.spi.PollingGraphUpdater; -import org.opentripplanner.updater.spi.WriteToGraphCallback; import org.opentripplanner.utils.tostring.ToStringBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +42,6 @@ public class VehicleParkingUpdater extends PollingGraphUpdater { private final Map> tempEdgesByPark = new HashMap<>(); private final DataSource source; private final List oldVehicleParkings = new ArrayList<>(); - private WriteToGraphCallback saveResultOnGraph; private final VertexLinker linker; private final VehicleParkingRepository parkingRepository; @@ -64,11 +62,6 @@ public VehicleParkingUpdater( LOG.info("Creating vehicle-parking updater running every {}: {}", pollingPeriod(), source); } - @Override - public void setup(WriteToGraphCallback writeToGraphCallback) { - this.saveResultOnGraph = writeToGraphCallback; - } - @Override protected void runPolling() throws InterruptedException, ExecutionException { LOG.debug("Updating vehicle parkings from {}", source); @@ -82,7 +75,7 @@ protected void runPolling() throws InterruptedException, ExecutionException { VehicleParkingGraphWriterRunnable graphWriterRunnable = new VehicleParkingGraphWriterRunnable( vehicleParkings ); - processGraphUpdaterResult(saveResultOnGraph.execute(graphWriterRunnable)); + updateGraph(graphWriterRunnable); } private class VehicleParkingGraphWriterRunnable implements GraphWriterRunnable { diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java index c787989c5d2..f53d3010e59 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_position/PollingVehiclePositionUpdater.java @@ -8,7 +8,6 @@ import org.opentripplanner.service.realtimevehicles.model.RealtimeVehicle; import org.opentripplanner.standalone.config.routerconfig.updaters.VehiclePositionsUpdaterConfig; import org.opentripplanner.updater.spi.PollingGraphUpdater; -import org.opentripplanner.updater.spi.WriteToGraphCallback; import org.opentripplanner.utils.tostring.ToStringBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,10 +27,6 @@ public class PollingVehiclePositionUpdater extends PollingGraphUpdater { private final GtfsRealtimeHttpVehiclePositionSource vehiclePositionSource; private final Set vehiclePositionFeatures; - /** - * Parent update manager. Is used to execute graph writer runnables. - */ - private WriteToGraphCallback saveResultOnGraph; private final String feedId; private final RealtimeVehicleRepository realtimeVehicleRepository; private final boolean fuzzyTripMatching; @@ -55,11 +50,6 @@ public PollingVehiclePositionUpdater( ); } - @Override - public void setup(WriteToGraphCallback writeToGraphCallback) { - this.saveResultOnGraph = writeToGraphCallback; - } - /** * Repeatedly makes blocking calls to an UpdateStreamer to retrieve new stop time updates, and * applies those updates to the graph. @@ -78,7 +68,7 @@ public void runPolling() throws InterruptedException, ExecutionException { fuzzyTripMatching, updates ); - processGraphUpdaterResult(saveResultOnGraph.execute(runnable)); + updateGraph(runnable); } } diff --git a/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java b/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java index efcf9812feb..8419d004138 100644 --- a/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/vehicle_rental/VehicleRentalUpdater.java @@ -31,7 +31,6 @@ import org.opentripplanner.updater.RealTimeUpdateContext; import org.opentripplanner.updater.spi.PollingGraphUpdater; import org.opentripplanner.updater.spi.UpdaterConstructionException; -import org.opentripplanner.updater.spi.WriteToGraphCallback; import org.opentripplanner.updater.vehicle_rental.datasources.VehicleRentalDatasource; import org.opentripplanner.utils.lang.ObjectUtils; import org.opentripplanner.utils.logging.Throttle; @@ -54,8 +53,6 @@ public class VehicleRentalUpdater extends PollingGraphUpdater { private final VehicleRentalDatasource source; private final String nameForLogging; - private WriteToGraphCallback saveResultOnGraph; - private Map latestModifiedEdges = Map.of(); private Set latestAppliedGeofencingZones = Set.of(); private final Map verticesByStation = new HashMap<>(); @@ -109,11 +106,6 @@ public VehicleRentalUpdater( } } - @Override - public void setup(WriteToGraphCallback writeToGraphCallback) { - this.saveResultOnGraph = writeToGraphCallback; - } - @Override public String toString() { return ToStringBuilder.of(VehicleRentalUpdater.class).addObj("source", source).toString(); @@ -139,7 +131,7 @@ protected void runPolling() throws InterruptedException, ExecutionException { stations, geofencingZones ); - processGraphUpdaterResult(saveResultOnGraph.execute(graphWriterRunnable)); + updateGraph(graphWriterRunnable); } private class VehicleRentalGraphWriterRunnable implements GraphWriterRunnable { From 69ac4854678bb964715d4b7d143e457cc6b77dda Mon Sep 17 00:00:00 2001 From: Michael Tsang Date: Thu, 12 Dec 2024 12:50:17 +0000 Subject: [PATCH 5/8] add test for desired feature flag --- .../updater/spi/PollingGraphUpdaterTest.java | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 application/src/test/java/org/opentripplanner/updater/spi/PollingGraphUpdaterTest.java diff --git a/application/src/test/java/org/opentripplanner/updater/spi/PollingGraphUpdaterTest.java b/application/src/test/java/org/opentripplanner/updater/spi/PollingGraphUpdaterTest.java new file mode 100644 index 00000000000..6132988d92b --- /dev/null +++ b/application/src/test/java/org/opentripplanner/updater/spi/PollingGraphUpdaterTest.java @@ -0,0 +1,78 @@ +package org.opentripplanner.updater.spi; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opentripplanner.framework.application.OTPFeature; +import org.opentripplanner.updater.GraphWriterRunnable; + +public class PollingGraphUpdaterTest { + + private static final PollingGraphUpdaterParameters config = new PollingGraphUpdaterParameters() { + @Override + public Duration frequency() { + return Duration.ZERO; + } + + @Override + public String configRef() { + return ""; + } + }; + + private static final PollingGraphUpdater subject = new PollingGraphUpdater(config) { + @Override + protected void runPolling() {} + }; + + private boolean updateCompleted; + + @BeforeAll + static void beforeAll() { + subject.setup(runnable -> CompletableFuture.runAsync(() -> runnable.run(null))); + } + + @BeforeEach + void setUp() { + updateCompleted = false; + } + + private final GraphWriterRunnable graphWriterRunnable = context -> { + try { + Thread.sleep(100); + updateCompleted = true; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + + @Test + void testUpdateGraphWithWaitFeatureOn() { + OTPFeature.WaitForGraphUpdateInPollingUpdaters.testOn(() -> { + callUpdater(); + assertTrue(updateCompleted); + }); + } + + @Test + void testProcessGraphUpdaterResultWithWaitFeatureOff() { + OTPFeature.WaitForGraphUpdateInPollingUpdaters.testOff(() -> { + callUpdater(); + assertFalse(updateCompleted); + }); + } + + private void callUpdater() { + try { + subject.updateGraph(graphWriterRunnable); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} From 0037b5aab805b8eb719593387881fd242556744c Mon Sep 17 00:00:00 2001 From: Michael Tsang Date: Thu, 12 Dec 2024 12:55:27 +0000 Subject: [PATCH 6/8] implement feature flag --- .../opentripplanner/updater/spi/PollingGraphUpdater.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java b/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java index 0525406505e..e8faa4b6aba 100644 --- a/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java @@ -3,6 +3,7 @@ import java.time.Duration; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import org.opentripplanner.framework.application.OTPFeature; import org.opentripplanner.updater.GraphWriterRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,6 +115,9 @@ public final void setup(WriteToGraphCallback writeToGraphCallback) { protected final void updateGraph(GraphWriterRunnable task) throws ExecutionException, InterruptedException { - saveResultOnGraph.execute(task).get(); + var result = saveResultOnGraph.execute(task); + if (OTPFeature.WaitForGraphUpdateInPollingUpdaters.isOn()) { + result.get(); + } } } From 51080a8510956dcd40cd578a4ac1320fb1de01ce Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Tue, 7 Jan 2025 11:05:31 +0000 Subject: [PATCH 7/8] fix(deps): update dependency com.google.guava:guava to v33.4.0-jre --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index fbc65c1e53a..58af808d0e1 100644 --- a/pom.xml +++ b/pom.xml @@ -422,7 +422,7 @@ com.google.guava guava - 33.3.1-jre + 33.4.0-jre From 7163f6a9154ede5f384c91ceec450b403f82d4e6 Mon Sep 17 00:00:00 2001 From: OTP Changelog Bot Date: Thu, 9 Jan 2025 15:21:03 +0000 Subject: [PATCH 8/8] Add changelog entry for #6262 [ci skip] --- doc/user/Changelog.md | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/user/Changelog.md b/doc/user/Changelog.md index b859f87f19a..14a9f6d83ed 100644 --- a/doc/user/Changelog.md +++ b/doc/user/Changelog.md @@ -70,6 +70,7 @@ based on merged pull requests. Search GitHub issues and pull requests for smalle - Add fallback name for corridors [#6303](https://github.com/opentripplanner/OpenTripPlanner/pull/6303) - Implement SIRI Lite [#6284](https://github.com/opentripplanner/OpenTripPlanner/pull/6284) - Add a matcher API for filters in the transit service used for regularStop lookup [#6234](https://github.com/opentripplanner/OpenTripPlanner/pull/6234) +- Make all polling updaters wait for graph update finish [#6262](https://github.com/opentripplanner/OpenTripPlanner/pull/6262) [](AUTOMATIC_CHANGELOG_PLACEHOLDER_DO_NOT_REMOVE) ## 2.6.0 (2024-09-18)