Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove MongoJack and consolidate MongoDB utils #837

Open
wants to merge 44 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
aa1b9cd
Remove MongoJack and consolidate utils
trevorgerhardt Nov 8, 2022
bd77ee5
Remove unnecessary `toString()`s
trevorgerhardt Nov 19, 2022
a929020
Retrieve each modification type manually
trevorgerhardt Nov 19, 2022
8f4a40c
Prefer Lists over arrays for MongoDB parsing
trevorgerhardt Nov 19, 2022
efb4ec5
Refactor to handle new MongoDB changes
trevorgerhardt Nov 19, 2022
f5c632f
Update MongoDB driver
trevorgerhardt Nov 19, 2022
de55f9f
Prefer `var`
trevorgerhardt Nov 19, 2022
6b3fd5e
Compare to first grid's zoom
trevorgerhardt Nov 19, 2022
44e2d47
Use native MongoDB Geometry type
trevorgerhardt Nov 19, 2022
325eea9
Simplify delete source set
trevorgerhardt Nov 20, 2022
06e7cb4
Additional GridResultWriter clean up
trevorgerhardt Nov 20, 2022
ea12ed3
Create a utility method to gzip a File
trevorgerhardt Nov 20, 2022
341f317
Add zero argument constructor back
trevorgerhardt Nov 20, 2022
b29a750
Clean up result writers
trevorgerhardt Nov 20, 2022
8d215b7
Pass components directly to `BundleController`
trevorgerhardt Nov 21, 2022
7546f20
Use BsonDiscriminators for modifications
trevorgerhardt Nov 21, 2022
71eedc0
Remove unnecessary Codecs
trevorgerhardt Nov 21, 2022
c4b9ca9
Update src/main/java/com/conveyal/analysis/datasource/derivation/Aggr…
trevorgerhardt Dec 7, 2022
486abb7
Update src/main/java/com/conveyal/analysis/models/BaseModel.java
trevorgerhardt Dec 7, 2022
d5c1620
Update src/main/java/com/conveyal/analysis/results/GridResultWriter.java
trevorgerhardt Dec 7, 2022
8e1a0d1
Update src/main/java/com/conveyal/analysis/controllers/RegionalAnalys…
trevorgerhardt Dec 7, 2022
7d9a45e
Update src/main/java/com/conveyal/analysis/controllers/RegionalAnalys…
trevorgerhardt Dec 7, 2022
d885ba6
Update src/main/java/com/conveyal/analysis/controllers/RegionalAnalys…
trevorgerhardt Dec 7, 2022
3f4bb12
Remove now unnecessary FIXME comment
trevorgerhardt Dec 7, 2022
5a3d49d
Merge branch 'remove-mongojack' of https://github.com/conveyal/r5 int…
trevorgerhardt Dec 7, 2022
9ac1659
Add accidentally removed `@Override`
trevorgerhardt Dec 7, 2022
1dfc172
Use the actual return type, instead of the generic `Object` type
trevorgerhardt Dec 7, 2022
432d961
Add `BsonDiscriminators` to `DecayFunction`s
trevorgerhardt Dec 8, 2022
b66a4be
Add synchronized back to terminate method
trevorgerhardt Dec 8, 2022
5187485
Revert comment auto-formatting
trevorgerhardt Dec 9, 2022
c2ae60b
Revert comment auto-formatting
trevorgerhardt Dec 9, 2022
423f05e
Revert comment auto-formatting
trevorgerhardt Dec 9, 2022
29ed72b
Convert `toStreetModeSet` to take a single `Set`
trevorgerhardt Dec 9, 2022
de9ed5f
Switch `AnalysisRequest` back to taking strings as modes
trevorgerhardt Dec 9, 2022
dd3e49a
Update getScenarioJsonUrl
trevorgerhardt Dec 9, 2022
1c4e3a6
Clean up EnumSet changes
trevorgerhardt Dec 9, 2022
7a0b1bf
Remove `gtfsCache` component dependency
trevorgerhardt Dec 26, 2022
7865c74
Implement TODO added in last commit
trevorgerhardt Dec 26, 2022
99e848d
Merge branch 'dev' into remove-mongojack
trevorgerhardt Dec 29, 2022
552b3d7
Throw a "not found error" when grid does not exist
trevorgerhardt Dec 30, 2022
b7eedd9
Fix tests
trevorgerhardt Dec 30, 2022
12d76ef
Merge branch 'dev' into remove-mongojack
trevorgerhardt Jan 25, 2023
378e8a1
Remove unused imports
trevorgerhardt Jan 25, 2023
02f9319
Merge branch 'dev' into remove-mongojack
trevorgerhardt Mar 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 1 addition & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,7 @@ dependencies {
}

