Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Task running order in pipeline #181

Closed
jrs65 opened this issue Jul 2, 2021 · 11 comments
Closed

Task running order in pipeline #181

jrs65 opened this issue Jul 2, 2021 · 11 comments
Assignees
Labels
enhancement priority/medium This issue is fairly important and should be addressed

Comments

@jrs65
Copy link
Contributor

jrs65 commented Jul 2, 2021

I've been finding and thinking about a few issues with the task running order within caput.pipeline, and I figured I would start a discussion about potential fixes or mitigations.

Products kept around longer than needed

First let's think about an example config:

tasks:
  - type: MakeStream
    out: stream

  - type: FilterA
    in: stream
    out: filtered_stream

 - type: FilterB
    in: stream
    out: filtered_stream

  - type: MakeMap
    in: filtered_stream

In this the initial stream is generated, each filter task is run which adds two entries to the filtered_stream queue and then finally the MakeMap tasks consumes them. While all of this is technically correct it has kept the filtered_stream entries around for longer than was necessary as each task is run in turn. In theory MakeMap could have run for the first time immediately after FilterA which would allowed it to have dropped the first item in filtered_stream. This may seem like a small issue, but we are often memory constrained and so running tasks in an order which uses more memory than required is an issue.

In the example above this could be fixed by the user by changing the task order in the file to be: MakeStream, FilterA, MakeMap, FilterB; but although this would work above, if we added a FilterC there would be no good place to place the MakeMap task.

I can see a few potential changes that would help here, all of which change add a concept of priority to the task running order which would mean they run ahead of other tasks provided their requirements are satisfied (i.e. they become greedy):

  1. prioritise tasks that have the most items in their queue already. This wouldn't help the problem above, but would definitely mean you couldn't construct an even worse version by adding more FilterX steps.
  2. analyse the task graph and prioritise tasks with more connections to the input queue. This might be hard to do correctly, and would be tricky if there are different input queues. In the example above the pipeline would realise that multiple tasks produce input for MakeMap and would thus prioritise running MakeMap.
  3. add a priority option to the task that allows the config writer to give specific tasks higher priority. In the example above the user would specify a higher priority for MakeMap which would mean that it is run ahead of other tasks whenever is has work to do.

Pairing up/produce output more often than you get input

Lets say I want to stack catalog sources on a set of maps. I might have something like this:

tasks:
  - type: LoadCatalogs
    out: catalog
    params:
      files:
      - catalogA
      - catalogB
    
  - type: LoadMaps
    out: map
    params:
      files:
      - map1
      - map2
      - map3

  - type: Stack
    in: [catalog, map]

what I want this to do is to stack all pairs of catalogs and maps, but what it will do is to stack (catalogA, map1), (catalogB, map2), and then stop because it never gets anything to put with map3. In some sense this is fair, there is no way the pipeline could know that is what I want it to do. I could attempt to write a task that sites before Stack that does the pairing for me, but it turns out that it is impossible to even write a task that would allow this pairing up. There's two reasons you can't do this:

  1. Tasks that accept input can only produce one piece of output for every input they get. There is a slight workaround in that they can take a single input as a requires key and produce an unlimited amount of output but that is violated in the above example. There's no way to make the above work with this restriction, there are five total inputs and six possible pairs.
  2. Tasks always pop items from their input queues at the same rate. This means there aren't any good ways of efficiently threading the above items.

Other issues

I have a few other problems, so I'll try and add them here later on

@jrs65
Copy link
Contributor Author

jrs65 commented Aug 27, 2021

Here's another one that is a bit more pernicious.

