Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

perform_later will reset CurrentAttributes #1548

Open
gap777 opened this issue Nov 23, 2024 · 4 comments
Open

perform_later will reset CurrentAttributes #1548

gap777 opened this issue Nov 23, 2024 · 4 comments

Comments

@gap777
Copy link
Contributor

gap777 commented Nov 23, 2024

We're using CurrentAttributes to track user_id:

class Current < ActiveSupport::CurrentAttributes
  attribute  :user_id
end

When we call perform_later on a job, this is reset in the caller's context:

puts Current.user_id # "abcdef"
MyJob.perform_later
puts Current.user_id # nil

I don't think that should happen.


GoodJob::Adapter#enqueue_at invokes Rails.application.executor.wrap -- perhaps this is the cause?

@gap777
Copy link
Contributor Author

gap777 commented Nov 23, 2024

FYI, my config.good_job.execution_mode is set to :async.

@gap777
Copy link
Contributor Author

gap777 commented Nov 23, 2024

Perhaps this is related?
rails/rails#49227

@Earlopain
Copy link
Collaborator

Yes, that issue looks vaguely familiar. I adapted the repro for good_job and the result is the same:

Repro

# frozen_string_literal: true

require "bundler/inline"

gemfile(true) do
  source "https://rubygems.org"

  git_source(:github) { |repo| "https://github.com/#{repo}.git" }

  gem "rails", github: "rails/rails", branch: "main"
  gem "pg"
  gem "good_job", require: false
end

require "active_support/all"
require "active_support/railtie"
require 'active_record/railtie'
require "active_job/railtie"
require "minitest/autorun"
require "good_job"

ENV["DATABASE_URL"] = "postgresql://postgres@localhost:5432/test"

class TestApp < Rails::Application
  config.load_defaults 7.1

  config.active_job.queue_adapter = :inline
  config.eager_load = false

  config.good_job.execution_mode = :async
  config.active_job.queue_adapter = :good_job
end

Rails.application.initialize!

