Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 104 additions & 3 deletions ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -782,12 +782,15 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
TKikimrRunner kikimr(TKikimrSettings()
.SetUseRealThreads(false)
.SetEnableAddColumsWithDefaults(true)
.SetDisableMissingDefaultColumnsInBulkUpsert(true)
.SetWithSampleTables(false));

auto db = kikimr.RunCall([&] { return kikimr.GetQueryClient(); } );
auto session = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); } );
auto querySession = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); } );

auto tableClient = kikimr.RunCall([&] { return kikimr.GetTableClient(); } );

auto& runtime = *kikimr.GetTestServer().GetRuntime();

{
Expand Down Expand Up @@ -860,7 +863,7 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {

auto alterQuery = R"(
ALTER TABLE `/Root/AddNonColumnDoesnotReturnInternalError`
ADD COLUMN Value3 Int32 NOT NULL DEFAULT 7;
ADD COLUMN Value3 Int32 DEFAULT 7;
)";

auto alterFuture = kikimr.RunInThreadPool([&] { return session.ExecuteQuery(alterQuery, TTxControl::NoTx()).GetValueSync(); });
Expand Down Expand Up @@ -894,6 +897,29 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
UNIT_ASSERT_STRING_CONTAINS(result, R"([[1u;"Changed";"Updated"];[2u;"New";"text"]])");
}

{
auto rowsBuilder = NYdb::TValueBuilder();
rowsBuilder.BeginList();
for (ui32 i = 10; i <= 15; ++i) {
rowsBuilder.AddListItem()
.BeginStruct()
.AddMember("Key")
.Uint32(i)
.AddMember("Value")
.String("String")
.AddMember("Value2")
.String("String2")
.EndStruct();

}
rowsBuilder.EndList();
auto result = kikimr.RunCall([&] {
return tableClient.BulkUpsert("/Root/AddNonColumnDoesnotReturnInternalError", rowsBuilder.Build()).GetValueSync();
});
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SCHEME_ERROR, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Missing default columns: Value3");
}

{
TString result = fQuery(R"(
UPSERT INTO `/Root/AddNonColumnDoesnotReturnInternalError` (Key, Value, Value2, Value3) VALUES (1, "4", "four", 1);
Expand Down Expand Up @@ -933,8 +959,8 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {

auto result = runtime.WaitFuture(alterFuture);
fCompareTable(R"([
[1u;"Changed";"Updated";7];
[2u;"New";"text";7]
[1u;"Changed";"Updated";[7]];
[2u;"New";"text";[7]]
])");
}

Expand Down Expand Up @@ -1513,6 +1539,81 @@ Y_UNIT_TEST_SUITE(KqpConstraints) {
}
}

Y_UNIT_TEST_TWIN(DefaultColumnAndBulkUpsert, DisableMissingDefaultColumnsInBulkUpsert) {
TKikimrRunner kikimr(TKikimrSettings()
.SetEnableAddColumsWithDefaults(true)
.SetDisableMissingDefaultColumnsInBulkUpsert(DisableMissingDefaultColumnsInBulkUpsert)
.SetWithSampleTables(false));

auto queryClient = kikimr.GetQueryClient();
auto tableClient = kikimr.GetTableClient();

{
auto query = R"(
CREATE TABLE `/Root/DefaultColumnAndBulkUpsert` (
Key Uint32 NOT NULL,
Value1 String DEFAULT "Default value",
Value2 Int64 DEFAULT 123,
PRIMARY KEY (Key),
);
)";

auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
auto query = R"(
UPSERT INTO `/Root/DefaultColumnAndBulkUpsert` (Key) VALUES (1), (2);
)";

auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
auto query = R"(
UPSERT INTO `/Root/DefaultColumnAndBulkUpsert` (Key, Value1) VALUES (3, "Value1"), (4, "Value2");
)";

auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
auto query = R"(
ALTER TABLE `/Root/DefaultColumnAndBulkUpsert` ADD COLUMN Value3 Utf8 DEFAULT "Value3"u;
)";

auto result = queryClient.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
auto rowsBuilder = NYdb::TValueBuilder();
rowsBuilder.BeginList();
for (ui32 i = 10; i <= 15; ++i) {
rowsBuilder.AddListItem()
.BeginStruct()
.AddMember("Key")
.Uint32(i)
.AddMember("Value2")
.OptionalInt64(0)
.EndStruct();

}
rowsBuilder.EndList();

auto result = tableClient.BulkUpsert("/Root/DefaultColumnAndBulkUpsert", rowsBuilder.Build()).ExtractValueSync();
if (DisableMissingDefaultColumnsInBulkUpsert) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SCHEME_ERROR, result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Missing default columns: Value3, Value1");
} else {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}
}
}