The ordering of tasks can do very strange things to the way in which things are run. Here's a non-minimal example that goes crazy. You would think that this task would feed each generated mockcatalog into the input of RingMapBeamform however, it actually generates all mock catalogs before starting to feed them into that task.

  tasks:
    - type: draco.core.io.LoadProductManager
      out: manager
      params:
        product_directory: /project/rpp-chime/chime/beam_transfers/chime_4cyl_585-800_I_XX-YY_{anbeam}/

    - type: draco.core.io.LoadFITSCatalog
      out: cat_for_selfunc
      params:
        catalogs:
          - tag: "rcat"
            files:
              - "/project/rpp-chime/chime/catalogs/eBOSS_{tracer}_clustering_random-{field}-vDR16.fits"
        freq_range: [585.0, 800.0]

    - type: draco.synthesis.mockcatalog.SelectionFunctionEstimator
      in: cat_for_selfunc
      out: selfunc
      params:
        save: false
        output_name: selfunc.h5

    - type: draco.core.io.LoadBasicCont
      out: source_map
      params:
        files:
          - /project/rpp-chime/chime/stacking/sims/lss/{lss}/maps/map_{tracer}_log.h5

    - type: draco.synthesis.mockcatalog.ResizeSelectionFunctionMap
      in: [selfunc, source_map]
      out: resized_selfunc
      params:
        smooth: true
        save: false
        output_name: resized_selfunc.h5

    - type: draco.synthesis.mockcatalog.PdfGeneratorWithSelectionFunction
      in: [source_map, resized_selfunc]
      out: pdf_map
      params:
        tracer: {tracer}
        save: false

    - type: draco.synthesis.mockcatalog.MockCatalogGenerator
      requires: pdf_map
      out: mock_cat
      params:
        nsource: *{tracer}_{field}
        tag: "{{count:04d}}"
        ncat: 10000

    - type: draco.synthesis.mockcatalog.AddEBOSSZErrorsToCatalog
      in: mock_cat
      out: mock_cat_zerror
      params:
        save: false
        output_name: "mockcatalog_{{tag}}.h5"

    - type: draco.core.io.LoadFilesFromParams
      out: ringmap
      params:
        files:
            - "/project/rpp-chime/chime/stacking/sims/analysis_ringmaps/{simbeam}/{anbeam}/{ss}_{lss}/dcringmap_{state}filter.h5"
        distributed: true

    - type: draco.core.io.LoadFilesFromParams
      out: mask
      params:
        files:
          - "/project/rpp-chime/chime/stacking/data/mask_rad_03_from_ringmap_intercyl_dayenu_relaxed_deconvolve_psbeam_fullstack.h5"
        distributed: true

    - type: draco.analysis.flagging.MaskBeamformedOutliers
      in: [ringmap, mask]
      out: ringmap_masked

    - type: draco.analysis.beamform.RingMapBeamForm
      requires: [manager, ringmap_masked]
      in: mock_cat_zerror
      out: formed_beam
      params:
        save: false
        output_name: "formedbeam_filtered_{{tag}}.h5"

    - type: draco.analysis.sourcestack.SourceStack
      in: formed_beam
      out: stack
      params:
        freqside: 50

