Skip to content

Commit

Permalink
Merge pull request #938 from conveyal/background-multi-grid
Browse files Browse the repository at this point in the history
Schedule creation of geotiff zip files in the background
  • Loading branch information
abyrd committed Apr 16, 2024
2 parents e30d765 + 91aa0e9 commit 471ffe2
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public List<HttpController> standardHttpControllers () {
new GtfsController(gtfsCache),
new BundleController(this),
new OpportunityDatasetController(fileStorage, taskScheduler, censusExtractor, database),
new RegionalAnalysisController(broker, fileStorage),
new RegionalAnalysisController(broker, fileStorage, taskScheduler),
new AggregationAreaController(fileStorage, database, taskScheduler),
// This broker controller registers at least one handler at URL paths beginning with /internal, which
// is exempted from authentication and authorization, but should be hidden from the world
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.conveyal.analysis.AnalysisServerException;
import com.conveyal.analysis.SelectingGridReducer;
import com.conveyal.analysis.UserPermissions;
import com.conveyal.analysis.components.TaskScheduler;
import com.conveyal.analysis.components.broker.Broker;
import com.conveyal.analysis.components.broker.JobStatus;
import com.conveyal.analysis.models.AnalysisRequest;
Expand All @@ -11,6 +12,7 @@
import com.conveyal.analysis.models.RegionalAnalysis;
import com.conveyal.analysis.persistence.Persistence;
import com.conveyal.analysis.results.CsvResultType;
import com.conveyal.analysis.util.HttpStatus;
import com.conveyal.analysis.util.JsonUtil;
import com.conveyal.file.FileStorage;
import com.conveyal.file.FileStorageFormat;
Expand All @@ -22,6 +24,7 @@
import com.conveyal.r5.analyst.PointSet;
import com.conveyal.r5.analyst.PointSetCache;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.r5.analyst.progress.Task;
import com.google.common.primitives.Ints;
import com.mongodb.QueryBuilder;
import gnu.trove.list.array.TIntArrayList;
Expand All @@ -36,6 +39,7 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
Expand All @@ -45,9 +49,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.zip.GZIPOutputStream;

import static com.conveyal.analysis.util.JsonUtil.toJson;
Expand All @@ -60,6 +67,7 @@
import static com.google.common.base.Preconditions.checkState;
import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON;
import static org.eclipse.jetty.http.MimeTypes.Type.TEXT_HTML;
import static org.eclipse.jetty.http.MimeTypes.Type.TEXT_PLAIN;

/**
* Spark HTTP handler methods that allow launching new regional analyses, as well as deleting them and fetching
Expand All @@ -80,10 +88,12 @@ public class RegionalAnalysisController implements HttpController {

private final Broker broker;
private final FileStorage fileStorage;
private final TaskScheduler taskScheduler;

public RegionalAnalysisController (Broker broker, FileStorage fileStorage) {
public RegionalAnalysisController (Broker broker, FileStorage fileStorage, TaskScheduler taskScheduler) {
this.broker = broker;
this.fileStorage = fileStorage;
this.taskScheduler = taskScheduler;
}

private Collection<RegionalAnalysis> getRegionalAnalysesForRegion(String regionId, UserPermissions userPermissions) {
Expand Down Expand Up @@ -254,8 +264,9 @@ private HumanKey getSingleCutoffGrid (
grid.writeGeotiff(fos);
break;
}

LOG.debug("Finished deriving single-cutoff grid {}. Transferring to storage.", singleCutoffKey);
fileStorage.moveIntoStorage(singleCutoffFileStorageKey, localFile);
LOG.debug("Finished transferring single-cutoff grid {} to storage.", singleCutoffKey);
}
String analysisHumanName = humanNameForEntity(analysis);
String destinationHumanName = humanNameForEntity(destinations);
Expand All @@ -266,6 +277,10 @@ private HumanKey getSingleCutoffGrid (
return new HumanKey(singleCutoffFileStorageKey, resultHumanFilename);
}

// Prevent multiple requests from creating the same files in parallel.
// This could potentially be integrated into FileStorage with enum return values or an additional boolean method.
private Set<String> filesBeingPrepared = Collections.synchronizedSet(new HashSet<>());

private Object getAllRegionalResults (Request req, Response res) throws IOException {
final String regionalAnalysisId = req.params("_id");
final UserPermissions userPermissions = UserPermissions.from(req);
Expand All @@ -277,39 +292,61 @@ private Object getAllRegionalResults (Request req, Response res) throws IOExcept
throw AnalysisServerException.badRequest("Batch result download only available for gridded origins.");
}
FileStorageKey zippedResultsKey = new FileStorageKey(RESULTS, analysis._id + "_ALL.zip");
if (!fileStorage.exists(zippedResultsKey)) {
// Iterate over all dest, cutoff, percentile combinations and generate one geotiff grid output for each one.
List<HumanKey> humanKeys = new ArrayList<>();
for (String destinationPointSetId : analysis.destinationPointSetIds) {
OpportunityDataset destinations = getDestinations(destinationPointSetId, userPermissions);
for (int cutoffMinutes : analysis.cutoffsMinutes) {
for (int percentile : analysis.travelTimePercentiles) {
HumanKey gridKey = getSingleCutoffGrid(
analysis, destinations, cutoffMinutes, percentile, FileStorageFormat.GEOTIFF
);
humanKeys.add(gridKey);
if (fileStorage.exists(zippedResultsKey)) {
res.type(APPLICATION_JSON.asString());
String analysisHumanName = humanNameForEntity(analysis);
return fileStorage.getJsonUrl(zippedResultsKey, analysisHumanName, "zip");
}
if (filesBeingPrepared.contains(zippedResultsKey.path)) {
res.type(TEXT_PLAIN.asString());
res.status(HttpStatus.ACCEPTED_202);
return "Geotiff zip is already being prepared in the background.";
}
// File did not exist. Create it in the background and ask caller to request it later.
filesBeingPrepared.add(zippedResultsKey.path);
Task task = Task.create("Zip all geotiffs for regional analysis " + analysis.name)
.forUser(userPermissions)
.withAction(progressListener -> {
int nSteps = analysis.destinationPointSetIds.length * analysis.cutoffsMinutes.length *
analysis.travelTimePercentiles.length * 2 + 1;
progressListener.beginTask("Creating and archiving geotiffs...", nSteps);
// Iterate over all dest, cutoff, percentile combinations and generate one geotiff for each combination.
List<HumanKey> humanKeys = new ArrayList<>();
for (String destinationPointSetId : analysis.destinationPointSetIds) {
OpportunityDataset destinations = getDestinations(destinationPointSetId, userPermissions);
for (int cutoffMinutes : analysis.cutoffsMinutes) {
for (int percentile : analysis.travelTimePercentiles) {
HumanKey gridKey = getSingleCutoffGrid(
analysis, destinations, cutoffMinutes, percentile, FileStorageFormat.GEOTIFF
);
humanKeys.add(gridKey);
progressListener.increment();
}
}
}
}
File tempZipFile = File.createTempFile("regional", ".zip");
// Zipfs can't open existing empty files, the file has to not exist. FIXME: Non-dangerous race condition
// Examining ZipFileSystemProvider reveals a "useTempFile" env parameter, but this is for the individual entries.
// May be better to just use zipOutputStream which would also allow gzip - zip CSV conversion.
tempZipFile.delete();
Map<String, String> env = Map.of("create", "true");
URI uri = URI.create("jar:file:" + tempZipFile.getAbsolutePath());
try (FileSystem zipFilesystem = FileSystems.newFileSystem(uri, env)) {
for (HumanKey key : humanKeys) {
Path storagePath = fileStorage.getFile(key.storageKey).toPath();
Path zipPath = zipFilesystem.getPath(key.humanName);
Files.copy(storagePath, zipPath, StandardCopyOption.REPLACE_EXISTING);
File tempZipFile = File.createTempFile("regional", ".zip");
// Zipfs can't open existing empty files, the file has to not exist. FIXME: Non-dangerous race condition
// Examining ZipFileSystemProvider reveals a "useTempFile" env parameter, but this is for the individual
// entries. May be better to just use zipOutputStream which would also allow gzip - zip CSV conversion.
tempZipFile.delete();
Map<String, String> env = Map.of("create", "true");
URI uri = URI.create("jar:file:" + tempZipFile.getAbsolutePath());
try (FileSystem zipFilesystem = FileSystems.newFileSystem(uri, env)) {
for (HumanKey key : humanKeys) {
Path storagePath = fileStorage.getFile(key.storageKey).toPath();
Path zipPath = zipFilesystem.getPath(key.humanName);
Files.copy(storagePath, zipPath, StandardCopyOption.REPLACE_EXISTING);
progressListener.increment();
}
}
}
fileStorage.moveIntoStorage(zippedResultsKey, tempZipFile);
}
res.type(APPLICATION_JSON.asString());
String analysisHumanName = humanNameForEntity(analysis);
return fileStorage.getJsonUrl(zippedResultsKey, analysisHumanName, "zip");
fileStorage.moveIntoStorage(zippedResultsKey, tempZipFile);
progressListener.increment();
filesBeingPrepared.remove(zippedResultsKey.path);
});
taskScheduler.enqueue(task);
res.type(TEXT_PLAIN.asString());
res.status(HttpStatus.ACCEPTED_202);
return "Building geotiff zip in background.";
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/conveyal/r5/analyst/progress/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ protected void bubbleUpProgress() {
}

/**
* Check that all necesary fields have been set before enqueueing for execution, and check any invariants.
* Check that all necessary fields have been set before enqueueing for execution, and check any invariants.
*/
public void validate () {
if (this.user == null) {
Expand Down

0 comments on commit 471ffe2

Please sign in to comment.