Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: non-blocking sockets #1562

Draft
wants to merge 29 commits into
base: postgresql-dialect
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e182a20
perf: use non-blocking sockets
olavloite Mar 17, 2024
06e2903
chore: add missing copyright headers
olavloite Mar 17, 2024
c730315
chore: further support for non-blocking sockets
olavloite Mar 19, 2024
a0ba3e4
Merge branch 'postgresql-dialect' into non-blocking-socket
olavloite Mar 24, 2024
d260585
chore: more fixes for non-blocking sockets
olavloite Mar 27, 2024
720dc4e
Merge branch 'postgresql-dialect' into non-blocking-socket
olavloite Mar 30, 2024
020057e
test: fix tests + stop reader thread
olavloite Mar 30, 2024
dccadf6
refactor: listen to all localhost addresses
olavloite Mar 30, 2024
4114b12
chore: use wildcard address
olavloite Mar 30, 2024
6bb83ce
fix: handle copy messages
olavloite Mar 30, 2024
5a0247f
fix: create parent directory
olavloite Mar 30, 2024
c6305d8
chore: accept copy messages in all modes
olavloite Mar 30, 2024
b4ea859
test: skip copy tests + skip uds for SQLAlchemy
olavloite Mar 30, 2024
41e2c0f
test: skip uds for SQLAlchemy
olavloite Mar 30, 2024
c88d5a6
chore: log zero bytes read
olavloite Mar 30, 2024
b9f4f23
test: skip more uds tests
olavloite Mar 30, 2024
ca46056
chore: keep track of time between reads
olavloite Apr 1, 2024
f20f7cf
chore: log more warnings
olavloite Apr 1, 2024
feb8912
chore: more logging
olavloite Apr 1, 2024
e2b1cb7
chore: more logging
olavloite Apr 1, 2024
2495839
test: always loop through selected keys
olavloite Apr 1, 2024
6763a65
chore: more logging
olavloite Apr 1, 2024
7a32a1a
fix: start time before end time
olavloite Apr 1, 2024
df7c41d
fix: only log each 10 seconds
olavloite Apr 1, 2024
5018275
test: wait at most 60 seconds for a read
olavloite Apr 1, 2024
315c59e
test: always select and iterate over keys
olavloite Apr 1, 2024
4a1fd32
chore: go back to conditional select
olavloite Apr 1, 2024
43b63a1
test: bind to loopback
olavloite Apr 1, 2024
d65455a
chore: go back to wildcard address
olavloite Apr 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/latency-comparison/golang/runners/pgx_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func RunPgx(database, sql string, readWrite bool, numOperations, numClients, wai
// Connect to Cloud Spanner through PGAdapter.
var connString string
if useUnixSocket {
connString = fmt.Sprintf("host=%s port=%d database=%s", host, port, url.QueryEscape(database))
connString = fmt.Sprintf("host=%s port=%d database=%s", host, port, database)
} else {
connString = fmt.Sprintf("postgres://uid:pwd@%s:%d/%s?sslmode=disable", host, port, url.QueryEscape(database))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.spanner.Instance;
import com.google.cloud.spanner.InstanceAdminClient;
import com.google.cloud.spanner.InstanceNotFoundException;
import com.google.cloud.spanner.SessionPoolOptions;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
Expand Down Expand Up @@ -56,6 +57,7 @@
import com.google.cloud.spanner.pgadapter.wireoutput.ReadyResponse;
import com.google.cloud.spanner.pgadapter.wireoutput.TerminateResponse;
import com.google.cloud.spanner.pgadapter.wireprotocol.BootstrapMessage;
import com.google.cloud.spanner.pgadapter.wireprotocol.ControlMessage;
import com.google.cloud.spanner.pgadapter.wireprotocol.ParseMessage;
import com.google.cloud.spanner.pgadapter.wireprotocol.SSLMessage;
import com.google.cloud.spanner.pgadapter.wireprotocol.WireMessage;
Expand Down Expand Up @@ -108,7 +110,7 @@ public class ConnectionHandler implements Runnable {
private static final String CHANNEL_PROVIDER_PROPERTY = "CHANNEL_PROVIDER";

private final ProxyServer server;
private Socket socket;
protected Socket socket;
private final Map<String, IntermediatePreparedStatement> statementsMap = new HashMap<>();
private final Cache<String, Future<DescribeResult>> autoDescribedStatementsCache =
CacheBuilder.newBuilder()
Expand All @@ -120,6 +122,7 @@ public class ConnectionHandler implements Runnable {
private static final Map<Integer, ConnectionHandler> CONNECTION_HANDLERS =
new ConcurrentHashMap<>();
private volatile ConnectionStatus status = ConnectionStatus.UNAUTHENTICATED;
private Map<String, String> connectionParameters;
private Thread thread;
private final int connectionId;
private final int secret;
Expand All @@ -145,7 +148,7 @@ public class ConnectionHandler implements Runnable {
private final LinkedList<ParseMessage> skippedAutoDetectParseMessages = new LinkedList<>();

private ExtendedQueryProtocolHandler extendedQueryProtocolHandler;
private CopyStatement activeCopyStatement;
private volatile CopyStatement activeCopyStatement;

ConnectionHandler(ProxyServer server, Socket socket) {
this(server, socket, null);
Expand Down Expand Up @@ -208,6 +211,9 @@ public void connectToSpanner(String database, @Nullable Credentials credentials)
if (options.getSessionPoolOptions() != null) {
connectionOptionsBuilder =
connectionOptionsBuilder.setSessionPoolOptions(options.getSessionPoolOptions());
} else {
connectionOptionsBuilder.setSessionPoolOptions(
SessionPoolOptions.newBuilder().setTrackStackTraceOfSessionCheckout(false).build());
}
if (options.isEnableOpenTelemetryMetrics()) {
SpannerOptions.enableOpenTelemetryMetrics();
Expand Down Expand Up @@ -365,18 +371,29 @@ enum RunConnectionState {
TERMINATED
}

public BootstrapMessage readBootstrapMessage() throws Exception {
return BootstrapMessage.create(this);
}

public ControlMessage readControlMessage() throws Exception {
return ControlMessage.create(this);
}

protected ConnectionMetadata createConnectionMetadata() throws IOException {
return new ConnectionMetadata(this.socket.getInputStream(), this.socket.getOutputStream());
}

/**
* Starts listening for incoming messages on the network socket. Returns RESTART_WITH_SSL if the
* listening process should be restarted with SSL.
*/
private RunConnectionState runConnection(boolean ssl) {
RunConnectionState result = RunConnectionState.TERMINATED;
try (ConnectionMetadata connectionMetadata =
new ConnectionMetadata(this.socket.getInputStream(), this.socket.getOutputStream())) {
try (ConnectionMetadata connectionMetadata = createConnectionMetadata()) {
this.connectionMetadata = connectionMetadata;

try {
this.message = this.server.recordMessage(BootstrapMessage.create(this));
this.message = this.server.recordMessage(readBootstrapMessage());
if (!ssl
&& getServer().getOptions().getSslMode().isSslEnabled()
&& this.message instanceof SSLMessage) {
Expand Down Expand Up @@ -435,7 +452,7 @@ && getServer().getOptions().getSslMode().isSslEnabled()
if (this.spannerConnection != null) {
this.spannerConnection.close();
}
this.socket.close();
closeSocket();
} catch (SpannerException | IOException e) {
logger.log(
Level.WARNING,
Expand All @@ -457,6 +474,10 @@ && getServer().getOptions().getSslMode().isSslEnabled()
return result;
}

protected void closeSocket() throws IOException {
this.socket.close();
}

boolean checkValidConnection(boolean ssl) throws Exception {
// Allow SSL connections from non-localhost even if the localhost check has not explicitly
// been disabled.
Expand Down Expand Up @@ -546,7 +567,7 @@ void terminate() {
* @param exception The exception to be related.
* @throws IOException if there is some issue in the sending of the error messages.
*/
void handleError(PGException exception) throws Exception {
void handleError(PGException exception) throws IOException {
logger.log(
Level.WARNING,
exception,
Expand Down Expand Up @@ -771,22 +792,39 @@ public void clearInvalidMessageCount() {
this.invalidMessagesCount = 0;
}

public boolean supportsPeekNextByte() {
return true;
}

public ConnectionMetadata getConnectionMetadata() {
return connectionMetadata;
}

protected void setConnectionMetadata(ConnectionMetadata connectionMetadata) {
this.connectionMetadata = connectionMetadata;
}

public ExtendedQueryProtocolHandler getExtendedQueryProtocolHandler() {
return extendedQueryProtocolHandler;
}

public synchronized ConnectionStatus getStatus() {
public ConnectionStatus getStatus() {
return status;
}

public synchronized void setStatus(ConnectionStatus status) {
public void setStatus(ConnectionStatus status) {
this.status = status;
}

public void setStatus(ConnectionStatus status, Map<String, String> connectionParameters) {
this.status = status;
this.connectionParameters = connectionParameters;
}

public Map<String, String> getConnectionParameters() {
return this.connectionParameters;
}

public WellKnownClient getWellKnownClient() {
return wellKnownClient;
}
Expand Down Expand Up @@ -894,6 +932,7 @@ boolean isHasDeterminedClientUsingQuery() {
/** Status of a {@link ConnectionHandler} */
public enum ConnectionStatus {
UNAUTHENTICATED,
AUTHENTICATING,
AUTHENTICATED,
COPY_IN,
COPY_DONE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.cloud.spanner.pgadapter;

import com.google.cloud.spanner.pgadapter.metadata.ChannelOutputStream;
import com.google.cloud.spanner.pgadapter.metadata.ConnectionMetadata;
import com.google.cloud.spanner.pgadapter.metadata.ForwardingInputStream;
import com.google.cloud.spanner.pgadapter.wireprotocol.BootstrapMessage;
import com.google.cloud.spanner.pgadapter.wireprotocol.ControlMessage;
import com.google.cloud.spanner.pgadapter.wireprotocol.FlushMessage;
import com.google.cloud.spanner.pgadapter.wireprotocol.SyncMessage;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;

public class NonBlockingConnectionHandler extends ConnectionHandler {
private static final int DEFAULT_BUFFER_CAPACITY = 1 << 13;

private static final Logger logger =
Logger.getLogger(NonBlockingConnectionHandler.class.getName());

private final ByteBuffer headerBuffer = ByteBuffer.allocateDirect(5);

private ByteBuffer messageBuffer = ByteBuffer.allocateDirect(DEFAULT_BUFFER_CAPACITY);

private final BlockingQueue<BootstrapMessage> bootstrapMessages = new LinkedBlockingQueue<>();

private final BlockingQueue<ControlMessage> controlMessages = new LinkedBlockingQueue<>();

private final ForwardingInputStream forwardingInputStream = new ForwardingInputStream();

private final SocketChannel channel;

NonBlockingConnectionHandler(ProxyServer server, SocketChannel channel) {
super(server, channel.socket());
this.channel = channel;
setConnectionMetadata(
new ConnectionMetadata(forwardingInputStream, new ChannelOutputStream(channel)));
}

@Override
void createSSLSocket() throws IOException {
throw new IOException("SSL is not supported for non-blocking connection handlers");

Check warning on line 59 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingConnectionHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingConnectionHandler.java#L59

Added line #L59 was not covered by tests
}

ByteBuffer getHeaderBuffer() {
this.headerBuffer.rewind();
return this.headerBuffer;
}

ByteBuffer getMessageBuffer(int length) {
if (this.messageBuffer.capacity() < length) {
this.messageBuffer = ByteBuffer.allocateDirect(length);
} else {
this.messageBuffer.rewind();
this.messageBuffer.limit(length);
}
return this.messageBuffer;
}

void setRawInputStream(InputStream inputStream) {
this.forwardingInputStream.setDelegate(inputStream);
}

@Override
public boolean supportsPeekNextByte() {
return false;
}

@Override
protected ConnectionMetadata createConnectionMetadata() {
return getConnectionMetadata();
}

@Override
protected void closeSocket() throws IOException {
try {
this.channel.close();
} catch (IOException ioException) {
ioException.printStackTrace();
throw ioException;
} catch (Throwable t) {
t.printStackTrace();

Check warning on line 99 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingConnectionHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingConnectionHandler.java#L95-L99

Added lines #L95 - L99 were not covered by tests
}
}

void addBootstrapMessage(BootstrapMessage bootstrapMessage) {
this.bootstrapMessages.add(bootstrapMessage);
}

@Override
public BootstrapMessage readBootstrapMessage() throws Exception {
return this.bootstrapMessages.take();
}

void addControlMessage(ControlMessage controlMessage) {
this.controlMessages.add(controlMessage);
}

@Override
public ControlMessage readControlMessage() throws Exception {
if (getStatus() == ConnectionStatus.COPY_IN) {
while (true) {
ControlMessage message = this.controlMessages.take();
if (message instanceof FlushMessage || message instanceof SyncMessage) {
continue;

Check warning on line 122 in src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingConnectionHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/google/cloud/spanner/pgadapter/NonBlockingConnectionHandler.java#L122

Added line #L122 was not covered by tests
}
return message;
}
} else {
return this.controlMessages.take();
}
}
}
Loading
Loading