Improve the logic for Bad handling of missing connection database server RM #1387

This commit is contained in:
Ashesh Vashi 2016-09-06 15:35:20 +05:30 committed by Akshay Joshi
parent 8b61aa49d0
commit 8ac65070bc
3 changed files with 243 additions and 113 deletions

View File

@ -818,7 +818,7 @@ class ServerNode(PGChildNodeView):
'servers/password.html', 'servers/password.html',
server_label=server.name, server_label=server.name,
username=server.username, username=server.username,
errmsg=e.message if e.message else str(e), errmsg=getattr(e, 'message', str(e)),
_=gettext _=gettext
) )
) )

View File

@ -8,7 +8,7 @@ define('pgadmin.browser',
'pgadmin.browser.node', 'pgadmin.browser.collection' 'pgadmin.browser.node', 'pgadmin.browser.collection'
], ],
function(require, $, _, S, Bootstrap, pgAdmin, alertify, CodeMirror) { function(require, $, _, S, Bootstrap, pgAdmin, Alertify, CodeMirror) {
// Some scripts do export their object in the window only. // Some scripts do export their object in the window only.
// Generally the one, which do no have AMD support. // Generally the one, which do no have AMD support.
@ -40,6 +40,7 @@ function(require, $, _, S, Bootstrap, pgAdmin, alertify, CodeMirror) {
_.each(data, function(d){ _.each(data, function(d){
d._label = d.label; d._label = d.label;
d.label = _.escape(d.label); d.label = _.escape(d.label);
data._inode = data.inode;
}) })
return data; return data;
}; };
@ -991,6 +992,7 @@ function(require, $, _, S, Bootstrap, pgAdmin, alertify, CodeMirror) {
} }
_data._label = _data.label; _data._label = _data.label;
_data.label = _.escape(_data.label); _data.label = _.escape(_data.label);
_data._inode = _data.inode;
traversePath(); traversePath();
}, },
@ -1321,6 +1323,7 @@ function(require, $, _, S, Bootstrap, pgAdmin, alertify, CodeMirror) {
ctx.pI.push(_old); ctx.pI.push(_old);
_new._label = _new.label; _new._label = _new.label;
_new.label = _.escape(_new.label); _new.label = _.escape(_new.label);
_new._inode = _new.inode;
if (_old._pid != _new._pid) { if (_old._pid != _new._pid) {
ctx.op = 'RECREATE'; ctx.op = 'RECREATE';
@ -1351,7 +1354,7 @@ function(require, $, _, S, Bootstrap, pgAdmin, alertify, CodeMirror) {
ctx.i = null; ctx.i = null;
ctx.d = null; ctx.d = null;
} else { } else {
isOpen = this.tree.isInode(_i) && this.tree.isOpen(_i); isOpen = (this.tree.isInode(_i) && this.tree.isOpen(_i));
} }
ctx.branch = ctx.t.serialize( ctx.branch = ctx.t.serialize(
@ -1383,7 +1386,8 @@ function(require, $, _, S, Bootstrap, pgAdmin, alertify, CodeMirror) {
return; return;
} }
var fetchNodeInfo = function(_i, _d, _n) { var fetchNodeInfo = function(_i, _d, _n) {
var url = _n.generate_url(_i, 'nodes', _d, true); var info = _n.getTreeNodeHierarchy(_i),
url = _n.generate_url(_i, 'nodes', _d, true);
$.ajax({ $.ajax({
url: url, url: url,
@ -1396,6 +1400,7 @@ function(require, $, _, S, Bootstrap, pgAdmin, alertify, CodeMirror) {
data._label = data.label; data._label = data.label;
data.label = _.escape(data.label); data.label = _.escape(data.label);
data._inode = data.inode;
var d = ctx.t.itemData(ctx.i); var d = ctx.t.itemData(ctx.i);
_.extend(d, data); _.extend(d, data);
ctx.t.setLabel(ctx.i, {label: _d.label}); ctx.t.setLabel(ctx.i, {label: _d.label});
@ -1417,21 +1422,44 @@ function(require, $, _, S, Bootstrap, pgAdmin, alertify, CodeMirror) {
} }
}, },
error: function(jqx, error, status) { error: function(jqx, error, status) {
var p = ctx.t.parent(ctx.i); if (
!Alertify.pgHandleItemError(
xhr, error, message, {item: _i, info: info}
)
) {
var msg = xhr.responseText,
contentType = xhr.getResponseHeader('Content-Type'),
msg = xhr.responseText,
jsonResp = (
contentType &&
contentType.indexOf('application/json') == 0 &&
$.parseJSON(xhr.responseText)
) || {};
if (!p) if (xhr.status == 410 && jsonResp.success == 0) {
return; var p = ctx.t.parent(ctx.i);
ctx.t.remove(ctx.i, { ctx.t.remove(ctx.i, {
success: function() { success: function() {
// Try to refresh the parent on error if (p) {
try { // Try to refresh the parent on error
pgBrowser.Events.trigger( try {
'pgadmin:browser:tree:refresh', p pgBrowser.Events.trigger(
); 'pgadmin:browser:tree:refresh', p
} catch(e) {} );
} catch(e) {}
}
}
});
} }
});
Alertify.pgNotifier(
error, xhr, "{{ _("Error retrieving details for the node.") }}",
function() {
console.log(arguments);
}
);
}
} }
}); });
}.bind(this); }.bind(this);
@ -1466,6 +1494,13 @@ function(require, $, _, S, Bootstrap, pgAdmin, alertify, CodeMirror) {
console.log(arguments); console.log(arguments);
} }
}); });
} else if (!this.tree.isInode(_i) && d._inode) {
this.tree.setInode(_i, {
success: fetchNodeInfo.bind(this, _i, d, n),
fail: function() {
console.log(arguments);
}
});
} else { } else {
fetchNodeInfo(_i, d, n); fetchNodeInfo(_i, d, n);
} }

