2015-06-30 00:51:55 -05:00
|
|
|
#########################################################################
|
2015-01-22 09:56:23 -06:00
|
|
|
#
|
|
|
|
# pgAdmin 4 - PostgreSQL Tools
|
|
|
|
#
|
2024-01-01 02:43:48 -06:00
|
|
|
# Copyright (C) 2013 - 2024, The pgAdmin Development Team
|
2015-01-22 09:56:23 -06:00
|
|
|
# This software is released under the PostgreSQL Licence
|
|
|
|
#
|
|
|
|
##########################################################################
|
|
|
|
|
|
|
|
"""Perform the initial setup of the application, by creating the auth
|
|
|
|
and settings database."""
|
|
|
|
|
2016-06-21 08:12:14 -05:00
|
|
|
import os
|
|
|
|
import sys
|
2023-12-21 00:37:26 -06:00
|
|
|
import typer
|
|
|
|
from rich.console import Console
|
|
|
|
from rich.table import Table
|
|
|
|
from rich import box, print
|
|
|
|
import json as jsonlib
|
2024-03-14 07:42:28 -05:00
|
|
|
from functools import wraps
|
2023-12-21 00:37:26 -06:00
|
|
|
|
|
|
|
console = Console()
|
|
|
|
app = typer.Typer()
|
2022-09-06 01:18:55 -05:00
|
|
|
|
|
|
|
# We need to include the root directory in sys.path to ensure that we can
|
|
|
|
# find everything we need when running in the standalone runtime.
|
|
|
|
root = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
if sys.path[0] != root:
|
|
|
|
sys.path.insert(0, root)
|
|
|
|
|
2020-04-30 06:52:48 -05:00
|
|
|
import builtins
|
2022-07-01 07:42:00 -05:00
|
|
|
import config
|
2015-10-20 02:03:18 -05:00
|
|
|
|
2017-08-25 05:56:44 -05:00
|
|
|
# Grab the SERVER_MODE if it's been set by the runtime
|
|
|
|
if 'SERVER_MODE' in globals():
|
|
|
|
builtins.SERVER_MODE = globals()['SERVER_MODE']
|
|
|
|
else:
|
|
|
|
builtins.SERVER_MODE = None
|
2016-06-21 08:12:14 -05:00
|
|
|
|
2024-01-03 04:39:42 -06:00
|
|
|
from pgadmin.model import db, Version, User, \
|
2023-12-21 00:37:26 -06:00
|
|
|
SCHEMA_VERSION as CURRENT_SCHEMA_VERSION
|
2021-04-27 02:24:57 -05:00
|
|
|
from pgadmin import create_app
|
2024-01-03 04:39:42 -06:00
|
|
|
from pgadmin.utils import clear_database_servers, dump_database_servers, \
|
2024-02-15 03:28:31 -06:00
|
|
|
load_database_servers, _handle_error
|
2022-07-01 07:42:00 -05:00
|
|
|
from pgadmin.setup import db_upgrade, create_app_data_directory
|
2023-12-21 00:37:26 -06:00
|
|
|
from typing import Optional, List
|
|
|
|
from typing_extensions import Annotated
|
2024-02-15 03:28:31 -06:00
|
|
|
from pgadmin.utils.constants import INTERNAL, LDAP, OAUTH2, \
|
2023-12-21 00:37:26 -06:00
|
|
|
KERBEROS, WEBSERVER
|
2024-03-27 01:17:58 -05:00
|
|
|
from pgadmin.tools.user_management import create_user, delete_user, \
|
|
|
|
update_user as user_management_update_user
|
2023-12-21 00:37:26 -06:00
|
|
|
from enum import Enum
|
2024-02-15 03:28:31 -06:00
|
|
|
from flask_babel import gettext
|
|
|
|
|
2023-12-21 00:37:26 -06:00
|
|
|
|
|
|
|
app = typer.Typer(pretty_exceptions_show_locals=False)
|
|
|
|
|
|
|
|
|
2024-03-14 07:42:28 -05:00
|
|
|
def update_sqlite_path(f):
|
|
|
|
"""
|
|
|
|
This function will behave as a decorator which will check if
|
|
|
|
sqlite path is provided and will update it in config
|
|
|
|
"""
|
|
|
|
|
|
|
|
@wraps(f)
|
|
|
|
def wrap(*args, **kwargs):
|
|
|
|
if kwargs and kwargs.get('sqlite_path') is not None:
|
|
|
|
# update the sqlite path
|
|
|
|
config.SQLITE_PATH = kwargs['sqlite_path']
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
|
|
|
|
return wrap
|
|
|
|
|
|
|
|
|
2023-12-21 00:37:26 -06:00
|
|
|
class ManageServers:
|
|
|
|
|
|
|
|
@app.command()
|
2024-03-14 07:42:28 -05:00
|
|
|
@update_sqlite_path
|
2023-12-21 00:37:26 -06:00
|
|
|
def dump_servers(output_file: str, user: Optional[str] = None,
|
|
|
|
auth_source: Optional[str] = INTERNAL,
|
|
|
|
sqlite_path: Optional[str] = None,
|
|
|
|
server: List[int] = None):
|
|
|
|
"""Dump the server groups and servers. """
|
|
|
|
|
|
|
|
# What user?
|
|
|
|
dump_user = user if user is not None else config.DESKTOP_USER
|
|
|
|
|
|
|
|
print('----------')
|
|
|
|
print('Dumping servers with:')
|
|
|
|
print('User:', dump_user)
|
|
|
|
print('SQLite pgAdmin config:', config.SQLITE_PATH)
|
|
|
|
print('----------')
|
|
|
|
|
2024-01-03 04:39:42 -06:00
|
|
|
try:
|
|
|
|
app = create_app(config.APP_NAME + '-cli')
|
|
|
|
with app.test_request_context():
|
|
|
|
dump_database_servers(output_file, server, dump_user, True,
|
|
|
|
auth_source)
|
|
|
|
except Exception as e:
|
|
|
|
print(str(e))
|
2023-12-21 00:37:26 -06:00
|
|
|
|
|
|
|
@app.command()
|
2024-03-14 07:42:28 -05:00
|
|
|
@update_sqlite_path
|
2023-12-21 00:37:26 -06:00
|
|
|
def load_servers(input_file: str, user: Optional[str] = None,
|
|
|
|
auth_source: Optional[str] = INTERNAL,
|
|
|
|
sqlite_path: Optional[str] = None,
|
|
|
|
replace: Optional[bool] = False
|
|
|
|
):
|
|
|
|
|
|
|
|
"""Load server groups and servers."""
|
|
|
|
|
|
|
|
# What user?
|
|
|
|
load_user = user if user is not None else config.DESKTOP_USER
|
|
|
|
|
|
|
|
print('----------')
|
|
|
|
print('Loading servers with:')
|
|
|
|
print('User:', load_user)
|
|
|
|
print('SQLite pgAdmin config:', config.SQLITE_PATH)
|
|
|
|
print('----------')
|
|
|
|
|
2024-01-03 04:39:42 -06:00
|
|
|
try:
|
|
|
|
app = create_app(config.APP_NAME + '-cli')
|
|
|
|
with app.test_request_context():
|
|
|
|
if replace:
|
|
|
|
clear_database_servers(load_user, True, auth_source)
|
|
|
|
load_database_servers(input_file, None, load_user, True,
|
|
|
|
auth_source)
|
|
|
|
except Exception as e:
|
|
|
|
print(str(e))
|
2023-12-21 00:37:26 -06:00
|
|
|
|
|
|
|
|
|
|
|
class AuthExtTypes(str, Enum):
|
|
|
|
oauth2 = OAUTH2
|
|
|
|
ldap = LDAP
|
|
|
|
kerberos = KERBEROS
|
|
|
|
webserver = WEBSERVER
|
|
|
|
|
|
|
|
|
|
|
|
# Enum class can not be extended
|
|
|
|
class AuthType(str, Enum):
|
|
|
|
oauth2 = OAUTH2
|
|
|
|
ldap = LDAP
|
|
|
|
kerberos = KERBEROS
|
|
|
|
webserver = WEBSERVER
|
|
|
|
internal = INTERNAL
|
|
|
|
|
|
|
|
|
|
|
|
class ManageUsers:
|
|
|
|
|
|
|
|
@app.command()
|
2024-03-14 07:42:28 -05:00
|
|
|
@update_sqlite_path
|
2023-12-21 00:37:26 -06:00
|
|
|
def add_user(email: str, password: str,
|
|
|
|
role: Annotated[Optional[bool], typer.Option(
|
|
|
|
"--admin/--nonadmin")] = False,
|
|
|
|
active: Annotated[Optional[bool],
|
|
|
|
typer.Option("--active/--inactive")] = True,
|
|
|
|
console: Optional[bool] = True,
|
2024-03-14 07:42:28 -05:00
|
|
|
json: Optional[bool] = False,
|
|
|
|
sqlite_path: Optional[str] = None,
|
2023-12-21 00:37:26 -06:00
|
|
|
):
|
|
|
|
"""Add Internal user. """
|
|
|
|
|
|
|
|
data = {
|
|
|
|
'email': email,
|
|
|
|
'role': 1 if role else 2,
|
|
|
|
'active': active,
|
|
|
|
'auth_source': INTERNAL,
|
|
|
|
'newPassword': password,
|
|
|
|
'confirmPassword': password,
|
|
|
|
}
|
|
|
|
ManageUsers.create_user(data, console, json)
|
|
|
|
|
|
|
|
@app.command()
|
2024-03-14 07:42:28 -05:00
|
|
|
@update_sqlite_path
|
2023-12-21 00:37:26 -06:00
|
|
|
def add_external_user(username: str,
|
|
|
|
auth_source: AuthExtTypes = AuthExtTypes.oauth2,
|
|
|
|
email: Optional[str] = None,
|
|
|
|
role: Annotated[Optional[bool],
|
|
|
|
typer.Option(
|
|
|
|
"--admin/--nonadmin")] = False,
|
|
|
|
active: Annotated[Optional[bool],
|
|
|
|
typer.Option(
|
|
|
|
"--active/--inactive")] = True,
|
|
|
|
console: Optional[bool] = True,
|
2024-03-14 07:42:28 -05:00
|
|
|
json: Optional[bool] = False,
|
|
|
|
sqlite_path: Optional[str] = None,
|
2023-12-21 00:37:26 -06:00
|
|
|
):
|
|
|
|
"""Add external user, other than Internal like
|
|
|
|
Ldap, Ouath2, Kerberos, Webserver. """
|
|
|
|
|
|
|
|
data = {
|
|
|
|
'username': username,
|
|
|
|
'email': email,
|
|
|
|
'role': 1 if role else 2,
|
|
|
|
'active': active,
|
|
|
|
'auth_source': auth_source
|
|
|
|
}
|
|
|
|
ManageUsers.create_user(data, console, json)
|
|
|
|
|
|
|
|
@app.command()
|
2024-03-14 07:42:28 -05:00
|
|
|
@update_sqlite_path
|
2024-02-13 23:50:44 -06:00
|
|
|
def delete_user(username: str,
|
|
|
|
auth_source: AuthType = AuthType.internal,
|
|
|
|
auto_confirm: Annotated[Optional[bool],
|
|
|
|
typer.Option(
|
2024-03-14 07:42:28 -05:00
|
|
|
"--yes")] = False,
|
|
|
|
sqlite_path: Optional[str] = None,
|
2024-02-13 23:50:44 -06:00
|
|
|
):
|
2023-12-21 00:37:26 -06:00
|
|
|
"""Delete the user. """
|
2024-03-14 07:42:28 -05:00
|
|
|
|
2024-02-13 23:50:44 -06:00
|
|
|
confirm_msg = "Are you sure you want to delete it?"
|
2024-03-14 07:42:28 -05:00
|
|
|
|
2024-02-13 23:50:44 -06:00
|
|
|
if auto_confirm or typer.confirm(confirm_msg):
|
2023-12-21 00:37:26 -06:00
|
|
|
app = create_app(config.APP_NAME + '-cli')
|
|
|
|
with app.test_request_context():
|
|
|
|
uid = ManageUsers.get_user(username=username,
|
|
|
|
auth_source=auth_source)
|
|
|
|
if not uid:
|
|
|
|
print("User not found")
|
|
|
|
else:
|
|
|
|
status, msg = delete_user(uid)
|
|
|
|
if status:
|
|
|
|
print('User deleted successfully.')
|
|
|
|
else:
|
|
|
|
print('Something went wrong. ' + str(msg))
|
|
|
|
|
|
|
|
@app.command()
|
2024-03-14 07:42:28 -05:00
|
|
|
@update_sqlite_path
|
2023-12-21 00:37:26 -06:00
|
|
|
def update_user(email: str,
|
|
|
|
password: Optional[str] = None,
|
|
|
|
role: Annotated[Optional[bool],
|
|
|
|
typer.Option("--admin/--nonadmin"
|
|
|
|
)] = None,
|
|
|
|
active: Annotated[Optional[bool],
|
|
|
|
typer.Option("--active/--inactive"
|
|
|
|
)] = None,
|
|
|
|
console: Optional[bool] = True,
|
2024-03-14 07:42:28 -05:00
|
|
|
json: Optional[bool] = False,
|
|
|
|
sqlite_path: Optional[str] = None,
|
2023-12-21 00:37:26 -06:00
|
|
|
):
|
|
|
|
"""Update internal user."""
|
|
|
|
|
|
|
|
data = dict()
|
|
|
|
if password:
|
|
|
|
if len(password) < 6:
|
|
|
|
print("Password must be at least 6 characters long.")
|
|
|
|
exit()
|
2024-03-27 01:17:58 -05:00
|
|
|
# validate_password relies on the new password being present as
|
|
|
|
# `newPassword` and `confirmPassword` in the data
|
|
|
|
data['newPassword'] = password
|
|
|
|
data['confirmPassword'] = password
|
2023-12-21 00:37:26 -06:00
|
|
|
|
|
|
|
if role is not None:
|
|
|
|
data['role'] = 1 if role else 2
|
|
|
|
if active is not None:
|
|
|
|
data['active'] = active
|
|
|
|
|
|
|
|
app = create_app(config.APP_NAME + '-cli')
|
|
|
|
with app.test_request_context():
|
|
|
|
uid = ManageUsers.get_user(username=email,
|
|
|
|
auth_source=INTERNAL)
|
|
|
|
if not uid:
|
|
|
|
print("User not found")
|
|
|
|
else:
|
2024-03-27 01:17:58 -05:00
|
|
|
status, msg = user_management_update_user(uid, data)
|
2023-12-21 00:37:26 -06:00
|
|
|
if status:
|
2024-01-03 04:39:42 -06:00
|
|
|
_user = ManageUsers.get_users_from_db(username=email,
|
|
|
|
auth_source=INTERNAL,
|
|
|
|
console=False)
|
2023-12-21 00:37:26 -06:00
|
|
|
ManageUsers.display_user(_user[0], console, json)
|
|
|
|
else:
|
|
|
|
print('Something went wrong. ' + str(msg))
|
|
|
|
|
|
|
|
@app.command()
|
2024-03-14 07:42:28 -05:00
|
|
|
@update_sqlite_path
|
2024-01-03 04:39:42 -06:00
|
|
|
def get_users(username: Optional[str] = None,
|
2023-12-21 00:37:26 -06:00
|
|
|
auth_source: AuthType = None,
|
2024-03-14 07:42:28 -05:00
|
|
|
json: Optional[bool] = False,
|
|
|
|
sqlite_path: Optional[str] = None,
|
2023-12-21 00:37:26 -06:00
|
|
|
):
|
2024-03-14 07:42:28 -05:00
|
|
|
|
2024-01-03 04:39:42 -06:00
|
|
|
ManageUsers.get_users_from_db(username, auth_source, True, json)
|
|
|
|
|
|
|
|
def get_users_from_db(username: Optional[str] = None,
|
|
|
|
auth_source: AuthType = None,
|
|
|
|
console: Optional[bool] = True,
|
2024-03-14 07:42:28 -05:00
|
|
|
json: Optional[bool] = False,
|
2024-01-03 04:39:42 -06:00
|
|
|
):
|
2023-12-21 00:37:26 -06:00
|
|
|
"""Get user(s) details."""
|
2024-03-14 07:42:28 -05:00
|
|
|
|
2023-12-21 00:37:26 -06:00
|
|
|
app = create_app(config.APP_NAME + '-cli')
|
|
|
|
with app.test_request_context():
|
|
|
|
if username and auth_source:
|
|
|
|
users = User.query.filter_by(username=username,
|
|
|
|
auth_source=auth_source)
|
|
|
|
elif not username and auth_source:
|
|
|
|
users = User.query.filter_by(auth_source=auth_source)
|
|
|
|
elif username and not auth_source:
|
|
|
|
users = User.query.filter_by(username=username)
|
|
|
|
else:
|
|
|
|
users = User.query.all()
|
|
|
|
users_data = []
|
|
|
|
for u in users:
|
|
|
|
_data = {'id': u.id,
|
|
|
|
'username': u.username,
|
|
|
|
'email': u.email,
|
|
|
|
'active': u.active,
|
|
|
|
'role': u.roles[0].id,
|
|
|
|
'auth_source': u.auth_source,
|
|
|
|
'locked': u.locked
|
|
|
|
}
|
|
|
|
users_data.append(_data)
|
2024-01-03 04:39:42 -06:00
|
|
|
if console:
|
|
|
|
ManageUsers.display_user(users_data, console, json)
|
|
|
|
else:
|
2023-12-21 00:37:26 -06:00
|
|
|
return users_data
|
|
|
|
|
|
|
|
@app.command()
|
2024-03-14 07:42:28 -05:00
|
|
|
@update_sqlite_path
|
2023-12-21 00:37:26 -06:00
|
|
|
def update_external_user(username: str,
|
|
|
|
auth_source: AuthExtTypes = AuthExtTypes.oauth2,
|
|
|
|
email: Optional[str] = None,
|
|
|
|
role: Annotated[Optional[bool],
|
|
|
|
typer.Option("--admin/--nonadmin"
|
|
|
|
)] = None,
|
|
|
|
active: Annotated[
|
|
|
|
Optional[bool],
|
|
|
|
typer.Option("--active/--inactive")] = None,
|
|
|
|
console: Optional[bool] = True,
|
2024-03-14 07:42:28 -05:00
|
|
|
json: Optional[bool] = False,
|
|
|
|
sqlite_path: Optional[str] = None,
|
2023-12-21 00:37:26 -06:00
|
|
|
):
|
|
|
|
"""Update external users other than Internal like
|
|
|
|
Ldap, Ouath2, Kerberos, Webserver."""
|
|
|
|
|
|
|
|
data = dict()
|
|
|
|
if email:
|
|
|
|
data['email'] = email
|
|
|
|
if role is not None:
|
|
|
|
data['role'] = 1 if role else 2
|
|
|
|
if active is not None:
|
|
|
|
data['active'] = active
|
|
|
|
|
|
|
|
app = create_app(config.APP_NAME + '-cli')
|
|
|
|
with app.test_request_context():
|
|
|
|
uid = ManageUsers.get_user(username=username,
|
|
|
|
auth_source=auth_source)
|
|
|
|
if not uid:
|
|
|
|
print("User not found")
|
|
|
|
else:
|
2024-03-27 01:17:58 -05:00
|
|
|
status, msg = user_management_update_user(uid, data)
|
2023-12-21 00:37:26 -06:00
|
|
|
if status:
|
2024-03-14 07:42:28 -05:00
|
|
|
_user = ManageUsers.get_users_from_db(
|
|
|
|
username=username,
|
|
|
|
auth_source=auth_source,
|
|
|
|
console=False
|
|
|
|
)
|
2023-12-21 00:37:26 -06:00
|
|
|
ManageUsers.display_user(_user[0], console, json)
|
|
|
|
else:
|
|
|
|
print('Something went wrong. ' + str(msg))
|
|
|
|
|
|
|
|
def create_user(data, console, json):
|
|
|
|
app = create_app(config.APP_NAME + '-cli')
|
|
|
|
with app.test_request_context():
|
2024-01-03 04:39:42 -06:00
|
|
|
username = data['username'] if 'username' in data else \
|
2023-12-21 00:37:26 -06:00
|
|
|
data['email']
|
|
|
|
uid = ManageUsers.get_user(username=username,
|
|
|
|
auth_source=data['auth_source'])
|
|
|
|
if uid:
|
|
|
|
print("User already exists.")
|
|
|
|
exit()
|
|
|
|
|
|
|
|
if 'newPassword' in data and len(data['newPassword']) < 6:
|
|
|
|
print("Password must be at least 6 characters long.")
|
|
|
|
exit()
|
|
|
|
|
|
|
|
status, msg = create_user(data)
|
|
|
|
if status:
|
|
|
|
ManageUsers.display_user(data, console, json)
|
|
|
|
else:
|
|
|
|
print('Something went wrong. ' + str(msg))
|
|
|
|
|
|
|
|
def get_user(username=None, auth_source=INTERNAL):
|
|
|
|
app = create_app(config.APP_NAME + '-cli')
|
|
|
|
usr = None
|
|
|
|
with app.test_request_context():
|
|
|
|
usr = User.query.filter_by(username=username,
|
|
|
|
auth_source=auth_source).first()
|
|
|
|
|
|
|
|
if not usr:
|
|
|
|
return None
|
|
|
|
return usr.id
|
|
|
|
|
|
|
|
def display_user(data, _console, _json):
|
2024-01-03 04:39:42 -06:00
|
|
|
if _console:
|
|
|
|
if _json:
|
|
|
|
json_formatted_str = jsonlib.dumps(data, indent=0)
|
|
|
|
console.print(json_formatted_str)
|
|
|
|
else:
|
|
|
|
if isinstance(data, dict):
|
|
|
|
data = [data]
|
|
|
|
for _data in data:
|
|
|
|
table = Table(title="User Details", box=box.ASCII)
|
|
|
|
table.add_column("Field", style="green")
|
|
|
|
table.add_column("Value", style="green")
|
|
|
|
|
|
|
|
if 'username' in _data:
|
|
|
|
table.add_row("Username", _data['username'])
|
|
|
|
if 'email' in _data:
|
|
|
|
table.add_row("Email", _data['email'])
|
|
|
|
table.add_row("auth_source", _data['auth_source'])
|
|
|
|
table.add_row("role",
|
|
|
|
"Admin" if _data['role'] and
|
|
|
|
_data['role'] != 2 else
|
|
|
|
"Non-admin")
|
|
|
|
table.add_row("active",
|
|
|
|
'True' if _data['active'] else 'False')
|
|
|
|
console.print(table)
|
2023-12-21 00:37:26 -06:00
|
|
|
|
|
|
|
|
|
|
|
class ManagePreferences:
|
|
|
|
|
|
|
|
def get_user(username=None, auth_source=INTERNAL):
|
|
|
|
app = create_app(config.APP_NAME + '-cli')
|
|
|
|
usr = None
|
|
|
|
with app.test_request_context():
|
|
|
|
usr = User.query.filter_by(username=username,
|
|
|
|
auth_source=auth_source).first()
|
|
|
|
if not usr:
|
|
|
|
return None
|
|
|
|
return usr.id
|
|
|
|
|
|
|
|
@app.command()
|
2024-03-14 07:42:28 -05:00
|
|
|
@update_sqlite_path
|
|
|
|
def get_prefs(json: Optional[bool] = False,
|
|
|
|
sqlite_path: Optional[str] = None,
|
|
|
|
):
|
|
|
|
|
2024-01-03 04:39:42 -06:00
|
|
|
return ManagePreferences.fetch_prefs()
|
|
|
|
|
|
|
|
def fetch_prefs(id: Optional[bool] = None, json: Optional[bool] = False):
|
2023-12-21 00:37:26 -06:00
|
|
|
"""Get Preferences List."""
|
|
|
|
app = create_app(config.APP_NAME + '-cli')
|
|
|
|
table = Table(title="Pref Details", box=box.ASCII)
|
|
|
|
table.add_column("Preference", style="green")
|
|
|
|
with app.app_context():
|
2024-02-15 03:28:31 -06:00
|
|
|
from pgadmin.model import Preferences as PrefTable, \
|
2023-12-21 00:37:26 -06:00
|
|
|
ModulePreference as ModulePrefTable, \
|
|
|
|
PreferenceCategory as PrefCategoryTbl
|
|
|
|
|
|
|
|
module_prefs = ModulePrefTable.query.all()
|
|
|
|
cat_prefs = PrefCategoryTbl.query.all()
|
|
|
|
prefs = PrefTable.query.all()
|
|
|
|
if id:
|
|
|
|
all_preferences = {}
|
|
|
|
else:
|
|
|
|
all_preferences = []
|
|
|
|
for i in module_prefs:
|
|
|
|
for j in cat_prefs:
|
|
|
|
if i.id == j.mid:
|
|
|
|
for k in prefs:
|
|
|
|
if k.cid == j.id:
|
|
|
|
if id:
|
|
|
|
all_preferences["{0}:{1}:{2}".format(
|
|
|
|
i.name, j.name, k.name)
|
|
|
|
] = "{0}:{1}:{2}".format(i.id, j.id, k.id)
|
|
|
|
else:
|
|
|
|
table.add_row("{0}:{1}:{2}".format(
|
|
|
|
i.name, j.name, k.name))
|
|
|
|
all_preferences.append(
|
|
|
|
"{0}:{1}:{2}".format(
|
|
|
|
i.name, j.name, k.name)
|
|
|
|
)
|
|
|
|
if id:
|
|
|
|
return all_preferences
|
|
|
|
else:
|
|
|
|
if json:
|
|
|
|
json_formatted_str = jsonlib.dumps(
|
|
|
|
{"Preferences": all_preferences},
|
|
|
|
indent=0)
|
2024-01-03 04:39:42 -06:00
|
|
|
print(json_formatted_str)
|
2023-12-21 00:37:26 -06:00
|
|
|
else:
|
2024-01-03 04:39:42 -06:00
|
|
|
print(table)
|
2023-12-21 00:37:26 -06:00
|
|
|
|
|
|
|
@app.command()
|
2024-03-14 07:42:28 -05:00
|
|
|
@update_sqlite_path
|
2024-02-15 03:28:31 -06:00
|
|
|
def set_prefs(username,
|
|
|
|
pref_options: Annotated[Optional[List[str]],
|
|
|
|
typer.Argument()] = None,
|
2023-12-21 00:37:26 -06:00
|
|
|
auth_source: AuthType = AuthType.internal,
|
2024-01-03 04:39:42 -06:00
|
|
|
console: Optional[bool] = True,
|
2024-02-15 03:28:31 -06:00
|
|
|
json: Optional[bool] = False,
|
2024-03-14 07:42:28 -05:00
|
|
|
input_file: Optional[str] = None,
|
|
|
|
sqlite_path: Optional[str] = None,
|
|
|
|
):
|
2023-12-21 00:37:26 -06:00
|
|
|
"""Set User preferences."""
|
2024-02-15 03:28:31 -06:00
|
|
|
|
|
|
|
if input_file:
|
|
|
|
from urllib.parse import unquote
|
|
|
|
# generate full path of file
|
|
|
|
try:
|
|
|
|
file_path = unquote(input_file)
|
|
|
|
except Exception as e:
|
|
|
|
print(str(e))
|
|
|
|
return _handle_error(str(e), True)
|
|
|
|
import json as json_utility
|
|
|
|
try:
|
|
|
|
with open(file_path) as f:
|
|
|
|
data = json_utility.load(f)
|
|
|
|
except json_utility.decoder.JSONDecodeError as e:
|
|
|
|
return _handle_error(gettext("Error parsing input file %s: %s"
|
|
|
|
% (file_path, e)), True)
|
|
|
|
except Exception as e:
|
|
|
|
return _handle_error(
|
|
|
|
gettext("Error reading input file %s: [%d] %s" %
|
|
|
|
(file_path, e.errno, e.strerror)), True)
|
|
|
|
|
|
|
|
pref_data = data['preferences']
|
|
|
|
|
|
|
|
for k, v in pref_data.items():
|
|
|
|
pref_options.append(k + "=" + str(v))
|
|
|
|
|
2023-12-21 00:37:26 -06:00
|
|
|
user_id = ManagePreferences.get_user(username, auth_source)
|
2024-02-15 03:28:31 -06:00
|
|
|
table = Table(title="Updated Pref Details", box=box.ASCII)
|
2023-12-21 00:37:26 -06:00
|
|
|
table.add_column("Preference", style="green")
|
|
|
|
if not user_id:
|
|
|
|
print("User not found.")
|
|
|
|
return
|
|
|
|
|
2024-01-03 04:39:42 -06:00
|
|
|
prefs = ManagePreferences.fetch_prefs(True)
|
2023-12-21 00:37:26 -06:00
|
|
|
app = create_app(config.APP_NAME + '-cli')
|
2024-01-03 04:39:42 -06:00
|
|
|
invalid_prefs = []
|
|
|
|
valid_prefs = []
|
2023-12-21 00:37:26 -06:00
|
|
|
with app.app_context():
|
|
|
|
from pgadmin.preferences import save_pref
|
|
|
|
for opt in pref_options:
|
|
|
|
val = opt.split("=")
|
2024-01-03 04:39:42 -06:00
|
|
|
if len(val) <= 1:
|
|
|
|
print('Preference key=value is required, example: '
|
|
|
|
'[green]sqleditor:editor:comma_first=true[/green]')
|
|
|
|
return
|
2023-12-21 00:37:26 -06:00
|
|
|
final_opt = val[0].split(":")
|
|
|
|
val = val[1]
|
|
|
|
f = ":".join(final_opt)
|
|
|
|
if f in prefs:
|
|
|
|
ids = prefs[f].split(":")
|
|
|
|
_row = {
|
|
|
|
'mid': ids[0],
|
|
|
|
'category_id': ids[1],
|
|
|
|
'id': ids[2],
|
|
|
|
'name': final_opt[2],
|
|
|
|
'user_id': user_id,
|
|
|
|
'value': val}
|
2024-01-03 04:39:42 -06:00
|
|
|
save_pref(_row)
|
|
|
|
valid_prefs.append(_row)
|
|
|
|
|
|
|
|
if not json:
|
2023-12-21 00:37:26 -06:00
|
|
|
table.add_row(jsonlib.dumps(_row))
|
2024-01-03 04:39:42 -06:00
|
|
|
else:
|
|
|
|
invalid_prefs.append(f)
|
|
|
|
|
|
|
|
if len(invalid_prefs) >= 1:
|
|
|
|
print("Preference(s) [red]{0}[/red] not found.".format(
|
|
|
|
(', ').join(
|
|
|
|
invalid_prefs)))
|
2023-12-21 00:37:26 -06:00
|
|
|
|
2024-01-03 04:39:42 -06:00
|
|
|
if not json and console:
|
|
|
|
print(table)
|
|
|
|
elif json and console:
|
|
|
|
print(jsonlib.dumps(valid_prefs, indent=2))
|
2023-12-21 00:37:26 -06:00
|
|
|
|
|
|
|
|
|
|
|
@app.command()
|
2023-12-21 03:09:29 -06:00
|
|
|
def setup_db(app: Annotated[str, typer.Argument(
|
|
|
|
help="This argument doesn't require in CLI mode.")] = None):
|
2018-11-21 10:09:05 -06:00
|
|
|
"""Setup the configuration database."""
|
2018-01-26 10:54:21 -06:00
|
|
|
|
2023-12-21 03:09:29 -06:00
|
|
|
app = app or create_app()
|
2017-04-23 22:06:55 -05:00
|
|
|
create_app_data_directory(config)
|
|
|
|
|
2020-08-31 06:15:31 -05:00
|
|
|
print("pgAdmin 4 - Application Initialisation")
|
|
|
|
print("======================================\n")
|
Resolved quite a few file-system encoding/decoding related cases.
In order to resolve the non-ascii characters in path (in user directory,
storage path, etc) on windows, we have converted the path into the
short-path, so that - we don't need to deal with the encoding issues
(specially with Python 2).
We've resolved majority of the issues with this patch.
We still need couple issues to resolve after this in the same area.
TODO
* Add better support for non-ascii characters in the database name on
windows with Python 3
* Improve the messages created after the background processes by
different modules (such as Backup, Restore, Import/Export, etc.),
which does not show short-paths, and xml representable characters for
non-ascii characters, when found in the database objects, and the file
PATH.
Fixes #2174, #1797, #2166, #1940
Initial patch by: Surinder Kumar
Reviewed by: Murtuza Zabuawala
2017-03-07 04:00:57 -06:00
|
|
|
|
2022-10-20 05:48:41 -05:00
|
|
|
def run_migration_for_sqlite():
|
|
|
|
with app.app_context():
|
|
|
|
# Run migration for the first time i.e. create database
|
|
|
|
from config import SQLITE_PATH
|
|
|
|
if not os.path.exists(SQLITE_PATH):
|
2017-08-29 09:03:02 -05:00
|
|
|
db_upgrade(app)
|
2022-10-20 05:48:41 -05:00
|
|
|
else:
|
2017-08-29 09:03:02 -05:00
|
|
|
version = Version.query.filter_by(name='ConfigDB').first()
|
2022-10-20 05:48:41 -05:00
|
|
|
schema_version = version.value
|
|
|
|
|
|
|
|
# Run migration if current schema version is greater than the
|
|
|
|
# schema version stored in version table
|
|
|
|
if CURRENT_SCHEMA_VERSION >= schema_version:
|
|
|
|
db_upgrade(app)
|
2018-11-21 10:09:05 -06:00
|
|
|
|
2022-10-20 05:48:41 -05:00
|
|
|
# Update schema version to the latest
|
|
|
|
if CURRENT_SCHEMA_VERSION > schema_version:
|
|
|
|
version = Version.query.filter_by(name='ConfigDB').first()
|
|
|
|
version.value = CURRENT_SCHEMA_VERSION
|
|
|
|
db.session.commit()
|
|
|
|
|
|
|
|
if os.name != 'nt':
|
|
|
|
os.chmod(config.SQLITE_PATH, 0o600)
|
|
|
|
|
|
|
|
def run_migration_for_others():
|
|
|
|
with app.app_context():
|
|
|
|
version = Version.query.filter_by(name='ConfigDB').first()
|
|
|
|
if version == -1:
|
|
|
|
db_upgrade(app)
|
|
|
|
else:
|
|
|
|
schema_version = version.value
|
|
|
|
|
|
|
|
# Run migration if current schema version is greater than the
|
|
|
|
# schema version stored in version table
|
|
|
|
if CURRENT_SCHEMA_VERSION >= schema_version:
|
|
|
|
db_upgrade(app)
|
|
|
|
|
|
|
|
# Update schema version to the latest
|
|
|
|
if CURRENT_SCHEMA_VERSION > schema_version:
|
|
|
|
version = Version.query.filter_by(name='ConfigDB').first()
|
|
|
|
version.value = CURRENT_SCHEMA_VERSION
|
|
|
|
db.session.commit()
|
|
|
|
|
|
|
|
# Run the migration as per specified by the user.
|
|
|
|
if config.CONFIG_DATABASE_URI is not None and \
|
|
|
|
len(config.CONFIG_DATABASE_URI) > 0:
|
|
|
|
run_migration_for_others()
|
|
|
|
else:
|
|
|
|
run_migration_for_sqlite()
|
2019-04-17 10:57:34 -05:00
|
|
|
|
2018-11-21 10:09:05 -06:00
|
|
|
|
2024-03-14 07:42:28 -05:00
|
|
|
def main():
|
2023-12-21 00:37:26 -06:00
|
|
|
app()
|
2024-03-14 07:42:28 -05:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
main()
|