Skip to content

Proposal for read back-pressure #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ ext {
// Composition Libraries
rxjava1Version = '1.0.8'
rxjavaRsVersion = '0.5.0'
reactorVersion = '2.0.3.RELEASE'
reactorVersion = '2.1.0.BUILD-SNAPSHOT'

// Testing
mockitoVersion = '1.10.19'
Expand Down Expand Up @@ -113,6 +113,7 @@ project('ripc-transport-netty4-examples') {
dependencies {
// ripc-core
compile project(":ripc-transport-netty4"),
"com.fasterxml.jackson.core:jackson-databind:2.5.4",
"io.reactivex:rxjava:$rxjava1Version",
"io.reactivex:rxjava-reactive-streams:$rxjavaRsVersion"
runtime "ch.qos.logback:logback-classic:$logbackVersion"
Expand Down Expand Up @@ -163,8 +164,8 @@ project('ripc-reactor-examples') {
dependencies {
// ripc-tcp
compile project(":ripc-reactor"),
project(":ripc-transport-netty4")

project(":ripc-transport-netty4"),
"com.fasterxml.jackson.core:jackson-databind:2.5.4"
runtime "ch.qos.logback:logback-classic:$logbackVersion"
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.netty.handler.codec.json;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.channel.ChannelPipeline;

import java.util.List;

/**
* NOTE: This class was copied from Netty 4.1 sources.
*
* Splits a byte stream of JSON objects and arrays into individual objects/arrays and passes them up the
* {@link ChannelPipeline}.
*
* This class does not do any real parsing or validation. A sequence of bytes is considered a JSON object/array
* if it contains a matching number of opening and closing braces/brackets. It's up to a subsequent
* {@link ChannelHandler} to parse the JSON text into a more usable form i.e. a POJO.
*/
public class JsonObjectDecoder extends ByteToMessageDecoder {

private static final int ST_CORRUPTED = -1;
private static final int ST_INIT = 0;
private static final int ST_DECODING_NORMAL = 1;
private static final int ST_DECODING_ARRAY_STREAM = 2;

private int openBraces;
private int idx;

private int state;
private boolean insideString;

private final int maxObjectLength;
private final boolean streamArrayElements;

public JsonObjectDecoder() {
// 1 MB
this(1024 * 1024);
}

public JsonObjectDecoder(int maxObjectLength) {
this(maxObjectLength, false);
}

public JsonObjectDecoder(boolean streamArrayElements) {
this(1024 * 1024, streamArrayElements);
}

/**
* @param maxObjectLength maximum number of bytes a JSON object/array may use (including braces and all).
* Objects exceeding this length are dropped and an {@link TooLongFrameException}
* is thrown.
* @param streamArrayElements if set to true and the "top level" JSON object is an array, each of its entries
* is passed through the pipeline individually and immediately after it was fully
* received, allowing for arrays with "infinitely" many elements.
*
*/
public JsonObjectDecoder(int maxObjectLength, boolean streamArrayElements) {
if (maxObjectLength < 1) {
throw new IllegalArgumentException("maxObjectLength must be a positive int");
}
this.maxObjectLength = maxObjectLength;
this.streamArrayElements = streamArrayElements;
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (state == ST_CORRUPTED) {
in.skipBytes(in.readableBytes());
return;
}

// index of next byte to process.
int idx = this.idx;
int wrtIdx = in.writerIndex();

if (wrtIdx > maxObjectLength) {
// buffer size exceeded maxObjectLength; discarding the complete buffer.
in.skipBytes(in.readableBytes());
reset();
throw new TooLongFrameException(
"object length exceeds " + maxObjectLength + ": " + wrtIdx + " bytes discarded");
}

for (/* use current idx */; idx < wrtIdx; idx++) {
byte c = in.getByte(idx);
if (state == ST_DECODING_NORMAL) {
decodeByte(c, in, idx);

// All opening braces/brackets have been closed. That's enough to conclude
// that the JSON object/array is complete.
if (openBraces == 0) {
ByteBuf json = extractObject(ctx, in, in.readerIndex(), idx + 1 - in.readerIndex());
if (json != null) {
out.add(json);
}

// The JSON object/array was extracted => discard the bytes from
// the input buffer.
in.readerIndex(idx + 1);
// Reset the object state to get ready for the next JSON object/text
// coming along the byte stream.
reset();
}
} else if (state == ST_DECODING_ARRAY_STREAM) {
decodeByte(c, in, idx);

if (!insideString && (openBraces == 1 && c == ',' || openBraces == 0 && c == ']')) {
// skip leading spaces. No range check is needed and the loop will terminate
// because the byte at position idx is not a whitespace.
for (int i = in.readerIndex(); Character.isWhitespace(in.getByte(i)); i++) {
in.skipBytes(1);
}

// skip trailing spaces.
int idxNoSpaces = idx - 1;
while (idxNoSpaces >= in.readerIndex() && Character.isWhitespace(in.getByte(idxNoSpaces))) {
idxNoSpaces--;
}

ByteBuf json = extractObject(ctx, in, in.readerIndex(), idxNoSpaces + 1 - in.readerIndex());
if (json != null) {
out.add(json);
}

in.readerIndex(idx + 1);

if (c == ']') {
reset();
}
}
// JSON object/array detected. Accumulate bytes until all braces/brackets are closed.
} else if (c == '{' || c == '[') {
initDecoding(c);

if (state == ST_DECODING_ARRAY_STREAM) {
// Discard the array bracket
in.skipBytes(1);
}
// Discard leading spaces in front of a JSON object/array.
} else if (Character.isWhitespace(c)) {
in.skipBytes(1);
} else {
state = ST_CORRUPTED;
throw new CorruptedFrameException(
"invalid JSON received at byte position " + idx + ": " + ByteBufUtil.hexDump(in));
}
}

if (in.readableBytes() == 0) {
this.idx = 0;
} else {
this.idx = idx;
}
}

/**
* Override this method if you want to filter the json objects/arrays that get passed through the pipeline.
*/
@SuppressWarnings("UnusedParameters")
protected ByteBuf extractObject(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
return buffer.slice(index, length).retain();
}

private void decodeByte(byte c, ByteBuf in, int idx) {
if ((c == '{' || c == '[') && !insideString) {
openBraces++;
} else if ((c == '}' || c == ']') && !insideString) {
openBraces--;
} else if (c == '"') {
// start of a new JSON string. It's necessary to detect strings as they may
// also contain braces/brackets and that could lead to incorrect results.
if (!insideString) {
insideString = true;
// If the double quote wasn't escaped then this is the end of a string.
} else if (in.getByte(idx - 1) != '\\') {
insideString = false;
}
}
}

private void initDecoding(byte openingBrace) {
openBraces = 1;
if (openingBrace == '[' && streamArrayElements) {
state = ST_DECODING_ARRAY_STREAM;
} else {
state = ST_DECODING_NORMAL;
}
}

private void reset() {
insideString = false;
state = ST_INIT;
openBraces = 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package io.ripc.reactor.protocol.tcp;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.json.JsonObjectDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import io.ripc.protocol.tcp.TcpServer;
import io.ripc.transport.netty4.tcp.Netty4TcpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.rx.Streams;

import java.nio.charset.Charset;

public class CodecSample {

private static final Logger LOG = LoggerFactory.getLogger(CodecSample.class);


public static void main(String... args) throws InterruptedException {
//runLineBasedFrameDecoder();
echoJsonStreamDecoding();
}

private static void runLineBasedFrameDecoder() {

TcpServer<String, String> transport = Netty4TcpServer.<String, String>create(
0,
new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
int bufferSize = 1;
ChannelConfig config = channel.config();
config.setOption(ChannelOption.SO_RCVBUF, bufferSize);
config.setOption(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferSize));
channel.pipeline().addFirst(
new LineBasedFrameDecoder(256),
new StringDecoder(CharsetUtil.UTF_8),
new StringEncoder(CharsetUtil.UTF_8));
}
});

ReactorTcpServer.create(transport).start(connection -> {
connection.log("input")
.observeComplete(v -> LOG.info("Connection input complete"))
.capacity(1)
.consume(line -> {
String response = "Hello " + line + "\n";
Streams.wrap(connection.writeWith(Streams.just(response))).consume();
});
return Streams.never();
});
}

private static void echoJsonStreamDecoding() {

TcpServer<Person, Person> transport = Netty4TcpServer.<Person, Person>create(
0,
new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addFirst(
new JsonObjectDecoder(),
new JacksonJsonCodec());
}
});

ReactorTcpServer.create(transport)
.start(connection -> {
connection.log("input")
.observeComplete(v -> LOG.info("Connection input complete"))
.capacity(1)
.consume(person -> {
person = new Person(person.getLastName(), person.getFirstName());
Streams.wrap(connection.writeWith(Streams.just(person))).consume();
});
return Streams.never();
});

}

private static class JacksonJsonCodec extends ChannelDuplexHandler {

private final ObjectMapper mapper = new ObjectMapper();

@Override
public void channelRead(ChannelHandlerContext context, Object message) throws Exception {
if (message instanceof ByteBuf) {
Charset charset = Charset.defaultCharset();
message = this.mapper.readValue(((ByteBuf) message).toString(charset), Person.class);
}
super.channelRead(context, message);
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof Person) {
byte[] buff = mapper.writeValueAsBytes(msg);
ByteBuf bb = ctx.alloc().buffer(buff.length);
bb.writeBytes(buff);
msg = bb;
}
super.write(ctx, msg, promise);
}
}

private static class Person {

private String firstName;

private String lastName;

public Person() {
}

public Person(String firstName, String lastName) {
this.firstName = firstName;
this.lastName = lastName;
}

public String getFirstName() {
return firstName;
}

public Person setFirstName(String firstName) {
this.firstName = firstName;
return this;
}

public String getLastName() {
return lastName;
}

public Person setLastName(String lastName) {
this.lastName = lastName;
return this;
}
}

}
Loading