View File

@ -131,6 +131,9 @@ class Connection(BaseConnection):
* _release() * _release()
- Release the connection object of psycopg2 - Release the connection object of psycopg2
* _reconnect()
- Attempt to reconnect to the database
* _wait(conn) * _wait(conn)
- This method is used to wait for asynchronous connection. This is a - This method is used to wait for asynchronous connection. This is a
blocking call. blocking call.
@ -181,6 +184,10 @@ class Connection(BaseConnection):
self.row_count = 0 self.row_count = 0
self.__notices = None self.__notices = None
self.password = None self.password = None
# This flag indicates the connection status (connected/disconnected).
self.wasConnected = False
# This flag indicates the connection reconnecting status.
self.reconnecting = False
super(Connection, self).__init__() super(Connection, self).__init__()
@ -233,7 +240,8 @@ class Connection(BaseConnection):
encpass = self.password or getattr(mgr, 'password', None) encpass = self.password or getattr(mgr, 'password', None)
# Reset the existing connection password # Reset the existing connection password
self.password = None if self.reconnecting is not False:
self.password = None
if encpass: if encpass:
# Fetch Logged in User Details. # Fetch Logged in User Details.
@ -301,8 +309,44 @@ Failed to connect to the database server(#{server_id}) for connection ({conn_id}
return False, msg return False, msg
self.conn = pg_conn self.conn = pg_conn
self.__backend_pid = pg_conn.get_backend_pid() self.wasConnected = True
try:
status, msg = self._initialize(conn_id, **kwargs)
except Exception as e:
current_app.logger.exception(e)
self.conn = None
if not self.reconnecting:
self.wasConnected = False
raise e
if status:
mgr._update_password(encpass)
else:
if not self.reconnecting:
self.wasConnected = False
return status, msg
def _initialize(self, conn_id, **kwargs):
self.execution_aborted = False self.execution_aborted = False
self.__backend_pid = self.conn.get_backend_pid()
setattr(g, "{0}#{1}".format(
self.manager.sid,
self.conn_id.encode('utf-8')
), None)
status, cur = self.__cursor()
formatted_exception_msg = self._formatted_exception_msg
mgr = self.manager
def _execute(cur, query, params=None):
try:
self.__internal_blocking_execute(cur, query, params)
except psycopg2.Error as pe:
cur.close()
return formatted_exception_msg(pe, False)
return None
# autocommit flag does not work with asynchronous connections. # autocommit flag does not work with asynchronous connections.
# By default asynchronous connection runs in autocommit mode. # By default asynchronous connection runs in autocommit mode.
@ -313,22 +357,22 @@ Failed to connect to the database server(#{server_id}) for connection ({conn_id}
self.conn.autocommit = True self.conn.autocommit = True
register_date_typecasters(self.conn) register_date_typecasters(self.conn)
status, res = self.execute_scalar(""" status = _execute(cur, """
SET DateStyle=ISO; SET DateStyle=ISO;
SET client_min_messages=notice; SET client_min_messages=notice;
SET bytea_output=escape; SET bytea_output=escape;
SET client_encoding='UNICODE';""") SET client_encoding='UNICODE';""")
if not status: if status is not None:
self.conn.close() self.conn.close()
self.conn = None self.conn = None
return False, res return False, status
if mgr.role: if mgr.role:
status, res = self.execute_scalar(u"SET ROLE TO %s", [mgr.role]) status = _execute(cur, u"SET ROLE TO %s", [mgr.role])
if not status: if status is not None:
self.conn.close() self.conn.close()
self.conn = None self.conn = None
current_app.logger.error(""" current_app.logger.error("""
@ -337,35 +381,38 @@ Connect to the database server (#{server_id}) for connection ({conn_id}), but -
""".format( """.format(
server_id=self.manager.sid, server_id=self.manager.sid,
conn_id=conn_id, conn_id=conn_id,
msg=res msg=status
) )
) )
return False, \ return False, \
_("Failed to setup the role with error message:\n{0}").format( _("Failed to setup the role with error message:\n{0}").format(
res status
) )
if mgr.ver is None: if mgr.ver is None:
status, res = self.execute_scalar("SELECT version()") status = _execute(cur, "SELECT version()")
if status: if status is not None:
mgr.ver = res
mgr.sversion = pg_conn.server_version
else:
self.conn.close() self.conn.close()
self.conn = None self.conn = None
self.wasConneted = False
current_app.logger.error(""" current_app.logger.error("""
Failed to fetch the version information on the established connection to the database server (#{server_id}) for '{conn_id}' with below error message: Failed to fetch the version information on the established connection to the database server (#{server_id}) for '{conn_id}' with below error message:
{msg} {msg}
""".format( """.format(
server_id=self.manager.sid, server_id=self.manager.sid,
conn_id=conn_id, conn_id=conn_id,
msg=res msg=status
) )
) )
return False, res return False, status
status, res = self.execute_dict(""" if cur.rowcount > 0:
row = cur.fetchmany(1)[0]
mgr.ver = row['version']
mgr.sversion = self.conn.server_version
status = _execute(cur, """
SELECT SELECT
db.oid as did, db.datname, db.datallowconn, pg_encoding_to_char(db.encoding) AS serverencoding, db.oid as did, db.datname, db.datallowconn, pg_encoding_to_char(db.encoding) AS serverencoding,
has_database_privilege(db.oid, 'CREATE') as cancreate, datlastsysoid has_database_privilege(db.oid, 'CREATE') as cancreate, datlastsysoid
@ -373,16 +420,17 @@ FROM
pg_database db pg_database db
WHERE db.datname = current_database()""") WHERE db.datname = current_database()""")
if status: if status is None:
mgr.db_info = mgr.db_info or dict() mgr.db_info = mgr.db_info or dict()
f_row = res['rows'][0] if cur.rowcount > 0:
mgr.db_info[f_row['did']] = f_row.copy() res = cur.fetchmany(1)[0]
mgr.db_info[res['did']] = res.copy()
# We do not have database oid for the maintenance database. # We do not have database oid for the maintenance database.
if len(mgr.db_info) == 1: if len(mgr.db_info) == 1:
mgr.did = f_row['did'] mgr.did = res['did']
status, res = self.execute_dict(""" status = _execute(cur, """
SELECT SELECT
oid as id, rolname as name, rolsuper as is_superuser, oid as id, rolname as name, rolsuper as is_superuser,
rolcreaterole as can_create_role, rolcreatedb as can_create_db rolcreaterole as can_create_role, rolcreatedb as can_create_db
@ -391,28 +439,34 @@ FROM
WHERE WHERE
rolname = current_user""") rolname = current_user""")
if status: if status is None:
mgr.user_info = dict() mgr.user_info = dict()
f_row = res['rows'][0] if cur.rowcount > 0:
mgr.user_info = f_row.copy() mgr.user_info = cur.fetchmany(1)[0]
if 'password' in kwargs: if 'password' in kwargs:
mgr.password = kwargs['password'] mgr.password = kwargs['password']
server_types = None
if 'server_types' in kwargs and isinstance(kwargs['server_types'], list): if 'server_types' in kwargs and isinstance(kwargs['server_types'], list):
for st in kwargs['server_types']: server_types = mgr.server_types = kwargs['server_types']
if st.instanceOf(mgr.ver):
mgr.server_type = st.stype if server_types is None:
mgr.server_cls = st from pgadmin.browser.server_groups.servers.types import ServerType
break server_types = ServerType.types()
for st in server_types:
if st.instanceOf(mgr.ver):
mgr.server_type = st.stype
mgr.server_cls = st
break
mgr._update_password(encpass)
mgr.update_session() mgr.update_session()
return True, None return True, None
def __cursor(self, server_cursor=False): def __cursor(self, server_cursor=False):
if not self.conn: if self.wasConnected is False:
raise ConnectionLost( raise ConnectionLost(
self.manager.sid, self.manager.sid,
self.db, self.db,
@ -428,73 +482,68 @@ WHERE
return True, cur return True, cur
if not self.connected(): if not self.connected():
status = False
errmsg = "" errmsg = ""
current_app.logger.warning(""" current_app.logger.warning(
Connection to database server (#{server_id}) for the connection - '{conn_id}' has been lost. "Connection to database server (#{server_id}) for the connection - '{conn_id}' has been lost.".format(
""".format( server_id=self.manager.sid,
server_id=self.manager.sid, conn_id=self.conn_id
conn_id=self.conn_id )
)
) )
if self.auto_reconnect: if self.auto_reconnect and not self.reconnecting:
status, errmsg = self.connect() self.__attempt_execution_reconnect(None)
else:
if not status: raise ConnectionLost(
errmsg = gettext( self.manager.sid,
""" self.db,
Attempt to reconnect failed with the error: None if self.conn_id[0:3] == u'DB:' else self.conn_id[5:]
{0}""".format(errmsg) )
)
if not status:
msg = gettext("Connection lost.\n{0}").format(errmsg)
current_app.logger.error(errmsg)
return False, msg
try: try:
if server_cursor: if server_cursor:
# Providing name to cursor will create server side cursor. # Providing name to cursor will create server side cursor.
cursor_name = "CURSOR:{0}".format(self.conn_id) cursor_name = "CURSOR:{0}".format(self.conn_id)
cur = self.conn.cursor(name=cursor_name, cur = self.conn.cursor(
cursor_factory=DictCursor) name=cursor_name, cursor_factory=DictCursor
)
else: else:
cur = self.conn.cursor(cursor_factory=DictCursor) cur = self.conn.cursor(cursor_factory=DictCursor)
except psycopg2.Error as pe: except psycopg2.Error as pe:
errmsg = gettext(""" current_app.logger.exception(pe)
Failed to create cursor for psycopg2 connection with error message for the \ errmsg = gettext(
server#{1}:{2}: "Failed to create cursor for psycopg2 connection with error message for the server#{1}:{2}:\n{0}"
{0}""").format(str(pe), self.manager.sid, self.db) ).format(
str(pe), self.manager.sid, self.db
)
current_app.logger.error(errmsg) current_app.logger.error(errmsg)
self.conn.close() if self.conn.closed:
self.conn = None self.conn = None
if self.auto_reconnect and not self.reconnecting:
current_app.logger.info(
gettext(
"Attempting to reconnect to the database server (#{server_id}) for the connection - '{conn_id}'."
).format(
server_id=self.manager.sid,
conn_id=self.conn_id
)
)
return self.__attempt_execution_reconnect(
self.__cursor, server_cursor
)
else:
raise ConnectionLost(
self.manager.sid,
self.db,
None if self.conn_id[0:3] == u'DB:' else self.conn_id[5:]
)
if self.auto_reconnect: setattr(
current_app.logger.debug(""" g, "{0}#{1}".format(
Attempting to reconnect to the database server (#{server_id}) for the connection - '{conn_id}'. self.manager.sid, self.conn_id.encode('utf-8')
""".format( ), cur
server_id=self.manager.sid, )
conn_id=self.conn_id
)
)
status, cur = self.connect()
if not status:
msg = gettext(
u"""
Connection for server#{0} with database "{1}" was lost.
Attempt to reconnect it failed with the error:
{2}"""
).format(self.driver.server_id, self.db, cur)
current_app.logger.error(msg)
return False, cur
else:
return False, errmsg
setattr(g, "{0}#{1}".format(self.manager.sid, self.conn_id.encode('utf-8')), cur)
return True, cur return True, cur
@ -537,7 +586,7 @@ Attempt to reconnect it failed with the error:
cur.close() cur.close()
errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
current_app.logger.error( current_app.logger.error(
u"Failed to execute query ((with server cursor) for the server #{server_id} - {conn_id} (Query-id: {query_id}):\nError Message:{errmsg}".format( u"failed to execute query ((with server cursor) for the server #{server_id} - {conn_id} (query-id: {query_id}):\nerror message:{errmsg}".format(
server_id=self.manager.sid, server_id=self.manager.sid,
conn_id=self.conn_id, conn_id=self.conn_id,
query=query, query=query,
@ -606,6 +655,10 @@ Attempt to reconnect it failed with the error:
except psycopg2.Error as pe: except psycopg2.Error as pe:
cur.close() cur.close()
if not self.connected(): if not self.connected():
if self.auto_reconnect and not self.reconnecting:
return self.__attempt_execution_reconnect(
self.execute_dict, query, params, formatted_exception_msg
)
raise ConnectionLost( raise ConnectionLost(
self.manager.sid, self.manager.sid,
self.db, self.db,
@ -713,6 +766,10 @@ Failed to execute query (execute_async) for the server #{server_id} - {conn_id}
except psycopg2.Error as pe: except psycopg2.Error as pe:
cur.close() cur.close()
if not self.connected(): if not self.connected():
if self.auto_reconnect and not self.reconnecting:
return self.__attempt_execution_reconnect(
self.execute_void, query, params, formatted_exception_msg
)
raise ConnectionLost( raise ConnectionLost(
self.manager.sid, self.manager.sid,
self.db, self.db,
@ -736,6 +793,36 @@ Failed to execute query (execute_void) for the server #{server_id} - {conn_id}
return True, None return True, None
def __attempt_execution_reconnect(self, fn, *args, **kwargs):
self.reconnecting = True
setattr(g, "{0}#{1}".format(
self.manager.sid,
self.conn_id.encode('utf-8')
), None)
try:
status, res = self.connect()
if status:
if fn:
status, res = fn(*args, **kwargs)
self.reconnecting = False
return status, res
except Exception as e:
current_app.logger.exception(e)
self.reconnecting = False
current_app.warning(
"Failed to reconnect the database server (#{server_id})".format(
server_id=self.manager.sid,
conn_id=self.conn_id
)
)
self.reconnecting = False
raise ConnectionLost(
self.manager.sid,
self.db,
None if self.conn_id[0:3] == u'DB:' else self.conn_id[5:]
)
def execute_2darray(self, query, params=None, formatted_exception_msg=False): def execute_2darray(self, query, params=None, formatted_exception_msg=False):
status, cur = self.__cursor() status, cur = self.__cursor()
self.row_count = 0 self.row_count = 0
@ -758,11 +845,11 @@ Failed to execute query (execute_void) for the server #{server_id} - {conn_id}
except psycopg2.Error as pe: except psycopg2.Error as pe:
cur.close() cur.close()
if not self.connected(): if not self.connected():
raise ConnectionLost( if self.auto_reconnect and \
self.manager.sid, not self.reconnecting:
self.db, return self.__attempt_execution_reconnect(
None if self.conn_id[0:3] == u'DB:' else self.conn_id[5:] self.execute_2darray, query, params, formatted_exception_msg
) )
errmsg = self._formatted_exception_msg(pe, formatted_exception_msg) errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
current_app.logger.error( current_app.logger.error(
u"Failed to execute query (execute_2darray) for the server #{server_id} - {conn_id} (Query-id: {query_id}):\nError Message:{errmsg}".format( u"Failed to execute query (execute_2darray) for the server #{server_id} - {conn_id} (Query-id: {query_id}):\nError Message:{errmsg}".format(
@ -809,6 +896,11 @@ Failed to execute query (execute_void) for the server #{server_id} - {conn_id}
except psycopg2.Error as pe: except psycopg2.Error as pe:
cur.close() cur.close()
if not self.connected(): if not self.connected():
if self.auto_reconnect and not self.reconnecting:
return self.__attempt_execution_reconnect(
self.execute_dict, query, params,
formatted_exception_msg
)
raise ConnectionLost( raise ConnectionLost(
self.manager.sid, self.manager.sid,
self.db, self.db,
@ -900,10 +992,12 @@ Failed to reset the connection to the server due to following error:
return self.execute_scalar('SELECT 1') return self.execute_scalar('SELECT 1')
def _release(self): def _release(self):
if self.conn: if self.wasConneted:
self.conn.close() if self.conn:
self.conn = None self.conn.close()
self.conn = None
self.password = None self.password = None
self.wasConnected = False
def _wait(self, conn): def _wait(self, conn):
""" """
@ -1227,6 +1321,7 @@ class ServerManager(object):
self.ssl_mode = server.ssl_mode self.ssl_mode = server.ssl_mode
self.pinged = datetime.datetime.now() self.pinged = datetime.datetime.now()
self.db_info = dict() self.db_info = dict()
self.server_types = None
for con in self.connections: for con in self.connections:
self.connections[con]._release() self.connections[con]._release()
@ -1430,7 +1525,7 @@ WHERE db.oid = {0}""".format(did))
self.password = passwd self.password = passwd
for conn_id in self.connections: for conn_id in self.connections:
conn = self.connections[conn_id] conn = self.connections[conn_id]
if conn.conn is not None: if conn.conn is not None or conn.wasConnected is True:
conn.password = passwd conn.password = passwd
def update_session(self): def update_session(self):