Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 107 additions & 18 deletions core/impl/collection.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -803,14 +803,14 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>
}

void upsert(std::string document_key,
codec::encoded_value encoded,
std::variant<codec::encoded_value, std::function<codec::encoded_value()>> value,
upsert_options::built options,
upsert_handler&& handler) const
{
auto span = create_kv_span(
core::tracing::operation::mcbp_upsert, options.parent_span, options.durability_level);

auto value = std::move(encoded);
auto [data, flags] = get_encoded_value(std::move(value), span);
auto id = core::document_id{
bucket_name_,
scope_name_,
Expand All @@ -820,10 +820,10 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>
if (options.persist_to == persist_to::none && options.replicate_to == replicate_to::none) {
core::operations::upsert_request request{
std::move(id),
std::move(value.data),
std::move(data),
{},
{},
value.flags,
flags,
options.expiry,
options.durability_level,
options.timeout,
Expand All @@ -845,10 +845,10 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>

core::operations::upsert_request request{
id,
std::move(value.data),
std::move(data),
{},
{},
value.flags,
flags,
options.expiry,
durability_level::none,
options.timeout,
Expand Down Expand Up @@ -893,14 +893,14 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>
}

void insert(std::string document_key,
codec::encoded_value encoded,
std::variant<codec::encoded_value, std::function<codec::encoded_value()>> value,
insert_options::built options,
insert_handler&& handler) const
{
auto span = create_kv_span(
core::tracing::operation::mcbp_insert, options.parent_span, options.durability_level);

auto value = std::move(encoded);
auto [data, flags] = get_encoded_value(std::move(value), span);
auto id = core::document_id{
bucket_name_,
scope_name_,
Expand All @@ -910,10 +910,10 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>
if (options.persist_to == persist_to::none && options.replicate_to == replicate_to::none) {
core::operations::insert_request request{
std::move(id),
std::move(value.data),
std::move(data),
{},
{},
value.flags,
flags,
options.expiry,
options.durability_level,
options.timeout,
Expand All @@ -937,10 +937,10 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>

core::operations::insert_request request{
id,
std::move(value.data),
std::move(data),
{},
{},
value.flags,
flags,
options.expiry,
durability_level::none,
options.timeout,
Expand Down Expand Up @@ -982,15 +982,17 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>
});
});
}

void replace(std::string document_key,
codec::encoded_value encoded,
std::variant<codec::encoded_value, std::function<codec::encoded_value()>> value,
replace_options::built options,
replace_handler&& handler) const
{
auto span = create_kv_span(
core::tracing::operation::mcbp_replace, options.parent_span, options.durability_level);

auto value = std::move(encoded);
auto [data, flags] = get_encoded_value(std::move(value), span);

auto id = core::document_id{
bucket_name_,
scope_name_,
Expand All @@ -1000,10 +1002,10 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>
if (options.persist_to == persist_to::none && options.replicate_to == replicate_to::none) {
core::operations::replace_request request{
std::move(id),
std::move(value.data),
std::move(data),
{},
{},
value.flags,
flags,
options.expiry,
options.cas,
options.durability_level,
Expand All @@ -1029,10 +1031,10 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>

core::operations::replace_request request{
id,
std::move(value.data),
std::move(data),
{},
{},
value.flags,
flags,
options.expiry,
options.cas,
durability_level::none,
Expand Down Expand Up @@ -1216,6 +1218,20 @@ class collection_impl : public std::enable_shared_from_this<collection_impl>
}

private:
auto get_encoded_value(
std::variant<codec::encoded_value, std::function<codec::encoded_value()>> value,
const std::shared_ptr<tracing::request_span>& operation_span) const -> codec::encoded_value
{
if (std::holds_alternative<codec::encoded_value>(value)) {
return std::get<codec::encoded_value>(value);
}
const auto request_encoding_span =
tracer()->create_span(core::tracing::operation::step_request_encoding, operation_span);
auto encoded = std::get<std::function<codec::encoded_value()>>(value)();
request_encoding_span->end();
return encoded;
}

auto create_kv_span(const std::string& operation_name,
const std::shared_ptr<tracing::request_span>& parent_span,
const std::optional<durability_level> durability = {}) const
Expand Down Expand Up @@ -1656,6 +1672,30 @@ collection::upsert(std::string document_id,
return future;
}

auto
collection::upsert(std::string document_id,
std::function<codec::encoded_value()> document_fn,
const upsert_options& options,
upsert_handler&& handler) const -> void
{
impl_->upsert(
std::move(document_id), std::move(document_fn), options.build(), std::move(handler));
}

auto
collection::upsert(std::string document_id,
std::function<codec::encoded_value()> document_fn,
const upsert_options& options) const
-> std::future<std::pair<error, mutation_result>>
{
auto barrier = std::make_shared<std::promise<std::pair<error, mutation_result>>>();
auto future = barrier->get_future();
upsert(std::move(document_id), std::move(document_fn), options, [barrier](auto err, auto result) {
barrier->set_value({ std::move(err), std::move(result) });
});
return future;
}

void
collection::insert(std::string document_id,
codec::encoded_value document,
Expand All @@ -1680,6 +1720,30 @@ collection::insert(std::string document_id,
return future;
}

auto
collection::insert(std::string document_id,
std::function<codec::encoded_value()> document_fn,
const insert_options& options,
insert_handler&& handler) const -> void
{
impl_->insert(
std::move(document_id), std::move(document_fn), options.build(), std::move(handler));
}

auto
collection::insert(std::string document_id,
std::function<codec::encoded_value()> document_fn,
const insert_options& options) const
-> std::future<std::pair<error, mutation_result>>
{
auto barrier = std::make_shared<std::promise<std::pair<error, mutation_result>>>();
auto future = barrier->get_future();
insert(std::move(document_id), std::move(document_fn), options, [barrier](auto err, auto result) {
barrier->set_value({ std::move(err), std::move(result) });
});
return future;
}

void
collection::replace(std::string document_id,
codec::encoded_value document,
Expand All @@ -1704,6 +1768,31 @@ collection::replace(std::string document_id,
return future;
}

void
collection::replace(std::string document_id,
std::function<codec::encoded_value()> document_fn,
const replace_options& options,
replace_handler&& handler) const
{
impl_->replace(
std::move(document_id), std::move(document_fn), options.build(), std::move(handler));
}

auto
collection::replace(std::string document_id,
std::function<codec::encoded_value()> document_fn,
const replace_options& options) const
-> std::future<std::pair<error, mutation_result>>
{
auto barrier = std::make_shared<std::promise<std::pair<error, mutation_result>>>();
auto future = barrier->get_future();
replace(
std::move(document_id), std::move(document_fn), options, [barrier](auto err, auto result) {
barrier->set_value({ std::move(err), std::move(result) });
});
return future;
}

void
collection::scan(const couchbase::scan_type& scan_type,
const couchbase::scan_options& options,
Expand Down
69 changes: 57 additions & 12 deletions couchbase/collection.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,10 @@ public:
const upsert_options& options,
upsert_handler&& handler) const
{
return upsert(
std::move(document_id), encode_document<Transcoder>(document), options, std::move(handler));
return upsert(std::move(document_id),
create_encode_fn<Transcoder, Document>(std::move(document)),
options,
std::move(handler));
}

/**
Expand Down Expand Up @@ -510,7 +512,8 @@ public:
const upsert_options& options = {}) const
-> std::future<std::pair<error, mutation_result>>
{
return upsert(std::move(document_id), encode_document<Transcoder>(document), options);
return upsert(
std::move(document_id), create_encode_fn<Transcoder, Document>(std::move(document)), options);
}

/**
Expand Down Expand Up @@ -559,8 +562,10 @@ public:
const insert_options& options,
insert_handler&& handler) const
{
return insert(
std::move(document_id), encode_document<Transcoder>(document), options, std::move(handler));
return insert(std::move(document_id),
create_encode_fn<Transcoder, Document>(std::move(document)),
options,
std::move(handler));
}

/**
Expand Down Expand Up @@ -609,7 +614,8 @@ public:
const insert_options& options = {}) const
-> std::future<std::pair<error, mutation_result>>
{
return insert(std::move(document_id), encode_document<Transcoder>(document), options);
return insert(
std::move(document_id), create_encode_fn<Transcoder, Document>(std::move(document)), options);
}

/**
Expand Down Expand Up @@ -660,8 +666,10 @@ public:
const replace_options& options,
replace_handler&& handler) const
{
return replace(
std::move(document_id), encode_document<Transcoder>(document), options, std::move(handler));
return replace(std::move(document_id),
create_encode_fn<Transcoder, Document>(std::move(document)),
options,
std::move(handler));
}

/**
Expand Down Expand Up @@ -712,7 +720,8 @@ public:
const replace_options& options = {}) const
-> std::future<std::pair<error, mutation_result>>
{
return replace(std::move(document_id), encode_document<Transcoder>(document), options);
return replace(
std::move(document_id), create_encode_fn<Transcoder, Document>(std::move(document)), options);
}

/**
Expand Down Expand Up @@ -1090,15 +1099,51 @@ private:
[[nodiscard]] auto crypto_manager() const -> const std::shared_ptr<crypto::manager>&;

template<typename Transcoder, typename Document>
[[nodiscard]] auto encode_document(const Document& document) const -> codec::encoded_value
[[nodiscard]] auto create_encode_fn(Document document) const
-> std::function<codec::encoded_value()>
{
if constexpr (codec::is_crypto_transcoder_v<Transcoder>) {
return Transcoder::encode(document, crypto_manager());
return [crypto_manager = crypto_manager(),
document = std::move(document)]() -> codec::encoded_value {
return Transcoder::encode(document, crypto_manager);
};
} else {
return Transcoder::encode(document);
return [document = std::move(document)]() -> codec::encoded_value {
return Transcoder::encode(document);
};
}
}

void replace(std::string document_id,
std::function<codec::encoded_value()> document_fn,
const replace_options& options,
replace_handler&& handler) const;

auto replace(std::string document_id,
std::function<codec::encoded_value()> document_fn,
const replace_options& options) const
-> std::future<std::pair<error, mutation_result>>;

void upsert(std::string document_id,
std::function<codec::encoded_value()> document_fn,
const upsert_options& options,
upsert_handler&& handler) const;

auto upsert(std::string document_id,
std::function<codec::encoded_value()> document_fn,
const upsert_options& options) const
-> std::future<std::pair<error, mutation_result>>;

void insert(std::string document_id,
std::function<codec::encoded_value()> document_fn,
const insert_options& options,
insert_handler&& handler) const;

auto insert(std::string document_id,
std::function<codec::encoded_value()> document_fn,
const insert_options& options) const
-> std::future<std::pair<error, mutation_result>>;

collection(core::cluster core,
std::string_view bucket_name,
std::string_view scope_name,
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ unit_test(orphan_reporter)
target_link_libraries(test_unit_jsonsl PRIVATE jsonsl)

integration_benchmark(get)
integration_benchmark(replace)

transaction_test(context)
transaction_test(simple)
Expand Down
Loading
Loading