Files
pgadmin4/web/pgadmin/misc/workspaces/__init__.py
Akshay Joshi 1607875a08 Fixed issues for Workspaces:
1) Unable to add ad-hoc server with External Database.
2) Server list not updating when password is saved on the Welcome page.
3) Icons for Query Tool and PSQL missing on the Welcome page.
4) Error when removing connection parameters.
2025-01-29 17:14:25 +05:30

243 lines
8.5 KiB
Python

##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2025, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
"""A blueprint module implementing the workspace."""
import json
import config
from config import PG_DEFAULT_DRIVER
from flask import request, current_app
from pgadmin.user_login_check import pga_login_required
from flask_babel import gettext
from flask_security import current_user
from pgadmin.utils import PgAdminModule
from pgadmin.model import db, Server
from pgadmin.utils.driver import get_driver
from pgadmin.utils.ajax import bad_request, make_json_response
from pgadmin.browser.server_groups.servers.utils import (
is_valid_ipaddress, convert_connection_parameter, check_ssl_fields)
from pgadmin.tools.schema_diff.node_registry import SchemaDiffRegistry
from pgadmin.browser.server_groups.servers.utils import (
disconnect_from_all_servers, delete_adhoc_servers)
from pgadmin.tools.psql import get_open_psql_connections
MODULE_NAME = 'workspace'
class WorkspaceModule(PgAdminModule):
def get_exposed_url_endpoints(self):
"""
Returns:
list: URL endpoints for Workspace module
"""
return [
'workspace.adhoc_connect_server',
'workspace.layout_changed'
]
blueprint = WorkspaceModule(MODULE_NAME, __name__,
url_prefix='/misc/workspace')
@blueprint.route("/")
@pga_login_required
def index():
return bad_request(
errormsg=gettext('This URL cannot be requested directly.')
)
@blueprint.route(
'/adhoc_connect_server',
methods=["POST"],
endpoint="adhoc_connect_server"
)
@pga_login_required
def adhoc_connect_server():
required_args = ['server_name', 'did']
data = request.form if request.form else json.loads(
request.data
)
# Loop through data and if found any value is blank string then
# convert it to None as after porting into React, from frontend
# '' blank string is coming as a value instead of null.
for item in data:
if data[item] == '':
data[item] = None
# Some fields can be provided with service file so they are optional
if 'service' in data and not data['service']:
required_args.extend([
'host',
'port',
'user'
])
for arg in required_args:
if arg not in data:
return make_json_response(
status=410,
success=0,
errormsg=gettext(
"Could not find the required parameter ({})."
).format(arg)
)
connection_params = convert_connection_parameter(
data.get('connection_params', []))
if connection_params is not None:
if 'hostaddr' in connection_params and \
not is_valid_ipaddress(connection_params['hostaddr']):
return make_json_response(
success=0,
status=400,
errormsg=gettext('Not a valid Host address')
)
# To check ssl configuration
_, connection_params = check_ssl_fields(connection_params)
# set the connection params again in the data
if 'connection_params' in data:
data['connection_params'] = connection_params
# Fetch all the new data in case of non-existing servers
new_host = data.get('host', None)
new_port = int(data.get('port', 0))
new_db = data.get('database_name', None)
if new_db is None:
new_db = data.get('did')
new_username = data.get('user')
new_role = data.get('role', None)
new_server_name = data.get('server_name', None)
new_service = data.get('service', None)
try:
server = None
if config.CONFIG_DATABASE_URI is not None and \
len(config.CONFIG_DATABASE_URI) > 0:
# Filter out all the servers with the below combination.
servers = Server.query.filter_by(host=new_host,
port=new_port,
maintenance_db=new_db,
username=new_username,
name=new_server_name,
role=new_role,
service=new_service
).all()
# If found matching servers then compare the connection_params as
# with external database (PostgreSQL) comparing two json objects
# are not supported.
for existing_server in servers:
if existing_server.connection_params == connection_params:
server = existing_server
break
else:
server = Server.query.filter_by(host=new_host,
port=new_port,
maintenance_db=new_db,
username=new_username,
name=new_server_name,
role=new_role,
service=new_service,
connection_params=connection_params
).first()
# If server is none then no server with the above combination is found.
if server is None:
# Check if sid is present in data if it is then used that sid.
if ('sid' in data and data['sid'] is not None and
int(data['sid']) > 0):
server = Server.query.filter_by(id=data['sid']).first()
# Clone the server object
server = server.clone()
# Replace the following with the new/changed value.
server.maintenance_db = new_db
server.username = new_username
server.role = new_role
server.connection_params = connection_params
server.is_adhoc = 1
db.session.add(server)
db.session.commit()
else:
server = Server(
user_id=current_user.id,
servergroup_id=data.get('gid', 1),
name=new_server_name,
host=new_host,
port=new_port,
maintenance_db=new_db,
username=new_username,
role=new_role,
service=new_service,
connection_params=connection_params,
is_adhoc=1
)
db.session.add(server)
db.session.commit()
view = SchemaDiffRegistry.get_node_view('server')
return view.connect(1, server.id, is_qt=False, server=server)
except Exception as e:
current_app.logger.exception(e)
return make_json_response(
status=410,
success=0,
errormsg=str(e)
)
@blueprint.route(
'/layout_changed',
methods=["DELETE"],
endpoint="layout_changed"
)
@pga_login_required
def layout_changed():
# if layout is changed from 'Workspace' to 'Classic', disconnect all
# servers.
disconnect_from_all_servers()
delete_adhoc_servers()
return make_json_response(status=200)
def check_and_delete_adhoc_server(sid):
"""
This function is used to check for adhoc server and if all Query Tool
and PSQL connections are closed then delete that server.
"""
server = Server.query.filter_by(id=sid).first()
if server.is_adhoc:
# Check PSQL connections. If more connections are open for
# the given sid return from the function.
psql_connections = get_open_psql_connections()
if sid in psql_connections.values():
return
# Check Query Tool connections for the given sid
manager = get_driver(PG_DEFAULT_DRIVER).connection_manager(sid)
for key, value in manager.connections.items():
if key.startswith('CONN') and value.connected():
return
# Assumption at this point all the Query Tool and PSQL connections
# is closed, so now we can release the manager
manager.release()
# Delete the adhoc server from the pgadmin database
delete_adhoc_servers(sid)