Skip to content

Change session/cluster initialization to happen in the background CSHARP-916 #528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: 4.x
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
04f7b2a
add lazy initialization; change API to facilitate it
joao-r-reis Jun 23, 2020
cc0ebde
change LBP and SEP Initialize to be async
joao-r-reis Jun 24, 2020
37c35f9
fix deadlock in session shutdown
joao-r-reis Jun 24, 2020
3d9c256
add Session.ConnectAsync()
joao-r-reis Jun 24, 2020
57cc9a3
add upgrade guide section
joao-r-reis Jun 24, 2020
81d9da0
moved control connection creation to Metadata
joao-r-reis Jun 26, 2020
4baf9dc
add ShutdownAsync to speculative exec policy
joao-r-reis Jun 26, 2020
5dc87e5
introduce metadata internal and make metadata lazy (wip)
joao-r-reis Jun 29, 2020
0bd3a03
revamp Metadata
joao-r-reis Jun 30, 2020
5af864c
add Metadata parameter to LBP.Distance
joao-r-reis Jul 1, 2020
8e07076
wip on tests
joao-r-reis Jul 1, 2020
f7ba714
change metrics initialization to session constructor
joao-r-reis Jul 2, 2020
01f77c6
add faster initialized check
joao-r-reis Jul 2, 2020
f0725c1
move protocoleventdebouncer and serializermanager to configuration
joao-r-reis Jul 2, 2020
08a6abf
add event forwarding between internalmetadata and metadata
joao-r-reis Jul 2, 2020
f90006c
fix tests compilation errors
joao-r-reis Jul 2, 2020
aa6b759
fix cod analysis errors
joao-r-reis Jul 2, 2020
2ac8d45
implement init timeouts and reconnection; change LBP init (wip)
joao-r-reis Jul 6, 2020
ecf4b7e
fixed build errors and init bug
joao-r-reis Jul 7, 2020
301ff05
fix init bug
joao-r-reis Jul 7, 2020
a571479
moved init logic to single component
joao-r-reis Jul 7, 2020
6891087
replace .Result with GetAwaiter
joao-r-reis Jul 7, 2020
9b9054c
add inner exception to init error
joao-r-reis Jul 7, 2020
30a715c
change LBP API to remove deadlock
joao-r-reis Jul 7, 2020
de70171
fix init bugs and couple of tests
joao-r-reis Jul 7, 2020
ec0ffe8
fix couple of tests and fix memory leak
joao-r-reis Jul 8, 2020
75d498a
fix leak related to Task.Delay with cancellation
joao-r-reis Jul 8, 2020
b709c43
add session connect to backpressure tests
joao-r-reis Jul 8, 2020
8ab4fe2
fix net452 build
joao-r-reis Jul 8, 2020
e27af05
update upgrade guide
joao-r-reis Jul 8, 2020
bbf5181
remove unnecessary code
joao-r-reis Jul 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 90 additions & 1 deletion doc/upgrade-guide/upgrade-to-4x/README.md
Original file line number Diff line number Diff line change
@@ -23,4 +23,93 @@ The `usedHostsPerRemoteDc` parameter on the `DCAwareRoundRobinPolicy` gave the i

A good write up on datacenter failover describing some of these considerations can be found [here][dc-failover-post].

[dc-failover-post]: https://medium.com/@foundev/cassandra-local-quorum-should-stay-local-c174d555cc57
## Creation of `ISession` instances returns immediately

In C# driver 4.0, when you create an `ISession` instance, the initialization task will be started in the background. When a request is executed, the driver will wait for the initialization task to finish before sending the request.

If you want to explicitely wait for the initialization to be finished, you can use one of these new methods: `ISession.ConnectAsync()` / `ISession.Connect()`.

## Session initialization retries

When the control connection initialization fails because it wasn't able to reach any of the contact points, the driver will retry the initialization according to the configured reconnection policy.

While an initialization attempt is in progress, the session methods that require initialization (e.g. `session.Execute()`) block until it is finished. If the initialization fails, the methods that were blocked will throw an exception. Until a new initialization attempt begins, any call to a method that requires initialization will throw the same exception.

## Addition of several async methods (e.g. `Metadata.AllHostsAsync()`)

The `ICluster.Metadata` property has always blocked until the initialization task is finished but until now there weren't many issues with this behavior because the user would create a session right away which would initialize the `ICluster` instance in a blocking manner.

Now that the initialization happens in the background, the `ICluster.Metadata` property no longer blocks but the methods on `IMetadata` block until the initialization is finished. For users who use `async/await` we added `async` variants for all methods (e.g. `Metadata.AllHostsAsync()`).

Some methods were added that return a snapshot of the current metadata cache (e.g. `Metadata.AllHostsSnapshot()`). These methods do not block but will return empty collections if the initialization is not done:

