From a08496bb1a952702a259ffe19deb3d8cef1b41a3 Mon Sep 17 00:00:00 2001 From: Guo Xiang Tan Date: Tue, 26 Jan 2016 18:03:49 +0800 Subject: [PATCH] Remove Concurrent::TimerTask which spawns a long lasting Thread. --- .../postgresql_fallback_adapter.rb | 108 ++++++++++++------ .../postgresql_fallback_adapter_spec.rb | 61 ++++++---- 2 files changed, 110 insertions(+), 59 deletions(-) diff --git a/lib/active_record/connection_adapters/postgresql_fallback_adapter.rb b/lib/active_record/connection_adapters/postgresql_fallback_adapter.rb index df5a23b9e9b..a56f187b977 100644 --- a/lib/active_record/connection_adapters/postgresql_fallback_adapter.rb +++ b/lib/active_record/connection_adapters/postgresql_fallback_adapter.rb @@ -1,45 +1,86 @@ require 'active_record/connection_adapters/abstract_adapter' require 'active_record/connection_adapters/postgresql_adapter' require 'discourse' -require 'concurrent' -class TaskObserver - def update(time, result, ex) - if result - logger.info { "PG connection heartbeat successfully returned #{result}" } - elsif ex.is_a?(Concurrent::TimeoutError) - logger.warning { "PG connection heartbeat timed out".freeze } - else - if ex.message.include?("PG::UnableToSend") - logger.info { "PG connection heartbeat: Master connection is not active.".freeze } - else - logger.error { "PG connection heartbeat failed with error: \"#{ex}\"" } - end +class PostgreSQLFallbackHandler + include Singleton + + attr_reader :running + attr_accessor :master + + def initialize + @master = true + @running = false + end + + def verify_master + return if @running && recently_checked? + @running = true + + Thread.new do + begin + logger.info "#{self.class}: Checking master server..." + connection = ActiveRecord::Base.postgresql_connection(config) + + if connection.active? + logger.info "#{self.class}: Master server is active. Reconnecting..." + ActiveRecord::Base.remove_connection + ActiveRecord::Base.establish_connection(config) + Discourse.disable_readonly_mode + @master = true + end + rescue => e + if e.message.include?("could not connect to server") + logger.warn "#{self.class}: Connection to master PostgreSQL server failed with '#{e.message}'" + else + raise e + end + ensure + @last_check = Time.zone.now + @running = false + end end end private + def config + ActiveRecord::Base.configurations[Rails.env] + end + def logger Rails.logger end + + def recently_checked? + if @last_check + Time.zone.now <= @last_check + 5.seconds + else + false + end + end end module ActiveRecord module ConnectionHandling def postgresql_fallback_connection(config) - begin - connection = postgresql_connection(config) - rescue PG::ConnectionBad => e + fallback_handler = ::PostgreSQLFallbackHandler.instance + config = config.symbolize_keys + + if !fallback_handler.master connection = postgresql_connection(config.dup.merge({ - "host" => config["replica_host"], "port" => config["replica_port"] + host: config[:replica_host], port: config[:replica_port] })) verify_replica(connection) - - Discourse.enable_readonly_mode if !Discourse.readonly_mode? - - start_connection_heartbeart(connection, config) + Discourse.enable_readonly_mode + else + begin + connection = postgresql_connection(config) + rescue PG::ConnectionBad => e + fallback_handler.master = false + raise e + end end connection @@ -51,24 +92,23 @@ module ActiveRecord value = connection.raw_connection.exec("SELECT pg_is_in_recovery()").values[0][0] raise "Replica database server is not in recovery mode." if value == 'f' end + end - def interval - 5 - end + module ConnectionAdapters + class PostgreSQLAdapter + set_callback :checkout, :before, :switch_back? - def start_connection_heartbeart(existing_connection, config) - timer_task = Concurrent::TimerTask.new(execution_interval: interval) do |task| - connection = postgresql_connection(config) + private - if connection.active? - existing_connection.disconnect! - Discourse.disable_readonly_mode if Discourse.readonly_mode? - task.shutdown - end + def fallback_handler + @fallback_handler ||= ::PostgreSQLFallbackHandler.instance end - timer_task.add_observer(TaskObserver.new) - timer_task.execute + def switch_back? + if !fallback_handler.master && !fallback_handler.running + fallback_handler.verify_master + end + end end end end diff --git a/spec/components/active_record/connection_adapters/postgresql_fallback_adapter_spec.rb b/spec/components/active_record/connection_adapters/postgresql_fallback_adapter_spec.rb index 66dd0707c0a..05013407d6c 100644 --- a/spec/components/active_record/connection_adapters/postgresql_fallback_adapter_spec.rb +++ b/spec/components/active_record/connection_adapters/postgresql_fallback_adapter_spec.rb @@ -7,12 +7,12 @@ describe ActiveRecord::ConnectionHandling do "adapter" => "postgresql_fallback", "replica_host" => "localhost", "replica_port" => "6432" - }) + }).symbolize_keys! end after do - ActiveRecord::Base.clear_all_connections! Discourse.disable_readonly_mode + ::PostgreSQLFallbackHandler.instance.master = true end describe "#postgresql_fallback_connection" do @@ -24,34 +24,43 @@ describe ActiveRecord::ConnectionHandling do context 'when master server is down' do before do @replica_connection = mock('replica_connection') - - ActiveRecord::Base.expects(:postgresql_connection).with(config).raises(PG::ConnectionBad) - - ActiveRecord::Base.expects(:postgresql_connection).with(config.merge({ - "host" => "localhost", "port" => "6432" - })).returns(@replica_connection) - - ActiveRecord::Base.expects(:verify_replica).with(@replica_connection) - - @replica_connection.expects(:disconnect!) - - ActiveRecord::Base.stubs(:interval).returns(0.1) - - Concurrent::TimerTask.any_instance.expects(:shutdown) end it 'should failover to a replica server' do - ActiveRecord::Base.postgresql_fallback_connection(config) + begin + ActiveRecord::Base.expects(:postgresql_connection).with(config).raises(PG::ConnectionBad) + ActiveRecord::Base.expects(:verify_replica).with(@replica_connection) - expect(Discourse.readonly_mode?).to eq(true) + ActiveRecord::Base.expects(:postgresql_connection).with(config.merge({ + host: "localhost", port: "6432" + })).returns(@replica_connection) - ActiveRecord::Base.unstub(:postgresql_connection) - sleep 0.15 + expect { ActiveRecord::Base.postgresql_fallback_connection(config) } + .to raise_error(PG::ConnectionBad) - expect(Discourse.readonly_mode?).to eq(false) + expect{ ActiveRecord::Base.postgresql_fallback_connection(config) } + .to change{ Discourse.readonly_mode? }.from(false).to(true) - expect(ActiveRecord::Base.connection) - .to be_an_instance_of(ActiveRecord::ConnectionAdapters::PostgreSQLAdapter) + ActiveRecord::Base.unstub(:postgresql_connection) + + current_threads = Thread.list + + expect{ ActiveRecord::Base.connection_pool.checkout } + .to change{ Thread.list.size }.by(1) + + # Wait for the thread to finish execution + threads = (Thread.list - current_threads).each(&:join) + + expect(Discourse.readonly_mode?).to eq(false) + + expect(ActiveRecord::Base.connection_pool.connections.count).to eq(0) + + expect(ActiveRecord::Base.connection) + .to be_an_instance_of(ActiveRecord::ConnectionAdapters::PostgreSQLAdapter) + ensure + # threads.each { |t| Thread.kill(t) } if threads + ActiveRecord::Base.establish_connection(:test) + end end end @@ -59,8 +68,10 @@ describe ActiveRecord::ConnectionHandling do it 'should raise the right error' do ActiveRecord::Base.expects(:postgresql_connection).raises(PG::ConnectionBad).twice - expect { ActiveRecord::Base.postgresql_fallback_connection(config) } - .to raise_error(PG::ConnectionBad) + 2.times do + expect { ActiveRecord::Base.postgresql_fallback_connection(config) } + .to raise_error(PG::ConnectionBad) + end end end end