-
Notifications
You must be signed in to change notification settings - Fork 240
/
Copy pathClientWarningsTests.cs
188 lines (170 loc) · 8.1 KB
/
ClientWarningsTests.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
//
// Copyright (C) DataStax Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using Cassandra.IntegrationTests.TestBase;
using Cassandra.IntegrationTests.TestClusterManagement;
using Cassandra.Tests;
using NUnit.Framework;
namespace Cassandra.IntegrationTests.Core
{
[TestFixture, Category(TestCategory.Short), Category(TestCategory.RealCluster), Category(TestCategory.ServerApi)]
public class ClientWarningsTests : TestGlobals
{
public ISession Session { get; set; }
private const string Keyspace = "ks_client_warnings";
private const string Table = Keyspace + ".tbl1";
private ITestCluster _testCluster;
[OneTimeSetUp]
public void SetupFixture()
{
if (TestClusterManager.CheckCassandraVersion(false, Version.Parse("2.2"), Comparison.LessThan))
{
Assert.Ignore("Requires Cassandra version >= 2.2");
return;
}
Diagnostics.CassandraTraceSwitch.Level = TraceLevel.Info;
_testCluster = TestClusterManager.CreateNew(1, new TestClusterOptions
{
CassandraYaml = new[]
{
"batch_size_warn_threshold_in_kb:5",
"batch_size_fail_threshold_in_kb:50"
},
//Using a mirroring handler, the server will reply providing the same payload that was sent
JvmArgs = new[] { "-Dcassandra.custom_query_handler_class=org.apache.cassandra.cql3.CustomPayloadMirroringQueryHandler" }
});
_testCluster.InitClient();
Session = _testCluster.Session;
Session.Execute(string.Format(TestUtils.CreateKeyspaceSimpleFormat, Keyspace, 1));
Session.Execute(string.Format(TestUtils.CreateTableSimpleFormat, Table));
}
[Test]
public void Should_QueryTrace_When_Enabled()
{
var rs = Session.Execute(new SimpleStatement("SELECT * from system.local").EnableTracing());
Assert.NotNull(rs.Info.QueryTrace);
var hosts = Session.Cluster.Metadata.AllHosts();
Assert.NotNull(hosts);
var coordinator = hosts.FirstOrDefault();
Assert.NotNull(coordinator);
Assert.AreEqual(coordinator.Address.Address, rs.Info.QueryTrace.Coordinator);
Assert.Greater(rs.Info.QueryTrace.Events.Count, 0);
if (Session.Cluster.Metadata.GetClusterDescription().ProtocolVersion >= ProtocolVersion.V4)
{
Assert.NotNull(rs.Info.QueryTrace.ClientAddress);
}
else
{
Assert.Null(rs.Info.QueryTrace.ClientAddress);
}
}
[Test]
public void Should_NotGetQueryTrace_When_NotEnabledXDefaultX()
{
var rs = Session.Execute(new SimpleStatement("SELECT * from system.local"));
Assert.Null(rs.Info.QueryTrace);
}
[Test, TestCassandraVersion(2, 2)]
public void Should_NotGenerateWarning_When_RegularBehavior()
{
var rs = Session.Execute("SELECT * FROM system.local");
//It should be null for queries that do not generate warnings
Assert.Null(rs.Info.Warnings);
}
[Test, TestCassandraVersion(2, 2)]
public void Should_Warning_When_BatchExceedsLength()
{
var rs = Session.Execute(GetBatchAsSimpleStatement(5*1025));
Assert.NotNull(rs.Info.Warnings);
Assert.AreEqual(1, rs.Info.Warnings.Length);
StringAssert.Contains("batch", rs.Info.Warnings[0].ToLowerInvariant());
StringAssert.Contains("exceeding", rs.Info.Warnings[0].ToLowerInvariant());
}
[Test, TestCassandraVersion(2, 2)]
public void Should_WarningWithTrace_When_BatchExceedsLengthAndTraceEnabled()
{
var statement = GetBatchAsSimpleStatement(5 * 1025);
var rs = Session.Execute(statement.EnableTracing());
Assert.NotNull(rs.Info.QueryTrace);
Assert.NotNull(rs.Info.Warnings);
Assert.AreEqual(1, rs.Info.Warnings.Length);
StringAssert.Contains("batch", rs.Info.Warnings[0].ToLowerInvariant());
StringAssert.Contains("exceeding", rs.Info.Warnings[0].ToLowerInvariant());
}
[Test, TestCassandraVersion(2, 2)]
public void Should_ThrowInvalidException_When_BatchIsTooBig()
{
const int length = 50 * 1025;
Assert.Throws<InvalidQueryException>(() => Session.Execute(GetBatchAsSimpleStatement(length)));
Assert.Throws<InvalidQueryException>(() => Session.Execute(
GetBatchAsSimpleStatement(length).EnableTracing()));
Assert.Throws<InvalidQueryException>(() => Session.Execute(
GetBatchAsSimpleStatement(length).EnableTracing().SetOutgoingPayload(GetPayload())));
}
[Test, TestCassandraVersion(2, 2)]
public void Should_WarningAndGetPayload_When_UsingMirrorPayload()
{
var statement = GetBatchAsSimpleStatement(5*1025);
var outgoing = GetPayload();
var rs = Session.Execute(statement.SetOutgoingPayload(GetPayload()));
Assert.NotNull(rs.Info.Warnings);
Assert.AreEqual(1, rs.Info.Warnings.Length);
StringAssert.Contains("batch", rs.Info.Warnings[0].ToLowerInvariant());
StringAssert.Contains("exceeding", rs.Info.Warnings[0].ToLowerInvariant());
CollectionAssert.AreEqual(outgoing["k1"], rs.Info.IncomingPayload["k1"]);
CollectionAssert.AreEqual(outgoing["k2"], rs.Info.IncomingPayload["k2"]);
}
[Test, TestCassandraVersion(2, 2)]
public void Should_WarningAndTracingAndGetPayload_When_UsingMirrorPayloadAndEnableTracing()
{
var statement = GetBatchAsSimpleStatement(5*1025);
var outgoing = new Dictionary<string, byte[]>
{
{"k1", Encoding.UTF8.GetBytes("value1")},
{"k2", Encoding.UTF8.GetBytes("value2")}
};
var rs = Session.Execute(statement.SetOutgoingPayload(outgoing).EnableTracing());
Assert.NotNull(rs.Info.QueryTrace);
CollectionAssert.AreEqual(outgoing["k1"], rs.Info.IncomingPayload["k1"]);
CollectionAssert.AreEqual(outgoing["k2"], rs.Info.IncomingPayload["k2"]);
Assert.NotNull(rs.Info.Warnings);
Assert.AreEqual(1, rs.Info.Warnings.Length);
StringAssert.Contains("batch", rs.Info.Warnings[0].ToLowerInvariant());
StringAssert.Contains("exceeding", rs.Info.Warnings[0].ToLowerInvariant());
}
private static IStatement GetBatchAsSimpleStatement(int length)
{
const string query = "BEGIN UNLOGGED BATCH" +
" INSERT INTO {0} (k, t) VALUES ('key0', 'value0');" +
" INSERT INTO {0} (k, t) VALUES ('{1}', '{2}');" +
"APPLY BATCH";
return new SimpleStatement(string.Format(query, Table, "key1", String.Join("", Enumerable.Repeat("a", length))))
.SetConsistencyLevel(ConsistencyLevel.LocalQuorum);
}
private static IDictionary<string, byte[]> GetPayload()
{
return new Dictionary<string, byte[]>
{
{"k1", Encoding.UTF8.GetBytes("value1")},
{"k2", Encoding.UTF8.GetBytes("value2")}
};
}
}
}