From a98a89ce64cfdcfafb10c9567de50477c1d95acb Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Tue, 27 May 2025 18:33:20 +0000 Subject: [PATCH 1/3] feat(dev): ds return backport; - Return records capability. - Improved ds upsert sql. --- ckanext/datastore/backend/postgres.py | 99 ++++++++++++++++++--------- ckanext/datastore/logic/action.py | 25 ++++++- ckanext/datastore/logic/schema.py | 8 +++ 3 files changed, 99 insertions(+), 33 deletions(-) diff --git a/ckanext/datastore/backend/postgres.py b/ckanext/datastore/backend/postgres.py index e18b18c076c..07853b21d3a 100644 --- a/ckanext/datastore/backend/postgres.py +++ b/ckanext/datastore/backend/postgres.py @@ -1486,6 +1486,8 @@ def upsert_data(context: Context, data_dict: dict[str, Any]): records = data_dict['records'] sql_columns = ", ".join( identifier(name) for name in field_names) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + return_columns = "_id, " + sql_columns num = -1 if method == _INSERT: @@ -1505,16 +1507,23 @@ def upsert_data(context: Context, data_dict: dict[str, Any]): rows.append(row) sql_string = '''INSERT INTO {res_id} ({columns}) - VALUES ({values});'''.format( + VALUES ({values}) {return_statement};'''.format( res_id=identifier(data_dict['resource_id']), columns=sql_columns, + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + return_statement='RETURNING {return_columns}'.format( + return_columns=return_columns) if data_dict[ + 'include_records'] else '', values=', '.join([ f":val_{idx}" for idx in range(0, len(field_names)) ]) ) try: - context['connection'].execute(sa.text(sql_string), rows) + results = context['connection'].execute(sa.text(sql_string), rows) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + if data_dict['include_records']: + data_dict['records'] = [dict(r) for r in results.mappings().all()] except (DatabaseError, DataError) as err: # (canada fork only): parse constraint sql errors # TODO: upstream contrib!! @@ -1535,7 +1544,8 @@ def upsert_data(context: Context, data_dict: dict[str, Any]): elif method in [_UPDATE, _UPSERT]: unique_keys = _get_unique_key(context, data_dict) - + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + updated_records = {} for num, record in enumerate(records): if not unique_keys and '_id' not in record: raise ValidationError({ @@ -1606,12 +1616,17 @@ def upsert_data(context: Context, data_dict: dict[str, Any]): sql_string = u''' UPDATE {res_id} SET ({columns}, "_full_text") = ({values}, NULL) - WHERE ({primary_key}) = ({primary_value}); + WHERE ({primary_key}) = ({primary_value}) + {return_statement}; '''.format( res_id=identifier(data_dict['resource_id']), columns=u', '.join( [identifier(field) for field in used_field_names]), + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + return_statement='RETURNING {return_columns}'.format( + return_columns=return_columns) if data_dict[ + 'include_records'] else '', values=u', '.join(values), primary_key=pk_sql, primary_value=pk_values_sql, @@ -1620,6 +1635,10 @@ def upsert_data(context: Context, data_dict: dict[str, Any]): results = context['connection'].execute( sa.text(sql_string), {**used_values, **unique_values}) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + if data_dict['include_records']: + for r in results.mappings().all(): + updated_records[str(r._id)] = dict(r) except DatabaseError as err: # (canada fork only): parse constraint sql errors # TODO: upstream contrib!! @@ -1647,37 +1666,39 @@ def upsert_data(context: Context, data_dict: dict[str, Any]): elif method == _UPSERT: format_params = dict( res_id=identifier(data_dict['resource_id']), - columns=u', '.join( + columns=('_id, ' if pk_sql == '"_id"' else '') + ', '.join( [identifier(field) for field in used_field_names]), - values=u', '.join([ - f'cast(:{p} as nested)' - if field['type'] == 'nested' else ":" + p - for p, field in zip(value_placeholders, used_fields) - ]), - primary_key=pk_sql, - primary_value=pk_values_sql, + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + set_statement=', '.join( + ['{col}=EXCLUDED.{col}'.format(col=identifier(field)) + for field in used_field_names]), + return_statement='RETURNING {return_columns}'.format( + return_columns=return_columns) if data_dict[ + 'include_records'] else '', + values=('%s, ' % pk_values_sql if pk_sql == '"_id"' else '') + + ', '.join([f'cast(:{p} as nested)' if field['type'] == 'nested' + else ":" + p + for p, field in zip(value_placeholders, used_fields)]), + primary_key=pk_sql ) - update_string = """ - UPDATE {res_id} - SET ({columns}, "_full_text") = ({values}, NULL) - WHERE ({primary_key}) = ({primary_value}) - """.format(**format_params) - - insert_string = """ - INSERT INTO {res_id} ({columns}) - SELECT {values} - WHERE NOT EXISTS (SELECT 1 FROM {res_id} - WHERE ({primary_key}) = ({primary_value})) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + sql_string = """ + INSERT INTO {res_id} ({columns}) VALUES ({values}) + ON CONFLICT ({primary_key}) DO UPDATE + SET {set_statement} + {return_statement} """.format(**format_params) values = {**used_values, **unique_values} try: - context['connection'].execute( - sa.text(update_string), values) - context['connection'].execute( - sa.text(insert_string), values) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + results = context['connection'].execute( + sa.text(sql_string), values) + if data_dict['include_records']: + for r in results.mappings().all(): + updated_records[str(r._id)] = dict(r) except DatabaseError as err: # (canada fork only): parse constraint sql errors # TODO: upstream contrib!! @@ -1695,6 +1716,9 @@ def upsert_data(context: Context, data_dict: dict[str, Any]): 'records': [errmsg], 'records_row': num, }) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + if updated_records: + data_dict['records'] = list(updated_records.values()) def validate(context: Context, data_dict: dict[str, Any]): @@ -1724,6 +1748,8 @@ def validate(context: Context, data_dict: dict[str, Any]): data_dict_copy.pop('include_total', None) data_dict_copy.pop('total_estimation_threshold', None) data_dict_copy.pop('records_format', None) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + data_dict_copy.pop('include_records', None) data_dict_copy.pop('calculate_record_count', None) for key, values in data_dict_copy.items(): @@ -1959,6 +1985,11 @@ def delete_data(context: Context, data_dict: dict[str, Any]): validate(context, data_dict) fields_types = _get_fields_types( context['connection'], data_dict['resource_id']) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + fields = _get_fields(context['connection'], data_dict['resource_id']) + sql_columns = ", ".join( + identifier(f['id']) for f in fields) + return_columns = "_id, " + sql_columns query_dict: dict[str, Any] = { 'where': [] @@ -1969,13 +2000,19 @@ def delete_data(context: Context, data_dict: dict[str, Any]): fields_types, query_dict) where_clause, where_values = _where(query_dict['where']) - sql_string = u'DELETE FROM "{0}" {1}'.format( - data_dict['resource_id'], - where_clause + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + sql_string = u'DELETE FROM {0} {1} {2}'.format( + identifier(data_dict['resource_id']), + where_clause, + 'RETURNING {return_columns}'.format( + return_columns=return_columns) if data_dict['include_records'] else '' ) try: - _execute_single_statement(context, sql_string, where_values) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + results =_execute_single_statement(context, sql_string, where_values) + if data_dict['include_records']: + data_dict['deleted_records'] = [dict(r) for r in results.mappings().all()] except ProgrammingError as pe: raise ValidationError({'filters': [_programming_error_summary(pe)]}) diff --git a/ckanext/datastore/logic/action.py b/ckanext/datastore/logic/action.py index 1f60e287120..48dbc50c8de 100644 --- a/ckanext/datastore/logic/action.py +++ b/ckanext/datastore/logic/action.py @@ -66,6 +66,9 @@ def datastore_create(context: Context, data_dict: dict[str, Any]): :param records: the data, eg: [{"dob": "2005", "some_stuff": ["a", "b"]}] (optional) :type records: list of dictionaries + :param include_records: return the full values of inserted records + (optional, default: False) + :type include_records: bool :param primary_key: fields that represent a unique key (optional) :type primary_key: list or comma separated string :param foreign_keys: tables and fields that represent foreign keys (optional) @@ -184,7 +187,9 @@ def datastore_create(context: Context, data_dict: dict[str, Any]): result.pop('id', None) result.pop('connection_url', None) - result.pop('records', None) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + if not data_dict.pop('include_records', False): + result.pop('records', None) return result @@ -251,6 +256,9 @@ def datastore_upsert(context: Context, data_dict: dict[str, Any]): :param records: the data, eg: [{"dob": "2005", "some_stuff": ["a","b"]}] (optional) :type records: list of dictionaries + :param include_records: return the full values of inserted records + (optional, default: False) + :type include_records: bool :param method: the method to use to put the data into the datastore. Possible options are: upsert, insert, update (optional, default: upsert) @@ -299,6 +307,10 @@ def datastore_upsert(context: Context, data_dict: dict[str, Any]): result.pop('id', None) result.pop('connection_url', None) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + if not data_dict.pop('include_records', False): + result.pop('records', None) + if data_dict.get('calculate_record_count', False): backend.calculate_record_count(data_dict['resource_id']) # type: ignore @@ -388,6 +400,9 @@ def datastore_delete(context: Context, data_dict: dict[str, Any]): If missing delete whole table and all dependent views. (optional) :type filters: dictionary + :param include_records: return the full values of deleted records + (optional, default: False) + :type include_records: bool :param calculate_record_count: updates the stored count of records, used to optimize datastore_search in combination with the `total_estimation_threshold` parameter. If doing a series of requests @@ -397,7 +412,7 @@ def datastore_delete(context: Context, data_dict: dict[str, Any]): **Results:** - :returns: Original filters sent. + :returns: Original filters sent and list of deleted_records :rtype: dictionary ''' @@ -455,6 +470,9 @@ def datastore_delete(context: Context, data_dict: dict[str, Any]): result.pop('id', None) result.pop('connection_url', None) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + if not data_dict.pop('include_records', False): + result.pop('deleted_records', None) return result @@ -470,6 +488,9 @@ def datastore_records_delete(context: Context, data_dict: dict[str, Any]): If {} delete all records. (required) :type filters: dictionary + :param include_records: return the full values of deleted records + (optional, default: False) + :type include_records: bool :param calculate_record_count: updates the stored count of records, used to optimize datastore_search in combination with the `total_estimation_threshold` parameter. If doing a series of requests diff --git a/ckanext/datastore/logic/schema.py b/ckanext/datastore/logic/schema.py index a22a107eea1..49ea98189b3 100644 --- a/ckanext/datastore/logic/schema.py +++ b/ckanext/datastore/logic/schema.py @@ -140,6 +140,8 @@ def datastore_create_schema() -> Schema: one_of([u'row'])], 'function': [not_empty, unicode_only], }, + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + 'include_records': [default(False), boolean_validator], 'calculate_record_count': [ignore_missing, default(False), boolean_validator], '__junk': [empty], @@ -155,6 +157,8 @@ def datastore_upsert_schema() -> Schema: 'id': [ignore_missing], 'method': [ignore_missing, unicode_safe, one_of( ['upsert', 'insert', 'update'])], + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + 'include_records': [default(False), boolean_validator], 'calculate_record_count': [ignore_missing, default(False), boolean_validator], 'dry_run': [ignore_missing, boolean_validator], @@ -169,6 +173,8 @@ def datastore_delete_schema() -> Schema: 'resource_id': [not_missing, not_empty, unicode_safe], 'force': [ignore_missing, boolean_validator], 'id': [ignore_missing], + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + 'include_records': [default(False), boolean_validator], 'calculate_record_count': [ignore_missing, default(False), boolean_validator], '__junk': [empty], @@ -183,6 +189,8 @@ def datastore_records_delete_schema() -> Schema: 'force': [ignore_missing, boolean_validator], 'filters': [not_missing, dict_only], 'id': [ignore_missing], + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + 'include_records': [default(False), boolean_validator], 'calculate_record_count': [ignore_missing, default(False), boolean_validator], '__junk': [empty], From 145487ada9c9e256fd639a88885371e32e575bfa Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Tue, 27 May 2025 18:36:27 +0000 Subject: [PATCH 2/3] feat(misc): changelog; - Added change log file. --- changes/200.canada.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/200.canada.feature diff --git a/changes/200.canada.feature b/changes/200.canada.feature new file mode 100644 index 00000000000..56d180c03be --- /dev/null +++ b/changes/200.canada.feature @@ -0,0 +1 @@ +Adds the capability to include the inserted/updated/deleted records from the DataStore via the `include_records` DataDict key. \ No newline at end of file From eaa82b835a6a84efb1ce3ae02950f81ed664ce35 Mon Sep 17 00:00:00 2001 From: Jesse Vickery Date: Tue, 3 Jun 2025 16:09:06 +0000 Subject: [PATCH 3/3] fix(dev): changes; - Upstream PR changes. --- ckanext/datastore/backend/postgres.py | 23 ++++++++++++++++------- ckanext/datastore/logic/action.py | 14 +++++++------- ckanext/datastore/logic/schema.py | 4 ++-- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/ckanext/datastore/backend/postgres.py b/ckanext/datastore/backend/postgres.py index 07853b21d3a..92c615f55ff 100644 --- a/ckanext/datastore/backend/postgres.py +++ b/ckanext/datastore/backend/postgres.py @@ -2001,17 +2001,26 @@ def delete_data(context: Context, data_dict: dict[str, Any]): where_clause, where_values = _where(query_dict['where']) # (canada fork only): https://github.com/ckan/ckan/pull/8684 - sql_string = u'DELETE FROM {0} {1} {2}'.format( - identifier(data_dict['resource_id']), - where_clause, - 'RETURNING {return_columns}'.format( - return_columns=return_columns) if data_dict['include_records'] else '' - ) + if data_dict['include_deleted_records']: + rows_max = config.get('ckan.datastore.search.rows_max') + sql_string = '''WITH deleted AS ( + DELETE FROM {0} {1} RETURNING {2} + ) SELECT d.* FROM deleted as d LIMIT {3} + '''.format( + identifier(data_dict['resource_id']), + where_clause, + return_columns, + rows_max + ) + else: + sql_string = u'DELETE FROM {0} {1}'.format( + identifier(data_dict['resource_id']), + where_clause) try: # (canada fork only): https://github.com/ckan/ckan/pull/8684 results =_execute_single_statement(context, sql_string, where_values) - if data_dict['include_records']: + if data_dict['include_deleted_records']: data_dict['deleted_records'] = [dict(r) for r in results.mappings().all()] except ProgrammingError as pe: raise ValidationError({'filters': [_programming_error_summary(pe)]}) diff --git a/ckanext/datastore/logic/action.py b/ckanext/datastore/logic/action.py index 48dbc50c8de..7870bc4352c 100644 --- a/ckanext/datastore/logic/action.py +++ b/ckanext/datastore/logic/action.py @@ -400,9 +400,9 @@ def datastore_delete(context: Context, data_dict: dict[str, Any]): If missing delete whole table and all dependent views. (optional) :type filters: dictionary - :param include_records: return the full values of deleted records - (optional, default: False) - :type include_records: bool + :param include_deleted_records: return the full values of deleted records + (optional, default: False) + :type include_deleted_records: bool :param calculate_record_count: updates the stored count of records, used to optimize datastore_search in combination with the `total_estimation_threshold` parameter. If doing a series of requests @@ -471,7 +471,7 @@ def datastore_delete(context: Context, data_dict: dict[str, Any]): result.pop('id', None) result.pop('connection_url', None) # (canada fork only): https://github.com/ckan/ckan/pull/8684 - if not data_dict.pop('include_records', False): + if not data_dict.pop('include_deleted_records', False): result.pop('deleted_records', None) return result @@ -488,9 +488,9 @@ def datastore_records_delete(context: Context, data_dict: dict[str, Any]): If {} delete all records. (required) :type filters: dictionary - :param include_records: return the full values of deleted records - (optional, default: False) - :type include_records: bool + :param include_deleted_records: return the full values of deleted records + (optional, default: False) + :type include_deleted_records: bool :param calculate_record_count: updates the stored count of records, used to optimize datastore_search in combination with the `total_estimation_threshold` parameter. If doing a series of requests diff --git a/ckanext/datastore/logic/schema.py b/ckanext/datastore/logic/schema.py index 49ea98189b3..712a5109faa 100644 --- a/ckanext/datastore/logic/schema.py +++ b/ckanext/datastore/logic/schema.py @@ -174,7 +174,7 @@ def datastore_delete_schema() -> Schema: 'force': [ignore_missing, boolean_validator], 'id': [ignore_missing], # (canada fork only): https://github.com/ckan/ckan/pull/8684 - 'include_records': [default(False), boolean_validator], + 'include_deleted_records': [default(False), boolean_validator], 'calculate_record_count': [ignore_missing, default(False), boolean_validator], '__junk': [empty], @@ -190,7 +190,7 @@ def datastore_records_delete_schema() -> Schema: 'filters': [not_missing, dict_only], 'id': [ignore_missing], # (canada fork only): https://github.com/ckan/ckan/pull/8684 - 'include_records': [default(False), boolean_validator], + 'include_deleted_records': [default(False), boolean_validator], 'calculate_record_count': [ignore_missing, default(False), boolean_validator], '__junk': [empty],