mirror of
https://github.com/pgadmin-org/pgadmin4.git
synced 2025-02-25 18:55:31 -06:00
191 lines
6.5 KiB
Python
191 lines
6.5 KiB
Python
##########################################################################
|
|
#
|
|
# pgAdmin 4 - PostgreSQL Tools
|
|
#
|
|
# Copyright (C) 2013 - 2024, The pgAdmin Development Team
|
|
# This software is released under the PostgreSQL Licence
|
|
#
|
|
##########################################################################
|
|
|
|
"""A blueprint module implementing the workspace."""
|
|
import json
|
|
import config
|
|
from flask import request, current_app
|
|
from pgadmin.user_login_check import pga_login_required
|
|
from flask_babel import gettext
|
|
from flask_security import current_user
|
|
from pgadmin.utils import PgAdminModule
|
|
from pgadmin.model import db, Server
|
|
from pgadmin.utils.ajax import bad_request, make_json_response
|
|
from pgadmin.browser.server_groups.servers.utils import (
|
|
is_valid_ipaddress, convert_connection_parameter, check_ssl_fields)
|
|
from pgadmin.tools.schema_diff.node_registry import SchemaDiffRegistry
|
|
from pgadmin.browser.server_groups.servers.utils import (
|
|
disconnect_from_all_servers, delete_adhoc_servers)
|
|
|
|
MODULE_NAME = 'workspace'
|
|
|
|
|
|
class WorkspaceModule(PgAdminModule):
|
|
|
|
def get_exposed_url_endpoints(self):
|
|
"""
|
|
Returns:
|
|
list: URL endpoints for Workspace module
|
|
"""
|
|
return [
|
|
'workspace.adhoc_connect_server'
|
|
]
|
|
|
|
|
|
blueprint = WorkspaceModule(MODULE_NAME, __name__,
|
|
url_prefix='/misc/workspace')
|
|
|
|
|
|
@blueprint.route("/")
|
|
@pga_login_required
|
|
def index():
|
|
return bad_request(
|
|
errormsg=gettext('This URL cannot be requested directly.')
|
|
)
|
|
|
|
|
|
@blueprint.route(
|
|
'/adhoc_connect_server',
|
|
methods=["POST"],
|
|
endpoint="adhoc_connect_server"
|
|
)
|
|
@pga_login_required
|
|
def adhoc_connect_server():
|
|
required_args = ['host', 'port', 'user']
|
|
|
|
data = request.form if request.form else json.loads(
|
|
request.data
|
|
)
|
|
|
|
for arg in required_args:
|
|
if arg not in data:
|
|
return make_json_response(
|
|
status=410,
|
|
success=0,
|
|
errormsg=gettext(
|
|
"Could not find the required parameter ({})."
|
|
).format(arg)
|
|
)
|
|
|
|
connection_params = convert_connection_parameter(
|
|
data.get('connection_params', []))
|
|
|
|
if 'hostaddr' in connection_params and \
|
|
not is_valid_ipaddress(connection_params['hostaddr']):
|
|
return make_json_response(
|
|
success=0,
|
|
status=400,
|
|
errormsg=gettext('Not a valid Host address')
|
|
)
|
|
|
|
# To check ssl configuration
|
|
_, connection_params = check_ssl_fields(connection_params)
|
|
# set the connection params again in the data
|
|
if 'connection_params' in data:
|
|
data['connection_params'] = connection_params
|
|
|
|
# Fetch all the new data in case of non-existing servers
|
|
new_db = data.get('database_name', None)
|
|
if new_db is None:
|
|
new_db = data.get('did')
|
|
new_username = data.get('user')
|
|
new_role = data.get('role', None)
|
|
new_server_name = data.get('server_name', None)
|
|
|
|
try:
|
|
server = None
|
|
if config.CONFIG_DATABASE_URI is not None and \
|
|
len(config.CONFIG_DATABASE_URI) > 0:
|
|
# Filter out all the servers with the below combination.
|
|
servers = Server.query.filter_by(host=data['host'],
|
|
port=data['port'],
|
|
maintenance_db=new_db,
|
|
username=new_username,
|
|
name=new_server_name,
|
|
role=new_role
|
|
).all()
|
|
|
|
# If found matching servers then compare the connection_params as
|
|
# with external database (PostgreSQL) comparing two json objects
|
|
# are not supported.
|
|
for existing_server in servers:
|
|
if existing_server.connection_params == connection_params:
|
|
server = existing_server
|
|
break
|
|
else:
|
|
server = Server.query.filter_by(host=data['host'],
|
|
port=data['port'],
|
|
maintenance_db=new_db,
|
|
username=new_username,
|
|
name=new_server_name,
|
|
role=new_role,
|
|
connection_params=connection_params
|
|
).first()
|
|
|
|
# If server is none then no server with the above combination is found.
|
|
if server is None:
|
|
# Check if sid is present in data if it is then used that sid.
|
|
if ('sid' in data and data['sid'] is not None and
|
|
int(data['sid']) > 0):
|
|
server = Server.query.filter_by(id=data['sid']).first()
|
|
|
|
# Clone the server object
|
|
server = server.clone()
|
|
|
|
# Replace the following with the new/changed value.
|
|
server.maintenance_db = new_db
|
|
server.username = new_username
|
|
server.role = new_role
|
|
server.connection_params = connection_params
|
|
server.is_adhoc = 1
|
|
|
|
db.session.add(server)
|
|
db.session.commit()
|
|
else:
|
|
server = Server(
|
|
user_id=current_user.id,
|
|
servergroup_id=data.get('gid', 1),
|
|
name=new_server_name,
|
|
host=data.get('host', None),
|
|
port=data.get('port'),
|
|
maintenance_db=new_db,
|
|
username=new_username,
|
|
role=new_role,
|
|
service=data.get('service', None),
|
|
connection_params=connection_params,
|
|
is_adhoc=1
|
|
)
|
|
db.session.add(server)
|
|
db.session.commit()
|
|
|
|
view = SchemaDiffRegistry.get_node_view('server')
|
|
return view.connect(1, server.id, is_qt=False, server=server)
|
|
except Exception as e:
|
|
current_app.logger.exception(e)
|
|
return make_json_response(
|
|
status=410,
|
|
success=0,
|
|
errormsg=str(e)
|
|
)
|
|
|
|
|
|
@blueprint.route(
|
|
'/layout_changed',
|
|
methods=["DELETE"],
|
|
endpoint="layout_changed"
|
|
)
|
|
@pga_login_required
|
|
def layout_changed():
|
|
# if layout is changed from 'Workspace' to 'Classic', disconnect all
|
|
# servers.
|
|
disconnect_from_all_servers()
|
|
delete_adhoc_servers()
|
|
|
|
return make_json_response(status=200)
|