- `IMetadata.AllHostsSnapshot()`
- `IMetadata.AllReplicasSnapshot()`
- `IMetadata.GetReplicasSnapshot()`

There are also some extension methods that require a session to be initialized so we have added async variants of those methods as well:

| Existing method | New async method | Namespace |
|------------------------------|-------------------------------|-----------------------|
| `CreateBatch(this ISession)` | `ISession.CreateBatchAsync()` | `Cassandra.Data.Linq` |
| `GetState(this ISession)` | `ISession.GetStateAsync()` | `Cassandra` |

There are also some properties that were moved to the `ClusterDescription` class. You can obtain a `ClusterDescription` instance via the `IMetadata.GetClusterDescriptionAsync()` method (or `IMetadata.GetClusterDescription()`).

| `Metadata.ClusterName` | `ClusterDescription.ClusterName` | `Cassandra` |
| `Metadata.IsDbaas` | `ClusterDescription.IsDbaas` | `Cassandra` |

## `Metadata` API changes

Several methods of the `ICluster` interface were actually wrappers around methods that are implemented in the `Metadata` class. We decided to move all metadata related elements from `ICluster`/`ISession` to the new `IMetadata` interface to simplify the driver's API, it makes more sense to have metadata related methods, properties and events on the `IMetadata` interface.

If you are using one of theses elements that were moved to the `IMetadata` interface, you can use `ICluster.Metadata` to access it. Note that some of these methods now block until the initialization is done so we added `async` variants for them (see previous section for an explanation about this).

These are the methods, properties and events that were affected:

| Old API | New API |
|------------------------------------|-----------------------------------|
| `ICluster.AllHosts()` | `IMetadata.AllHosts()` |
| `ICluster.GetHost()` | `IMetadata.GetHost()` |
| `ICluster.GetReplicas()` | `IMetadata.GetReplicas()` |
| `ICluster.RefreshSchema()` | `IMetadata.RefreshSchema()` |
| `ICluster.RefreshSchemaAsync()` | `IMetadata.RefreshSchemaAsync()` |
| `ICluster.HostAdded` | `IMetadata.HostAdded` |
| `ICluster.HostRemoved` | `IMetadata.HostRemoved` |
| `ISession.BinaryProtocolVersion` | `IMetadata.GetClusterDescription().ProtocolVersion` |

Note: `ClusterDescription.ProtocolVersion` returns an `enum` instead of `int`, you can cast this `enum` to `int` if you need it (`ISession.BinaryProtocolVersion` did this internally).

## Removal of `ISession.WaitForSchemaAgreement()`

When a DDL request is executed, the driver will wait for schema agreement before returning control to the user. See `ProtocolOptions.MaxSchemaAgreementWaitSeconds` for more info.

If you want to manually check for schema agreement you can use the `IMetadata.CheckSchemaAgreementAsync()` method.

## `Metadata` no longer implements `IDisposable`

The implementation of `IDisposable` was pretty much empty at this point so we decided to remove it.

We also removed `Metadata.ShutDown()` for the same reason.

## `ILoadBalancingPolicy` interface changes

You are only affected by these changes if you implemented a custom load balancing policy in your application instead of using one of those that are provided by the driver.

### `ILoadBalancingPolicy.Initialize()`

The `Initialize()` method is now `InitializeAsync()` and returns a `Task`. If the implementation is not async, we recommend returning `Task.CompletedTask` or `Task.FromResult(0)`.

`InitializeAsync()` now has a `IMetadataSnapshotProvider` parameter instead of `ICluster`. You can obtain the hosts collection and replicas using this instance (see the earlier section related to `Metadata` API changes for more information on these changes).

### `ILoadBalancingPolicy.NewQueryPlan()` and `ILoadBalancingPolicy.Distance()`

The `NewQueryPlan()` and `Distance()` methods now have a `ICluster` parameter.

This is to simplify the process of implementing a custom load balancing policy. Previously, all implementations had to be stateful and threadsafe, i.e., the `cluster` object that was provided in the `Initialize()` was necessary in order to implement the `NewQueryPlan()` method.

Now you can build a completely stateless load balancing policy (which is guaranteed to be threadsafe) by obtaining the hosts / replicas via the `ICluster` parameter in the `NewQueryPlan()` method. In this scenario you can have an implementation of the `InitializeAsync()` method that just returns `Task.CompletedTask` or `Task.FromResult(0)`.

You can still build more complex load balancing policies that access some kind of metadata service for example by implementing the `InitializeAsync()` method.

[dc-failover-post]: https://medium.com/@foundev/cassandra-local-quorum-should-stay-local-c174d555cc57
38 changes: 26 additions & 12 deletions src/Cassandra.IntegrationTests/Core/ClientTimeoutTests.cs
Original file line number Diff line number Diff line change
@@ -306,6 +306,7 @@ public void Should_Not_Leak_Connections_Test()
var clusters = Enumerable.Range(0, 100).Select(
b => ClusterBuilder()
.AddContactPoint(_testCluster.InitialContactPoint)
.WithReconnectionPolicy(new ConstantReconnectionPolicy(120 * 1000))
.WithSocketOptions(socketOptions)
.Build()).ToList();