ActiveRecord::Schema.define(version: 2024_08_01_143343) do
  # These are extensions that must be enabled in order to support this database
  enable_extension "pgcrypto"
  enable_extension "plpgsql"

  create_table "good_job_batches", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
    t.datetime "created_at", null: false
    t.datetime "updated_at", null: false
    t.text "description"
    t.jsonb "serialized_properties"
    t.text "on_finish"
    t.text "on_success"
    t.text "on_discard"
    t.text "callback_queue_name"
    t.integer "callback_priority"
    t.datetime "enqueued_at"
    t.datetime "discarded_at"
    t.datetime "finished_at"
    t.datetime "jobs_finished_at"
  end

  create_table "good_job_executions", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
    t.datetime "created_at", null: false
    t.datetime "updated_at", null: false
    t.uuid "active_job_id", null: false
    t.text "job_class"
    t.text "queue_name"
    t.jsonb "serialized_params"
    t.datetime "scheduled_at"
    t.datetime "finished_at"
    t.text "error"
    t.integer "error_event", limit: 2
    t.text "error_backtrace", array: true
    t.uuid "process_id"
    t.interval "duration"
    t.index ["active_job_id", "created_at"], name: "index_good_job_executions_on_active_job_id_and_created_at"
    t.index ["process_id", "created_at"], name: "index_good_job_executions_on_process_id_and_created_at"
  end

  create_table "good_job_processes", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
    t.datetime "created_at", null: false
    t.datetime "updated_at", null: false
    t.jsonb "state"
    t.integer "lock_type", limit: 2
  end

  create_table "good_job_settings", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
    t.datetime "created_at", null: false
    t.datetime "updated_at", null: false
    t.text "key"
    t.jsonb "value"
    t.index ["key"], name: "index_good_job_settings_on_key", unique: true
  end

  create_table "good_jobs", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
    t.text "queue_name"
    t.integer "priority"
    t.jsonb "serialized_params"
    t.datetime "scheduled_at", precision: nil
    t.datetime "performed_at", precision: nil
    t.datetime "finished_at", precision: nil
    t.text "error"
    t.datetime "created_at", precision: nil, null: false
    t.datetime "updated_at", precision: nil, null: false
    t.uuid "active_job_id"
    t.text "concurrency_key"
    t.text "cron_key"
    t.uuid "retried_good_job_id"
    t.datetime "cron_at", precision: nil
    t.uuid "batch_id"
    t.uuid "batch_callback_id"
    t.boolean "is_discrete"
    t.integer "executions_count"
    t.text "job_class"
    t.integer "error_event", limit: 2
    t.text "labels", array: true
    t.uuid "locked_by_id"
    t.datetime "locked_at"
    t.index ["active_job_id", "created_at"], name: "index_good_jobs_on_active_job_id_and_created_at"
    t.index ["batch_callback_id"], name: "index_good_jobs_on_batch_callback_id", where: "(batch_callback_id IS NOT NULL)"
    t.index ["batch_id"], name: "index_good_jobs_on_batch_id", where: "(batch_id IS NOT NULL)"
    t.index ["concurrency_key"], name: "index_good_jobs_on_concurrency_key_when_unfinished", where: "(finished_at IS NULL)"
    t.index ["cron_key", "created_at"], name: "index_good_jobs_on_cron_key_and_created_at_cond", where: "(cron_key IS NOT NULL)"
    t.index ["cron_key", "cron_at"], name: "index_good_jobs_on_cron_key_and_cron_at_cond", unique: true, where: "(cron_key IS NOT NULL)"
    t.index ["finished_at"], name: "index_good_jobs_jobs_on_finished_at", where: "((retried_good_job_id IS NULL) AND (finished_at IS NOT NULL))"
    t.index ["labels"], name: "index_good_jobs_on_labels", where: "(labels IS NOT NULL)", using: :gin
    t.index ["locked_by_id"], name: "index_good_jobs_on_locked_by_id", where: "(locked_by_id IS NOT NULL)"
    t.index ["priority", "created_at"], name: "index_good_job_jobs_for_candidate_lookup", where: "(finished_at IS NULL)"
    t.index ["priority", "created_at"], name: "index_good_jobs_jobs_on_priority_created_at_when_unfinished", order: { priority: "DESC NULLS LAST" }, where: "(finished_at IS NULL)"
    t.index ["priority", "scheduled_at"], name: "index_good_jobs_on_priority_scheduled_at_unfinished_unlocked", where: "((finished_at IS NULL) AND (locked_by_id IS NULL))"
    t.index ["queue_name", "scheduled_at"], name: "index_good_jobs_on_queue_name_and_scheduled_at", where: "(finished_at IS NULL)"
    t.index ["scheduled_at"], name: "index_good_jobs_on_scheduled_at", where: "(finished_at IS NULL)"
  end
end


class Current < ActiveSupport::CurrentAttributes
  attribute :user
end

class DummyJob < ActiveJob::Base
  def perform
  end
end

class CurrentAttributeTest < ActiveSupport::TestCase
  test "CurrentAttributes" do
    Current.user = "test"
    DummyJob.perform_later

    assert_equal("test", Current.user, "CurrentAttributes reset")
  end
end

Not wrapping in the executor does make the test pass. I still am not really understanding its purpose but suspect removing it would result in some other consequences.

@bensheldon
Copy link
Owner

I was looking back through Active Job history and it seems like at one time Active Job would run its inline jobs like:

Thread.new { job.perform }.join

There's a good explanation of the downsides here: rails/rails#40626

Looking at the current state of CurrentAttributes, I think it's possible to collect (all/most?) of them up and re-assign them after the job:

module PreserveCurrentAttributes
  def self.wrap
    stored_values = {}
 
    instances = ActiveSupport::CurrentAttributes.send(:current_instances)
    instances.each do |key, instance|
      stored_values[key] = instance.attributes.dup
    end

    yield

    stored_values.each do |key, values|
      instance = ActiveSupport::CurrentAttributes.send(:current_instances)[key]
      values.each do |attr_name, value|
        instance.public_send("#{attr_name}=", value)
      end
    end
  end
end

That seems dangerously straightforward 🤷🏻

@bensheldon bensheldon moved this from Inbox to In Progress in GoodJob Backlog v2 Jan 26, 2025
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
Status: In Progress
Development

No branches or pull requests

3 participants