diff --git a/Gemfile b/Gemfile index ebfe98b..5cd9b94 100644 --- a/Gemfile +++ b/Gemfile @@ -13,3 +13,7 @@ gemspec # To use a debugger # gem 'byebug', group: [:development, :test] +group :test do + gem "pg", "< 1.0" +end + diff --git a/lib/bulk_insert.rb b/lib/bulk_insert.rb index 6f207b3..7907aff 100644 --- a/lib/bulk_insert.rb +++ b/lib/bulk_insert.rb @@ -4,9 +4,9 @@ module BulkInsert extend ActiveSupport::Concern module ClassMethods - def bulk_insert(*columns, values: nil, set_size:500, ignore: false, update_duplicates: false) + def bulk_insert(*columns, values: nil, set_size:500, ignore: false, update_duplicates: false, return_primary_keys: false) columns = default_bulk_columns if columns.empty? - worker = BulkInsert::Worker.new(connection, table_name, columns, set_size, ignore, update_duplicates) + worker = BulkInsert::Worker.new(connection, table_name, primary_key, columns, set_size, ignore, update_duplicates, return_primary_keys) if values.present? transaction do diff --git a/lib/bulk_insert/worker.rb b/lib/bulk_insert/worker.rb index ba428fc..f4ba7c2 100644 --- a/lib/bulk_insert/worker.rb +++ b/lib/bulk_insert/worker.rb @@ -5,9 +5,9 @@ class Worker attr_accessor :before_save_callback attr_accessor :after_save_callback attr_accessor :adapter_name - attr_reader :ignore, :update_duplicates + attr_reader :ignore, :update_duplicates, :result_sets - def initialize(connection, table_name, column_names, set_size=500, ignore=false, update_duplicates=false) + def initialize(connection, table_name, primary_key, column_names, set_size=500, ignore=false, update_duplicates=false, return_primary_keys=false) @connection = connection @set_size = set_size @@ -15,10 +15,12 @@ def initialize(connection, table_name, column_names, set_size=500, ignore=false, # INSERT IGNORE only fails inserts with duplicate keys or unallowed nulls not the whole set of inserts @ignore = ignore @update_duplicates = update_duplicates + @return_primary_keys = return_primary_keys columns = connection.columns(table_name) column_map = columns.inject({}) { |h, c| h.update(c.name => c) } + @primary_key = primary_key @columns = column_names.map { |name| column_map[name.to_s] } @table_name = connection.quote_table_name(table_name) @column_names = column_names.map { |name| connection.quote_column_name(name) }.join(",") @@ -26,6 +28,7 @@ def initialize(connection, table_name, column_names, set_size=500, ignore=false, @before_save_callback = nil @after_save_callback = nil + @result_sets = [] @set = [] end @@ -76,7 +79,7 @@ def after_save(&block) def save! if pending? @before_save_callback.(@set) if @before_save_callback - compose_insert_query.tap { |query| @connection.execute(query) if query } + execute_query @after_save_callback.() if @after_save_callback @set.clear end @@ -84,6 +87,13 @@ def save! self end + def execute_query + if query = compose_insert_query + result_set = @connection.exec_query(query) + @result_sets.push(result_set) if @return_primary_keys + end + end + def compose_insert_query sql = insert_sql_statement @now = Time.now @@ -107,6 +117,7 @@ def compose_insert_query if !rows.empty? sql << rows.join(",") sql << on_conflict_statement + sql << primary_key_return_statement sql else false @@ -130,6 +141,14 @@ def insert_ignore end end + def primary_key_return_statement + if @return_primary_keys && adapter_name =~ /\APost(?:greSQL|GIS)/i + " RETURNING #{@primary_key}" + else + '' + end + end + def on_conflict_statement if (adapter_name =~ /\APost(?:greSQL|GIS)/i && ignore ) ' ON CONFLICT DO NOTHING' diff --git a/test/bulk_insert/worker_test.rb b/test/bulk_insert/worker_test.rb index 35622d6..bdd2dff 100644 --- a/test/bulk_insert/worker_test.rb +++ b/test/bulk_insert/worker_test.rb @@ -5,6 +5,7 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase @insert = BulkInsert::Worker.new( Testing.connection, Testing.table_name, + 'id', %w(greeting age happy created_at updated_at color)) @now = Time.now end @@ -121,6 +122,103 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase assert_equal true, hello.happy? end + test "save! does not add to result sets when not returning primary keys" do + worker = BulkInsert::Worker.new( + Testing.connection, + Testing.table_name, + 'id', + %w(greeting age happy created_at updated_at color), + 500, + false, + false, + false + ) + worker.add greeting: "first" + worker.add greeting: "second" + worker.save! + + assert_equal 0, worker.result_sets.count + end + + test "save! adds to result sets when returning primary keys" do + worker = BulkInsert::Worker.new( + Testing.connection, + Testing.table_name, + 'id', + %w(greeting age happy created_at updated_at color), + 500, + false, + false, + true + ) + worker.add greeting: "first" + worker.add greeting: "second" + worker.save! + assert_equal 1, worker.result_sets.count + assert_equal 2, worker.result_sets.map(&:to_a).flatten.count + + worker.add greeting: "third" + worker.add greeting: "fourth" + worker.save! + assert_equal 2, worker.result_sets.count + assert_equal 4, worker.result_sets.map(&:to_a).flatten.count + end + + test "save! does not change worker result sets if there are no pending rows" do + worker = BulkInsert::Worker.new( + Testing.connection, + Testing.table_name, + 'id', + %w(greeting age happy created_at updated_at color), + 500, + false, + false, + true + ) + assert_no_difference -> { worker.result_sets.count } do + worker.save! + end + end + + test "results in the same order as the records appear in the insert statement" do + worker = BulkInsert::Worker.new( + Testing.connection, + Testing.table_name, + 'id', + %w(greeting age happy created_at updated_at color), + 500, + false, + false, + true + ) + + attributes_for_insertion = (0..20).map { |i| { age: i } } + worker.add_all attributes_for_insertion + results = worker.result_sets.map(&:to_a).flatten + + returned_ids = results.map {|result| result.fetch("id").to_i } + expected_age_for_id_hash = {} + returned_ids.map.with_index do |id, index| + expected_age_for_id_hash[id] = index + end + + new_saved_records = Testing.find(returned_ids) + new_saved_records.each do |record| + assert_same(expected_age_for_id_hash[record.id], record.age) + end + end + + test "initialized with empty result_sets array" do + new_worker = BulkInsert::Worker.new( + Testing.connection, + Testing.table_name, + 'id', + %w(greeting age happy created_at updated_at color) + ) + assert_instance_of(Array, new_worker.result_sets) + assert_empty new_worker.result_sets + end + test "save! calls the after_save handler" do x = 41 @@ -214,7 +312,7 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase end test "adapter dependent default methods" do - assert_equal @insert.adapter_name, 'SQLite' + assert_equal @insert.adapter_name, 'PostgreSQL' assert_equal @insert.insert_sql_statement, "INSERT INTO \"testings\" (\"greeting\",\"age\",\"happy\",\"created_at\",\"updated_at\",\"color\") VALUES " @insert.add ["Yo", 15, false, nil, nil] @@ -225,6 +323,7 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase mysql_worker = BulkInsert::Worker.new( Testing.connection, Testing.table_name, + 'id', %w(greeting age happy created_at updated_at color), 500, # batch size true) # ignore @@ -244,6 +343,7 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase mysql_worker = BulkInsert::Worker.new( Testing.connection, Testing.table_name, + 'id', %w(greeting age happy created_at updated_at color), 500, # batch size true, # ignore @@ -262,6 +362,7 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase mysql_worker = BulkInsert::Worker.new( Testing.connection, Testing.table_name, + 'id', %w(greeting age happy created_at updated_at color), 500, # batch size true) # ignore @@ -278,32 +379,41 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase pgsql_worker = BulkInsert::Worker.new( Testing.connection, Testing.table_name, + 'id', %w(greeting age happy created_at updated_at color), 500, # batch size - true) # ignore + true, # ignore + false, # update duplicates + true # return primary key + ) pgsql_worker.adapter_name = 'PostgreSQL' pgsql_worker.add ["Yo", 15, false, nil, nil] - assert_equal pgsql_worker.compose_insert_query, "INSERT INTO \"testings\" (\"greeting\",\"age\",\"happy\",\"created_at\",\"updated_at\",\"color\") VALUES ('Yo',15,'f',NULL,NULL,'chartreuse') ON CONFLICT DO NOTHING" + assert_equal pgsql_worker.compose_insert_query, "INSERT INTO \"testings\" (\"greeting\",\"age\",\"happy\",\"created_at\",\"updated_at\",\"color\") VALUES ('Yo',15,'f',NULL,NULL,'chartreuse') ON CONFLICT DO NOTHING RETURNING id" end test "adapter dependent PostGIS methods" do pgsql_worker = BulkInsert::Worker.new( Testing.connection, Testing.table_name, + 'id', %w(greeting age happy created_at updated_at color), 500, # batch size - true) # ignore + true, # ignore + false, # update duplicates + true # return primary key + ) # ignore pgsql_worker.adapter_name = 'PostGIS' pgsql_worker.add ["Yo", 15, false, nil, nil] - assert_equal pgsql_worker.compose_insert_query, "INSERT INTO \"testings\" (\"greeting\",\"age\",\"happy\",\"created_at\",\"updated_at\",\"color\") VALUES ('Yo',15,'f',NULL,NULL,'chartreuse') ON CONFLICT DO NOTHING" + assert_equal pgsql_worker.compose_insert_query, "INSERT INTO \"testings\" (\"greeting\",\"age\",\"happy\",\"created_at\",\"updated_at\",\"color\") VALUES ('Yo',15,'f',NULL,NULL,'chartreuse') ON CONFLICT DO NOTHING RETURNING id" end test "adapter dependent sqlite3 methods (with lowercase adapter name)" do sqlite_worker = BulkInsert::Worker.new( Testing.connection, Testing.table_name, + 'id', %w(greeting age happy created_at updated_at color), 500, # batch size true) # ignore @@ -317,6 +427,7 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase sqlite_worker = BulkInsert::Worker.new( Testing.connection, Testing.table_name, + 'id', %w(greeting age happy created_at updated_at color), 500, # batch size true) # ignore @@ -330,6 +441,7 @@ class BulkInsertWorkerTest < ActiveSupport::TestCase mysql_worker = BulkInsert::Worker.new( Testing.connection, Testing.table_name, + 'id', %w(greeting age happy created_at updated_at color), 500, # batch size false, # ignore diff --git a/test/bulk_insert_test.rb b/test/bulk_insert_test.rb index 9b779a6..f033ebe 100644 --- a/test/bulk_insert_test.rb +++ b/test/bulk_insert_test.rb @@ -20,10 +20,24 @@ class BulkInsertTest < ActiveSupport::TestCase end end + test "worker should not have any result sets without option for returning primary keys" do + worker = Testing.bulk_insert + worker.add greeting: "hello" + worker.save! + assert_empty worker.result_sets + end + + test "with option to return primary keys, worker should have result sets" do + worker = Testing.bulk_insert(return_primary_keys: true) + worker.add greeting: "yo" + worker.save! + assert_equal 1, worker.result_sets.count + end + test "bulk_insert with array should save the array immediately" do assert_difference "Testing.count", 2 do Testing.bulk_insert values: [ - [ "Hello", 15, true, "green" ], + [ "Hello", 15, true, Time.now, Time.now, "green" ], { greeting: "Hey", age: 20, happy: false } ] end diff --git a/test/dummy/config/database.yml b/test/dummy/config/database.yml index 1c1a37c..cfddb5a 100644 --- a/test/dummy/config/database.yml +++ b/test/dummy/config/database.yml @@ -1,25 +1,16 @@ -# SQLite version 3.x -# gem install sqlite3 -# -# Ensure the SQLite 3 gem is defined in your Gemfile -# gem 'sqlite3' -# default: &default - adapter: sqlite3 + adapter: postgresql pool: 5 timeout: 5000 development: <<: *default - database: db/development.sqlite3 + database: bulk_insert_development # Warning: The database defined as "test" will be erased and # re-generated from your development database when you run "rake". # Do not set this db to the same as development or production. test: <<: *default - database: db/test.sqlite3 + database: bulk_insert_test -production: - <<: *default - database: db/production.sqlite3 diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index 5f3a71f..b806206 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -13,6 +13,9 @@ ActiveRecord::Schema.define(version: 20151028194232) do + # These are extensions that must be enabled in order to support this database + enable_extension "plpgsql" + create_table "testings", force: :cascade do |t| t.string "greeting" t.integer "age"