@@ -315,7 +316,15 @@ public void Should_Not_Leak_Connections_Test()
{
try
{
await c.ConnectAsync().ConfigureAwait(false);
var session = await c.ConnectAsync().ConfigureAwait(false);
try
{
await session.ConnectAsync().ConfigureAwait(false);
}
finally
{
await session.ShutdownAsync().ConfigureAwait(false);
}
}
catch (NoHostAvailableException ex)
{
@@ -332,23 +341,31 @@ public void Should_Not_Leak_Connections_Test()
{
t.Dispose();
}

tasks = null;

GC.Collect();
Thread.Sleep(1000);
Thread.Sleep(3000);
GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);

decimal initialMemory = GC.GetTotalMemory(true);

const int length = 100;
const int length = 200;

tasks = clusters.Select(c => Task.Run(async () =>
{
for (var i = 0; i < length; i++)
{
try
{
await c.ConnectAsync().ConfigureAwait(false);
var session = await c.ConnectAsync().ConfigureAwait(false);
try
{
await session.ConnectAsync().ConfigureAwait(false);
}
finally
{
await session.ShutdownAsync().ConfigureAwait(false);
}
}
catch (NoHostAvailableException ex)
{
@@ -369,18 +386,15 @@ public void Should_Not_Leak_Connections_Test()

tasks = null;

GC.Collect();
Thread.Sleep(1000);
GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);

Assert.Less(GC.GetTotalMemory(true) / initialMemory, 1.5M,
"Should not exceed a 50% (1.5) more than was previously allocated");

}
finally
{
foreach (var c in clusters)
{
c.Dispose();
}
Task.WhenAll(clusters.Select(c => Task.Run(() => c.ShutdownAsync()))).GetAwaiter().GetResult();
}
}
}
4 changes: 2 additions & 2 deletions src/Cassandra.IntegrationTests/Core/ClientWarningsTests.cs
Original file line number Diff line number Diff line change
@@ -68,13 +68,13 @@ public void Should_QueryTrace_When_Enabled()
{
var rs = Session.Execute(new SimpleStatement("SELECT * from system.local").EnableTracing());
Assert.NotNull(rs.Info.QueryTrace);
var hosts = Session.Cluster.AllHosts();
var hosts = Session.Cluster.Metadata.AllHosts();
Assert.NotNull(hosts);
var coordinator = hosts.FirstOrDefault();
Assert.NotNull(coordinator);
Assert.AreEqual(coordinator.Address.Address, rs.Info.QueryTrace.Coordinator);
Assert.Greater(rs.Info.QueryTrace.Events.Count, 0);
if (Session.BinaryProtocolVersion >= 4)
if (Session.Cluster.Metadata.GetClusterDescription().ProtocolVersion >= ProtocolVersion.V4)
{
Assert.NotNull(rs.Info.QueryTrace.ClientAddress);
}
Original file line number Diff line number Diff line change
@@ -36,19 +36,19 @@ public void Should_SendRequestsToAllHosts_When_PeersOnSameAddress()
{
const string query = "SELECT * FROM ks.table";

Assert.AreEqual(3, Session.Cluster.AllHosts().Count);
Assert.IsTrue(Session.Cluster.AllHosts().All(h => h.IsUp));
Assert.AreEqual(1, Session.Cluster.AllHosts().Select(h => h.Address.Address).Distinct().Count());
Assert.AreEqual(3, Session.Cluster.AllHosts().Select(h => h.Address.Port).Distinct().Count());
Assert.AreEqual(3, Session.Cluster.Metadata.AllHosts().Count);
Assert.IsTrue(Session.Cluster.Metadata.AllHosts().All(h => h.IsUp));
Assert.AreEqual(1, Session.Cluster.Metadata.AllHosts().Select(h => h.Address.Address).Distinct().Count());
Assert.AreEqual(3, Session.Cluster.Metadata.AllHosts().Select(h => h.Address.Port).Distinct().Count());

foreach (var i in Enumerable.Range(0, 10))
{
// doesn't exist but doesn't matter
Session.Execute(query);
}

Assert.AreEqual(3, Session.Cluster.AllHosts().Count);
Assert.IsTrue(Session.Cluster.AllHosts().All(h => h.IsUp));
Assert.AreEqual(3, Session.Cluster.Metadata.AllHosts().Count);
Assert.IsTrue(Session.Cluster.Metadata.AllHosts().All(h => h.IsUp));

var queriesByNode = TestCluster.GetNodes().Select(n => n.GetQueries(query, QueryType.Query));
Assert.IsTrue(queriesByNode.All(queries => queries.Count >= 1));
Loading