// Database driver.
implementation 'org.mongodb:mongo-java-driver:3.11.0'

// Legacy system for storing Java objects, this functionality is now provided by the MongoDB driver itself.
implementation 'org.mongojack:mongojack:2.10.1'
Comment on lines -152 to -153
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👋

implementation 'org.mongodb:mongodb-driver-sync:4.7.2'

// JSON serialization and deserialization from and to Java objects
implementation 'com.fasterxml.jackson.core:jackson-core:2.10.3'
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/com/conveyal/analysis/BackendMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.conveyal.analysis.components.BackendComponents;
import com.conveyal.analysis.components.LocalBackendComponents;
import com.conveyal.analysis.persistence.Persistence;
import com.conveyal.r5.SoftwareVersion;
import com.conveyal.r5.analyst.PointSetCache;
import com.conveyal.r5.analyst.WorkerCategory;
Expand Down Expand Up @@ -44,7 +43,6 @@ private static void startServerInternal (BackendComponents components, TaskActio
// TODO migrate to non-static Components.
// TODO remove the static ApiMain abstraction layer. We do not use it anywhere but in handling GraphQL queries.
// TODO we could move this to something like BackendComponents.initialize()
Persistence.initializeStatically(components.config);
PointSetCache.initializeStatically(components.fileStorage);

// TODO handle this via components without explicit "if (offline)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,20 @@ public List<HttpController> standardHttpControllers () {
return Lists.newArrayList(
// These handlers are at paths beginning with /api
// and therefore subject to authentication and authorization.
new GtfsController(gtfsCache),
new BundleController(this),
new GtfsController(database, gtfsCache),
new BundleController(database, fileStorage, taskScheduler),
new OpportunityDatasetController(fileStorage, taskScheduler, censusExtractor, database),
new RegionalAnalysisController(broker, fileStorage),
new RegionalAnalysisController(broker, database, fileStorage),
new AggregationAreaController(fileStorage, database, taskScheduler),
// This broker controller registers at least one handler at URL paths beginning with /internal, which
// is exempted from authentication and authorization, but should be hidden from the world
// outside the cluster by the reverse proxy. Perhaps we should serve /internal on a separate
// port so they can't be accidentally exposed by the reverse proxy. It could even be a separate
// InternalHttpApi component with its own spark service, renaming this ExternalHttpApi.
new BrokerController(broker, eventBus),
new BrokerController(broker, database, eventBus),
new UserActivityController(taskScheduler),
new DataSourceController(fileStorage, database, taskScheduler, censusExtractor),
new WorkerProxyController(broker)
new WorkerProxyController(broker, database)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ public LocalBackendComponents () {
authentication = new LocalAuthentication();
// TODO add nested LocalWorkerComponents here, to reuse some components, and pass it into the LocalWorkerLauncher?
workerLauncher = new LocalWorkerLauncher(config, fileStorage, gtfsCache, osmCache);
broker = new Broker(config, fileStorage, eventBus, workerLauncher);
broker = new Broker(config, eventBus, workerLauncher);
censusExtractor = new SeamlessCensusGridExtractor(config);
// Instantiate the HttpControllers last, when all the components except the HttpApi are already created.
List<HttpController> httpControllers = standardHttpControllers();
httpControllers.add(new LocalFilesController(fileStorage));
httpApi = new HttpApi(fileStorage, authentication, eventBus, config, httpControllers);
// compute = new LocalCompute();
// persistence = persistence(local_Mongo)
eventBus.addHandlers(new ErrorLogger());
}

Expand Down
87 changes: 11 additions & 76 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,11 @@
import com.conveyal.analysis.components.eventbus.EventBus;
import com.conveyal.analysis.components.eventbus.RegionalAnalysisEvent;
import com.conveyal.analysis.components.eventbus.WorkerEvent;
import com.conveyal.analysis.models.RegionalAnalysis;
import com.conveyal.analysis.results.MultiOriginAssembler;
import com.conveyal.analysis.util.JsonUtil;
import com.conveyal.file.FileStorage;
import com.conveyal.file.FileStorageKey;
import com.conveyal.file.FileUtils;
import com.conveyal.r5.analyst.WorkerCategory;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.r5.analyst.cluster.RegionalWorkResult;
import com.conveyal.r5.analyst.cluster.WorkerStatus;
import com.conveyal.r5.analyst.scenario.Scenario;
import com.conveyal.r5.util.ExceptionUtils;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.MultimapBuilder;
Expand All @@ -27,8 +21,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -42,7 +34,6 @@
import static com.conveyal.analysis.components.eventbus.WorkerEvent.Action.REQUESTED;
import static com.conveyal.analysis.components.eventbus.WorkerEvent.Role.REGIONAL;
import static com.conveyal.analysis.components.eventbus.WorkerEvent.Role.SINGLE_POINT;
import static com.conveyal.file.FileCategory.BUNDLES;
import static com.google.common.base.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -93,7 +84,6 @@ public interface Config {
private Config config;

// Component Dependencies
private final FileStorage fileStorage;
private final EventBus eventBus;
private final WorkerLauncher workerLauncher;

Expand Down Expand Up @@ -143,9 +133,8 @@ public interface Config {
public TObjectLongMap<WorkerCategory> recentlyRequestedWorkers =
TCollections.synchronizedMap(new TObjectLongHashMap<>());

public Broker (Config config, FileStorage fileStorage, EventBus eventBus, WorkerLauncher workerLauncher) {
public Broker(Config config, EventBus eventBus, WorkerLauncher workerLauncher) {
this.config = config;
this.fileStorage = fileStorage;
this.eventBus = eventBus;
this.workerLauncher = workerLauncher;
}
Expand All @@ -154,89 +143,35 @@ public Broker (Config config, FileStorage fileStorage, EventBus eventBus, Worker
* Enqueue a set of tasks for a regional analysis.
* Only a single task is passed in, which the broker will expand into all the individual tasks for a regional job.
*/
public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAnalysis) {

// Make a copy of the regional task inside the RegionalAnalysis, replacing the scenario with a scenario ID.
RegionalTask templateTask = templateTaskFromRegionalAnalysis(regionalAnalysis);

LOG.info("Enqueuing tasks for job {} using template task.", templateTask.jobId);
if (findJob(templateTask.jobId) != null) {
LOG.error("Someone tried to enqueue job {} but it already exists.", templateTask.jobId);
throw new RuntimeException("Enqueued duplicate job " + templateTask.jobId);
public synchronized void enqueueTasksForRegionalJob(Job job, MultiOriginAssembler assembler) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any synchronization concerns we should keep in mind?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that I can think of here. This method was already synchronized.

// Once the assembler has been created, enqueue the job.
LOG.info("Enqueuing tasks for job {} using template task.", job.jobId);
if (findJob(job.jobId) != null) {
LOG.error("Someone tried to enqueue job {} but it already exists.", job.jobId);
throw new RuntimeException("Enqueued duplicate job " + job.jobId);
}
WorkerTags workerTags = WorkerTags.fromRegionalAnalysis(regionalAnalysis);
Job job = new Job(templateTask, workerTags);
jobs.put(job.workerCategory, job);

// Register the regional job so results received from multiple workers can be assembled into one file.
// TODO encapsulate MultiOriginAssemblers in a new Component
// Note: if this fails with an exception we'll have a job enqueued, possibly being processed, with no assembler.
// That is not catastrophic, but the user may need to recognize and delete the stalled regional job.
MultiOriginAssembler assembler = new MultiOriginAssembler(regionalAnalysis, job, fileStorage);
resultAssemblers.put(templateTask.jobId, assembler);
resultAssemblers.put(job.jobId, assembler);

if (config.testTaskRedelivery()) {
// This is a fake job for testing, don't confuse the worker startup code below with null graph ID.
return;
}

if (workerCatalog.noWorkersAvailable(job.workerCategory, config.offline())) {
createOnDemandWorkerInCategory(job.workerCategory, workerTags);
createOnDemandWorkerInCategory(job.workerCategory, job.workerTags);
} else {
// Workers exist in this category, clear out any record that we're waiting for one to start up.
recentlyRequestedWorkers.remove(job.workerCategory);
}
eventBus.send(new RegionalAnalysisEvent(templateTask.jobId, STARTED).forUser(workerTags.user, workerTags.group));
}

/**
* The single RegionalTask object represents a lot of individual accessibility tasks at many different origin
* points, typically on a grid. Before passing that RegionalTask on to the Broker (which distributes tasks to
* workers and tracks progress), we remove the details of the scenario, substituting the scenario's unique ID
* to save time and bandwidth. This avoids repeatedly sending the scenario details to the worker in every task,
* as they are often quite voluminous. The workers will fetch the scenario once from S3 and cache it based on
* its ID only. We protectively clone this task because we're going to null out its scenario field, and don't
* want to affect the original object which contains all the scenario details.
* TODO Why is all this detail added after the Persistence call?
* We don't want to store all the details added below in Mongo?
*/
private RegionalTask templateTaskFromRegionalAnalysis (RegionalAnalysis regionalAnalysis) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RegionalTask templateTask = regionalAnalysis.request.clone();
// First replace the inline scenario with a scenario ID, storing the scenario for retrieval by workers.
Scenario scenario = templateTask.scenario;
templateTask.scenarioId = scenario.id;
// Null out the scenario in the template task, avoiding repeated serialization to the workers as massive JSON.
templateTask.scenario = null;
String fileName = String.format("%s_%s.json", regionalAnalysis.bundleId, scenario.id);
FileStorageKey fileStorageKey = new FileStorageKey(BUNDLES, fileName);
try {
File localScenario = FileUtils.createScratchFile("json");
JsonUtil.objectMapper.writeValue(localScenario, scenario);
// FIXME this is using a network service in a method called from a synchronized broker method.
// Move file into storage before entering the synchronized block.
fileStorage.moveIntoStorage(fileStorageKey, localScenario);
} catch (IOException e) {
LOG.error("Error storing scenario for retrieval by workers.", e);
}
// Fill in all the fields in the template task that will remain the same across all tasks in a job.
// I am not sure why we are re-setting all these fields, it seems like they are already set when the task is
// initialized by AnalysisRequest.populateTask. But we'd want to thoroughly check that assumption before
// eliminating or moving these lines.
templateTask.jobId = regionalAnalysis._id;
templateTask.graphId = regionalAnalysis.bundleId;
templateTask.workerVersion = regionalAnalysis.workerVersion;
templateTask.height = regionalAnalysis.height;
templateTask.width = regionalAnalysis.width;
templateTask.north = regionalAnalysis.north;
templateTask.west = regionalAnalysis.west;
templateTask.zoom = regionalAnalysis.zoom;
return templateTask;
eventBus.send(new RegionalAnalysisEvent(job.jobId, STARTED).forUser(job.workerTags.user, job.workerTags.group));
}

/**
* Create on-demand worker for a given job.
*/
public void createOnDemandWorkerInCategory(WorkerCategory category, WorkerTags workerTags){
public void createOnDemandWorkerInCategory(WorkerCategory category, WorkerTags workerTags) {
createWorkersInCategory(category, workerTags, 1, 0);
}

Expand Down
47 changes: 33 additions & 14 deletions src/main/java/com/conveyal/analysis/components/broker/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,23 +128,42 @@ private RegionalTask makeOneTask (int taskNumber) {
*/
public final Set<String> errors = new HashSet();

public Job (RegionalTask templateTask, WorkerTags workerTags) {
this.jobId = templateTask.jobId;
this.templateTask = templateTask;
this.workerCategory = new WorkerCategory(templateTask.graphId, templateTask.workerVersion);
this.nTasksCompleted = 0;
this.nextTaskToDeliver = 0;

if (templateTask.originPointSetKey != null) {
checkNotNull(templateTask.originPointSet);
this.nTasksTotal = templateTask.originPointSet.featureCount();
public Job(RegionalTask task, WorkerTags workerTags) {
templateTask = templateTaskFromRegionalTask(task);
jobId = templateTask.jobId;
workerCategory = new WorkerCategory(templateTask.graphId, templateTask.workerVersion);
nTasksCompleted = 0;
nextTaskToDeliver = 0;
nTasksTotal = getTasksTotal(templateTask);
completedTasks = new BitSet(nTasksTotal);
this.workerTags = workerTags;
}

public static int getTasksTotal(RegionalTask task) {
if (task.originPointSetKey != null) {
checkNotNull(task.originPointSet);
return task.originPointSet.featureCount();
} else {
this.nTasksTotal = templateTask.width * templateTask.height;
return task.width * task.height;
}
}

this.completedTasks = new BitSet(nTasksTotal);
this.workerTags = workerTags;

/**
* The single RegionalTask object represents a lot of individual accessibility tasks at many different origin
* points, typically on a grid. Before passing that RegionalTask on to the Broker (which distributes tasks to
* workers and tracks progress), we remove the details of the scenario, substituting the scenario's unique ID
* to save time and bandwidth. This avoids repeatedly sending the scenario details to the worker in every task,
* as they are often quite voluminous. The workers will fetch the scenario once from S3 and cache it based on
* its ID only. We protectively clone this task because we're going to null out its scenario field, and don't
* want to affect the original object which contains all the scenario details.
*/
private static RegionalTask templateTaskFromRegionalTask(RegionalTask task) {
RegionalTask templateTask = task.clone();
// First replace the inline scenario with a scenario ID, storing the scenario for retrieval by workers.
templateTask.scenarioId = templateTask.scenario.id;
// Null out the scenario in the template task, avoiding repeated serialization to the workers as massive JSON.
templateTask.scenario = null;
return templateTask;
}

public boolean markTaskCompleted(int taskId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package com.conveyal.analysis.components.broker;

import com.conveyal.analysis.UserPermissions;
import com.conveyal.analysis.components.BackendComponents;
import com.conveyal.analysis.components.LocalBackendComponents;
import com.conveyal.analysis.models.RegionalAnalysis;
import com.conveyal.analysis.results.MultiOriginAssembler;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.UUID;

/**
Expand Down Expand Up @@ -64,9 +67,11 @@ private static void sendFakeJob(Broker broker) {
templateTask.height = 1;
templateTask.width = N_TASKS_PER_JOB;
templateTask.scenarioId = "FAKE";
RegionalAnalysis regionalAnalysis = new RegionalAnalysis();
RegionalAnalysis regionalAnalysis = new RegionalAnalysis(new UserPermissions("[email protected]", false, "testing"), "test");
regionalAnalysis.request = templateTask;
broker.enqueueTasksForRegionalJob(regionalAnalysis);
var job = new Job(templateTask, WorkerTags.fromRegionalAnalysis(regionalAnalysis));
var assembler = new MultiOriginAssembler(job, new ArrayList<>());
Comment on lines +72 to +73
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update our style guide re: use of var keyword?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Although I haven't looked at our style guide in ages. My short opinion / addition to the style guide is: prefer using var for all non-primitive types except in cases where we want to explicitly show a type.

In the examples above, writing the types instead of var is redundant information on the same line: Job, and MultiOriginAssembler are already there.

A case where we might want to be more explicit, is when a method returns a value and we want to distinguish that type in relation to neighboring types.

broker.enqueueTasksForRegionalJob(job, assembler);
}

public static String compactUUID() {
Expand Down