Fixed cognitive complexity issues reported by SonarQube.

This commit is contained in:
Nikhil Mohite 2020-08-19 13:41:53 +05:30 committed by Akshay Joshi
parent 0668a52c6b
commit 4b56962c1b
6 changed files with 514 additions and 262 deletions

View File

@ -609,6 +609,28 @@ class LanguageView(PGChildNodeView, SchemaDiffObjectCompare):
except Exception as e:
return internal_server_error(errormsg=str(e))
@staticmethod
def _parse_privileges(data):
"""
CHeck key in data adn parse privilege according.
:param data: Data.
:return:
"""
for key in ['lanacl']:
if key in data and data[key] is not None:
if 'added' in data[key]:
data[key]['added'] = parse_priv_to_db(
data[key]['added'], ["U"]
)
if 'changed' in data[key]:
data[key]['changed'] = parse_priv_to_db(
data[key]['changed'], ["U"]
)
if 'deleted' in data[key]:
data[key]['deleted'] = parse_priv_to_db(
data[key]['deleted'], ["U"]
)
def get_sql(self, data, lid=None):
"""
This function will generate sql from model data.
@ -634,20 +656,7 @@ class LanguageView(PGChildNodeView, SchemaDiffObjectCompare):
gettext("Could not find the language information.")
)
for key in ['lanacl']:
if key in data and data[key] is not None:
if 'added' in data[key]:
data[key]['added'] = parse_priv_to_db(
data[key]['added'], ["U"]
)
if 'changed' in data[key]:
data[key]['changed'] = parse_priv_to_db(
data[key]['changed'], ["U"]
)
if 'deleted' in data[key]:
data[key]['deleted'] = parse_priv_to_db(
data[key]['deleted'], ["U"]
)
LanguageView._parse_privileges(data)
old_data = res['rows'][0]
for arg in required_args:

View File

@ -909,6 +909,100 @@ class ForeignTableView(PGChildNodeView, DataTypeReader,
except Exception as e:
return internal_server_error(errormsg=str(e))
@staticmethod
def _parse_privileges(data):
"""
Parser privilege data as per type.
:param data: Data.
:return:
"""
if 'acl' in data and 'added' in data['acl']:
data['acl']['added'] = parse_priv_to_db(data['acl']['added'],
["a", "r", "w", "x"])
if 'acl' in data and 'changed' in data['acl']:
data['acl']['changed'] = parse_priv_to_db(
data['acl']['changed'], ["a", "r", "w", "x"])
if 'acl' in data and 'deleted' in data['acl']:
data['acl']['deleted'] = parse_priv_to_db(
data['acl']['deleted'], ["a", "r", "w", "x"])
@staticmethod
def _check_old_col_ops(old_col_frmt_options, option, col):
"""
check old column options.
:param old_col_frmt_options: old column option data.
:param option: option data.
:param col: column data.
:return:
"""
if (
option['option'] in old_col_frmt_options and
option['value'] != old_col_frmt_options[option['option']]
):
col['coloptions_updated']['changed'].append(option)
elif option['option'] not in old_col_frmt_options:
col['coloptions_updated']['added'].append(option)
if option['option'] in old_col_frmt_options:
del old_col_frmt_options[option['option']]
@staticmethod
def _parse_column_options(data):
"""
Parse columns data.
:param data: Data.
:return:
"""
for c in data['columns']['changed']:
old_col_options = c['attfdwoptions'] = []
if 'attfdwoptions' in c and c['attfdwoptions']:
old_col_options = c['attfdwoptions']
old_col_frmt_options = {}
for o in old_col_options:
col_opt = o.split("=")
old_col_frmt_options[col_opt[0]] = col_opt[1]
c['coloptions_updated'] = {'added': [],
'changed': [],
'deleted': []}
if 'coloptions' in c and len(c['coloptions']) > 0:
for o in c['coloptions']:
ForeignTableView._check_old_col_ops(old_col_frmt_options,
o, c)
for o in old_col_frmt_options:
c['coloptions_updated']['deleted'].append(
{'option': o})
def _format_columns_data(self, data, old_data):
"""
Format columns.
:param data: data.
:param old_data: old data for compare.
:return:
"""
col_data = {}
# Prepare dict of columns with key = column's attnum
# Will use this in the update template when any column is
# changed, to identify the columns.
for c in old_data['columns']:
col_data[c['attnum']] = c
old_data['columns'] = col_data
if 'columns' in data and 'added' in data['columns']:
data['columns']['added'] = self._format_columns(
data['columns']['added'])
if 'columns' in data and 'changed' in data['columns']:
data['columns']['changed'] = self._format_columns(
data['columns']['changed'])
# Parse Column Options
ForeignTableView._parse_column_options(data)
def get_sql(self, **kwargs):
"""
Generates the SQL statements to create/update the Foreign Table.
@ -929,7 +1023,7 @@ class ForeignTableView(PGChildNodeView, DataTypeReader,
foid, inherits=True)
if not status:
return old_data
if not old_data:
elif not old_data:
return gone(
gettext("The specified foreign table could not be found."))
@ -937,65 +1031,10 @@ class ForeignTableView(PGChildNodeView, DataTypeReader,
data['is_schema_diff'] = True
old_data['columns_for_schema_diff'] = old_data['columns']
# Prepare dict of columns with key = column's attnum
# Will use this in the update template when any column is
# changed, to identify the columns.
col_data = {}
for c in old_data['columns']:
col_data[c['attnum']] = c
old_data['columns'] = col_data
if 'columns' in data and 'added' in data['columns']:
data['columns']['added'] = self._format_columns(
data['columns']['added'])
if 'columns' in data and 'changed' in data['columns']:
data['columns']['changed'] = self._format_columns(
data['columns']['changed'])
# Parse Column Options
for c in data['columns']['changed']:
old_col_options = c['attfdwoptions'] = []
if 'attfdwoptions' in c and c['attfdwoptions']:
old_col_options = c['attfdwoptions']
old_col_frmt_options = {}
for o in old_col_options:
col_opt = o.split("=")
old_col_frmt_options[col_opt[0]] = col_opt[1]
c['coloptions_updated'] = {'added': [],
'changed': [],
'deleted': []}
if 'coloptions' in c and len(c['coloptions']) > 0:
for o in c['coloptions']:
if (
o['option'] in old_col_frmt_options and
o['value'] != old_col_frmt_options[o['option']]
):
c['coloptions_updated']['changed'].append(o)
elif o['option'] not in old_col_frmt_options:
c['coloptions_updated']['added'].append(o)
if o['option'] in old_col_frmt_options:
del old_col_frmt_options[o['option']]
for o in old_col_frmt_options:
c['coloptions_updated']['deleted'].append(
{'option': o})
self._format_columns_data(data, old_data)
# Parse Privileges
if 'acl' in data and 'added' in data['acl']:
data['acl']['added'] = parse_priv_to_db(data['acl']['added'],
["a", "r", "w", "x"])
if 'acl' in data and 'changed' in data['acl']:
data['acl']['changed'] = parse_priv_to_db(
data['acl']['changed'], ["a", "r", "w", "x"])
if 'acl' in data and 'deleted' in data['acl']:
data['acl']['deleted'] = parse_priv_to_db(
data['acl']['deleted'], ["a", "r", "w", "x"])
ForeignTableView._parse_privileges(data)
# If ftsrvname is changed while comparing two schemas
# then we need to drop foreign table and recreate it
@ -1003,16 +1042,16 @@ class ForeignTableView(PGChildNodeView, DataTypeReader,
# Modify the data required to recreate the foreign table.
self.modify_data_for_schema_diff(data, old_data)
SQL = render_template(
sql = render_template(
"/".join([self.template_path,
'foreign_table_schema_diff.sql']),
data=data, o_data=old_data)
else:
SQL = render_template(
sql = render_template(
"/".join([self.template_path, self._UPDATE_SQL]),
data=data, o_data=old_data
)
return SQL, data['name'] if 'name' in data else old_data['name']
return sql, data['name'] if 'name' in data else old_data['name']
else:
data['columns'] = self._format_columns(data['columns'])
@ -1021,9 +1060,9 @@ class ForeignTableView(PGChildNodeView, DataTypeReader,
data['acl'] = parse_priv_to_db(data['acl'],
["a", "r", "w", "x"])
SQL = render_template("/".join([self.template_path,
sql = render_template("/".join([self.template_path,
self._CREATE_SQL]), data=data)
return SQL, data['name']
return sql, data['name']
@check_precondition
def dependents(self, gid, sid, did, scid, foid):

View File

@ -293,47 +293,82 @@ class SchemaDiffTableCompare(SchemaDiffObjectCompare):
# Keys that are available in source and missing in target.
added = dict1_keys - dict2_keys
for item in added:
source_ddl = module_view.ddl_compare(
source_params=source_params,
target_params=target_params,
source=dict1[item],
target=None,
comp_status='source_only'
)
diff += '\n' + source_ddl
diff = SchemaDiffTableCompare._compare_source_only(
added, module_view, source_params, target_params,
dict1, diff)
# Keys that are available in target and missing in source.
removed = dict2_keys - dict1_keys
for item in removed:
target_ddl = module_view.ddl_compare(
source_params=source_params,
target_params=target_params,
source=None,
target=dict2[item],
comp_status='target_only'
)
diff += '\n' + target_ddl
diff = SchemaDiffTableCompare._compare_target_only(
removed, module_view, source_params, target_params,
dict2, diff)
# Keys that are available in both source and target.
for key in intersect_keys:
# Recursively Compare the two dictionary
if not are_dictionaries_identical(
dict1[key], dict2[key], ignore_whitespaces,
self.keys_to_ignore):
diff_ddl = module_view.ddl_compare(
source_params=source_params,
target_params=target_params,
source=dict1[key],
target=dict2[key],
comp_status='different',
parent_source_data=source,
parent_target_data=target
)
diff += '\n' + diff_ddl
other_param = {
"dict1": dict1,
"dict2": dict2,
"ignore_whitespaces": ignore_whitespaces,
"source": source,
"target": target
}
diff = self._compare_source_and_target(
intersect_keys, module_view, source_params,
target_params, diff, **other_param)
return diff
@staticmethod
def _compare_source_only(added, module_view, source_params, target_params,
dict1, diff):
for item in added:
source_ddl = module_view.ddl_compare(
source_params=source_params,
target_params=target_params,
source=dict1[item],
target=None,
comp_status='source_only'
)
diff += '\n' + source_ddl
return diff
@staticmethod
def _compare_target_only(removed, module_view, source_params,
target_params, dict2, diff):
for item in removed:
target_ddl = module_view.ddl_compare(
source_params=source_params,
target_params=target_params,
source=None,
target=dict2[item],
comp_status='target_only'
)
diff += '\n' + target_ddl
return diff
def _compare_source_and_target(self, intersect_keys, module_view,
source_params, target_params, diff,
**kwargs):
dict1 = kwargs['dict1']
dict2 = kwargs['dict2']
ignore_whitespaces = kwargs['ignore_whitespaces']
source = kwargs['source']
target = kwargs['target']
for key in intersect_keys:
# Recursively Compare the two dictionary
if not are_dictionaries_identical(
dict1[key], dict2[key], ignore_whitespaces,
self.keys_to_ignore):
diff_ddl = module_view.ddl_compare(
source_params=source_params,
target_params=target_params,
source=dict1[key],
target=dict2[key],
comp_status='different',
parent_source_data=source,
parent_target_data=target
)
diff += '\n' + diff_ddl
return diff

View File

@ -94,6 +94,56 @@ class DataTypeReader:
- Returns data-types on the basis of the condition provided.
"""
def _get_types_sql(self, conn, condition, add_serials, schema_oid):
"""
Get sql for types.
:param conn: connection
:param condition: Condition for sql
:param add_serials: add_serials flag
:param schema_oid: schema iod.
:return: sql for get type sql result, status and response.
"""
# Check if template path is already set or not
# if not then we will set the template path here
if not hasattr(self, 'data_type_template_path'):
self.data_type_template_path = 'datatype/sql/' + (
'#{0}#{1}#'.format(
self.manager.server_type,
self.manager.version
) if self.manager.server_type == 'gpdb' else
'#{0}#'.format(self.manager.version)
)
sql = render_template(
"/".join([self.data_type_template_path, 'get_types.sql']),
condition=condition,
add_serials=add_serials,
schema_oid=schema_oid
)
status, rset = conn.execute_2darray(sql)
return status, rset
@staticmethod
def _types_length_checks(length, typeval, precision):
min_val = 0
max_val = 0
if length:
min_val = 0 if typeval == 'D' else 1
if precision:
max_val = 1000
elif min_val:
# Max of integer value
max_val = 2147483647
else:
# Max value is 6 for data type like
# interval, timestamptz, etc..
if typeval == 'D':
max_val = 6
else:
max_val = 10
return min_val, max_val
def get_types(self, conn, condition, add_serials=False, schema_oid=''):
"""
Returns data-types including calculation for Length and Precision.
@ -106,23 +156,8 @@ class DataTypeReader:
"""
res = []
try:
# Check if template path is already set or not
# if not then we will set the template path here
if not hasattr(self, 'data_type_template_path'):
self.data_type_template_path = 'datatype/sql/' + (
'#{0}#{1}#'.format(
self.manager.server_type,
self.manager.version
) if self.manager.server_type == 'gpdb' else
'#{0}#'.format(self.manager.version)
)
SQL = render_template(
"/".join([self.data_type_template_path, 'get_types.sql']),
condition=condition,
add_serials=add_serials,
schema_oid=schema_oid
)
status, rset = conn.execute_2darray(SQL)
status, rset = self._get_types_sql(conn, condition, add_serials,
schema_oid)
if not status:
return status, rset
@ -131,28 +166,14 @@ class DataTypeReader:
# & length validation for current type
precision = False
length = False
min_val = 0
max_val = 0
# Check if the type will have length and precision or not
if row['elemoid']:
length, precision, typeval = self.get_length_precision(
row['elemoid'])
if length:
min_val = 0 if typeval == 'D' else 1
if precision:
max_val = 1000
elif min_val:
# Max of integer value
max_val = 2147483647
else:
# Max value is 6 for data type like
# interval, timestamptz, etc..
if typeval == 'D':
max_val = 6
else:
max_val = 10
min_val, max_val = DataTypeReader._types_length_checks(
length, typeval, precision)
res.append({
'label': row['typname'], 'value': row['typname'],
@ -214,6 +235,99 @@ class DataTypeReader:
return length, precision, typeval
@staticmethod
def _check_typmod(typmod, name):
"""
Check type mode ad return length as per type.
:param typmod:type mode.
:param name: name of type.
:return:
"""
length = '('
if name == 'numeric':
_len = (typmod - 4) >> 16
_prec = (typmod - 4) & 0xffff
length += str(_len)
if _prec is not None:
length += ',' + str(_prec)
elif (
name == 'time' or
name == 'timetz' or
name == 'time without time zone' or
name == 'time with time zone' or
name == 'timestamp' or
name == 'timestamptz' or
name == 'timestamp without time zone' or
name == 'timestamp with time zone' or
name == 'bit' or
name == 'bit varying' or
name == 'varbit'
):
_prec = 0
_len = typmod
length += str(_len)
elif name == 'interval':
_prec = 0
_len = typmod & 0xffff
# Max length for interval data type is 6
# If length is greater then 6 then set length to None
if _len > 6:
_len = ''
length += str(_len)
elif name == 'date':
# Clear length
length = ''
else:
_len = typmod - 4
_prec = 0
length += str(_len)
if len(length) > 0:
length += ')'
return length
@staticmethod
def _get_full_type_value(name, schema, length, array):
"""
Generate full type value as per req.
:param name: type name.
:param schema: schema name.
:param length: length.
:param array: array of types
:return: full type value
"""
if name == 'char' and schema == 'pg_catalog':
return '"char"' + array
elif name == 'time with time zone':
return 'time' + length + ' with time zone' + array
elif name == 'time without time zone':
return 'time' + length + ' without time zone' + array
elif name == 'timestamp with time zone':
return 'timestamp' + length + ' with time zone' + array
elif name == 'timestamp without time zone':
return 'timestamp' + length + ' without time zone' + array
else:
return name + length + array
@staticmethod
def _check_schema_in_name(typname, schema):
"""
Above 7.4, format_type also sends the schema name if it's not
included in the search_path, so we need to skip it in the typname
:param typename: typename for check.
:param schema: schema name for check.
:return: name
"""
if typname.find(schema + '".') >= 0:
name = typname[len(schema) + 3]
elif typname.find(schema + '.') >= 0:
name = typname[len(schema) + 1]
else:
name = typname
return name
@staticmethod
def get_full_type(nsp, typname, is_dup, numdims, typmod):
"""
@ -228,14 +342,7 @@ class DataTypeReader:
array = ''
length = ''
# Above 7.4, format_type also sends the schema name if it's not
# included in the search_path, so we need to skip it in the typname
if typname.find(schema + '".') >= 0:
name = typname[len(schema) + 3]
elif typname.find(schema + '.') >= 0:
name = typname[len(schema) + 1]
else:
name = typname
name = DataTypeReader._check_schema_in_name(typname, schema)
if name.startswith('_'):
if not numdims:
@ -256,60 +363,11 @@ class DataTypeReader:
numdims -= 1
if typmod != -1:
length = '('
if name == 'numeric':
_len = (typmod - 4) >> 16
_prec = (typmod - 4) & 0xffff
length += str(_len)
if _prec is not None:
length += ',' + str(_prec)
elif (
name == 'time' or
name == 'timetz' or
name == 'time without time zone' or
name == 'time with time zone' or
name == 'timestamp' or
name == 'timestamptz' or
name == 'timestamp without time zone' or
name == 'timestamp with time zone' or
name == 'bit' or
name == 'bit varying' or
name == 'varbit'
):
_prec = 0
_len = typmod
length += str(_len)
elif name == 'interval':
_prec = 0
_len = typmod & 0xffff
# Max length for interval data type is 6
# If length is greater then 6 then set length to None
if _len > 6:
_len = ''
length += str(_len)
elif name == 'date':
# Clear length
length = ''
else:
_len = typmod - 4
_prec = 0
length += str(_len)
length = DataTypeReader._check_typmod(typmod, name)
if len(length) > 0:
length += ')'
if name == 'char' and schema == 'pg_catalog':
return '"char"' + array
elif name == 'time with time zone':
return 'time' + length + ' with time zone' + array
elif name == 'time without time zone':
return 'time' + length + ' without time zone' + array
elif name == 'timestamp with time zone':
return 'timestamp' + length + ' with time zone' + array
elif name == 'timestamp without time zone':
return 'timestamp' + length + ' without time zone' + array
else:
return name + length + array
type_value = DataTypeReader._get_full_type_value(name, schema, length,
array)
return type_value
@classmethod
def parse_type_name(cls, type_name):

View File

@ -789,7 +789,7 @@ class ViewNode(PGChildNodeView, VacuumSettings, SchemaDiffObjectCompare):
status, res = self.conn.execute_dict(sql)
if not status:
return None, internal_server_error(errormsg=res)
if len(res['rows']) == 0:
elif len(res['rows']) == 0:
return None, gone(
gettext("Could not find the view on the server.")
)
@ -819,14 +819,7 @@ class ViewNode(PGChildNodeView, VacuumSettings, SchemaDiffObjectCompare):
self.view_schema = old_data['schema']
try:
sql = render_template("/".join(
[self.template_path,
self._SQL_PREFIX + self._UPDATE_SQL]), data=data,
o_data=old_data, conn=self.conn)
if 'definition' in data and data['definition']:
sql += self.get_columns_sql(did, vid)
sql = self._get_update_sql(did, vid, data, old_data)
except Exception as e:
current_app.logger.exception(e)
return None, internal_server_error(errormsg=str(e))
@ -837,6 +830,24 @@ class ViewNode(PGChildNodeView, VacuumSettings, SchemaDiffObjectCompare):
return sql, data['name'] if 'name' in data else old_data['name']
def _get_update_sql(self, did, vid, data, old_data):
"""
Get sql for update view.
:param did: Database Id.
:param vid: View Id.
:param data: data for get sql.
:param old_data: old view data for get sql.
:return: sql for update view.
"""
sql = render_template("/".join(
[self.template_path,
self._SQL_PREFIX + self._UPDATE_SQL]), data=data,
o_data=old_data, conn=self.conn)
if 'definition' in data and data['definition']:
sql += self.get_columns_sql(did, vid)
return sql
def _get_create_view_sql(self, data):
"""
Get create view sql with it's privileges.

View File

@ -20,40 +20,20 @@ list_keys_array = ['name', 'colname', 'argid', 'token', 'option', 'conname',
'fsrvoption', 'umoption']
def compare_dictionaries(**kwargs):
def _get_source_list(added, source_dict, node, source_params, view_object,
node_label, group_name):
"""
This function will compare the two dictionaries.
:param kwargs:
:return:
Get only source list.
:param added: added dict list.
:param source_dict: source dict.
:param node: node type.
:param source_params: source parameters.
:param view_object: view object for get sql.
:param node_label: node label.
:param group_name: group name
:return: list of source dict.
"""
view_object = kwargs.get('view_object')
source_params = kwargs.get('source_params')
target_params = kwargs.get('target_params')
group_name = kwargs.get('group_name')
source_dict = kwargs.get('source_dict')
target_dict = kwargs.get('target_dict')
node = kwargs.get('node')
node_label = kwargs.get('node_label')
ignore_whitespaces = kwargs.get('ignore_whitespaces')
ignore_keys = kwargs.get('ignore_keys', None)
dict1 = copy.deepcopy(source_dict)
dict2 = copy.deepcopy(target_dict)
# Find the duplicate keys in both the dictionaries
dict1_keys = set(dict1.keys())
dict2_keys = set(dict2.keys())
intersect_keys = dict1_keys.intersection(dict2_keys)
# Add gid to the params
source_params['gid'] = target_params['gid'] = 1
# Keys that are available in source and missing in target.
source_only = []
source_dependencies = []
added = dict1_keys - dict2_keys
global count
for item in added:
source_object_id = None
if 'oid' in source_dict[item]:
@ -100,9 +80,36 @@ def compare_dictionaries(**kwargs):
})
count += 1
return source_only
def _delete_keys(temp_tgt_params):
"""
Delete keys from temp target parameters.
:param temp_tgt_params:
:type temp_tgt_params:
:return:
"""
if 'gid' in temp_tgt_params:
del temp_tgt_params['gid']
if 'json_resp' in temp_tgt_params:
del temp_tgt_params['json_resp']
def _get_target_list(removed, target_dict, node, target_params, view_object,
node_label, group_name):
"""
Get only target list.
:param removed: removed list.
:param target_dict: target dict.
:param node: node type.
:param target_params: target parameters.
:param view_object: view object for get sql.
:param node_label: node label.
:param group_name: group name.
:return: list of target dict.
"""
target_only = []
# Keys that are available in target and missing in source.
removed = dict2_keys - dict1_keys
for item in removed:
target_object_id = None
if 'oid' in target_dict[item]:
@ -113,10 +120,7 @@ def compare_dictionaries(**kwargs):
temp_tgt_params['tid'] = target_object_id
temp_tgt_params['json_resp'] = False
target_ddl = view_object.get_sql_from_table_diff(**temp_tgt_params)
if 'gid' in temp_tgt_params:
del temp_tgt_params['gid']
if 'json_resp' in temp_tgt_params:
del temp_tgt_params['json_resp']
_delete_keys(temp_tgt_params)
diff_ddl = view_object.get_drop_sql(**temp_tgt_params)
else:
temp_tgt_params = copy.deepcopy(target_params)
@ -148,10 +152,32 @@ def compare_dictionaries(**kwargs):
})
count += 1
# Compare the values of duplicates keys.
return target_only
def _get_identical_and_different_list(intersect_keys, source_dict, target_dict,
node, node_label, view_object,
**kwargs):
"""
get lists of identical and different keys list.
:param intersect_keys:
:param source_dict:
:param target_dict:
:param node:
:param node_label:
:param view_object:
:param other_param:
:return: return list of identical and different dict.
"""
identical = []
different = []
diff_dependencies = []
dict1 = kwargs['dict1']
dict2 = kwargs['dict2']
ignore_whitespaces = kwargs['ignore_whitespaces']
ignore_keys = kwargs['ignore_keys']
source_params = kwargs['source_params']
target_params = kwargs['target_params']
group_name = kwargs['group_name']
for key in intersect_keys:
source_object_id = None
target_object_id = None
@ -257,6 +283,66 @@ def compare_dictionaries(**kwargs):
})
count += 1
return identical, different
def compare_dictionaries(**kwargs):
"""
This function will compare the two dictionaries.
:param kwargs:
:return:
"""
view_object = kwargs.get('view_object')
source_params = kwargs.get('source_params')
target_params = kwargs.get('target_params')
group_name = kwargs.get('group_name')
source_dict = kwargs.get('source_dict')
target_dict = kwargs.get('target_dict')
node = kwargs.get('node')
node_label = kwargs.get('node_label')
ignore_whitespaces = kwargs.get('ignore_whitespaces')
ignore_keys = kwargs.get('ignore_keys', None)
dict1 = copy.deepcopy(source_dict)
dict2 = copy.deepcopy(target_dict)
# Find the duplicate keys in both the dictionaries
dict1_keys = set(dict1.keys())
dict2_keys = set(dict2.keys())
intersect_keys = dict1_keys.intersection(dict2_keys)
# Add gid to the params
source_params['gid'] = target_params['gid'] = 1
# Keys that are available in source and missing in target.
added = dict1_keys - dict2_keys
global count
source_only = _get_source_list(added, source_dict, node, source_params,
view_object, node_label, group_name)
target_only = []
# Keys that are available in target and missing in source.
removed = dict2_keys - dict1_keys
target_only = _get_target_list(removed, target_dict, node, target_params,
view_object, node_label, group_name)
# Compare the values of duplicates keys.
other_param = {
"dict1": dict1,
"dict2": dict2,
"ignore_whitespaces": ignore_whitespaces,
"ignore_keys": ignore_keys,
"source_params": source_params,
"target_params": target_params,
"group_name": group_name
}
identical, different = _get_identical_and_different_list(
intersect_keys, source_dict, target_dict, node, node_label,
view_object, **other_param)
return source_only + target_only + different + identical
@ -491,6 +577,25 @@ def is_key_exists(key_list, target_dict):
return None
def _check_key_in_source_target(key, acl_keys, target, source):
"""
Check if key is present in source if not then check it's present in target.
:param key: key to be checked.
:param acl_keys: acl keys
:param target: target object.
:param source: source object.
:return: return key.
"""
if key is None:
key = is_key_exists(acl_keys, target)
if key is None:
key = 'acl'
elif key is not None and type(source[key]) != list:
key = 'acl'
return key
def parse_acl(source, target, diff_dict):
"""
This function is used to parse acl.
@ -504,12 +609,7 @@ def parse_acl(source, target, diff_dict):
# If key is not found in source then check the key is available
# in target.
if key is None:
key = is_key_exists(acl_keys, target)
if key is None:
key = 'acl'
elif key is not None and type(source[key]) != list:
key = 'acl'
key = _check_key_in_source_target(key, acl_keys, target, source)
tmp_source = source[key] if\
key in source and source[key] is not None else []