Skip to content

Commit

Permalink
Merge pull request #573 from ibi-group/flex-merge-feeds
Browse files Browse the repository at this point in the history
Flex merge feeds
  • Loading branch information
miles-grant-ibigroup authored Feb 8, 2024
2 parents 78f0710 + ac17bc2 commit e77c9a1
Show file tree
Hide file tree
Showing 44 changed files with 790 additions and 11 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@
<dependency>
<groupId>com.github.ibi-group</groupId>
<artifactId>gtfs-lib</artifactId>
<!-- Latest dev build on jitpack.io -->
<version>5e24a64c4266c5cc9866eaa4b11a1231270a2498</version>
<version>2bd924c6e5f5244a279c9d94298c14c0b3d47b35</version>
<!-- Latest dev-flex build on jitpack.io -->
<!-- Exclusions added in order to silence SLF4J warnings about multiple bindings:
http://www.slf4j.org/codes.html#multiple_bindings
-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
import com.conveyal.datatools.manager.persistence.Persistence;
import com.conveyal.datatools.manager.utils.ErrorUtils;
import com.conveyal.gtfs.loader.Feed;
import com.conveyal.gtfs.loader.JdbcGtfsExporter;
import com.conveyal.gtfs.loader.Table;
import com.conveyal.gtfs.model.Location;
import com.conveyal.gtfs.model.LocationShape;
import com.conveyal.gtfs.model.StopTime;
import com.conveyal.gtfs.util.GeoJsonUtil;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
Expand All @@ -38,13 +42,17 @@
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import static com.conveyal.datatools.manager.jobs.feedmerge.MergeFeedsType.SERVICE_PERIOD;
import static com.conveyal.datatools.manager.jobs.feedmerge.MergeFeedsType.REGIONAL;
import static com.conveyal.datatools.manager.jobs.feedmerge.MergeStrategy.CHECK_STOP_TIMES;
import static com.conveyal.datatools.manager.models.FeedRetrievalMethod.REGIONAL_MERGE;
import static com.conveyal.datatools.manager.utils.MergeFeedUtils.*;
import static com.conveyal.datatools.manager.utils.MergeFeedUtils.getMergedVersion;
import static com.conveyal.datatools.manager.utils.MergeFeedUtils.stopTimesMatchSimplified;
import static com.conveyal.datatools.manager.utils.StringUtils.getCleanName;
import static com.conveyal.gtfs.loader.Table.LOCATION_GEO_JSON_FILE_NAME;

/**
* This job handles merging two or more feed versions according to logic specific to the specified merge type.
Expand Down Expand Up @@ -204,6 +212,8 @@ public void jobLogic() {
}
}

mergeLocations(out);

// Loop over GTFS tables and merge each feed one table at a time.
for (int i = 0; i < numberOfTables; i++) {
Table table = tablesToMerge.get(i);
Expand All @@ -225,7 +235,7 @@ public void jobLogic() {
status.fail(message, e);
} finally {
try {
feedMergeContext.close();
if (feedMergeContext != null) feedMergeContext.close();
} catch (IOException e) {
logAndReportToBugsnag(e, "Error closing FeedMergeContext object");
}
Expand All @@ -245,6 +255,39 @@ public void jobLogic() {
}
}

/**
* Merge locations.geojson files. These files are not compatible with the CSV merging strategy. Instead, the
* location.geojson file is flattened into locations and locations shapes. The location id is then updated with the
* scope id to keep feed locations unique, converted back into geojson and written to the zip file.
*
* Locations must be processed prior to other CSV files so the location ids are available for foreign reference
* checks.
*/
void mergeLocations(ZipOutputStream out) throws IOException {
Set<Location> mergedLocations = new HashSet<>();
Set<LocationShape> mergedLocationShapes = new HashSet<>();
for (FeedToMerge feed : feedMergeContext.feedsToMerge) {
ZipEntry locationGeoJsonFile = feed.zipFile.getEntry(LOCATION_GEO_JSON_FILE_NAME);
if (locationGeoJsonFile != null) {
String idScope = getCleanName(feed.version.parentFeedSource().name) + feed.version.version;
List<Location> locations = GeoJsonUtil.getLocationsFromGeoJson(feed.zipFile, locationGeoJsonFile, null);
for (Location location : locations) {
location.location_id = String.join(":", idScope, location.location_id);
mergedLocations.add(location);
feedMergeContext.locationIds.add(location.location_id);
}
List<LocationShape> locationShapes = GeoJsonUtil.getLocationShapesFromGeoJson(feed.zipFile, locationGeoJsonFile, null);
for (LocationShape locationShape : locationShapes) {
locationShape.location_id = String.join(":", idScope, locationShape.location_id);
mergedLocationShapes.add(locationShape);
}
}
}
if (!mergedLocations.isEmpty()) {
JdbcGtfsExporter.writeLocationsToFile(out, new ArrayList<>(mergedLocations), new ArrayList<>(mergedLocationShapes));
}
}

