Second attempt which removes any kind proxying.

This commit is contained in:
Guo Xiang Tan
2016-01-26 15:46:51 +08:00
parent 46589a1a0c
commit 0058d09e35
3 changed files with 72 additions and 128 deletions

View File

@@ -14,7 +14,7 @@ class TaskObserver
logger.info { "PG connection heartbeat: Master connection is not active.".freeze }
else
logger.error { "PG connection heartbeat failed with error: \"#{ex}\"" }
end
end
end
end
@@ -28,15 +28,21 @@ end
module ActiveRecord
module ConnectionHandling
def postgresql_fallback_connection(config)
master_connection = postgresql_connection(config)
begin
connection = postgresql_connection(config)
rescue PG::ConnectionBad => e
connection = postgresql_connection(config.dup.merge({
"host" => config["replica_host"], "port" => config["replica_port"]
}))
replica_connection = postgresql_connection(config.dup.merge({
host: config[:replica_host], port: config[:replica_port]
}))
verify_replica(replica_connection)
verify_replica(connection)
klass = ConnectionAdapters::PostgreSQLFallbackAdapter.proxy_pass(master_connection.class)
klass.new(master_connection, replica_connection, logger, config)
Discourse.enable_readonly_mode if !Discourse.readonly_mode?
start_connection_heartbeart(connection, config)
end
connection
end
private
@@ -45,92 +51,24 @@ module ActiveRecord
value = connection.raw_connection.exec("SELECT pg_is_in_recovery()").values[0][0]
raise "Replica database server is not in recovery mode." if value == 'f'
end
end
module ConnectionAdapters
class PostgreSQLFallbackAdapter < AbstractAdapter
ADAPTER_NAME = "PostgreSQLFallback".freeze
MAX_FAILURE = 5
HEARTBEAT_INTERVAL = 5
def interval
5
end
attr_reader :main_connection
def start_connection_heartbeart(existing_connection, config)
timer_task = Concurrent::TimerTask.new(execution_interval: interval) do |task|
connection = postgresql_connection(config)
def self.all_methods(klass)
methods = []
(klass.ancestors - AbstractAdapter.ancestors).each do |_klass|
%w(public protected private).map do |level|
methods << _klass.send("#{level}_instance_methods", false)
end
end
methods.flatten.uniq.sort
end
def self.proxy_pass(klass)
Class.new(self) do
(self.all_methods(klass) - self.all_methods(self)).each do |method|
self.class_eval <<-EOF
def #{method}(*args, &block)
proxy_method(:#{method}, *args, &block)
end
EOF
end
if connection.active?
existing_connection.disconnect!
Discourse.disable_readonly_mode if Discourse.readonly_mode?
task.shutdown
end
end
def initialize(master_connection, replica_connection, logger, config)
super(nil, logger, config)
@master_connection = master_connection
@main_connection = @master_connection
@replica_connection = replica_connection
@failure_count = 0
load!
end
def proxy_method(method, *args, &block)
@main_connection.send(method, *args, &block)
rescue ActiveRecord::StatementInvalid => e
if e.message.include?("PG::UnableToSend") && @main_connection == @master_connection
@failure_count += 1
if @failure_count == MAX_FAILURE
Discourse.enable_readonly_mode if !Discourse.readonly_mode?
@main_connection = @replica_connection
load!
connection_heartbeart(@master_connection)
@failure_count = 0
else
proxy_method(method, *args, &block)
end
end
raise e
end
private
def load!
@visitor = @main_connection.visitor
@connection = @main_connection.raw_connection
end
def connection_heartbeart(connection, interval = HEARTBEAT_INTERVAL)
timer_task = Concurrent::TimerTask.new(execution_interval: interval) do |task|
connection.reconnect!
if connection.active?
@main_connection = connection
load!
Discourse.disable_readonly_mode if Discourse.readonly_mode?
task.shutdown
end
end
timer_task.add_observer(TaskObserver.new)
timer_task.execute
end
timer_task.add_observer(TaskObserver.new)
timer_task.execute
end
end
end