If the tasks for loading the maps, mask and applying it are moved up towards the top of the config it works fine.

  tasks:                                                                                                                                                                                                                                                                                                                                                         
    - type: draco.core.io.LoadProductManager
      out: manager
      params:
        product_directory: /project/rpp-chime/chime/beam_transfers/chime_4cyl_585-800_I_XX-YY_psbeam/

    - type: draco.core.io.LoadFilesFromParams
      out: ringmap
      params:
        files:
            - "/project/rpp-chime/chime/stacking/sims/analysis_ringmaps/psbeam/psbeam/dataweight_compderiv-00-FoG/dcringmap_postfilter.h5"
        distributed: true

    - type: draco.core.io.LoadFilesFromParams
      out: mask
      params:
        files:
          - "/project/rpp-chime/chime/stacking/data/mask_rad_03_from_ringmap_intercyl_dayenu_relaxed_deconvolve_psbeam_fullstack.h5"
        distributed: true

    - type: draco.analysis.flagging.MaskBeamformedOutliers
      in: [ringmap, mask]
      out: ringmap_masked

    - type: draco.core.io.LoadFITSCatalog
      out: cat_for_selfunc
      params:
        catalogs:
          - tag: "rcat"
            files:
              - "/project/rpp-chime/chime/catalogs/eBOSS_QSO_clustering_random-NGC-vDR16.fits"
        freq_range: [585.0, 800.0]

    - type: draco.synthesis.mockcatalog.SelectionFunctionEstimator
      in: cat_for_selfunc
      out: selfunc
      params:
        save: false
        output_name: selfunc.h5

    - type: draco.core.io.LoadBasicCont
      out: source_map
      params:
        files:
          - /project/rpp-chime/chime/stacking/sims/lss/compderiv-00-FoG/maps/map_QSO_log.h5

    - type: draco.synthesis.mockcatalog.ResizeSelectionFunctionMap
      in: [selfunc, source_map]
      out: resized_selfunc
      params:
        smooth: true
        save: false
        output_name: resized_selfunc.h5

    - type: draco.synthesis.mockcatalog.PdfGeneratorWithSelectionFunction
      in: [source_map, resized_selfunc]
      out: pdf_map
      params:
        tracer: QSO
        save: false

    - type: draco.synthesis.mockcatalog.MockCatalogGenerator
      requires: pdf_map
      out: mock_cat
      params:
        nsource: *QSO_NGC
        tag: "{count:04d}"
        ncat: 10

    - type: draco.synthesis.mockcatalog.AddEBOSSZErrorsToCatalog
      in: mock_cat
      out: mock_cat_zerror
      params:
        save: true
        output_name: "mockcatalog_{tag}.h5"

    - type: draco.analysis.beamform.RingMapBeamForm
      requires: [manager, ringmap_masked]
      in: mock_cat_zerror
      out: formed_beam
      params:
        save: false
        output_name: "formedbeam_filtered_{tag}.h5"

    - type: draco.analysis.sourcestack.SourceStack
      in: formed_beam
      out: stack
      params:
        freqside: 50

The reason this happens is a little obscure. When a task reports that it has no input data with which to do anything (i.e. raises _PipelineMissingData) the runner, rather than simply skipping this task and attempting to run any following ones, break's out of the loop and starts back at the beginning of the list (

break
). In the case above it runs through a cycle until it hits MaskBeamformedOutliers, at that point it knows it currently doesn't have any data it can run on, so it raises the _PipelineMissingData` exception, and that causes it not to advance to the next task (which I think would be the right choice), but to skip straight back to the start of the list.

I think there are a few potential resolutions here:

  • Change the break to a continue. This will probably change the order in which current pipelines are evaluated, but shouldn't really break anything (I think). A little risky though.
  • Evaluate which data keys can no longer be generated and remove tasks that will no longer get input. This requires some smarter logic in the code to track which tasks produce output into which queues, and then remove tasks who have input coming from tasks which can no longer generate output.

I think @kiyo-masui is the only person out there who is likely to understand the ins and outs of this one. I think the first one is an easy fix, but I fear it may break things in ways I am not foreseeing.

@kiyo-masui
Copy link
Contributor

In many ways its a miracle that we made it this far (7 years by my count) before my convoluted dependency resolution pipeliner stopped meeting our needs!

I've given these a single read, but need to dive deeper before I can comment. Will get back to you.

@jrs65
Copy link
Contributor Author

jrs65 commented Aug 27, 2021

Awesome. Thanks for looking through it Kiyo. I'll try and reduce the configs down to something which makes the problem more manifest. I just wanted to dump something in there before I forgot how to reproduce the issue!

@jrs65
Copy link
Contributor Author

jrs65 commented Aug 28, 2021

I've been trying to think what to do here, I think beyond changing the break to continue there are a few simple tweaks we could do to improve this one. Notably I would not round-robin through the tasks to run, but choose the next one to run based on a priority system. I suggest sorting on four criteria lexicographically. They are (from most important to least important):

  • Number of items currently in a tasks input queue. This would be the minimum number of items across all input queues for a task.
  • Hunger factor, i.e. how many items will a task eat. This is the difference between the number of input queues and the number of output queues, e.g. a task that purely consumes items from a single queue and doesn't produce anything would get +1. A pure producer gets a -1.
  • Task run position. The manager increments a counter everytime it runs something. Each task gets assigned this counter when it runs. Lower numbers are higher priority (e.g. you get higher priority the longer ago it was you were run).
  • Position in task list. I think this one would only be relevant before all tasks have been run once (as the previous criteria would have no ties as soon as a task has run).