/**
* Obtains trip ids whose entries in the stop_times table differ between the active and future feed.
*/
Expand Down Expand Up @@ -281,6 +324,10 @@ private boolean shouldSkipTable(String tableName) {
LOG.warn("Skipping editor-only table {}.", tableName);
return true;
}
if (tableName.equals(Table.LOCATIONS.name) || tableName.equals(Table.LOCATION_SHAPES.name)) {
LOG.warn("{} detected. Skipping traditional merge in favour of bespoke merge.", LOCATION_GEO_JSON_FILE_NAME);
return true;
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.time.LocalDate;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

Expand All @@ -23,6 +24,8 @@ public class FeedMergeContext implements Closeable {
public final boolean tripIdsMatch;
public final LocalDate futureFirstCalendarStartDate;
public final Set<String> sharedTripIds;
public Set<String> locationIds = new HashSet<>();


public FeedMergeContext(Set<FeedVersion> feedVersions, Auth0UserProfile owner) throws IOException {
feedsToMerge = MergeFeedUtils.collectAndSortFeeds(feedVersions, owner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ public class MergeFeedsResult implements Serializable {
*/
public Set<String> calendarDatesServiceIds = new HashSet<>();

/**
* Track various table ids for resolving foreign references.
*/
public Set<String> stopIds = new HashSet<>();
public Set<String> stopAreaIds = new HashSet<>();

/**
* Track the set of route IDs to end up in the merged feed in order to determine which route_attributes
* records should be retained in the merged result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ public boolean iterateOverRows() throws IOException {

public void startNewRow() throws IOException {
keyValue = csvReader.get(keyFieldIndex);
setForeignReferenceKeyValues();
// Get the spec fields to export
List<Field> specFields = table.specFields();
// Filter the spec fields on the set of fields found in all feeds to be merged.
Expand All @@ -238,8 +239,24 @@ public void startNewRow() throws IOException {
}

/**
* Determine which reference table to use. If there is only one reference use this. If there are multiple references
* determine the context and then the correct reference table to use.
* Build a list of table key id values to be used in foreign key field look-ups.
*/
private void setForeignReferenceKeyValues() {
switch (table.name) {
case "stops":
mergeFeedsResult.stopIds.add(getIdWithScope(keyValue));
break;
case "stop_areas":
mergeFeedsResult.stopAreaIds.add(getIdWithScope(keyValue));
break;
default:
// nothing.
}
}

/**
* Determine which reference table to use. If there is only one reference use this. If there are multiple
* references, determine the context and then the correct reference table to use.
*/
private Table getReferenceTable(FieldContext fieldContext, Field field) {
if (field.referenceTables.size() == 1) {
Expand All @@ -255,8 +272,17 @@ private Table getReferenceTable(FieldContext fieldContext, Field field) {
getTableScopedValue(Table.CALENDAR_DATES, fieldContext.getValue())
);
case STOP_TIMES_STOP_ID_KEY:
return ReferenceTableDiscovery.getStopTimeStopIdReferenceTable(
fieldContext.getValueToWrite(),
mergeFeedsResult,
feedMergeContext.locationIds
);
case STOP_AREA_STOP_ID_KEY:
// Include other cases as multiple references are added e.g. flex!.
return ReferenceTableDiscovery.getStopAreaAreaIdReferenceTable(
fieldContext.getValueToWrite(),
mergeFeedsResult,
feedMergeContext.locationIds
);
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.conveyal.gtfs.loader.Field;
import com.conveyal.gtfs.loader.Table;

import java.util.Set;
import java.util.stream.Collectors;

import static com.conveyal.datatools.manager.jobs.feedmerge.MergeLineContext.SERVICE_ID;
Expand All @@ -11,6 +12,9 @@ public class ReferenceTableDiscovery {

public static final String REF_TABLE_SEPARATOR = "#~#";

/**
* Tables that have two or more foreign references.
*/
public enum ReferenceTableKey {

TRIP_SERVICE_ID_KEY(
Expand Down Expand Up @@ -106,4 +110,38 @@ public static Table getTripServiceIdReferenceTable(
}
return null;
}

/**
* Define the reference table for a stop area's area id. This will either be a stop or location.
*/
public static Table getStopAreaAreaIdReferenceTable(
String fieldValue,
MergeFeedsResult mergeFeedsResult,
Set<String> locationIds
) {
if (mergeFeedsResult.stopIds.contains(fieldValue)) {
return Table.STOPS;
} else if (locationIds.contains(fieldValue)) {
return Table.LOCATIONS;
}
return null;
}

/**
* Define the reference table for a stop time's stop id. This will either be a stop, location or stop area.
*/
public static Table getStopTimeStopIdReferenceTable(
String fieldValue,
MergeFeedsResult mergeFeedsResult,
Set<String> locationIds
) {
if (mergeFeedsResult.stopIds.contains(fieldValue)) {
return Table.STOPS;
} else if (locationIds.contains(fieldValue)) {
return Table.LOCATIONS;
} else if (mergeFeedsResult.stopAreaIds.contains(fieldValue)) {
return Table.STOP_AREAS;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -423,15 +423,14 @@ public void validateMobility(MonitorableJob.Status status) {
// Wait for the file to be entirely copied into the directory.
// 5 seconds + ~1 second per 10mb
Thread.sleep(5000 + (this.fileSize / 10000));
File gtfsZip = this.retrieveGtfsFile();
// Namespace based folders avoid clash for validation being run on multiple versions of a feed.
// TODO: do we know that there will always be a namespace?
String validatorOutputDirectory = "/tmp/datatools_gtfs/" + this.namespace + "/";

status.update("MobilityData Analysis...", 20);
// Set up MobilityData validator.
ValidationRunnerConfig.Builder builder = ValidationRunnerConfig.builder();
builder.setGtfsSource(gtfsZip.toURI());
builder.setGtfsSource(this.retrieveGtfsFile().toURI());
builder.setOutputDirectory(Path.of(validatorOutputDirectory));
ValidationRunnerConfig mbValidatorConfig = builder.build();

Expand All @@ -443,8 +442,10 @@ public void validateMobility(MonitorableJob.Status status) {
status.update("MobilityData Analysis...", 80);
// Read generated report and save to Mongo.
String json;
try (FileReader fr = new FileReader(validatorOutputDirectory + "report.json")) {
BufferedReader in = new BufferedReader(fr);
try (
FileReader fr = new FileReader(validatorOutputDirectory + "report.json");
BufferedReader in = new BufferedReader(fr)
) {
json = in.lines().collect(Collectors.joining(System.lineSeparator()));
}

Expand Down
Loading

0 comments on commit e77c9a1

Please sign in to comment.