Skip to content

Commit

Permalink
Revert "Change vulnerability statement storage to include all stateme…
Browse files Browse the repository at this point in the history
…nts, even"

This reverts commit 736a80c.
  • Loading branch information
MagielBruntink committed Dec 12, 2022
1 parent 010ff53 commit eab1a55
Showing 1 changed file with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class ParserManager {
private static final Map<String, String> supportedForges =
Map.of("mvn", "pkg:maven", "PyPI", "pkg:pypi", "debian", "pkg:deb/debian");
private final int noParseWorkers = 5;
private volatile boolean isPatchFinderDone = false;

public ParserManager(JavaHttpClient client,
MongoDatabase mongoDatabase,
Expand Down Expand Up @@ -104,6 +106,21 @@ public void getVulnerabilitiesFromParsers(boolean updatesOnly, int numThreadsToU
}

final var vulnerabilities = parseVulnerabilities(updatesOnly);
logger.info("Filtering out vulnerabilities without a forge and storing them to the disk");
final var smtQueue = new ConcurrentLinkedQueue<>(vulnerabilities.stream().filter((v) -> !vulnerabilityForgeIsSupported(v)).collect(Collectors.toList()));

// For writing vuln. statements without blocking other threads
Thread smtWriter = new Thread(() -> {
while (true) {
if (!smtQueue.isEmpty()) {
storeStatement(smtQueue.remove());
}
else if (isPatchFinderDone) {
break;
}
}
});
smtWriter.start();

logger.info("Injecting information about Patches using " + numThreadsToUse + " threads");
var vulnerabilitiesWithForge = vulnerabilities.stream().filter(this::vulnerabilityForgeIsSupported).collect(Collectors.toList());
Expand All @@ -112,25 +129,22 @@ public void getVulnerabilitiesFromParsers(boolean updatesOnly, int numThreadsToU
List<Callable<Object>> patchFindingTasks = new ArrayList<>();
try(ProgressBar pb = new ProgressBar("InjectPatches", vulnerabilitiesWithForge.size())) {
for (var v: vulnerabilitiesWithForge) {
patchFindingTasks.add(() -> {parsePatchesAndPublish(v, pb);
patchFindingTasks.add(() -> {parsePatchesAndPublish(smtQueue, v, pb);
return null;
});
}
try {
executor.invokeAll(patchFindingTasks);
executor.shutdown();
isPatchFinderDone = true;
smtWriter.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

logger.info("Storing vulnerability statements to disk.");
for(var v : vulnerabilities) {
storeStatement(v);
}
}

private void parsePatchesAndPublish(Vulnerability v, ProgressBar pb) {
private void parsePatchesAndPublish(ConcurrentLinkedQueue<Vulnerability> smtQueue, Vulnerability v, ProgressBar pb) {
try {
var patchFinder = patchFinderFactory.getNewPatchFinder();
logger.info("Inferring purls for " + v.getId());
Expand All @@ -139,6 +153,7 @@ private void parsePatchesAndPublish(Vulnerability v, ProgressBar pb) {
patchFinder.parseReferences(v, nitriteController);
logger.info("Publishing " + v.getId() + " to Kafka");
emitKafkaMsg(v.toJson());
smtQueue.add(v);
pb.step();
} catch (Exception e) {
logger.error("Could NOT process " + v.getId() + "\n" + e);
Expand Down

0 comments on commit eab1a55

Please sign in to comment.