@kiyo-masui
Copy link
Contributor

Few things:

First, just for reference, here is the link to the docs describing the current rules: https://caput.readthedocs.io/en/latest/_autosummary/caput.pipeline.html#execution-order

Next, I want to assure you that the current execution ordering rules where developed for ease and simplicity, rather than optimality. I explicitly don't do any inspection of the input and output queues. The only information the current logic uses is the pipeline order, and the "state" of each task (is it at the setup, next, or finish stage). I think making some sort of priority system would be great... I didn't do it just because it was too complicated for the use cases envisaged at the time.

I think my choice to use break over continue was just a preference and I can't think of anything that would break if they were switched (although it's certainly possible something would). I had envisaged that dependencies would be generated higher in the task list than they were consumed, so the break made sense.

In fact, with the current logic it is a requirement that dependencies are generated further up in the task list than they are consumed. In the example you provided where you suggested the work around would be to move FilterB to below MakeMap would not work, since MakeMap would be forced to finish before all of its input was generated. The pipeline is not smart enough to know that there is still an opportunity to generate input below (even though that information is apparent from the config).

In any case, if you make a change you should build in a legacy mode for execution order. The conservative thing would be to keep the current order as the default and only change the behaviour if a certain key appears in the pipeline section of the config. If you like to live on the edge, change the default behaviour and revert to the current one only if execution_order: legacy is present.

@jrs65
Copy link
Contributor Author

jrs65 commented Aug 30, 2021

In any case, if you make a change you should build in a legacy mode for execution order. The conservative thing would be to keep the current order as the default and only change the behaviour if a certain key appears in the pipeline section of the config. If you like to live on the edge, change the default behaviour and revert to the current one only if execution_order: legacy is present.

Yeah, definitely agree with this. I'm not in a huge rush to make changes here, but I think we could make some useful improvements to it.

Do you think that changing break -> continue is safe enough to do without the legacy mode?

@kiyo-masui
Copy link
Contributor

I think that is as dangerous as any other change you might do, so you might as well build in the legacy mode from the start. (Luckily its a single if statement in this case... unlike the priority system which might involve substantial refactoring).

@jrs65
Copy link
Contributor Author

jrs65 commented Aug 31, 2021

That's fair. So I guess the first thing to do is to abstract the current ordering code to allow that change to be done and have an optional legacy mode which retains the break behaviour.

@anjakefala
Copy link
Contributor

Something we ran into for this bug: radiocosmology/draco#133
and this config:

     # Measure the observed fluxes of the point sources in the catalogs                                                                                                              
     - type: draco.analysis.beamform.BeamFormCat                                                                                                                                     
       requires: [manager, sstream_inter]                                                                                                                                            
       in: source_catalog                                                                                                                                                            
       params:                                                                                                                                                                       
         timetrack: 300.0                                                                                                                                                            
         save: true                                                                                                                                                                  
         output_name: "sourceflux_{tag}.h5"

On days with no data, the setup() for BeamFormCat does not run. However, its process() does run, and the crash occurs because setup() did not set attributes that process() relies on.

The pipeline framework should not be starting process() when requirements in requires have not been met either.

anjakefala added a commit to radiocosmology/draco that referenced this issue Oct 18, 2021
data_available is set in `_process_config`, instead of in `_process_data`,
bc of this bug:
radiocosmology/caput#181 (comment)

For `BeamFormCat`, `_process_data` is in `setup()`, and `setup()` is not
called if there is no data available.

Closes #133
anjakefala added a commit to radiocosmology/draco that referenced this issue Oct 20, 2021
data_available is set in `_process_config`, instead of in `_process_data`,
bc of this bug:
radiocosmology/caput#181 (comment)

For `BeamFormCat`, `_process_data` is in `setup()`, and `setup()` is not
called if there is no data available.

Closes #133
@ljgray
Copy link
Contributor

ljgray commented May 4, 2023

