diff --git a/changes/200.canada.feature b/changes/200.canada.feature new file mode 100644 index 00000000000..56d180c03be --- /dev/null +++ b/changes/200.canada.feature @@ -0,0 +1 @@ +Adds the capability to include the inserted/updated/deleted records from the DataStore via the `include_records` DataDict key. \ No newline at end of file diff --git a/ckanext/datastore/backend/postgres.py b/ckanext/datastore/backend/postgres.py index e18b18c076c..92c615f55ff 100644 --- a/ckanext/datastore/backend/postgres.py +++ b/ckanext/datastore/backend/postgres.py @@ -1486,6 +1486,8 @@ def upsert_data(context: Context, data_dict: dict[str, Any]): records = data_dict['records'] sql_columns = ", ".join( identifier(name) for name in field_names) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + return_columns = "_id, " + sql_columns num = -1 if method == _INSERT: @@ -1505,16 +1507,23 @@ def upsert_data(context: Context, data_dict: dict[str, Any]): rows.append(row) sql_string = '''INSERT INTO {res_id} ({columns}) - VALUES ({values});'''.format( + VALUES ({values}) {return_statement};'''.format( res_id=identifier(data_dict['resource_id']), columns=sql_columns, + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + return_statement='RETURNING {return_columns}'.format( + return_columns=return_columns) if data_dict[ + 'include_records'] else '', values=', '.join([ f":val_{idx}" for idx in range(0, len(field_names)) ]) ) try: - context['connection'].execute(sa.text(sql_string), rows) + results = context['connection'].execute(sa.text(sql_string), rows) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + if data_dict['include_records']: + data_dict['records'] = [dict(r) for r in results.mappings().all()] except (DatabaseError, DataError) as err: # (canada fork only): parse constraint sql errors # TODO: upstream contrib!! @@ -1535,7 +1544,8 @@ def upsert_data(context: Context, data_dict: dict[str, Any]): elif method in [_UPDATE, _UPSERT]: unique_keys = _get_unique_key(context, data_dict) - + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + updated_records = {} for num, record in enumerate(records): if not unique_keys and '_id' not in record: raise ValidationError({ @@ -1606,12 +1616,17 @@ def upsert_data(context: Context, data_dict: dict[str, Any]): sql_string = u''' UPDATE {res_id} SET ({columns}, "_full_text") = ({values}, NULL) - WHERE ({primary_key}) = ({primary_value}); + WHERE ({primary_key}) = ({primary_value}) + {return_statement}; '''.format( res_id=identifier(data_dict['resource_id']), columns=u', '.join( [identifier(field) for field in used_field_names]), + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + return_statement='RETURNING {return_columns}'.format( + return_columns=return_columns) if data_dict[ + 'include_records'] else '', values=u', '.join(values), primary_key=pk_sql, primary_value=pk_values_sql, @@ -1620,6 +1635,10 @@ def upsert_data(context: Context, data_dict: dict[str, Any]): results = context['connection'].execute( sa.text(sql_string), {**used_values, **unique_values}) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + if data_dict['include_records']: + for r in results.mappings().all(): + updated_records[str(r._id)] = dict(r) except DatabaseError as err: # (canada fork only): parse constraint sql errors # TODO: upstream contrib!! @@ -1647,37 +1666,39 @@ def upsert_data(context: Context, data_dict: dict[str, Any]): elif method == _UPSERT: format_params = dict( res_id=identifier(data_dict['resource_id']), - columns=u', '.join( + columns=('_id, ' if pk_sql == '"_id"' else '') + ', '.join( [identifier(field) for field in used_field_names]), - values=u', '.join([ - f'cast(:{p} as nested)' - if field['type'] == 'nested' else ":" + p - for p, field in zip(value_placeholders, used_fields) - ]), - primary_key=pk_sql, - primary_value=pk_values_sql, + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + set_statement=', '.join( + ['{col}=EXCLUDED.{col}'.format(col=identifier(field)) + for field in used_field_names]), + return_statement='RETURNING {return_columns}'.format( + return_columns=return_columns) if data_dict[ + 'include_records'] else '', + values=('%s, ' % pk_values_sql if pk_sql == '"_id"' else '') + + ', '.join([f'cast(:{p} as nested)' if field['type'] == 'nested' + else ":" + p + for p, field in zip(value_placeholders, used_fields)]), + primary_key=pk_sql ) - update_string = """ - UPDATE {res_id} - SET ({columns}, "_full_text") = ({values}, NULL) - WHERE ({primary_key}) = ({primary_value}) - """.format(**format_params) - - insert_string = """ - INSERT INTO {res_id} ({columns}) - SELECT {values} - WHERE NOT EXISTS (SELECT 1 FROM {res_id} - WHERE ({primary_key}) = ({primary_value})) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + sql_string = """ + INSERT INTO {res_id} ({columns}) VALUES ({values}) + ON CONFLICT ({primary_key}) DO UPDATE + SET {set_statement} + {return_statement} """.format(**format_params) values = {**used_values, **unique_values} try: - context['connection'].execute( - sa.text(update_string), values) - context['connection'].execute( - sa.text(insert_string), values) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + results = context['connection'].execute( + sa.text(sql_string), values) + if data_dict['include_records']: + for r in results.mappings().all(): + updated_records[str(r._id)] = dict(r) except DatabaseError as err: # (canada fork only): parse constraint sql errors # TODO: upstream contrib!! @@ -1695,6 +1716,9 @@ def upsert_data(context: Context, data_dict: dict[str, Any]): 'records': [errmsg], 'records_row': num, }) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + if updated_records: + data_dict['records'] = list(updated_records.values()) def validate(context: Context, data_dict: dict[str, Any]): @@ -1724,6 +1748,8 @@ def validate(context: Context, data_dict: dict[str, Any]): data_dict_copy.pop('include_total', None) data_dict_copy.pop('total_estimation_threshold', None) data_dict_copy.pop('records_format', None) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + data_dict_copy.pop('include_records', None) data_dict_copy.pop('calculate_record_count', None) for key, values in data_dict_copy.items(): @@ -1959,6 +1985,11 @@ def delete_data(context: Context, data_dict: dict[str, Any]): validate(context, data_dict) fields_types = _get_fields_types( context['connection'], data_dict['resource_id']) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + fields = _get_fields(context['connection'], data_dict['resource_id']) + sql_columns = ", ".join( + identifier(f['id']) for f in fields) + return_columns = "_id, " + sql_columns query_dict: dict[str, Any] = { 'where': [] @@ -1969,13 +2000,28 @@ def delete_data(context: Context, data_dict: dict[str, Any]): fields_types, query_dict) where_clause, where_values = _where(query_dict['where']) - sql_string = u'DELETE FROM "{0}" {1}'.format( - data_dict['resource_id'], - where_clause - ) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + if data_dict['include_deleted_records']: + rows_max = config.get('ckan.datastore.search.rows_max') + sql_string = '''WITH deleted AS ( + DELETE FROM {0} {1} RETURNING {2} + ) SELECT d.* FROM deleted as d LIMIT {3} + '''.format( + identifier(data_dict['resource_id']), + where_clause, + return_columns, + rows_max + ) + else: + sql_string = u'DELETE FROM {0} {1}'.format( + identifier(data_dict['resource_id']), + where_clause) try: - _execute_single_statement(context, sql_string, where_values) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + results =_execute_single_statement(context, sql_string, where_values) + if data_dict['include_deleted_records']: + data_dict['deleted_records'] = [dict(r) for r in results.mappings().all()] except ProgrammingError as pe: raise ValidationError({'filters': [_programming_error_summary(pe)]}) diff --git a/ckanext/datastore/logic/action.py b/ckanext/datastore/logic/action.py index 1f60e287120..7870bc4352c 100644 --- a/ckanext/datastore/logic/action.py +++ b/ckanext/datastore/logic/action.py @@ -66,6 +66,9 @@ def datastore_create(context: Context, data_dict: dict[str, Any]): :param records: the data, eg: [{"dob": "2005", "some_stuff": ["a", "b"]}] (optional) :type records: list of dictionaries + :param include_records: return the full values of inserted records + (optional, default: False) + :type include_records: bool :param primary_key: fields that represent a unique key (optional) :type primary_key: list or comma separated string :param foreign_keys: tables and fields that represent foreign keys (optional) @@ -184,7 +187,9 @@ def datastore_create(context: Context, data_dict: dict[str, Any]): result.pop('id', None) result.pop('connection_url', None) - result.pop('records', None) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + if not data_dict.pop('include_records', False): + result.pop('records', None) return result @@ -251,6 +256,9 @@ def datastore_upsert(context: Context, data_dict: dict[str, Any]): :param records: the data, eg: [{"dob": "2005", "some_stuff": ["a","b"]}] (optional) :type records: list of dictionaries + :param include_records: return the full values of inserted records + (optional, default: False) + :type include_records: bool :param method: the method to use to put the data into the datastore. Possible options are: upsert, insert, update (optional, default: upsert) @@ -299,6 +307,10 @@ def datastore_upsert(context: Context, data_dict: dict[str, Any]): result.pop('id', None) result.pop('connection_url', None) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + if not data_dict.pop('include_records', False): + result.pop('records', None) + if data_dict.get('calculate_record_count', False): backend.calculate_record_count(data_dict['resource_id']) # type: ignore @@ -388,6 +400,9 @@ def datastore_delete(context: Context, data_dict: dict[str, Any]): If missing delete whole table and all dependent views. (optional) :type filters: dictionary + :param include_deleted_records: return the full values of deleted records + (optional, default: False) + :type include_deleted_records: bool :param calculate_record_count: updates the stored count of records, used to optimize datastore_search in combination with the `total_estimation_threshold` parameter. If doing a series of requests @@ -397,7 +412,7 @@ def datastore_delete(context: Context, data_dict: dict[str, Any]): **Results:** - :returns: Original filters sent. + :returns: Original filters sent and list of deleted_records :rtype: dictionary ''' @@ -455,6 +470,9 @@ def datastore_delete(context: Context, data_dict: dict[str, Any]): result.pop('id', None) result.pop('connection_url', None) + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + if not data_dict.pop('include_deleted_records', False): + result.pop('deleted_records', None) return result @@ -470,6 +488,9 @@ def datastore_records_delete(context: Context, data_dict: dict[str, Any]): If {} delete all records. (required) :type filters: dictionary + :param include_deleted_records: return the full values of deleted records + (optional, default: False) + :type include_deleted_records: bool :param calculate_record_count: updates the stored count of records, used to optimize datastore_search in combination with the `total_estimation_threshold` parameter. If doing a series of requests diff --git a/ckanext/datastore/logic/schema.py b/ckanext/datastore/logic/schema.py index a22a107eea1..712a5109faa 100644 --- a/ckanext/datastore/logic/schema.py +++ b/ckanext/datastore/logic/schema.py @@ -140,6 +140,8 @@ def datastore_create_schema() -> Schema: one_of([u'row'])], 'function': [not_empty, unicode_only], }, + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + 'include_records': [default(False), boolean_validator], 'calculate_record_count': [ignore_missing, default(False), boolean_validator], '__junk': [empty], @@ -155,6 +157,8 @@ def datastore_upsert_schema() -> Schema: 'id': [ignore_missing], 'method': [ignore_missing, unicode_safe, one_of( ['upsert', 'insert', 'update'])], + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + 'include_records': [default(False), boolean_validator], 'calculate_record_count': [ignore_missing, default(False), boolean_validator], 'dry_run': [ignore_missing, boolean_validator], @@ -169,6 +173,8 @@ def datastore_delete_schema() -> Schema: 'resource_id': [not_missing, not_empty, unicode_safe], 'force': [ignore_missing, boolean_validator], 'id': [ignore_missing], + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + 'include_deleted_records': [default(False), boolean_validator], 'calculate_record_count': [ignore_missing, default(False), boolean_validator], '__junk': [empty], @@ -183,6 +189,8 @@ def datastore_records_delete_schema() -> Schema: 'force': [ignore_missing, boolean_validator], 'filters': [not_missing, dict_only], 'id': [ignore_missing], + # (canada fork only): https://github.com/ckan/ckan/pull/8684 + 'include_deleted_records': [default(False), boolean_validator], 'calculate_record_count': [ignore_missing, default(False), boolean_validator], '__junk': [empty],