diff --git a/app/models/global_setting.rb b/app/models/global_setting.rb index 10866f8e8bd..b1bf927f2f0 100644 --- a/app/models/global_setting.rb +++ b/app/models/global_setting.rb @@ -18,11 +18,14 @@ class GlobalSetting def self.database_config hash = {"adapter" => "postgresql"} - %w{pool timeout socket host port username password}.each do |s| + %w{pool timeout socket host port username password replica_host replica_port}.each do |s| if val = self.send("db_#{s}") hash[s] = val end end + + hash["adapter"] = "postgresql_fallback" if hash["replica_host"] + hostnames = [ hostname ] hostnames << backup_hostname if backup_hostname.present? diff --git a/config/database.yml b/config/database.yml index fb6d923aea2..19da362a0dd 100644 --- a/config/database.yml +++ b/config/database.yml @@ -1,7 +1,13 @@ development: prepared_statements: false - adapter: postgresql + adapter: postgresql_fallback + host: 172.17.0.2 + port: 6432 database: discourse_development + username: tgxworld + password: test + replica_host: 172.17.0.3 + replica_port: 6432 min_messages: warning pool: 5 timeout: 5000 diff --git a/config/discourse_defaults.conf b/config/discourse_defaults.conf index 19c1ce63d1b..0ff3b8af814 100644 --- a/config/discourse_defaults.conf +++ b/config/discourse_defaults.conf @@ -43,6 +43,12 @@ db_password = # see: https://github.com/rails/rails/issues/21992 db_prepared_statements = false +# host address for db replica server +db_replica_host = + +# port running replica db server, defaults to 5432 if not set +db_replica_port = + # hostname running the forum hostname = "www.example.com" diff --git a/lib/active_record/connection_adapters/postgresql_fallback_adapter.rb b/lib/active_record/connection_adapters/postgresql_fallback_adapter.rb new file mode 100644 index 00000000000..18f76d6ec57 --- /dev/null +++ b/lib/active_record/connection_adapters/postgresql_fallback_adapter.rb @@ -0,0 +1,136 @@ +require 'active_record/connection_adapters/abstract_adapter' +require 'active_record/connection_adapters/postgresql_adapter' +require 'discourse' +require 'concurrent' + +class TaskObserver + def update(time, result, ex) + if result + logger.info { "PG connection heartbeat successfully returned #{result}" } + elsif ex.is_a?(Concurrent::TimeoutError) + logger.warning { "PG connection heartbeat timed out".freeze } + else + if ex.message.include?("PG::UnableToSend") + logger.info { "PG connection heartbeat: Master connection is not active.".freeze } + else + logger.error { "PG connection heartbeat failed with error: \"#{ex}\"" } + end + end + end + + private + + def logger + Rails.logger + end +end + +module ActiveRecord + module ConnectionHandling + def postgresql_fallback_connection(config) + master_connection = postgresql_connection(config) + + replica_connection = postgresql_connection(config.dup.merge({ + host: config[:replica_host], port: config[:replica_port] + })) + verify_replica(replica_connection) + + klass = ConnectionAdapters::PostgreSQLFallbackAdapter.proxy_pass(master_connection.class) + klass.new(master_connection, replica_connection, logger, config) + end + + private + + def verify_replica(connection) + value = connection.raw_connection.exec("SELECT pg_is_in_recovery()").values[0][0] + raise "Replica database server is not in recovery mode." if value == 'f' + end + end + + module ConnectionAdapters + class PostgreSQLFallbackAdapter < AbstractAdapter + ADAPTER_NAME = "PostgreSQLFallback".freeze + MAX_FAILURE = 5 + HEARTBEAT_INTERVAL = 5 + + attr_reader :main_connection + + def self.all_methods(klass) + methods = [] + + (klass.ancestors - AbstractAdapter.ancestors).each do |_klass| + %w(public protected private).map do |level| + methods << _klass.send("#{level}_instance_methods", false) + end + end + + methods.flatten.uniq.sort + end + + def self.proxy_pass(klass) + Class.new(self) do + (self.all_methods(klass) - self.all_methods(self)).each do |method| + self.class_eval <<-EOF + def #{method}(*args, &block) + proxy_method(:#{method}, *args, &block) + end + EOF + end + end + end + + def initialize(master_connection, replica_connection, logger, config) + super(nil, logger, config) + + @master_connection = master_connection + @main_connection = @master_connection + @replica_connection = replica_connection + @failure_count = 0 + load! + end + + def proxy_method(method, *args, &block) + @main_connection.send(method, *args, &block) + rescue ActiveRecord::StatementInvalid => e + if e.message.include?("PG::UnableToSend") && @main_connection == @master_connection + @failure_count += 1 + + if @failure_count == MAX_FAILURE + Discourse.enable_readonly_mode if !Discourse.readonly_mode? + @main_connection = @replica_connection + load! + connection_heartbeart(@master_connection) + @failure_count = 0 + else + proxy_method(method, *args, &block) + end + end + + raise e + end + + private + + def load! + @visitor = @main_connection.visitor + @connection = @main_connection.raw_connection + end + + def connection_heartbeart(connection, interval = HEARTBEAT_INTERVAL) + timer_task = Concurrent::TimerTask.new(execution_interval: interval) do |task| + connection.reconnect! + + if connection.active? + @main_connection = connection + load! + Discourse.disable_readonly_mode if Discourse.readonly_mode? + task.shutdown + end + end + + timer_task.add_observer(TaskObserver.new) + timer_task.execute + end + end + end +end diff --git a/spec/components/active_record/connection_adapters/postgresql_fallback_adapter_spec.rb b/spec/components/active_record/connection_adapters/postgresql_fallback_adapter_spec.rb new file mode 100644 index 00000000000..e0195e7d821 --- /dev/null +++ b/spec/components/active_record/connection_adapters/postgresql_fallback_adapter_spec.rb @@ -0,0 +1,55 @@ +require 'rails_helper' +require_dependency 'active_record/connection_adapters/postgresql_fallback_adapter' + +describe ActiveRecord::ConnectionAdapters::PostgreSQLFallbackAdapter do + let(:master_connection) { ActiveRecord::Base.connection } + let(:replica_connection) { master_connection.dup } + let(:adapter) { described_class.new(master_connection, replica_connection, nil, nil) } + + before :each do + ActiveRecord::Base.clear_all_connections! + end + + describe "proxy_method" do + context "when master connection is not active" do + before do + replica_connection.stubs(:send) + master_connection.stubs(:send).raises(ActiveRecord::StatementInvalid.new('PG::UnableToSend')) + master_connection.stubs(:reconnect!) + master_connection.stubs(:active?).returns(false) + + @old_const = described_class::HEARTBEAT_INTERVAL + described_class.const_set("HEARTBEAT_INTERVAL", 0.1) + end + + after do + Discourse.disable_readonly_mode + described_class.const_set("HEARTBEAT_INTERVAL", @old_const) + end + + it "should set site to readonly mode and carry out failover and switch back procedures" do + expect(adapter.main_connection).to eq(master_connection) + adapter.proxy_method('some method') + expect(Discourse.readonly_mode?).to eq(true) + expect(adapter.main_connection).to eq(replica_connection) + + master_connection.stubs(:active?).returns(true) + sleep 0.15 + + expect(Discourse.readonly_mode?).to eq(false) + expect(adapter.main_connection).to eq(master_connection) + end + end + + it 'should raise errors not related to the database connection' do + master_connection.stubs(:send).raises(StandardError.new) + expect { adapter.proxy_method('some method') }.to raise_error(StandardError) + end + + it 'should proxy methods successfully' do + expect(adapter.proxy_method(:execute, 'SELECT 1').values[0][0]).to eq("1") + expect(adapter.proxy_method(:active?)).to eq(true) + expect(adapter.proxy_method(:raw_connection)).to eq(master_connection.raw_connection) + end + end +end