diff --git a/Rakefile b/Rakefile index efc248d9..0594c795 100644 --- a/Rakefile +++ b/Rakefile @@ -110,6 +110,7 @@ task :cache_cxx_dependencies do "-DCOUCHBASE_CXX_CLIENT_BUILD_TOOLS=OFF", "-DCOUCHBASE_CXX_CLIENT_BUILD_DOCS=OFF", "-DCOUCHBASE_CXX_CLIENT_STATIC_BORINGSSL=ON", + "-DCOUCHBASE_CXX_CLIENT_BUILD_OPENTELEMETRY=OFF", "-DCPM_DOWNLOAD_ALL=ON", "-DCPM_USE_NAMED_CACHE_DIRECTORIES=ON", "-DCPM_USE_LOCAL_PACKAGES=OFF", @@ -226,6 +227,7 @@ task :cache_cxx_dependencies do "-DCPM_USE_LOCAL_PACKAGES=OFF", "-DCPM_SOURCE_CACHE=#{cpm_cache_dir}", "-DCOUCHBASE_CXX_CLIENT_EMBED_MOZILLA_CA_BUNDLE_ROOT=#{cpm_cache_dir}", + "-DCOUCHBASE_CXX_CLIENT_BUILD_OPENTELEMETRY=OFF", ] cmake_flags << "-DCMAKE_C_COMPILER=#{cc}" if cc cmake_flags << "-DCMAKE_CXX_COMPILER=#{cxx}" if cxx diff --git a/ext/couchbase b/ext/couchbase index 3a383490..a13bafb9 160000 --- a/ext/couchbase +++ b/ext/couchbase @@ -1 +1 @@ -Subproject commit 3a383490d3e61ddaf078316b6e28c4c948f91dd2 +Subproject commit a13bafb98931c9a3aff2722eafbb5379267dedf0 diff --git a/ext/extconf.rb b/ext/extconf.rb index 183a78a3..c57089ed 100644 --- a/ext/extconf.rb +++ b/ext/extconf.rb @@ -97,6 +97,7 @@ def sys(*cmd) "-DCOUCHBASE_CXX_CLIENT_BUILD_TOOLS=OFF", "-DCOUCHBASE_CXX_CLIENT_BUILD_EXAMPLES=OFF", "-DCOUCHBASE_CXX_CLIENT_INSTALL=OFF", + "-DCOUCHBASE_CXX_CLIENT_BUILD_OPENTELEMETRY=OFF", ] if version.start_with?("4") diff --git a/ext/rcb_crud.cxx b/ext/rcb_crud.cxx index 43347a28..6cc8a26d 100644 --- a/ext/rcb_crud.cxx +++ b/ext/rcb_crud.cxx @@ -17,6 +17,8 @@ #include #include +#include +#include #include #include #include @@ -24,11 +26,18 @@ #include #include #include +#include +#include #include #include #include +#include +#include +#include +#include #include #include +#include #include #include @@ -543,6 +552,7 @@ cb_Backend_document_unlock(VALUE self, auto cluster = cb_backend_to_core_api_cluster(self); Check_Type(bucket, T_STRING); + Check_Type(scope, T_STRING); Check_Type(collection, T_STRING); Check_Type(id, T_STRING); if (!NIL_P(options)) { @@ -555,7 +565,6 @@ cb_Backend_document_unlock(VALUE self, cb_string_new(scope), cb_string_new(collection), cb_string_new(id), - }; core::operations::unlock_request req{ doc_id }; @@ -594,7 +603,7 @@ cb_Backend_document_upsert(VALUE self, VALUE flags, VALUE options) { - auto cluster = cb_backend_to_public_api_cluster(self); + auto cluster = cb_backend_to_core_api_cluster(self); Check_Type(bucket, T_STRING); Check_Type(scope, T_STRING); @@ -607,25 +616,48 @@ cb_Backend_document_upsert(VALUE self, } try { - couchbase::upsert_options opts; - set_timeout(opts, options); - set_expiry(opts, options); - set_durability(opts, options); - set_preserve_expiry(opts, options); - - auto f = cluster.bucket(cb_string_new(bucket)) - .scope(cb_string_new(scope)) - .collection(cb_string_new(collection)) - .upsert(cb_string_new(id), - couchbase::codec::encoded_value{ cb_binary_new(content), FIX2UINT(flags) }, - opts); - - auto [ctx, resp] = cb_wait_for_future(f); - if (ctx.ec()) { - cb_throw_error(ctx, "unable to upsert"); + core::operations::upsert_request req{ + core::document_id{ + cb_string_new(bucket), + cb_string_new(scope), + cb_string_new(collection), + cb_string_new(id), + }, + }; + cb_extract_content(req, content); + cb_extract_flags(req, flags); + cb_extract_timeout(req, options); + cb_extract_expiry(req, options); + cb_extract_durability_level(req, options); + cb_extract_preserve_expiry(req, options); + + std::promise promise; + auto f = promise.get_future(); + + if (const auto legacy_durability = extract_legacy_durability_constraints(options); + legacy_durability.has_value()) { + cluster.execute( + core::operations::upsert_request_with_legacy_durability{ + std::move(req), + legacy_durability.value().first, + legacy_durability.value().second, + }, + [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); + } else { + cluster.execute(std::move(req), [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); } - return to_mutation_result_value(resp); + auto resp = cb_wait_for_future(f); + if (resp.ctx.ec()) { + cb_throw_error(resp.ctx, "unable to upsert"); + } + + return cb_create_mutation_result(resp); + } catch (const std::system_error& se) { rb_exc_raise(cb_map_error_code( se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); @@ -644,7 +676,7 @@ cb_Backend_document_append(VALUE self, VALUE content, VALUE options) { - auto cluster = cb_backend_to_public_api_cluster(self); + auto cluster = cb_backend_to_core_api_cluster(self); Check_Type(bucket, T_STRING); Check_Type(scope, T_STRING); @@ -656,22 +688,45 @@ cb_Backend_document_append(VALUE self, } try { - couchbase::append_options opts; - set_timeout(opts, options); - set_durability(opts, options); - - auto f = cluster.bucket(cb_string_new(bucket)) - .scope(cb_string_new(scope)) - .collection(cb_string_new(collection)) - .binary() - .append(cb_string_new(id), cb_binary_new(content), opts); - - auto [ctx, resp] = cb_wait_for_future(f); - if (ctx.ec()) { - cb_throw_error(ctx, "unable to append"); + core::operations::append_request req{ + core::document_id{ + cb_string_new(bucket), + cb_string_new(scope), + cb_string_new(collection), + cb_string_new(id), + }, + }; + cb_extract_content(req, content); + cb_extract_timeout(req, options); + cb_extract_durability_level(req, options); + + std::promise promise; + auto f = promise.get_future(); + + if (const auto legacy_durability = extract_legacy_durability_constraints(options); + legacy_durability.has_value()) { + cluster.execute( + core::operations::append_request_with_legacy_durability{ + std::move(req), + legacy_durability.value().first, + legacy_durability.value().second, + }, + [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); + } else { + cluster.execute(std::move(req), [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); + } + + auto resp = cb_wait_for_future(f); + if (resp.ctx.ec()) { + cb_throw_error(resp.ctx, "unable to append"); } - return to_mutation_result_value(resp); + return cb_create_mutation_result(resp); + } catch (const std::system_error& se) { rb_exc_raise(cb_map_error_code( se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); @@ -690,7 +745,7 @@ cb_Backend_document_prepend(VALUE self, VALUE content, VALUE options) { - auto cluster = cb_backend_to_public_api_cluster(self); + auto cluster = cb_backend_to_core_api_cluster(self); Check_Type(bucket, T_STRING); Check_Type(scope, T_STRING); @@ -702,22 +757,44 @@ cb_Backend_document_prepend(VALUE self, } try { - couchbase::prepend_options opts; - set_timeout(opts, options); - set_durability(opts, options); - - auto f = cluster.bucket(cb_string_new(bucket)) - .scope(cb_string_new(scope)) - .collection(cb_string_new(collection)) - .binary() - .prepend(cb_string_new(id), cb_binary_new(content), opts); - - auto [ctx, resp] = cb_wait_for_future(f); - if (ctx.ec()) { - cb_throw_error(ctx, "unable to prepend"); + core::operations::prepend_request req{ + core::document_id{ + cb_string_new(bucket), + cb_string_new(scope), + cb_string_new(collection), + cb_string_new(id), + }, + }; + cb_extract_content(req, content); + cb_extract_timeout(req, options); + cb_extract_durability_level(req, options); + + std::promise promise; + auto f = promise.get_future(); + + if (const auto legacy_durability = extract_legacy_durability_constraints(options); + legacy_durability.has_value()) { + cluster.execute( + core::operations::prepend_request_with_legacy_durability{ + std::move(req), + legacy_durability.value().first, + legacy_durability.value().second, + }, + [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); + } else { + cluster.execute(std::move(req), [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); } - return to_mutation_result_value(resp); + auto resp = cb_wait_for_future(f); + if (resp.ctx.ec()) { + cb_throw_error(resp.ctx, "unable to prepend"); + } + + return cb_create_mutation_result(resp); } catch (const std::system_error& se) { rb_exc_raise(cb_map_error_code( se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); @@ -737,7 +814,7 @@ cb_Backend_document_replace(VALUE self, VALUE flags, VALUE options) { - auto cluster = cb_backend_to_public_api_cluster(self); + auto cluster = cb_backend_to_core_api_cluster(self); Check_Type(bucket, T_STRING); Check_Type(scope, T_STRING); @@ -750,26 +827,48 @@ cb_Backend_document_replace(VALUE self, } try { - couchbase::replace_options opts; - set_timeout(opts, options); - set_expiry(opts, options); - set_durability(opts, options); - set_preserve_expiry(opts, options); - set_cas(opts, options); - - auto f = cluster.bucket(cb_string_new(bucket)) - .scope(cb_string_new(scope)) - .collection(cb_string_new(collection)) - .replace(cb_string_new(id), - couchbase::codec::encoded_value{ cb_binary_new(content), FIX2UINT(flags) }, - opts); - - auto [ctx, resp] = cb_wait_for_future(f); - if (ctx.ec()) { - cb_throw_error(ctx, "unable to replace"); + core::operations::replace_request req{ + core::document_id{ + cb_string_new(bucket), + cb_string_new(scope), + cb_string_new(collection), + cb_string_new(id), + }, + }; + cb_extract_content(req, content); + cb_extract_flags(req, flags); + cb_extract_timeout(req, options); + cb_extract_expiry(req, options); + cb_extract_durability_level(req, options); + cb_extract_preserve_expiry(req, options); + cb_extract_cas(req, options); + + std::promise promise; + auto f = promise.get_future(); + + if (const auto legacy_durability = extract_legacy_durability_constraints(options); + legacy_durability.has_value()) { + cluster.execute( + core::operations::replace_request_with_legacy_durability{ + std::move(req), + legacy_durability.value().first, + legacy_durability.value().second, + }, + [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); + } else { + cluster.execute(std::move(req), [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); } - return to_mutation_result_value(resp); + auto resp = cb_wait_for_future(f); + if (resp.ctx.ec()) { + cb_throw_error(resp.ctx, "unable to replace"); + } + return cb_create_mutation_result(resp); + } catch (const std::system_error& se) { rb_exc_raise(cb_map_error_code( se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); @@ -789,7 +888,7 @@ cb_Backend_document_insert(VALUE self, VALUE flags, VALUE options) { - auto cluster = cb_backend_to_public_api_cluster(self); + auto cluster = cb_backend_to_core_api_cluster(self); Check_Type(bucket, T_STRING); Check_Type(scope, T_STRING); @@ -802,24 +901,46 @@ cb_Backend_document_insert(VALUE self, } try { - couchbase::insert_options opts; - set_timeout(opts, options); - set_expiry(opts, options); - set_durability(opts, options); - - auto f = cluster.bucket(cb_string_new(bucket)) - .scope(cb_string_new(scope)) - .collection(cb_string_new(collection)) - .insert(cb_string_new(id), - couchbase::codec::encoded_value{ cb_binary_new(content), FIX2UINT(flags) }, - opts); - - auto [ctx, resp] = cb_wait_for_future(f); - if (ctx.ec()) { - cb_throw_error(ctx, "unable to insert"); + core::operations::insert_request req{ + core::document_id{ + cb_string_new(bucket), + cb_string_new(scope), + cb_string_new(collection), + cb_string_new(id), + }, + }; + cb_extract_content(req, content); + cb_extract_flags(req, flags); + cb_extract_timeout(req, options); + cb_extract_expiry(req, options); + cb_extract_durability_level(req, options); + + std::promise promise; + auto f = promise.get_future(); + + if (const auto legacy_durability = extract_legacy_durability_constraints(options); + legacy_durability.has_value()) { + cluster.execute( + core::operations::insert_request_with_legacy_durability{ + std::move(req), + legacy_durability.value().first, + legacy_durability.value().second, + }, + [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); + } else { + cluster.execute(std::move(req), [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); } - return to_mutation_result_value(resp); + auto resp = cb_wait_for_future(f); + if (resp.ctx.ec()) { + cb_throw_error(resp.ctx, "unable to insert"); + } + return cb_create_mutation_result(resp); + } catch (const std::system_error& se) { rb_exc_raise(cb_map_error_code( se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); @@ -837,7 +958,7 @@ cb_Backend_document_remove(VALUE self, VALUE id, VALUE options) { - auto cluster = cb_backend_to_public_api_cluster(self); + auto cluster = cb_backend_to_core_api_cluster(self); Check_Type(bucket, T_STRING); Check_Type(scope, T_STRING); @@ -848,22 +969,43 @@ cb_Backend_document_remove(VALUE self, } try { - couchbase::remove_options opts; - set_timeout(opts, options); - set_durability(opts, options); - set_cas(opts, options); - - auto f = cluster.bucket(cb_string_new(bucket)) - .scope(cb_string_new(scope)) - .collection(cb_string_new(collection)) - .remove(cb_string_new(id), opts); - - auto [ctx, resp] = cb_wait_for_future(f); - if (ctx.ec()) { - cb_throw_error(ctx, "unable to remove"); + core::operations::remove_request req{ + core::document_id{ + cb_string_new(bucket), + cb_string_new(scope), + cb_string_new(collection), + cb_string_new(id), + }, + }; + cb_extract_timeout(req, options); + cb_extract_durability_level(req, options); + cb_extract_cas(req, options); + + std::promise promise; + auto f = promise.get_future(); + + if (const auto legacy_durability = extract_legacy_durability_constraints(options); + legacy_durability.has_value()) { + cluster.execute( + core::operations::remove_request_with_legacy_durability{ + std::move(req), + legacy_durability.value().first, + legacy_durability.value().second, + }, + [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); + } else { + cluster.execute(std::move(req), [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); } - return to_mutation_result_value(resp); + auto resp = cb_wait_for_future(f); + if (resp.ctx.ec()) { + cb_throw_error(resp.ctx, "unable to remove"); + } + return cb_create_mutation_result(resp); } catch (const std::system_error& se) { rb_exc_raise(cb_map_error_code( se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); @@ -881,7 +1023,7 @@ cb_Backend_document_increment(VALUE self, VALUE id, VALUE options) { - auto cluster = cb_backend_to_public_api_cluster(self); + auto cluster = cb_backend_to_core_api_cluster(self); Check_Type(bucket, T_STRING); Check_Type(scope, T_STRING); @@ -892,27 +1034,50 @@ cb_Backend_document_increment(VALUE self, } try { - couchbase::increment_options opts; - set_timeout(opts, options); - set_durability(opts, options); - set_expiry(opts, options); - set_delta(opts, options); - set_initial_value(opts, options); - - auto f = cluster.bucket(cb_string_new(bucket)) - .scope(cb_string_new(scope)) - .collection(cb_string_new(collection)) - .binary() - .increment(cb_string_new(id), opts); - - auto [ctx, resp] = cb_wait_for_future(f); - if (ctx.ec()) { - cb_throw_error(ctx, "unable to increment"); + core::operations::increment_request req{ + core::document_id{ + cb_string_new(bucket), + cb_string_new(scope), + cb_string_new(collection), + cb_string_new(id), + }, + }; + + cb_extract_timeout(req, options); + cb_extract_expiry(req, options); + cb_extract_option_uint64(req.delta, options, "delta"); + cb_extract_option_uint64(req.initial_value, options, "initial_value"); + cb_extract_durability_level(req, options); + + std::promise promise; + auto f = promise.get_future(); + + if (const auto legacy_durability = extract_legacy_durability_constraints(options); + legacy_durability.has_value()) { + cluster.execute( + core::operations::increment_request_with_legacy_durability{ + std::move(req), + legacy_durability.value().first, + legacy_durability.value().second, + }, + [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); + } else { + cluster.execute(std::move(req), [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); } - VALUE res = to_mutation_result_value(resp); - rb_hash_aset(res, rb_id2sym(rb_intern("content")), ULL2NUM(resp.content())); + auto resp = cb_wait_for_future(f); + if (resp.ctx.ec()) { + cb_throw_error(resp.ctx, "unable to increment"); + } + + VALUE res = cb_create_mutation_result(resp); + rb_hash_aset(res, rb_id2sym(rb_intern("content")), ULL2NUM(resp.content)); return res; + } catch (const std::system_error& se) { rb_exc_raise(cb_map_error_code( se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); @@ -930,7 +1095,7 @@ cb_Backend_document_decrement(VALUE self, VALUE id, VALUE options) { - auto cluster = cb_backend_to_public_api_cluster(self); + auto cluster = cb_backend_to_core_api_cluster(self); Check_Type(bucket, T_STRING); Check_Type(scope, T_STRING); @@ -941,26 +1106,48 @@ cb_Backend_document_decrement(VALUE self, } try { - couchbase::decrement_options opts; - set_timeout(opts, options); - set_durability(opts, options); - set_expiry(opts, options); - set_delta(opts, options); - set_initial_value(opts, options); - - auto f = cluster.bucket(cb_string_new(bucket)) - .scope(cb_string_new(scope)) - .collection(cb_string_new(collection)) - .binary() - .decrement(cb_string_new(id), opts); - - auto [ctx, resp] = cb_wait_for_future(f); - if (ctx.ec()) { - cb_throw_error(ctx, "unable to decrement"); + core::operations::decrement_request req{ + core::document_id{ + cb_string_new(bucket), + cb_string_new(scope), + cb_string_new(collection), + cb_string_new(id), + }, + }; + + cb_extract_timeout(req, options); + cb_extract_expiry(req, options); + cb_extract_option_uint64(req.delta, options, "delta"); + cb_extract_option_uint64(req.initial_value, options, "initial_value"); + cb_extract_durability_level(req, options); + + std::promise promise; + auto f = promise.get_future(); + + if (const auto legacy_durability = extract_legacy_durability_constraints(options); + legacy_durability.has_value()) { + cluster.execute( + core::operations::decrement_request_with_legacy_durability{ + std::move(req), + legacy_durability.value().first, + legacy_durability.value().second, + }, + [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); + } else { + cluster.execute(std::move(req), [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); } - VALUE res = to_mutation_result_value(resp); - rb_hash_aset(res, rb_id2sym(rb_intern("content")), ULL2NUM(resp.content())); + auto resp = cb_wait_for_future(f); + if (resp.ctx.ec()) { + cb_throw_error(resp.ctx, "unable to decrement"); + } + + VALUE res = cb_create_mutation_result(resp); + rb_hash_aset(res, rb_id2sym(rb_intern("content")), ULL2NUM(resp.content)); return res; } catch (const std::system_error& se) { @@ -1374,7 +1561,7 @@ cb_Backend_document_mutate_in(VALUE self, VALUE specs, VALUE options) { - auto cluster = cb_backend_to_public_api_cluster(self); + auto cluster = cb_backend_to_core_api_cluster(self); Check_Type(bucket, T_STRING); Check_Type(scope, T_STRING); @@ -1391,15 +1578,22 @@ cb_Backend_document_mutate_in(VALUE self, } try { - couchbase::mutate_in_options opts; - set_timeout(opts, options); - set_durability(opts, options); - set_expiry(opts, options); - set_preserve_expiry(opts, options); - set_access_deleted(opts, options); - set_create_as_deleted(opts, options); - set_cas(opts, options); - set_store_semantics(opts, options); + core::operations::mutate_in_request req{ + core::document_id{ + cb_string_new(bucket), + cb_string_new(scope), + cb_string_new(collection), + cb_string_new(id), + }, + }; + cb_extract_timeout(req, options); + cb_extract_durability_level(req, options); + cb_extract_expiry(req, options); + cb_extract_preserve_expiry(req, options); + cb_extract_option_bool(req.access_deleted, options, "access_deleted"); + cb_extract_option_bool(req.create_as_deleted, options, "create_as_deleted"); + cb_extract_cas(req, options); + cb_extract_store_semantics(req, options); static VALUE xattr_property = rb_id2sym(rb_intern("xattr")); static VALUE create_path_property = rb_id2sym(rb_intern("create_path")); @@ -1409,130 +1603,144 @@ cb_Backend_document_mutate_in(VALUE self, static VALUE param_property = rb_id2sym(rb_intern("param")); couchbase::mutate_in_specs cxx_specs; - auto entries_size = static_cast(RARRAY_LEN(specs)); - for (std::size_t i = 0; i < entries_size; ++i) { - VALUE entry = rb_ary_entry(specs, static_cast(i)); - cb_check_type(entry, T_HASH); - bool xattr = RTEST(rb_hash_aref(entry, xattr_property)); - bool create_path = RTEST(rb_hash_aref(entry, create_path_property)); - bool expand_macros = RTEST(rb_hash_aref(entry, expand_macros_property)); - VALUE path = rb_hash_aref(entry, path_property); - cb_check_type(path, T_STRING); - VALUE operation = rb_hash_aref(entry, opcode_property); - cb_check_type(operation, T_SYMBOL); - VALUE param = rb_hash_aref(entry, param_property); - if (ID operation_id = rb_sym2id(operation); operation_id == rb_intern("dict_add")) { - cb_check_type(param, T_STRING); - cxx_specs.push_back(couchbase::mutate_in_specs::insert_raw( - cb_string_new(path), cb_binary_new(param), expand_macros) - .xattr(xattr) - .create_path(create_path)); - } else if (operation_id == rb_intern("dict_upsert")) { - cb_check_type(param, T_STRING); - - cxx_specs.push_back(couchbase::mutate_in_specs::upsert_raw( - cb_string_new(path), cb_binary_new(param), expand_macros) - .xattr(xattr) - .create_path(create_path)); - } else if (operation_id == rb_intern("remove")) { - cxx_specs.push_back(couchbase::mutate_in_specs::remove(cb_string_new(path)).xattr(xattr)); - } else if (operation_id == rb_intern("replace")) { - cb_check_type(param, T_STRING); - cxx_specs.push_back(couchbase::mutate_in_specs::replace_raw( - cb_string_new(path), cb_binary_new(param), expand_macros) - .xattr(xattr)); - } else if (operation_id == rb_intern("array_push_last")) { - cb_check_type(param, T_STRING); - cxx_specs.push_back( - couchbase::mutate_in_specs::array_append_raw(cb_string_new(path), cb_binary_new(param)) - .xattr(xattr) - .create_path(create_path)); - } else if (operation_id == rb_intern("array_push_first")) { - cb_check_type(param, T_STRING); - cxx_specs.push_back( - couchbase::mutate_in_specs::array_prepend_raw(cb_string_new(path), cb_binary_new(param)) - .xattr(xattr) - .create_path(create_path)); - } else if (operation_id == rb_intern("array_insert")) { - cb_check_type(param, T_STRING); - cxx_specs.push_back( - couchbase::mutate_in_specs::array_insert_raw(cb_string_new(path), cb_binary_new(param)) - .xattr(xattr) - .create_path(create_path)); - } else if (operation_id == rb_intern("array_add_unique")) { - cb_check_type(param, T_STRING); - cxx_specs.push_back(couchbase::mutate_in_specs::array_add_unique_raw( - cb_string_new(path), cb_binary_new(param), expand_macros) - .xattr(xattr) - .create_path(create_path)); - } else if (operation_id == rb_intern("counter")) { - if (TYPE(param) == T_FIXNUM || TYPE(param) == T_BIGNUM) { - if (std::int64_t num = NUM2LL(param); num < 0) { - cxx_specs.push_back(couchbase::mutate_in_specs::decrement(cb_string_new(path), -1 * num) - .xattr(xattr) - .create_path(create_path)); + { + auto entries_size = static_cast(RARRAY_LEN(specs)); + for (std::size_t i = 0; i < entries_size; ++i) { + VALUE entry = rb_ary_entry(specs, static_cast(i)); + cb_check_type(entry, T_HASH); + bool xattr = RTEST(rb_hash_aref(entry, xattr_property)); + bool create_path = RTEST(rb_hash_aref(entry, create_path_property)); + bool expand_macros = RTEST(rb_hash_aref(entry, expand_macros_property)); + VALUE path = rb_hash_aref(entry, path_property); + cb_check_type(path, T_STRING); + VALUE operation = rb_hash_aref(entry, opcode_property); + cb_check_type(operation, T_SYMBOL); + VALUE param = rb_hash_aref(entry, param_property); + if (ID operation_id = rb_sym2id(operation); operation_id == rb_intern("dict_add")) { + cb_check_type(param, T_STRING); + cxx_specs.push_back(couchbase::mutate_in_specs::insert_raw( + cb_string_new(path), cb_binary_new(param), expand_macros) + .xattr(xattr) + .create_path(create_path)); + } else if (operation_id == rb_intern("dict_upsert")) { + cb_check_type(param, T_STRING); + + cxx_specs.push_back(couchbase::mutate_in_specs::upsert_raw( + cb_string_new(path), cb_binary_new(param), expand_macros) + .xattr(xattr) + .create_path(create_path)); + } else if (operation_id == rb_intern("remove")) { + cxx_specs.push_back(couchbase::mutate_in_specs::remove(cb_string_new(path)).xattr(xattr)); + } else if (operation_id == rb_intern("replace")) { + cb_check_type(param, T_STRING); + cxx_specs.push_back(couchbase::mutate_in_specs::replace_raw( + cb_string_new(path), cb_binary_new(param), expand_macros) + .xattr(xattr)); + } else if (operation_id == rb_intern("array_push_last")) { + cb_check_type(param, T_STRING); + cxx_specs.push_back( + couchbase::mutate_in_specs::array_append_raw(cb_string_new(path), cb_binary_new(param)) + .xattr(xattr) + .create_path(create_path)); + } else if (operation_id == rb_intern("array_push_first")) { + cb_check_type(param, T_STRING); + cxx_specs.push_back( + couchbase::mutate_in_specs::array_prepend_raw(cb_string_new(path), cb_binary_new(param)) + .xattr(xattr) + .create_path(create_path)); + } else if (operation_id == rb_intern("array_insert")) { + cb_check_type(param, T_STRING); + cxx_specs.push_back( + couchbase::mutate_in_specs::array_insert_raw(cb_string_new(path), cb_binary_new(param)) + .xattr(xattr) + .create_path(create_path)); + } else if (operation_id == rb_intern("array_add_unique")) { + cb_check_type(param, T_STRING); + cxx_specs.push_back(couchbase::mutate_in_specs::array_add_unique_raw( + cb_string_new(path), cb_binary_new(param), expand_macros) + .xattr(xattr) + .create_path(create_path)); + } else if (operation_id == rb_intern("counter")) { + if (TYPE(param) == T_FIXNUM || TYPE(param) == T_BIGNUM) { + if (std::int64_t num = NUM2LL(param); num < 0) { + cxx_specs.push_back( + couchbase::mutate_in_specs::decrement(cb_string_new(path), -1 * num) + .xattr(xattr) + .create_path(create_path)); + } else { + cxx_specs.push_back(couchbase::mutate_in_specs::increment(cb_string_new(path), num) + .xattr(xattr) + .create_path(create_path)); + } } else { - cxx_specs.push_back(couchbase::mutate_in_specs::increment(cb_string_new(path), num) - .xattr(xattr) - .create_path(create_path)); + throw ruby_exception( + exc_invalid_argument(), + rb_sprintf("subdocument counter operation expects number, but given: %+" PRIsVALUE, + param)); } + } else if (operation_id == rb_intern("set_doc")) { + cb_check_type(param, T_STRING); + cxx_specs.push_back( + couchbase::mutate_in_specs::replace_raw("", cb_binary_new(param), expand_macros) + .xattr(xattr)); + } else if (operation_id == rb_intern("remove_doc")) { + cxx_specs.push_back(couchbase::mutate_in_specs::remove("").xattr(xattr)); } else { throw ruby_exception( exc_invalid_argument(), - rb_sprintf("subdocument counter operation expects number, but given: %+" PRIsVALUE, - param)); + rb_sprintf("unsupported operation for subdocument mutation: %+" PRIsVALUE, operation)); } - } else if (operation_id == rb_intern("set_doc")) { - cb_check_type(param, T_STRING); - cxx_specs.push_back( - couchbase::mutate_in_specs::replace_raw("", cb_binary_new(param), expand_macros) - .xattr(xattr)); - } else if (operation_id == rb_intern("remove_doc")) { - cxx_specs.push_back(couchbase::mutate_in_specs::remove("").xattr(xattr)); - } else { - throw ruby_exception( - exc_invalid_argument(), - rb_sprintf("unsupported operation for subdocument mutation: %+" PRIsVALUE, operation)); } } + req.specs = cxx_specs.specs(); + + std::promise promise; + auto f = promise.get_future(); - auto f = cluster.bucket(cb_string_new(bucket)) - .scope(cb_string_new(scope)) - .collection(cb_string_new(collection)) - .mutate_in(cb_string_new(id), cxx_specs, opts); + if (const auto legacy_durability = extract_legacy_durability_constraints(options); + legacy_durability.has_value()) { + cluster.execute( + core::operations::mutate_in_request_with_legacy_durability{ + std::move(req), + legacy_durability.value().first, + legacy_durability.value().second, + }, + [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); + } else { + cluster.execute(std::move(req), [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); + } - auto [ctx, resp] = cb_wait_for_future(f); - if (ctx.ec()) { - cb_throw_error(ctx, "unable to mutate_in"); + auto resp = cb_wait_for_future(f); + if (resp.ctx.ec()) { + cb_throw_error(resp.ctx, "unable to mutate_in"); } static VALUE deleted_property = rb_id2sym(rb_intern("deleted")); static VALUE fields_property = rb_id2sym(rb_intern("fields")); static VALUE index_property = rb_id2sym(rb_intern("index")); - static VALUE cas_property = rb_id2sym(rb_intern("cas")); static VALUE value_property = rb_id2sym(rb_intern("value")); - VALUE res = to_mutation_result_value(resp); - rb_hash_aset(res, deleted_property, resp.is_deleted() ? Qtrue : Qfalse); - if (!ctx.ec()) { - rb_hash_aset(res, cas_property, cb_cas_to_num(resp.cas())); - - VALUE fields = rb_ary_new_capa(static_cast(entries_size)); - rb_hash_aset(res, fields_property, fields); - for (std::size_t i = 0; i < entries_size; ++i) { - VALUE entry = rb_hash_new(); - rb_hash_aset(entry, index_property, ULL2NUM(i)); - rb_hash_aset(entry, - path_property, - rb_hash_aref(rb_ary_entry(specs, static_cast(i)), path_property)); - if (resp.has_value(i)) { - auto value = resp.content_as(i); - rb_hash_aset(entry, value_property, cb_str_new(core::utils::json::generate(value))); - } - rb_ary_store(fields, static_cast(i), entry); + VALUE res = cb_create_mutation_result(resp); + rb_hash_aset(res, deleted_property, resp.deleted ? Qtrue : Qfalse); + VALUE fields = rb_ary_new_capa(static_cast(resp.fields.size())); + rb_hash_aset(res, fields_property, fields); + for (std::size_t i = 0; i < resp.fields.size(); ++i) { + VALUE entry = rb_hash_new(); + rb_hash_aset(entry, index_property, ULL2NUM(i)); + rb_hash_aset(entry, + path_property, + rb_hash_aref(rb_ary_entry(specs, static_cast(i)), path_property)); + if (!resp.fields.at(i).value.empty()) { + rb_hash_aset(entry, value_property, cb_str_new(resp.fields.at(i).value)); } + rb_ary_store(fields, static_cast(i), entry); } return res; + } catch (const std::system_error& se) { rb_exc_raise(cb_map_error_code( se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); diff --git a/ext/rcb_multi.cxx b/ext/rcb_multi.cxx index fb0f7ea1..27b3507e 100644 --- a/ext/rcb_multi.cxx +++ b/ext/rcb_multi.cxx @@ -17,7 +17,20 @@ #include #include + +// TODO(DC): Compilation fails unless all operations that support legacy durability are included +// This is probably something we should address in the C++ core. Including them all for now. +#include +#include #include +#include +#include +#include +#include +#include +#include +#include + #include #include @@ -32,7 +45,6 @@ namespace couchbase::ruby { namespace { - void cb_extract_array_of_ids(std::vector& ids, VALUE arg) { @@ -82,21 +94,29 @@ cb_extract_array_of_ids(std::vector& ids, VALUE arg) void cb_extract_array_of_id_content( - std::vector>& id_content, - VALUE arg) + std::vector>& id_content, + VALUE bucket_name, + VALUE scope_name, + VALUE collection_name, + VALUE tuples) { - if (TYPE(arg) != T_ARRAY) { + cb_check_type(bucket_name, T_STRING); + cb_check_type(scope_name, T_STRING); + cb_check_type(collection_name, T_STRING); + + if (TYPE(tuples) != T_ARRAY) { throw ruby_exception( rb_eArgError, - rb_sprintf("Type of ID/content tuples must be an Array, but given %+" PRIsVALUE, arg)); + rb_sprintf("Type of ID/content tuples must be an Array, but given %+" PRIsVALUE, tuples)); } - auto num_of_tuples = static_cast(RARRAY_LEN(arg)); + + auto num_of_tuples = static_cast(RARRAY_LEN(tuples)); if (num_of_tuples < 1) { throw ruby_exception(rb_eArgError, "Array of ID/content tuples must not be empty"); } id_content.reserve(num_of_tuples); for (std::size_t i = 0; i < num_of_tuples; ++i) { - VALUE entry = rb_ary_entry(arg, static_cast(i)); + VALUE entry = rb_ary_entry(tuples, static_cast(i)); if (TYPE(entry) != T_ARRAY || RARRAY_LEN(entry) != 3) { throw ruby_exception(rb_eArgError, rb_sprintf("ID/content tuple must be represented as an Array[id, " @@ -118,27 +138,40 @@ cb_extract_array_of_id_content( throw ruby_exception(rb_eArgError, rb_sprintf("Flags must be an Integer, but given %+" PRIsVALUE, flags)); } - id_content.emplace_back( - cb_string_new(id), - couchbase::codec::encoded_value{ cb_binary_new(content), FIX2UINT(flags) }); + id_content.emplace_back(core::document_id{ cb_string_new(bucket_name), + cb_string_new(scope_name), + cb_string_new(collection_name), + cb_string_new(id) }, + couchbase::codec::encoded_value{ + cb_binary_new(content), + FIX2UINT(flags), + }); } } void -cb_extract_array_of_id_cas(std::vector>& id_cas, VALUE arg) +cb_extract_array_of_id_cas(std::vector>& id_cas, + VALUE bucket_name, + VALUE scope_name, + VALUE collection_name, + VALUE tuples) { - if (TYPE(arg) != T_ARRAY) { + cb_check_type(bucket_name, T_STRING); + cb_check_type(scope_name, T_STRING); + cb_check_type(collection_name, T_STRING); + + if (TYPE(tuples) != T_ARRAY) { throw ruby_exception( rb_eArgError, - rb_sprintf("Type of ID/CAS tuples must be an Array, but given %+" PRIsVALUE, arg)); + rb_sprintf("Type of ID/CAS tuples must be an Array, but given %+" PRIsVALUE, tuples)); } - auto num_of_tuples = static_cast(RARRAY_LEN(arg)); + auto num_of_tuples = static_cast(RARRAY_LEN(tuples)); if (num_of_tuples < 1) { rb_raise(rb_eArgError, "Array of ID/CAS tuples must not be empty"); } id_cas.reserve(num_of_tuples); for (std::size_t i = 0; i < num_of_tuples; ++i) { - VALUE entry = rb_ary_entry(arg, static_cast(i)); + VALUE entry = rb_ary_entry(tuples, static_cast(i)); if (TYPE(entry) != T_ARRAY || RARRAY_LEN(entry) != 2) { throw ruby_exception( rb_eArgError, @@ -156,7 +189,11 @@ cb_extract_array_of_id_cas(std::vector>& cb_extract_cas(cas_val, cas); } - id_cas.emplace_back(cb_string_new(id), cas_val); + id_cas.emplace_back(core::document_id{ cb_string_new(bucket_name), + cb_string_new(scope_name), + cb_string_new(collection_name), + cb_string_new(id) }, + cas_val); } } @@ -222,45 +259,72 @@ cb_Backend_document_upsert_multi(VALUE self, VALUE id_content, VALUE options) { - auto cluster = cb_backend_to_public_api_cluster(self); - - try { - couchbase::upsert_options opts; - set_timeout(opts, options); - set_expiry(opts, options); - set_durability(opts, options); - set_preserve_expiry(opts, options); + auto cluster = cb_backend_to_core_api_cluster(self); - auto c = cluster.bucket(cb_string_new(bucket)) - .scope(cb_string_new(scope)) - .collection(cb_string_new(collection)); + Check_Type(bucket, T_STRING); + Check_Type(scope, T_STRING); + Check_Type(collection, T_STRING); + if (!NIL_P(options)) { + Check_Type(options, T_HASH); + } - std::vector> tuples{}; - cb_extract_array_of_id_content(tuples, id_content); + try { + std::vector> tuples{}; + cb_extract_array_of_id_content(tuples, bucket, scope, collection, id_content); auto num_of_tuples = tuples.size(); - std::vector< - std::pair>>> - futures; + std::vector>> futures; futures.reserve(num_of_tuples); for (auto& [id, content] : tuples) { - futures.emplace_back(id, c.upsert(id, std::move(content), opts)); + core::operations::upsert_request req{ + id, + std::move(content.data), + }; + req.flags = content.flags; + cb_extract_timeout(req, options); + cb_extract_expiry(req, options); + cb_extract_durability_level(req, options); + cb_extract_preserve_expiry(req, options); + + std::promise promise; + futures.emplace_back(id.key(), promise.get_future()); + + if (const auto legacy_durability = extract_legacy_durability_constraints(options); + legacy_durability.has_value()) { + cluster.execute( + core::operations::upsert_request_with_legacy_durability{ + std::move(req), + legacy_durability.value().first, + legacy_durability.value().second, + }, + [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); + } else { + cluster.execute(std::move(req), [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); + } } VALUE res = rb_ary_new_capa(static_cast(num_of_tuples)); for (auto& [id, f] : futures) { - auto [err, resp] = f.get(); - VALUE entry = to_mutation_result_value(resp); - if (err.ec()) { + auto resp = f.get(); + VALUE entry; + if (resp.ctx.ec()) { + entry = rb_hash_new(); static const auto sym_error = rb_id2sym(rb_intern("error")); - rb_hash_aset(entry, sym_error, cb_map_error(err, "unable (multi)upsert")); + rb_hash_aset(entry, sym_error, cb_map_error(resp.ctx, "unable (multi)upsert")); + } else { + entry = cb_create_mutation_result(resp); } static const auto sym_id = rb_id2sym(rb_intern("id")); rb_hash_aset(entry, sym_id, cb_str_new(id)); rb_ary_push(res, entry); } return res; + } catch (const std::system_error& se) { rb_exc_raise(cb_map_error_code( se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); @@ -278,50 +342,69 @@ cb_Backend_document_remove_multi(VALUE self, VALUE id_cas, VALUE options) { - auto cluster = cb_backend_to_public_api_cluster(self); + auto cluster = cb_backend_to_core_api_cluster(self); + Check_Type(bucket, T_STRING); + Check_Type(scope, T_STRING); + Check_Type(collection, T_STRING); if (!NIL_P(options)) { Check_Type(options, T_HASH); } try { - couchbase::remove_options opts; - set_timeout(opts, options); - set_durability(opts, options); - - std::vector> tuples{}; - cb_extract_array_of_id_cas(tuples, id_cas); - - auto c = cluster.bucket(cb_string_new(bucket)) - .scope(cb_string_new(scope)) - .collection(cb_string_new(collection)); + std::vector> tuples{}; + cb_extract_array_of_id_cas(tuples, bucket, scope, collection, id_cas); auto num_of_tuples = tuples.size(); - std::vector< - std::pair>>> - futures; + std::vector>> futures; futures.reserve(num_of_tuples); for (const auto& [id, cas] : tuples) { - opts.cas(cas); - futures.emplace_back(id, c.remove(id, opts)); + core::operations::remove_request req{ + id, + }; + cb_extract_timeout(req, options); + cb_extract_durability_level(req, options); + req.cas = cas; + + std::promise promise; + futures.emplace_back(id.key(), promise.get_future()); + + if (const auto legacy_durability = extract_legacy_durability_constraints(options); + legacy_durability.has_value()) { + cluster.execute( + core::operations::remove_request_with_legacy_durability{ + std::move(req), + legacy_durability.value().first, + legacy_durability.value().second, + }, + [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); + } else { + cluster.execute(std::move(req), [promise = std::move(promise)](auto&& resp) mutable { + promise.set_value(std::forward(resp)); + }); + } } VALUE res = rb_ary_new_capa(static_cast(num_of_tuples)); for (auto& [id, f] : futures) { - auto [ctx, resp] = f.get(); - VALUE entry = to_mutation_result_value(resp); - if (ctx.ec()) { + auto resp = f.get(); + VALUE entry; + if (resp.ctx.ec()) { + entry = rb_hash_new(); static const auto sym_error = rb_id2sym(rb_intern("error")); - rb_hash_aset(entry, sym_error, cb_map_error(ctx, "unable (multi)remove")); + rb_hash_aset(entry, sym_error, cb_map_error(resp.ctx, "unable (multi)remove")); + } else { + entry = cb_create_mutation_result(resp); } static const auto sym_id = rb_id2sym(rb_intern("id")); rb_hash_aset(entry, sym_id, cb_str_new(id)); - rb_ary_push(res, entry); } - return res; + } catch (const std::system_error& se) { rb_exc_raise(cb_map_error_code( se.code(), fmt::format("failed to perform {}: {}", __func__, se.what()), false)); diff --git a/ext/rcb_utils.cxx b/ext/rcb_utils.cxx index c3aefc5a..8f035050 100644 --- a/ext/rcb_utils.cxx +++ b/ext/rcb_utils.cxx @@ -15,6 +15,8 @@ * limitations under the License. */ +#include + #include #include "rcb_exceptions.hxx" @@ -212,6 +214,20 @@ cb_str_new(const std::optional& str) return Qnil; } +void +cb_extract_content(std::vector& field, VALUE content) +{ + cb_check_type(content, T_STRING); + field = cb_binary_new(content); +} + +void +cb_extract_flags(std::uint32_t& field, VALUE flags) +{ + cb_check_type(flags, T_FIXNUM); + field = FIX2UINT(flags); +} + void cb_extract_timeout(std::chrono::milliseconds& field, VALUE options) { @@ -358,6 +374,62 @@ cb_extract_cas(couchbase::cas& field, VALUE cas) } } +void +cb_extract_expiry(std::uint32_t& field, VALUE options) +{ + if (NIL_P(options)) { + return; + } + Check_Type(options, T_HASH); + + static VALUE property_name = rb_id2sym(rb_intern("expiry")); + VALUE val = rb_hash_aref(options, property_name); + if (NIL_P(val)) { + return; + } + const auto [type, duration] = unpack_expiry(val); + switch (type) { + case expiry_type::relative: + field = couchbase::core::impl::expiry_relative(duration); + break; + case expiry_type::absolute: + field = + couchbase::core::impl::expiry_absolute(std::chrono::system_clock::time_point(duration)); + break; + case expiry_type::none: + field = couchbase::core::impl::expiry_none(); + break; + } +} + +void +cb_extract_expiry(std::optional& field, VALUE options) +{ + if (NIL_P(options)) { + return; + } + Check_Type(options, T_HASH); + + static VALUE property_name = rb_id2sym(rb_intern("expiry")); + VALUE val = rb_hash_aref(options, property_name); + if (NIL_P(val)) { + return; + } + const auto [type, duration] = unpack_expiry(val); + switch (type) { + case expiry_type::relative: + field = couchbase::core::impl::expiry_relative(duration); + break; + case expiry_type::absolute: + field = + couchbase::core::impl::expiry_absolute(std::chrono::system_clock::time_point(duration)); + break; + case expiry_type::none: + field = couchbase::core::impl::expiry_none(); + break; + } +} + std::pair unpack_expiry(VALUE val, bool allow_nil) { diff --git a/ext/rcb_utils.hxx b/ext/rcb_utils.hxx index a74474e8..b689284e 100644 --- a/ext/rcb_utils.hxx +++ b/ext/rcb_utils.hxx @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -132,6 +133,43 @@ cb_extract_timeout(Request& req, VALUE options) } } +template +inline void +cb_extract_durability_level(Request& req, VALUE options) +{ + if (NIL_P(options)) { + return; + } + + if (TYPE(options) != T_HASH) { + throw ruby_exception(rb_eArgError, + rb_sprintf("expected options to be Hash, given %+" PRIsVALUE, options)); + } + + static VALUE property_name = rb_id2sym(rb_intern("durability_level")); + + VALUE val = rb_hash_aref(options, property_name); + if (NIL_P(val)) { + return; + } + if (TYPE(val) != T_SYMBOL) { + throw ruby_exception( + rb_eArgError, rb_sprintf("durability_level must be a Symbol, but given %+" PRIsVALUE, val)); + } + if (ID level = rb_sym2id(val); level == rb_intern("none")) { + req.durability_level = couchbase::durability_level::none; + } else if (level == rb_intern("majority")) { + req.durability_level = couchbase::durability_level::majority; + } else if (level == rb_intern("majority_and_persist_to_active")) { + req.durability_level = couchbase::durability_level::majority_and_persist_to_active; + } else if (level == rb_intern("persist_to_majority")) { + req.durability_level = couchbase::durability_level::persist_to_majority; + } else { + throw ruby_exception(rb_eArgError, + rb_sprintf("unexpected durability_level, given %+" PRIsVALUE, val)); + } +} + template inline void cb_extract_read_preference(Request& req, VALUE options) @@ -182,6 +220,26 @@ cb_extract_duration(Field& field, VALUE options, const char* name) } } +void +cb_extract_content(std::vector& field, VALUE content); + +template +inline void +cb_extract_content(Request& req, VALUE options) +{ + cb_extract_content(req.value, options); +} + +void +cb_extract_flags(std::uint32_t& field, VALUE flags); + +template +inline void +cb_extract_flags(Request& req, VALUE options) +{ + cb_extract_flags(req.flags, options); +} + void cb_extract_timeout(std::chrono::milliseconds& field, VALUE options); @@ -266,6 +324,41 @@ cb_extract_option_array(VALUE& val, VALUE options, const char* name); void cb_extract_cas(couchbase::cas& field, VALUE cas); +template +inline void +cb_extract_cas(Request& req, VALUE options) +{ + if (NIL_P(options) || TYPE(options) != T_HASH) { + return; + } + static VALUE property_name = rb_id2sym(rb_intern("cas")); + VALUE cas_value = rb_hash_aref(options, property_name); + if (NIL_P(cas_value)) { + return; + } + cb_extract_cas(req.cas, cas_value); +} + +void +cb_extract_expiry(std::uint32_t& field, VALUE options); + +void +cb_extract_expiry(std::optional& field, VALUE options); + +template +inline void +cb_extract_expiry(Request& req, VALUE options) +{ + cb_extract_expiry(req.expiry, options); +} + +template +inline void +cb_extract_preserve_expiry(Request& req, VALUE options) +{ + cb_extract_option_bool(req.preserve_expiry, options, "preserve_expiry"); +} + VALUE cb_cas_to_num(const couchbase::cas& cas); @@ -275,6 +368,24 @@ cb_num_to_cas(VALUE num); VALUE to_cas_value(couchbase::cas cas); +template +inline VALUE +cb_create_mutation_result(Response resp) +{ + VALUE res = rb_hash_new(); + rb_hash_aset(res, rb_id2sym(rb_intern("cas")), to_cas_value(resp.cas)); + + VALUE token = rb_hash_new(); + rb_hash_aset(token, rb_id2sym(rb_intern("partition_uuid")), ULL2NUM(resp.token.partition_uuid())); + rb_hash_aset( + token, rb_id2sym(rb_intern("sequence_number")), ULL2NUM(resp.token.sequence_number())); + rb_hash_aset(token, rb_id2sym(rb_intern("partition_id")), UINT2NUM(resp.token.partition_id())); + rb_hash_aset(token, rb_id2sym(rb_intern("bucket_name")), cb_str_new(resp.token.bucket_name())); + rb_hash_aset(res, rb_id2sym(rb_intern("mutation_token")), token); + + return res; +} + template inline VALUE to_mutation_result_value(Response resp) @@ -364,66 +475,6 @@ set_expiry(CommandOptions& opts, VALUE options) } } -template -inline void -set_access_deleted(CommandOptions& opts, VALUE options) -{ - static VALUE property_name = rb_id2sym(rb_intern("access_deleted")); - if (!NIL_P(options)) { - if (TYPE(options) != T_HASH) { - throw ruby_exception(rb_eArgError, - rb_sprintf("expected options to be Hash, given %+" PRIsVALUE, options)); - } - VALUE val = rb_hash_aref(options, property_name); - if (NIL_P(val)) { - return; - } - switch (TYPE(val)) { - case T_TRUE: - opts.access_deleted(true); - break; - case T_FALSE: - opts.access_deleted(false); - break; - - default: - throw ruby_exception( - rb_eArgError, - rb_sprintf("access_deleted must be an Boolean, but given %+" PRIsVALUE, val)); - } - } -} - -template -inline void -set_create_as_deleted(CommandOptions& opts, VALUE options) -{ - static VALUE property_name = rb_id2sym(rb_intern("create_as_deleted")); - if (!NIL_P(options)) { - if (TYPE(options) != T_HASH) { - throw ruby_exception(rb_eArgError, - rb_sprintf("expected options to be Hash, given %+" PRIsVALUE, options)); - } - VALUE val = rb_hash_aref(options, property_name); - if (NIL_P(val)) { - return; - } - switch (TYPE(val)) { - case T_TRUE: - opts.create_as_deleted(true); - break; - case T_FALSE: - opts.create_as_deleted(false); - break; - - default: - throw ruby_exception( - rb_eArgError, - rb_sprintf("create_as_deleted must be an Boolean, but given %+" PRIsVALUE, val)); - } - } -} - template inline void set_preserve_expiry(CommandOptions& opts, VALUE options) @@ -453,88 +504,6 @@ set_preserve_expiry(CommandOptions& opts, VALUE options) } } -template -inline void -set_cas(CommandOptions& opts, VALUE options) -{ - static VALUE property_name = rb_id2sym(rb_intern("cas")); - if (!NIL_P(options)) { - if (TYPE(options) != T_HASH) { - throw ruby_exception(rb_eArgError, - rb_sprintf("expected options to be Hash, given %+" PRIsVALUE, options)); - } - VALUE val = rb_hash_aref(options, property_name); - if (NIL_P(val)) { - return; - } - switch (TYPE(val)) { - case T_FIXNUM: - case T_BIGNUM: - opts.cas(couchbase::cas{ static_cast(NUM2ULL(val)) }); - break; - - default: - throw ruby_exception(rb_eArgError, - rb_sprintf("cas must be an Integer, but given %+" PRIsVALUE, val)); - } - } -} - -template -inline void -set_delta(CommandOptions& opts, VALUE options) -{ - static VALUE property_name = rb_id2sym(rb_intern("delta")); - if (!NIL_P(options)) { - if (TYPE(options) != T_HASH) { - throw ruby_exception(rb_eArgError, - rb_sprintf("expected options to be Hash, given %+" PRIsVALUE, options)); - } - VALUE val = rb_hash_aref(options, property_name); - if (NIL_P(val)) { - return; - } - switch (TYPE(val)) { - case T_FIXNUM: - case T_BIGNUM: - opts.delta(NUM2ULL(val)); - break; - - default: - throw ruby_exception(rb_eArgError, - rb_sprintf("delta must be an Integer, but given %+" PRIsVALUE, val)); - } - } -} - -template -inline void -set_initial_value(CommandOptions& opts, VALUE options) -{ - static VALUE property_name = rb_id2sym(rb_intern("initial_value")); - if (!NIL_P(options)) { - if (TYPE(options) != T_HASH) { - throw ruby_exception(rb_eArgError, - rb_sprintf("expected options to be Hash, given %+" PRIsVALUE, options)); - } - VALUE val = rb_hash_aref(options, property_name); - if (NIL_P(val)) { - return; - } - switch (TYPE(val)) { - case T_FIXNUM: - case T_BIGNUM: - opts.initial(NUM2ULL(val)); - break; - - default: - throw ruby_exception( - rb_eArgError, - rb_sprintf("initial_value must be an Integer, but given %+" PRIsVALUE, val)); - } - } -} - std::optional extract_durability_level(VALUE options); @@ -561,39 +530,40 @@ set_durability(CommandOptions& opts, VALUE options) } } -template +template inline void -set_store_semantics(CommandOptions& opts, VALUE options) +cb_extract_store_semantics(Request& req, VALUE options) { static VALUE property_name = rb_id2sym(rb_intern("store_semantics")); - if (!NIL_P(options)) { - if (TYPE(options) != T_HASH) { - throw ruby_exception(rb_eArgError, - rb_sprintf("expected options to be Hash, given %+" PRIsVALUE, options)); - } - VALUE val = rb_hash_aref(options, property_name); - if (NIL_P(val)) { - return; - } - if (TYPE(val) != T_SYMBOL) { - throw ruby_exception( - rb_eArgError, rb_sprintf("store_semantics must be a Symbol, but given %+" PRIsVALUE, val)); - } + if (NIL_P(options)) { + return; + } + if (TYPE(options) != T_HASH) { + throw ruby_exception(rb_eArgError, + rb_sprintf("expected options to be Hash, given %+" PRIsVALUE, options)); + } - if (ID mode = rb_sym2id(val); mode == rb_intern("replace")) { - opts.store_semantics(store_semantics::replace); - } else if (mode == rb_intern("insert")) { - opts.store_semantics(store_semantics::insert); - } else if (mode == rb_intern("upsert")) { - opts.store_semantics(store_semantics::upsert); - } else { - throw ruby_exception(rb_eArgError, - rb_sprintf("unexpected store_semantics, given %+" PRIsVALUE, val)); - } + VALUE val = rb_hash_aref(options, property_name); + if (NIL_P(val)) { + return; + } + if (TYPE(val) != T_SYMBOL) { + throw ruby_exception( + rb_eArgError, rb_sprintf("store_semantics must be a Symbol, but given %+" PRIsVALUE, val)); } -} + if (ID mode = rb_sym2id(val); mode == rb_intern("replace")) { + req.store_semantics = couchbase::store_semantics::replace; + } else if (mode == rb_intern("insert")) { + req.store_semantics = couchbase::store_semantics::insert; + } else if (mode == rb_intern("upsert")) { + req.store_semantics = couchbase::store_semantics::upsert; + } else { + throw ruby_exception(rb_eArgError, + rb_sprintf("unexpected store_semantics, given %+" PRIsVALUE, val)); + } +} } // namespace couchbase::ruby #endif diff --git a/lib/couchbase/cluster.rb b/lib/couchbase/cluster.rb index d478eb64..e4fd142a 100644 --- a/lib/couchbase/cluster.rb +++ b/lib/couchbase/cluster.rb @@ -401,20 +401,22 @@ def initialize(connection_string, *args) end @observability = Observability::Wrapper.new do |w| - if !(open_options[:enable_tracing].nil? && !open_options[:enable_tracing]) - w.tracer = Tracing::NoopTracer.new - elsif tracer.nil? - w.tracer = Tracing::ThresholdLoggingTracer.new( - emit_interval: open_options[:threshold_emit_interval], - kv_threshold: open_options[:key_value_threshold], - query_threshold: open_options[:query_threshold], - views_threshold: open_options[:view_threshold], - search_threshold: open_options[:search_threshold], - analytics_threshold: open_options[:analytics_threshold], - management_threshold: open_options[:management_threshold], - sample_size: open_options[:threshold_sample_size], - ) - end + w.tracer = if !(open_options[:enable_tracing].nil? && !open_options[:enable_tracing]) + Tracing::NoopTracer.new + elsif tracer.nil? + Tracing::ThresholdLoggingTracer.new( + emit_interval: open_options[:threshold_emit_interval], + kv_threshold: open_options[:key_value_threshold], + query_threshold: open_options[:query_threshold], + views_threshold: open_options[:view_threshold], + search_threshold: open_options[:search_threshold], + analytics_threshold: open_options[:analytics_threshold], + management_threshold: open_options[:management_threshold], + sample_size: open_options[:threshold_sample_size], + ) + else + tracer + end end @backend = Backend.new diff --git a/test/utils/tracing/test_span.rb b/test/utils/tracing/test_span.rb index edb7e229..17a1c803 100644 --- a/test/utils/tracing/test_span.rb +++ b/test/utils/tracing/test_span.rb @@ -18,7 +18,7 @@ module Couchbase module TestUtilities - class TestSpan < RequestSpan + class TestSpan < Couchbase::Tracing::RequestSpan attr_accessor :name attr_accessor :start_time, :end_time attr_accessor :attributes @@ -26,7 +26,7 @@ class TestSpan < RequestSpan attr_accessor :children def initialize(name, parent: nil, start_timestamp: nil) - super + super() @name = name @start_time = start_timestamp.nil? ? Time.now : start_timestamp @parent = parent diff --git a/test/utils/tracing/test_tracer.rb b/test/utils/tracing/test_tracer.rb index 762f9527..13f5c75f 100644 --- a/test/utils/tracing/test_tracer.rb +++ b/test/utils/tracing/test_tracer.rb @@ -20,7 +20,7 @@ module Couchbase module TestUtilities - class TestTracer < RequestTracer + class TestTracer < Couchbase::Tracing::RequestTracer def initialize super @spans = []