mirror of
https://github.com/pgadmin-org/pgadmin4.git
synced 2025-02-25 18:55:31 -06:00
Fix saving of query output as CSV data. Fixes #1405
This commit is contained in:
committed by
Dave Page
parent
a6024fc49a
commit
83a1535f89
@@ -17,6 +17,8 @@ import datetime
|
||||
import os
|
||||
import random
|
||||
import select
|
||||
import sys
|
||||
import csv
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
@@ -32,6 +34,11 @@ from .keywords import ScanKeyword
|
||||
from ..abstract import BaseDriver, BaseConnection
|
||||
from .cursor import DictCursor
|
||||
|
||||
if sys.version_info < (3,):
|
||||
from StringIO import StringIO
|
||||
else:
|
||||
from io import StringIO
|
||||
|
||||
_ = gettext
|
||||
|
||||
ASYNC_WAIT_TIMEOUT = 0.01 # in seconds or 10 milliseconds
|
||||
@@ -284,7 +291,10 @@ Failed to connect to the database server(#{server_id}) for connection ({conn_id}
|
||||
# autocommit flag does not work with asynchronous connections.
|
||||
# By default asynchronous connection runs in autocommit mode.
|
||||
if self.async == 0:
|
||||
self.conn.autocommit = True
|
||||
if 'autocommit' in kwargs and kwargs['autocommit'] == False:
|
||||
self.conn.autocommit = False
|
||||
else:
|
||||
self.conn.autocommit = True
|
||||
register_date_typecasters(self.conn)
|
||||
|
||||
status, res = self.execute_scalar("""
|
||||
@@ -384,11 +394,12 @@ WHERE
|
||||
|
||||
return True, None
|
||||
|
||||
def __cursor(self):
|
||||
def __cursor(self, server_cursor=False):
|
||||
cur = getattr(g, str(self.manager.sid) + '#' + self.conn_id, None)
|
||||
|
||||
if self.connected() and cur and not cur.closed:
|
||||
return True, cur
|
||||
if not server_cursor or (server_cursor and cur.name):
|
||||
return True, cur
|
||||
|
||||
if not self.connected():
|
||||
status = False
|
||||
@@ -419,7 +430,13 @@ Attempt to reconnect failed with the error:
|
||||
return False, msg
|
||||
|
||||
try:
|
||||
cur = self.conn.cursor(cursor_factory=DictCursor)
|
||||
if server_cursor:
|
||||
# Providing name to cursor will create server side cursor.
|
||||
cursor_name = "CURSOR:{0}".format(self.conn_id)
|
||||
cur = self.conn.cursor(name=cursor_name,
|
||||
cursor_factory=DictCursor)
|
||||
else:
|
||||
cur = self.conn.cursor(cursor_factory=DictCursor)
|
||||
except psycopg2.Error as pe:
|
||||
errmsg = gettext("""
|
||||
Failed to create cursor for psycopg2 connection with error message for the \
|
||||
@@ -471,6 +488,76 @@ Attempt to reconnect it failed with the error:
|
||||
if self.async == 1:
|
||||
self._wait(cur.connection)
|
||||
|
||||
|
||||
def execute_on_server_as_csv(self, query, params=None, formatted_exception_msg=False, records=2000):
|
||||
status, cur = self.__cursor(server_cursor=True)
|
||||
self.row_count = 0
|
||||
|
||||
if not status:
|
||||
return False, str(cur)
|
||||
query_id = random.randint(1, 9999999)
|
||||
|
||||
current_app.logger.log(25,
|
||||
"Execute (with server cursor) for server #{server_id} - {conn_id} (Query-id: {query_id}):\n{query}".format(
|
||||
server_id=self.manager.sid,
|
||||
conn_id=self.conn_id,
|
||||
query=query,
|
||||
query_id=query_id
|
||||
)
|
||||
)
|
||||
try:
|
||||
self.__internal_blocking_execute(cur, query, params)
|
||||
except psycopg2.Error as pe:
|
||||
cur.close()
|
||||
errmsg = self._formatted_exception_msg(pe, formatted_exception_msg)
|
||||
current_app.logger.error(
|
||||
"Failed to execute query ((with server cursor) for the server #{server_id} - {conn_id} (Query-id: {query_id}):\nError Message:{errmsg}".format(
|
||||
server_id=self.manager.sid,
|
||||
conn_id=self.conn_id,
|
||||
query=query,
|
||||
errmsg=errmsg,
|
||||
query_id=query_id
|
||||
)
|
||||
)
|
||||
return False, errmsg
|
||||
|
||||
def gen():
|
||||
|
||||
results = cur.fetchmany(records)
|
||||
if not results:
|
||||
if not cur.closed:
|
||||
cur.close()
|
||||
return
|
||||
|
||||
header = [c.to_dict()['name'] for c in cur.ordered_description()]
|
||||
|
||||
res_io = StringIO()
|
||||
|
||||
csv_writer = csv.DictWriter(
|
||||
res_io, fieldnames=header, delimiter=str(','), quoting=csv.QUOTE_NONNUMERIC
|
||||
)
|
||||
csv_writer.writeheader()
|
||||
csv_writer.writerows(results)
|
||||
|
||||
yield res_io.getvalue().strip(str('\r\n'))
|
||||
|
||||
while True:
|
||||
results = cur.fetchmany(records)
|
||||
|
||||
if not results:
|
||||
if not cur.closed:
|
||||
cur.close()
|
||||
break
|
||||
res_io = StringIO()
|
||||
|
||||
csv_writer = csv.DictWriter(
|
||||
res_io, fieldnames=header, delimiter=str(','), quoting=csv.QUOTE_NONNUMERIC
|
||||
)
|
||||
csv_writer.writerows(results)
|
||||
yield res_io.getvalue().strip(str('\r\n'))
|
||||
|
||||
return True, gen
|
||||
|
||||
def execute_scalar(self, query, params=None, formatted_exception_msg=False):
|
||||
status, cur = self.__cursor()
|
||||
self.row_count = 0
|
||||
@@ -1151,7 +1238,8 @@ class ServerManager(object):
|
||||
raise Exception("Information is not available.")
|
||||
|
||||
def connection(
|
||||
self, database=None, conn_id=None, auto_reconnect=True, did=None
|
||||
self, database=None, conn_id=None, auto_reconnect=True, did=None,
|
||||
async=None
|
||||
):
|
||||
msg_active_conn = gettext(
|
||||
"Server has no active connection. Please connect to the server."
|
||||
@@ -1197,7 +1285,10 @@ WHERE db.oid = {0}""".format(did))
|
||||
if my_id in self.connections:
|
||||
return self.connections[my_id]
|
||||
else:
|
||||
async = 1 if conn_id is not None else 0
|
||||
if async is None:
|
||||
async = 1 if conn_id is not None else 0
|
||||
else:
|
||||
async = 1 if async is True else 0
|
||||
self.connections[my_id] = Connection(
|
||||
self, my_id, database, auto_reconnect, async
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user