Skip to content

Commit

Permalink
Merge pull request #89 from aws/GSP
Browse files Browse the repository at this point in the history
Switch RDF exports to use Graph Store Protocol
  • Loading branch information
charlesivie authored Nov 15, 2023
2 parents 1ab85d2 + 89d2575 commit c2a7f43
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 5 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Amazon Neptune Export CHANGELOG

## Neptune Export v1.1.2 (Release Date: TDB):

### Bug Fixes:

### New Features and Improvements:

- Use Graph Store Protocol for complete RDF graph exports, improving performance for large exports.

## Neptune Export v1.1.1 (Release Date: November 3, 2023):

### Bug Fixes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ public enum FeatureToggle {
Simulate_Cloned_Cluster,
Keep_Rewritten_Files,
Infer_RDF_Prefixes,
No_GSP,
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public OutputWriter createOutputWriter(Supplier<Path> pathSupplier, KinesisConfi
File file = filePath.toFile();

return new FileToStreamOutputWriter(
new KinesisStreamPrintOutputWriter(file.getAbsolutePath(), new FileWriter(file)),
new KinesisStreamPrintOutputWriter(file.getAbsolutePath(), new BufferedWriter(new FileWriter(file))),
filePath,
kinesisConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void execute() throws Exception {
Timer.timedActivity("exporting RDF as " + targetConfig.format().description(),
(CheckedActivity.Runnable) () -> {
System.err.println("Creating statement files");
client.executeTupleQuery("SELECT * WHERE { GRAPH ?g { ?s ?p ?o } }", targetConfig);
client.executeCompleteExport(targetConfig);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.neptune.auth.NeptuneSigV4SignerException;
import com.amazonaws.services.neptune.cluster.ConnectionConfig;
import com.amazonaws.services.neptune.export.FeatureToggle;
import com.amazonaws.services.neptune.export.FeatureToggles;
import com.amazonaws.services.neptune.io.OutputWriter;
import com.amazonaws.services.neptune.rdf.io.NeptuneExportSparqlRepository;
import com.amazonaws.services.neptune.rdf.io.RdfTargetConfig;
import com.amazonaws.services.neptune.util.EnvironmentVariableUtils;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.eclipse.rdf4j.http.client.HttpClientSessionManager;
import org.eclipse.rdf4j.http.client.RDF4JProtocolSession;
import org.eclipse.rdf4j.http.client.SPARQLProtocolSession;
Expand All @@ -31,11 +34,15 @@
import org.eclipse.rdf4j.repository.sparql.SPARQLRepository;
import org.eclipse.rdf4j.rio.ParserConfig;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.eclipse.rdf4j.rio.RDFParser;
import org.eclipse.rdf4j.rio.RDFWriter;
import org.eclipse.rdf4j.rio.Rio;
import org.eclipse.rdf4j.rio.helpers.BasicParserSettings;
import org.joda.time.DateTime;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -65,7 +72,7 @@ public static NeptuneSparqlClient create(ConnectionConfig config, FeatureToggles
)
.peek(AbstractRepository::init)
.collect(Collectors.toList()),
featureToggles);
featureToggles, config);
}

private static SPARQLRepository updateParser(SPARQLRepository repository) {
Expand Down Expand Up @@ -106,10 +113,12 @@ private static String sparqlEndpoint(String endpoint, int port) {
private final List<SPARQLRepository> repositories;
private final Random random = new Random(DateTime.now().getMillis());
private final FeatureToggles featureToggles;
private final ConnectionConfig connectionConfig;

private NeptuneSparqlClient(List<SPARQLRepository> repositories, FeatureToggles featureToggles) {
private NeptuneSparqlClient(List<SPARQLRepository> repositories, FeatureToggles featureToggles, ConnectionConfig connectionConfig) {
this.repositories = repositories;
this.featureToggles = featureToggles;
this.connectionConfig = connectionConfig;
}

public void executeTupleQuery(String sparql, RdfTargetConfig targetConfig) throws IOException {
Expand Down Expand Up @@ -153,8 +162,35 @@ public void executeGraphQuery(String sparql, RdfTargetConfig targetConfig) throw
}
}

public void executeCompleteExport(RdfTargetConfig targetConfig) throws IOException {
if(featureToggles.containsFeature(FeatureToggle.No_GSP)) {
executeTupleQuery("SELECT * WHERE { GRAPH ?g { ?s ?p ?o } }", targetConfig);
} else {
HttpClient httpClient = chooseRepository().getHttpClient();
HttpUriRequest request = new HttpGet(getGSPEndpoint("default"));
request.addHeader("Accept", "application/n-quads");

private SPARQLRepository chooseRepository() {
org.apache.http.HttpResponse response = httpClient.execute(request);
InputStream responseBody = response.getEntity().getContent();

RDFParser rdfParser = Rio.createParser(RDFFormat.NQUADS);
OutputWriter outputWriter = targetConfig.createOutputWriter();
RDFWriter writer = targetConfig.createRDFWriter(outputWriter, new FeatureToggles(Collections.emptyList()));
rdfParser.setRDFHandler(writer);

try {
rdfParser.parse(responseBody);
}
catch (IOException e) {
throw e;
}
finally {
responseBody.close();
}
}
}

SPARQLRepository chooseRepository() {
return repositories.get(random.nextInt(repositories.size()));
}

Expand All @@ -163,4 +199,11 @@ public void close() {
repositories.forEach(AbstractRepository::shutDown);
}

private String getGSPEndpoint(String graphName) {
return String.format("https://%s:%s/sparql/gsp/?%s",
connectionConfig.endpoints().iterator().next(),
connectionConfig.port(),
graphName);
}

}

0 comments on commit c2a7f43

Please sign in to comment.