// Y_UNIT_TEST(SetNotNull) {
// struct TValue {
// private:
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,5 @@ message TFeatureFlags {
optional bool EnableStreamingQueries = 206 [default = false];
optional bool EnableTinyDisks = 207 [default = false];
optional bool EnableMetricsLevel = 208 [default = false];
optional bool DisableMissingDefaultColumnsInBulkUpsert = 215 [default = false];
}
1 change: 1 addition & 0 deletions ydb/core/testlib/basics/feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class TTestFeatureFlagsHolder {
FEATURE_FLAG_SETTER(EnableRealSystemViewPaths)
FEATURE_FLAG_SETTER(EnableDataShardWriteAlwaysVolatile)
FEATURE_FLAG_SETTER(EnableStreamingQueries)
FEATURE_FLAG_SETTER(DisableMissingDefaultColumnsInBulkUpsert)

#undef FEATURE_FLAG_SETTER
};
Expand Down
24 changes: 24 additions & 0 deletions ydb/core/tx/tx_proxy/upload_rows_common_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
THashMap<TString, ui32> columnByName;
THashSet<TString> keyColumnsLeft;
THashSet<TString> notNullColumnsLeft = entry.NotNullColumns;
THashSet<TString> defaultColumnsLeft;
SrcColumns.reserve(entry.Columns.size());
THashSet<TString> HasInternalConversion;

Expand All @@ -400,6 +401,10 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
keyColumnIds[keyOrder] = id;
keyColumnsLeft.insert(name);
}

if (colInfo.IsDefaultFromLiteral()) {
defaultColumnsLeft.insert(name);
}
}

if (entry.ColumnTableInfo) {
Expand Down Expand Up @@ -498,6 +503,10 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
NotNullColumns.emplace(ci.Name);
}

if (defaultColumnsLeft.contains(ci.Name)) {
defaultColumnsLeft.erase(ci.Name);
}

if (ci.KeyOrder != -1) {
KeyColumnPositions[ci.KeyOrder] = TFieldDescription{ci.Id, ci.Name, (ui32)pos, ci.PType, pgTypeMod, notNull};
keyColumnsLeft.erase(ci.Name);
Expand Down Expand Up @@ -582,6 +591,21 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
return TConclusionStatus::Fail(Sprintf("Missing not null columns: %s", JoinSeq(", ", notNullColumnsLeft).c_str()));
}

if (!defaultColumnsLeft.empty() && UpsertIfExists) {
// some default columns are not specified in the request, but upsert will only update existing rows
// and only the columns specified in the request will be updated; unspecified default columns will not be changed.
defaultColumnsLeft.clear();
}

if (!defaultColumnsLeft.empty()) {
if (AppData(ctx)->FeatureFlags.GetDisableMissingDefaultColumnsInBulkUpsert()) {
return TConclusionStatus::Fail(Sprintf("Missing default columns: %s", JoinSeq(", ", defaultColumnsLeft).c_str()));
}

UploadCounters.OnMissingDefaultColumns();
LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Missing default columns: " << JoinSeq(", ", defaultColumnsLeft).c_str());
}

TConclusionStatus res = TConclusionStatus::Success();
if (isColumnTable && HasAppData() && AppDataVerified().ColumnShardConfig.GetBulkUpsertRequireAllColumns()) {
res = CheckRequiredColumns(entry, *reqColumns);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/tx_proxy/upload_rows_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,6 @@ TUploadCounters::TUploadCounters()
WrittenBytes = TBase::GetDeriviative("Replies/WrittenBytes");
FailedBytes = TBase::GetDeriviative("Replies/FailedBytes");
RequestsBytes = TBase::GetDeriviative("Requests/Bytes");
MissingDefaultColumnsCount = TBase::GetDeriviative("MissingDefaultColumns/Count");
}
}
6 changes: 6 additions & 0 deletions ydb/core/tx/tx_proxy/upload_rows_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class TUploadCounters: public NColumnShard::TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr FailedBytes;
NMonitoring::TDynamicCounters::TCounterPtr RequestsBytes;

NMonitoring::TDynamicCounters::TCounterPtr MissingDefaultColumnsCount;

THashMap<TUploadStatus, NMonitoring::TDynamicCounters::TCounterPtr, TUploadStatus::THasher> CodesCount;

NMonitoring::TDynamicCounters::TCounterPtr GetCodeCounter(const TUploadStatus& status);
Expand Down Expand Up @@ -142,6 +144,10 @@ class TUploadCounters: public NColumnShard::TCommonCountersOwner {
PackageSizeCountByRecords->Collect(rowsCount);
RequestsBytes->Add(requestBytes);
}

void OnMissingDefaultColumns() {
MissingDefaultColumnsCount->Inc();
}
};

} // namespace NKikimr
Loading