Skip to content

Commit

Permalink
Merge pull request #1 from procore/make_query_return_ids
Browse files Browse the repository at this point in the history
Modify worker query to return ids of inserted records
  • Loading branch information
Peter Loomis authored Mar 19, 2018
2 parents 6471d0c + b355108 commit 7322020
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 23 deletions.
4 changes: 4 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ gemspec
# To use a debugger
# gem 'byebug', group: [:development, :test]

group :test do
gem "pg", "< 1.0"
end

4 changes: 2 additions & 2 deletions lib/bulk_insert.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ module BulkInsert
extend ActiveSupport::Concern

module ClassMethods
def bulk_insert(*columns, values: nil, set_size:500, ignore: false, update_duplicates: false)
def bulk_insert(*columns, values: nil, set_size:500, ignore: false, update_duplicates: false, return_primary_keys: false)
columns = default_bulk_columns if columns.empty?
worker = BulkInsert::Worker.new(connection, table_name, columns, set_size, ignore, update_duplicates)
worker = BulkInsert::Worker.new(connection, table_name, primary_key, columns, set_size, ignore, update_duplicates, return_primary_keys)

if values.present?
transaction do
Expand Down
25 changes: 22 additions & 3 deletions lib/bulk_insert/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,30 @@ class Worker
attr_accessor :before_save_callback
attr_accessor :after_save_callback
attr_accessor :adapter_name
attr_reader :ignore, :update_duplicates
attr_reader :ignore, :update_duplicates, :result_sets

def initialize(connection, table_name, column_names, set_size=500, ignore=false, update_duplicates=false)
def initialize(connection, table_name, primary_key, column_names, set_size=500, ignore=false, update_duplicates=false, return_primary_keys=false)
@connection = connection
@set_size = set_size

@adapter_name = connection.adapter_name
# INSERT IGNORE only fails inserts with duplicate keys or unallowed nulls not the whole set of inserts
@ignore = ignore
@update_duplicates = update_duplicates
@return_primary_keys = return_primary_keys

columns = connection.columns(table_name)
column_map = columns.inject({}) { |h, c| h.update(c.name => c) }

@primary_key = primary_key
@columns = column_names.map { |name| column_map[name.to_s] }
@table_name = connection.quote_table_name(table_name)
@column_names = column_names.map { |name| connection.quote_column_name(name) }.join(",")

@before_save_callback = nil
@after_save_callback = nil

@result_sets = []
@set = []
end

Expand Down Expand Up @@ -76,14 +79,21 @@ def after_save(&block)
def save!
if pending?
@before_save_callback.(@set) if @before_save_callback
compose_insert_query.tap { |query| @connection.execute(query) if query }
execute_query
@after_save_callback.() if @after_save_callback
@set.clear
end

self
end

def execute_query
if query = compose_insert_query
result_set = @connection.exec_query(query)
@result_sets.push(result_set) if @return_primary_keys
end
end

def compose_insert_query
sql = insert_sql_statement
@now = Time.now
Expand All @@ -107,6 +117,7 @@ def compose_insert_query
if !rows.empty?
sql << rows.join(",")
sql << on_conflict_statement
sql << primary_key_return_statement
sql
else
false
Expand All @@ -130,6 +141,14 @@ def insert_ignore
end
end

def primary_key_return_statement
if @return_primary_keys && adapter_name =~ /\APost(?:greSQL|GIS)/i
" RETURNING #{@primary_key}"
else
''
end
end

def on_conflict_statement
if (adapter_name =~ /\APost(?:greSQL|GIS)/i && ignore )
' ON CONFLICT DO NOTHING'
Expand Down
122 changes: 117 additions & 5 deletions test/bulk_insert/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase
@insert = BulkInsert::Worker.new(
Testing.connection,
Testing.table_name,
'id',
%w(greeting age happy created_at updated_at color))
@now = Time.now
end
Expand Down Expand Up @@ -121,6 +122,103 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase
assert_equal true, hello.happy?
end