I've found another interesting snippet from the daily config that has some weird behaviour.

    # Mask out intercylinder baselines before beam forming to minimise cross
    # talk. This creates a copy of the input that shares the vis dataset (but
    # with a distinct weight dataset) to save memory
    - type: draco.analysis.flagging.MaskBaselines
      requires: manager
      in: sstream_mask
      out: sstream_inter
      params:
        share: vis
        mask_short_ew: 1.0

    # Load the source catalogs to measure fluxes of
    - type: draco.core.io.LoadBasicCont
      out: source_catalog
      params:
        files:
        - "{catalogs[0]}"
        - "{catalogs[1]}"
        - "{catalogs[2]}"
        - "{catalogs[3]}"

    # Measure the observed fluxes of the point sources in the catalogs
    - type: draco.analysis.beamform.BeamFormCat
      requires: [manager, sstream_inter]
      in: source_catalog
      params:
        timetrack: 300.0
        save: true
        output_name: "sourceflux_{{tag}}.h5"

    # Mask out day time data
    - type: ch_pipeline.analysis.flagging.DayMask
      in: sstream_mask
      out: sstream_mask1

    - type: ch_pipeline.analysis.flagging.MaskMoon
      in: sstream_mask1
      out: sstream_mask2

    # Remove ranges of time known to be bad that may effect the delay power
    # spectrum estimate
    - type: ch_pipeline.analysis.flagging.DataFlagger
      in: sstream_mask2
      out: sstream_mask3
      params:
        flag_type:
          - acjump
          - bad_calibration_acquisition_restart
          - bad_calibration_fpga_restart
          - bad_calibration_gains
          - decorrelated_cylinder
          - globalflag
          - rain1mm

    # Load the stack that we will blend into the daily data
    - type: draco.core.io.LoadBasicCont
      out: sstack
      params:
        files:
            - "{blend_stack_file}"
        selections:
            freq_range: [{freq[0]:d}, {freq[1]:d}]

    - type: draco.analysis.flagging.BlendStack
      requires: sstack
      in: sstream_mask3
      out: sstream_blend1
      params:
        frac: 1e-4

Through this series of tasks, what we would think/want to happen is for LoadBasicCont to load a catalog, BeamFormCat to process and save the sourceflux file, and for that to repeat for all 4 catalogs. Only once that is done should the pipeline continue on.

Instead, the processing order goes:
LoadBasicCont loads first file
BeamFormCat calls setup (this creates an extra copy of the vis dataset)
LoadBasicCont loads second file
BeamFormCat processes and saves first file
MaskMoon, MaskDay, and DataFlagger are processed
The stack file gets loaded
LoadBasicCont loads the third file
BeamFormCat processes the second file
LoadBasicCont loads the fourth file
BeamFormCat processes the third file
BeamFormCat processes the fourth file

As a result, we end up having two copies of the sidereal stream (with shared vis dataset), plus a copy of the vis dataset held by BeamFormCat, plus the stack all in memory at the same time - essentially three sidereal streams.

I'll update this once I have a proper config that can be run on its own to replicate this

@ljgray
Copy link
Contributor

ljgray commented May 8, 2023

I'm just going to dump another note in here so I remember it. A fairly significant inefficiency that I've noticed is that tasks are kept around for one full pipeline iteration after their .finish method is called and any output is processed. This means that any task which stores copies of data in a class attribute will keep those copies around for an extra iteration, even though once the .finish method is called that task should not expect to do anything else. This has become problematic in a couple of places in the daily pipeline, where full copies of a timestream/sidereal stream are not cleared for a long time, sometimes not until after other tasks which may create copies or load new datasets run. For example, in some cases the SiderealGrouper task would not clear the list of timestream files until sometime after the regridder and ringmapmaker tasks have run.

@ljgray ljgray assigned jrs65 and ljgray and unassigned jrs65 May 8, 2023
@ljgray ljgray added enhancement priority/medium This issue is fairly important and should be addressed labels May 8, 2023
@ljgray ljgray closed this as completed in fe5b8bb May 24, 2024
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
enhancement priority/medium This issue is fairly important and should be addressed
Projects
None yet
Development

No branches or pull requests

4 participants