From 28cb70a092ca2c038b657ddc404d666b1298a802 Mon Sep 17 00:00:00 2001 From: Trevor Gerhardt Date: Mon, 17 Feb 2025 16:56:37 +0100 Subject: [PATCH] Refactor results writers to support dual access grids The `MultiGridResultWriter` and `GridResultWriter` can be reused for dual access grids with minor changes. Regular accessibility grids use time cutoffs, dual accessibility grids use thresholds. This change mainly moves up the `channels` parameter to support cutoffs and thresholds (channels, cutoffs, thresholds...naming here could be confusing). This change additionally does some minor refactoring around passing the file name to `BaseResultWriter`'s constructor so that metadata doesn't need to be passed and stored until `finish()`. --- .../results/AccessCsvResultWriter.java | 7 +-- .../analysis/results/BaseResultWriter.java | 21 ++----- .../analysis/results/CsvResultWriter.java | 15 ++--- .../analysis/results/GridResultWriter.java | 49 ++++++++------- .../results/MultiGridResultWriter.java | 61 ++++++++----------- .../results/MultiOriginAssembler.java | 10 ++- .../analysis/results/PathCsvResultWriter.java | 7 +-- .../TemporalDensityCsvResultWriter.java | 7 +-- .../analysis/results/TimeCsvResultWriter.java | 8 +-- 9 files changed, 72 insertions(+), 113 deletions(-) diff --git a/src/main/java/com/conveyal/analysis/results/AccessCsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/AccessCsvResultWriter.java index e208ac827..8d569e410 100644 --- a/src/main/java/com/conveyal/analysis/results/AccessCsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/AccessCsvResultWriter.java @@ -11,12 +11,7 @@ public class AccessCsvResultWriter extends CsvResultWriter { public AccessCsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException { - super(task, fileStorage); - } - - @Override - public CsvResultType resultType () { - return CsvResultType.ACCESS; + super(task, CsvResultType.ACCESS, fileStorage); } @Override diff --git a/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java b/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java index 8bcf94d26..081717a2e 100644 --- a/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java @@ -1,6 +1,5 @@ package com.conveyal.analysis.results; -import com.conveyal.file.FileCategory; import com.conveyal.file.FileStorage; import com.conveyal.file.FileStorageKey; import com.conveyal.file.FileUtils; @@ -31,30 +30,20 @@ public abstract class BaseResultWriter { private static final Logger LOG = LoggerFactory.getLogger(BaseResultWriter.class); + public final String fileName; private final FileStorage fileStorage; - protected File bufferFile; + protected File bufferFile = FileUtils.createScratchFile(); - public BaseResultWriter (FileStorage fileStorage) { + public BaseResultWriter(String fileName, FileStorage fileStorage) { + this.fileName = fileName; this.fileStorage = fileStorage; } - // Can this be merged into the constructor? - protected void prepare (String jobId) { - try { - bufferFile = File.createTempFile(jobId + "_", ".results"); - // On unexpected server shutdown, these files should be deleted. - // We could attempt to recover from shutdowns but that will take a lot of changes and persisted data. - bufferFile.deleteOnExit(); - } catch (IOException e) { - LOG.error("Exception while creating buffer file for multi-origin assembler: " + e.toString()); - } - } - /** * Gzip the access grid and store it. */ - protected synchronized void finish (String fileName) throws IOException { + protected synchronized void finish() throws IOException { LOG.info("Compressing {} and moving into file storage.", fileName); FileStorageKey fileStorageKey = new FileStorageKey(RESULTS, fileName); File gzippedResultFile = FileUtils.createScratchFile(); diff --git a/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java index 1bd9ff9ac..1c3ea9313 100644 --- a/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java @@ -13,7 +13,6 @@ import java.io.IOException; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; /** @@ -25,7 +24,6 @@ public abstract class CsvResultWriter extends BaseResultWriter implements Region private static final Logger LOG = LoggerFactory.getLogger(CsvResultWriter.class); - public final String fileName; private final CsvWriter csvWriter; private int nDataColumns; @@ -39,7 +37,7 @@ public abstract class CsvResultWriter extends BaseResultWriter implements Region * Override to return an Enum (usable as String) identifying the kind of results recorded by this CSV writer. * This serves as a filename suffix to distinguish between different CSVs generated by a single regional analysis. */ - public abstract CsvResultType resultType (); + public final CsvResultType resultType; /** * Override to provide column names for this CSV writer. @@ -60,16 +58,15 @@ public abstract class CsvResultWriter extends BaseResultWriter implements Region * "origin", "destination", and the supplied indicator. * FIXME it's strange we're manually passing injectable components into objects not wired up at application construction. */ - CsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException { - super(fileStorage); + CsvResultWriter(RegionalTask task, CsvResultType resultType, FileStorage fileStorage) throws IOException { + super(task.jobId + "_" + resultType + ".csv", fileStorage); checkArgument(task.originPointSet != null, "CsvResultWriters require FreeFormPointSet origins."); - super.prepare(task.jobId); - this.fileName = task.jobId + "_" + resultType() +".csv"; BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(bufferFile)); csvWriter = new CsvWriter(bufferedWriter, ','); setDataColumns(columnHeaders()); + this.resultType = resultType; this.task = task; - LOG.info("Created CSV file to hold {} results for regional job {}", resultType(), task.jobId); + LOG.info("Created CSV file to hold {} results for regional job {}", resultType, task.jobId); } /** @@ -89,7 +86,7 @@ protected void setDataColumns(String... columns) throws IOException { @Override public synchronized void finish () throws IOException { csvWriter.close(); - super.finish(this.fileName); + super.finish(); } /** diff --git a/src/main/java/com/conveyal/analysis/results/GridResultWriter.java b/src/main/java/com/conveyal/analysis/results/GridResultWriter.java index 88b1a8c08..d94aad95b 100644 --- a/src/main/java/com/conveyal/analysis/results/GridResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/GridResultWriter.java @@ -2,7 +2,7 @@ import com.conveyal.file.FileStorage; import com.conveyal.r5.analyst.LittleEndianIntOutputStream; -import com.conveyal.r5.analyst.cluster.RegionalTask; +import com.conveyal.r5.analyst.WebMercatorExtents; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +41,7 @@ public class GridResultWriter extends BaseResultWriter { private static final Logger LOG = LoggerFactory.getLogger(GridResultWriter.class); - private RandomAccessFile randomAccessFile; + private final RandomAccessFile randomAccessFile; /** The version of the access grids we produce */ private static final int ACCESS_GRID_VERSION = 0; @@ -50,32 +50,31 @@ public class GridResultWriter extends BaseResultWriter { private static final long HEADER_LENGTH_BYTES = 9 * Integer.BYTES; /** - * The number of different travel time cutoffs being applied when computing accessibility for each origin. + * The number of different travel time cutoffs or dual access thresholds being applied when computing values for + * each origin. * The number of values stored per origin cell in an accessibility results grid. - * Note that we're storing only the number of different cutoffs, but not the cutoff values themselves in the file. - * This means that the files can only be properly interpreted with the Mongo metadata from the regional analysis. + * Note that we're storing only the number of different channels, but not the values themselves in the file. + * This means that the files can only be properly interpreted with the metadata from the regional analysis. * This is an intentional choice to avoid changing the file format, and in any case these files are not expected - * to ever be used separately from an environment where the Mongo database is available. + * to ever be used separately from an environment where the metadata is available. */ private final int channels; /** - * Construct an writer for a single regional analysis result grid, using the proprietary + * Construct a writer for a single regional analysis result grid, using the proprietary * Conveyal grid format. This also creates the on-disk scratch buffer into which the results * from the workers will be accumulated. */ - GridResultWriter (RegionalTask task, FileStorage fileStorage) { - super(fileStorage); - int width = task.width; - int height = task.height; - this.channels = task.cutoffsMinutes.length; + GridResultWriter(WebMercatorExtents ext, int channels, String fileName, FileStorage fileStorage) { + super(fileName, fileStorage); + long bodyBytes = (long) ext.width * ext.height * channels * Integer.BYTES; + this.channels = channels; LOG.info( "Expecting multi-origin results for grid with width {}, height {}, {} values per origin.", - width, - height, + ext.width, + ext.height, channels ); - super.prepare(task.jobId); try { // Write the access grid file header to the temporary file. @@ -83,11 +82,11 @@ public class GridResultWriter extends BaseResultWriter { LittleEndianIntOutputStream data = new LittleEndianIntOutputStream(fos); data.writeAscii("ACCESSGR"); data.writeInt(ACCESS_GRID_VERSION); - data.writeInt(task.zoom); - data.writeInt(task.west); - data.writeInt(task.north); - data.writeInt(width); - data.writeInt(height); + data.writeInt(ext.zoom); + data.writeInt(ext.west); + data.writeInt(ext.north); + data.writeInt(ext.width); + data.writeInt(ext.height); data.writeInt(channels); data.close(); @@ -100,8 +99,8 @@ public class GridResultWriter extends BaseResultWriter { // IO limits on cloud servers with network storage. Even without initialization, any complete regional analysis // would overwrite every byte in the file with a result for some origin point, so the initial values are only // important when visualizing or debugging partially completed analysis results. - this.randomAccessFile = new RandomAccessFile(bufferFile, "rw"); - randomAccessFile.setLength(HEADER_LENGTH_BYTES + (width * height * channels * Integer.BYTES)); + randomAccessFile = new RandomAccessFile(bufferFile, "rw"); + randomAccessFile.setLength(HEADER_LENGTH_BYTES + bodyBytes); LOG.info( "Created temporary file to accumulate results from workers, size is {}.", human(randomAccessFile.length(), "B") @@ -113,8 +112,8 @@ public class GridResultWriter extends BaseResultWriter { /** Gzip the access grid and upload it to file storage (such as AWS S3). */ @Override - protected synchronized void finish (String fileName) throws IOException { - super.finish(fileName); + protected synchronized void finish() throws IOException { + super.finish(); randomAccessFile.close(); } @@ -138,7 +137,7 @@ synchronized void writeOneOrigin (int taskNumber, int[] values) throws IOExcepti if (values.length != channels) { throw new IllegalArgumentException("Number of channels to be written does not match this writer."); } - long offset = HEADER_LENGTH_BYTES + (taskNumber * channels * Integer.BYTES); + long offset = HEADER_LENGTH_BYTES + ((long) taskNumber * channels * Integer.BYTES); // RandomAccessFile is not threadsafe and multiple threads may call this, so synchronize. // TODO why is the method also synchronized then? synchronized (this) { diff --git a/src/main/java/com/conveyal/analysis/results/MultiGridResultWriter.java b/src/main/java/com/conveyal/analysis/results/MultiGridResultWriter.java index 5f4d90f8a..8d2f8c271 100644 --- a/src/main/java/com/conveyal/analysis/results/MultiGridResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/MultiGridResultWriter.java @@ -2,6 +2,7 @@ import com.conveyal.analysis.models.RegionalAnalysis; import com.conveyal.file.FileStorage; +import com.conveyal.r5.analyst.WebMercatorExtents; import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; @@ -10,37 +11,34 @@ * same interface as our CSV writers, so CSV and Grids can be processed similarly in MultiOriginAssembler. */ public class MultiGridResultWriter implements RegionalResultWriter { - - private final RegionalAnalysis regionalAnalysis; - - private final RegionalTask task; - /** * We create one GridResultWriter for each destination pointset and percentile. - * Each of those output files contains data for all travel time cutoffs at each origin. + * Each of those output files contains data for all channels at each origin. */ - private final GridResultWriter[][] accessibilityGridWriters; - - /** The number of different percentiles for which we're calculating accessibility on the workers. */ - private final int nPercentiles; - - /** The number of destination pointsets to which we're calculating accessibility */ - private final int nDestinationPointSets; + private final GridResultWriter[][] gridResultWriters; /** Constructor */ public MultiGridResultWriter ( - RegionalAnalysis regionalAnalysis, RegionalTask task, FileStorage fileStorage + RegionalAnalysis regionalAnalysis, RegionalTask task, int channels, FileStorage fileStorage ) { - // We are storing the regional analysis just to get its pointset IDs (not keys) and its own ID. - this.regionalAnalysis = regionalAnalysis; - this.task = task; - this.nPercentiles = task.percentiles.length; - this.nDestinationPointSets = task.makeTauiSite ? 0 : task.destinationPointSetKeys.length; + int nPercentiles = task.percentiles.length; + int nDestinationPointSets = task.makeTauiSite ? 0 : task.destinationPointSetKeys.length; // Create one grid writer per percentile and destination pointset. - accessibilityGridWriters = new GridResultWriter[nDestinationPointSets][nPercentiles]; + gridResultWriters = new GridResultWriter[nDestinationPointSets][nPercentiles]; for (int d = 0; d < nDestinationPointSets; d++) { for (int p = 0; p < nPercentiles; p++) { - accessibilityGridWriters[d][p] = new GridResultWriter(task, fileStorage); + String fileName = String.format( + "%s_%s_P%d.access", + regionalAnalysis._id, + regionalAnalysis.destinationPointSetIds[d], + task.percentiles[p] + ); + gridResultWriters[d][p] = new GridResultWriter( + WebMercatorExtents.forTask(task), + channels, + fileName, + fileStorage + ); } } } @@ -51,20 +49,18 @@ public void writeOneWorkResult (RegionalWorkResult workResult) throws Exception // TODO more efficient way to write little-endian integers // TODO check monotonic increasing invariants here rather than in worker. // Infer x and y cell indexes based on the template task - int taskNumber = workResult.taskId; for (int d = 0; d < workResult.accessibilityValues.length; d++) { int[][] percentilesForGrid = workResult.accessibilityValues[d]; - for (int p = 0; p < nPercentiles; p++) { - int[] cutoffsForPercentile = percentilesForGrid[p]; - GridResultWriter gridWriter = accessibilityGridWriters[d][p]; - gridWriter.writeOneOrigin(taskNumber, cutoffsForPercentile); + for (int p = 0; p < percentilesForGrid.length; p++) { + GridResultWriter gridWriter = gridResultWriters[d][p]; + gridWriter.writeOneOrigin(workResult.taskId, percentilesForGrid[p]); } } } @Override public void terminate () throws Exception { - for (GridResultWriter[] writers : accessibilityGridWriters) { + for (GridResultWriter[] writers : gridResultWriters) { for (GridResultWriter writer : writers) { writer.terminate(); } @@ -73,14 +69,9 @@ public void terminate () throws Exception { @Override public void finish () throws Exception { - for (int d = 0; d < nDestinationPointSets; d++) { - for (int p = 0; p < nPercentiles; p++) { - int percentile = task.percentiles[p]; - String destinationPointSetId = regionalAnalysis.destinationPointSetIds[d]; - // TODO verify that regionalAnalysis._id is the same as job.jobId - String gridFileName = - String.format("%s_%s_P%d.access", regionalAnalysis._id, destinationPointSetId, percentile); - accessibilityGridWriters[d][p].finish(gridFileName); + for (GridResultWriter[] gridResultWriterRow : gridResultWriters) { + for (GridResultWriter resultWriter : gridResultWriterRow) { + resultWriter.finish(); } } } diff --git a/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java b/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java index dc2b0b150..f03675583 100644 --- a/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java +++ b/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java @@ -10,7 +10,6 @@ import com.conveyal.r5.analyst.cluster.PathResult; import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; -import com.conveyal.r5.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,7 +120,12 @@ public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileSto resultWriters.add(new AccessCsvResultWriter(job.templateTask, fileStorage)); } else { // Gridded origins - create gridded regional analysis results - resultWriters.add(new MultiGridResultWriter(regionalAnalysis, job.templateTask, fileStorage)); + resultWriters.add(new MultiGridResultWriter( + regionalAnalysis, + job.templateTask, + job.templateTask.cutoffsMinutes.length, + fileStorage + )); } } @@ -157,7 +161,7 @@ public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileSto // FIXME instanceof+cast is ugly, do this some other way or even record the Grids if (writer instanceof CsvResultWriter) { CsvResultWriter csvWriter = (CsvResultWriter) writer; - regionalAnalysis.resultStorage.put(csvWriter.resultType(), csvWriter.fileName); + regionalAnalysis.resultStorage.put(csvWriter.resultType, csvWriter.fileName); } } } catch (AnalysisServerException e) { diff --git a/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java index 6a7c9ffc7..114a7086d 100644 --- a/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java @@ -15,12 +15,7 @@ public class PathCsvResultWriter extends CsvResultWriter { public PathCsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException { - super(task, fileStorage); - } - - @Override - public CsvResultType resultType () { - return CsvResultType.PATHS; + super(task, CsvResultType.PATHS, fileStorage); } @Override diff --git a/src/main/java/com/conveyal/analysis/results/TemporalDensityCsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/TemporalDensityCsvResultWriter.java index 02a6168ad..d98a414fb 100644 --- a/src/main/java/com/conveyal/analysis/results/TemporalDensityCsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/TemporalDensityCsvResultWriter.java @@ -18,15 +18,10 @@ public class TemporalDensityCsvResultWriter extends CsvResultWriter { private final int dualThreshold; public TemporalDensityCsvResultWriter(RegionalTask task, FileStorage fileStorage) throws IOException { - super(task, fileStorage); + super(task, CsvResultType.TDENSITY, fileStorage); dualThreshold = task.dualAccessibilityThreshold; } - @Override - public CsvResultType resultType () { - return CsvResultType.TDENSITY; - } - @Override public String[] columnHeaders () { List headers = new ArrayList<>(); diff --git a/src/main/java/com/conveyal/analysis/results/TimeCsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/TimeCsvResultWriter.java index 144da7713..e92e94558 100644 --- a/src/main/java/com/conveyal/analysis/results/TimeCsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/TimeCsvResultWriter.java @@ -9,18 +9,12 @@ import java.util.ArrayList; import java.util.List; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; public class TimeCsvResultWriter extends CsvResultWriter { public TimeCsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException { - super(task, fileStorage); - } - - @Override - public CsvResultType resultType () { - return CsvResultType.TIMES; + super(task, CsvResultType.TIMES, fileStorage); } @Override