test "save! does not add to result sets when not returning primary keys" do
worker = BulkInsert::Worker.new(
Testing.connection,
Testing.table_name,
'id',
%w(greeting age happy created_at updated_at color),
500,
false,
false,
false
)
worker.add greeting: "first"
worker.add greeting: "second"
worker.save!

assert_equal 0, worker.result_sets.count
end

test "save! adds to result sets when returning primary keys" do
worker = BulkInsert::Worker.new(
Testing.connection,
Testing.table_name,
'id',
%w(greeting age happy created_at updated_at color),
500,
false,
false,
true
)
worker.add greeting: "first"
worker.add greeting: "second"
worker.save!
assert_equal 1, worker.result_sets.count
assert_equal 2, worker.result_sets.map(&:to_a).flatten.count

worker.add greeting: "third"
worker.add greeting: "fourth"
worker.save!
assert_equal 2, worker.result_sets.count
assert_equal 4, worker.result_sets.map(&:to_a).flatten.count
end

test "save! does not change worker result sets if there are no pending rows" do
worker = BulkInsert::Worker.new(
Testing.connection,
Testing.table_name,
'id',
%w(greeting age happy created_at updated_at color),
500,
false,
false,
true
)
assert_no_difference -> { worker.result_sets.count } do
worker.save!
end
end

test "results in the same order as the records appear in the insert statement" do
worker = BulkInsert::Worker.new(
Testing.connection,
Testing.table_name,
'id',
%w(greeting age happy created_at updated_at color),
500,
false,
false,
true
)

attributes_for_insertion = (0..20).map { |i| { age: i } }
worker.add_all attributes_for_insertion
results = worker.result_sets.map(&:to_a).flatten

returned_ids = results.map {|result| result.fetch("id").to_i }
expected_age_for_id_hash = {}
returned_ids.map.with_index do |id, index|
expected_age_for_id_hash[id] = index
end

new_saved_records = Testing.find(returned_ids)
new_saved_records.each do |record|
assert_same(expected_age_for_id_hash[record.id], record.age)
end
end

test "initialized with empty result_sets array" do
new_worker = BulkInsert::Worker.new(
Testing.connection,
Testing.table_name,
'id',
%w(greeting age happy created_at updated_at color)
)
assert_instance_of(Array, new_worker.result_sets)
assert_empty new_worker.result_sets
end

test "save! calls the after_save handler" do
x = 41

Expand Down Expand Up @@ -214,7 +312,7 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase
end

test "adapter dependent default methods" do
assert_equal @insert.adapter_name, 'SQLite'
assert_equal @insert.adapter_name, 'PostgreSQL'
assert_equal @insert.insert_sql_statement, "INSERT INTO \"testings\" (\"greeting\",\"age\",\"happy\",\"created_at\",\"updated_at\",\"color\") VALUES "

@insert.add ["Yo", 15, false, nil, nil]
Expand All @@ -225,6 +323,7 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase
mysql_worker = BulkInsert::Worker.new(
Testing.connection,
Testing.table_name,
'id',
%w(greeting age happy created_at updated_at color),
500, # batch size
true) # ignore
Expand All @@ -244,6 +343,7 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase
mysql_worker = BulkInsert::Worker.new(
Testing.connection,
Testing.table_name,
'id',
%w(greeting age happy created_at updated_at color),
500, # batch size
true, # ignore
Expand All @@ -262,6 +362,7 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase
mysql_worker = BulkInsert::Worker.new(
Testing.connection,
Testing.table_name,
'id',
%w(greeting age happy created_at updated_at color),
500, # batch size
true) # ignore
Expand All @@ -278,32 +379,41 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase
pgsql_worker = BulkInsert::Worker.new(
Testing.connection,
Testing.table_name,
'id',
%w(greeting age happy created_at updated_at color),
500, # batch size
true) # ignore
true, # ignore
false, # update duplicates
true # return primary key
)
pgsql_worker.adapter_name = 'PostgreSQL'
pgsql_worker.add ["Yo", 15, false, nil, nil]

assert_equal pgsql_worker.compose_insert_query, "INSERT INTO \"testings\" (\"greeting\",\"age\",\"happy\",\"created_at\",\"updated_at\",\"color\") VALUES ('Yo',15,'f',NULL,NULL,'chartreuse') ON CONFLICT DO NOTHING"
assert_equal pgsql_worker.compose_insert_query, "INSERT INTO \"testings\" (\"greeting\",\"age\",\"happy\",\"created_at\",\"updated_at\",\"color\") VALUES ('Yo',15,'f',NULL,NULL,'chartreuse') ON CONFLICT DO NOTHING RETURNING id"
end

test "adapter dependent PostGIS methods" do
pgsql_worker = BulkInsert::Worker.new(
Testing.connection,
Testing.table_name,
'id',
%w(greeting age happy created_at updated_at color),
500, # batch size
true) # ignore
true, # ignore
false, # update duplicates
true # return primary key
) # ignore
pgsql_worker.adapter_name = 'PostGIS'
pgsql_worker.add ["Yo", 15, false, nil, nil]

assert_equal pgsql_worker.compose_insert_query, "INSERT INTO \"testings\" (\"greeting\",\"age\",\"happy\",\"created_at\",\"updated_at\",\"color\") VALUES ('Yo',15,'f',NULL,NULL,'chartreuse') ON CONFLICT DO NOTHING"
assert_equal pgsql_worker.compose_insert_query, "INSERT INTO \"testings\" (\"greeting\",\"age\",\"happy\",\"created_at\",\"updated_at\",\"color\") VALUES ('Yo',15,'f',NULL,NULL,'chartreuse') ON CONFLICT DO NOTHING RETURNING id"
end

test "adapter dependent sqlite3 methods (with lowercase adapter name)" do
sqlite_worker = BulkInsert::Worker.new(
Testing.connection,
Testing.table_name,
'id',
%w(greeting age happy created_at updated_at color),
500, # batch size
true) # ignore
Expand All @@ -317,6 +427,7 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase
sqlite_worker = BulkInsert::Worker.new(
Testing.connection,
Testing.table_name,
'id',
%w(greeting age happy created_at updated_at color),
500, # batch size
true) # ignore
Expand All @@ -330,6 +441,7 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase
mysql_worker = BulkInsert::Worker.new(
Testing.connection,
Testing.table_name,
'id',
%w(greeting age happy created_at updated_at color),
500, # batch size
false, # ignore
Expand Down
16 changes: 15 additions & 1 deletion test/bulk_insert_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,24 @@ class BulkInsertTest < ActiveSupport::TestCase
end
end

test "worker should not have any result sets without option for returning primary keys" do
worker = Testing.bulk_insert
worker.add greeting: "hello"
worker.save!
assert_empty worker.result_sets
end

test "with option to return primary keys, worker should have result sets" do
worker = Testing.bulk_insert(return_primary_keys: true)
worker.add greeting: "yo"
worker.save!
assert_equal 1, worker.result_sets.count
end

test "bulk_insert with array should save the array immediately" do
assert_difference "Testing.count", 2 do
Testing.bulk_insert values: [
[ "Hello", 15, true, "green" ],
[ "Hello", 15, true, Time.now, Time.now, "green" ],
{ greeting: "Hey", age: 20, happy: false }
]
end
Expand Down
15 changes: 3 additions & 12 deletions test/dummy/config/database.yml
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
# SQLite version 3.x
# gem install sqlite3
#
# Ensure the SQLite 3 gem is defined in your Gemfile
# gem 'sqlite3'
#
default: &default
adapter: sqlite3
adapter: postgresql
pool: 5
timeout: 5000

development:
<<: *default
database: db/development.sqlite3
database: bulk_insert_development

# Warning: The database defined as "test" will be erased and
# re-generated from your development database when you run "rake".
# Do not set this db to the same as development or production.
test:
<<: *default
database: db/test.sqlite3
database: bulk_insert_test

production:
<<: *default
database: db/production.sqlite3
3 changes: 3 additions & 0 deletions test/dummy/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

ActiveRecord::Schema.define(version: 20151028194232) do

# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"

create_table "testings", force: :cascade do |t|
t.string "greeting"
t.integer "age"
Expand Down

0 comments on commit 7322020

Please # to comment.