Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
build/
pom.xml
target/
.idea/
/bin/
.classpath
.project
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ test_LIBADD := \
package_dir := $(subst .,/,$(package))
AM_JAVACFLAGS := -Xlint -source 6 -target 6
JAVAC := javac
JVM_ARGS :=
JVM_ARGS := -Dio.netty.noPreferDirect=true -Dio.netty.noUnsafe=true
PROTOC := protoc
classes := $(asynchbase_SOURCES:src/%.java=$(top_builddir)/$(package_dir)/%.class) \
$(protobuf_SOURCES:src/%.java=$(top_builddir)/com/google/%.class) \
Expand Down Expand Up @@ -290,7 +290,7 @@ $(jar): $(top_builddir)/manifest $(top_builddir)/.javac-stamp $(classes)
doc: $(top_builddir)/api/index.html

JDK_JAVADOC := http://download.oracle.com/javase/6/docs/api
NETTY_JAVADOC := http://docs.jboss.org/netty/3.2/api
NETTY_JAVADOC := http://netty.io/4.0/api/index.html
SUASYNC_JAVADOC := http://tsunanet.net/~tsuna/async/api
GUAVA_JAVADOC := http://docs.guava-libraries.googlecode.com/git/javadoc
JAVADOCS = $(JDK_JAVADOC) $(NETTY_JAVADOC) $(SUASYNC_JAVADOC) $(GUAVA_JAVADOC)
Expand Down
4 changes: 3 additions & 1 deletion asynchbase.conf
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,6 @@ hbase.zookeeper.quorum=localhost

# The context name to use in the jaas.conf file you supplied to
# java.security.login.auth.config when starting the JVM
#hbase.sasl.clientconfig=Client
#hbase.sasl.clientconfig=Client

io.netty.noPreferDirect=true
2 changes: 1 addition & 1 deletion logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</encoder>
</appender>

<root level="info">
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
10 changes: 8 additions & 2 deletions pom.xml.in
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>@NETTY_VERSION@</version>
</dependency>

Expand Down Expand Up @@ -232,6 +232,12 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.13</version>
</dependency>

<!-- test dependencies -->

<dependency>
Expand Down
21 changes: 10 additions & 11 deletions src/AppendRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,15 @@
*/
package org.hbase.async;

import java.util.ArrayList;

import com.google.protobuf.ByteString;

import io.netty.buffer.ByteBuf;
import org.hbase.async.generated.ClientPB;
import org.jboss.netty.buffer.ChannelBuffer;
import org.hbase.async.generated.ClientPB.MutateRequest;
import org.hbase.async.generated.ClientPB.MutationProto;
import org.hbase.async.generated.HBasePB;

import java.util.ArrayList;

/**
* Appends data to one or more columns in HBase, creating the columns if they
* do not exist. The {@code value} is simply concatenated with whatever is
Expand Down Expand Up @@ -486,7 +485,7 @@ int payloadSize() {
}

@Override
void serializePayload(final ChannelBuffer buf) {
void serializePayload(final ByteBuf buf) {
for (int i = 0; i < qualifiers.length; i++) {
//HBASE KeyValue (org.apache.hadoop.hbase.KeyValue) doesn't have an Append Type
KeyValue.serialize(buf, KeyValue.PUT, timestamp, key, family,
Expand Down Expand Up @@ -578,7 +577,7 @@ MutationProto toMutationProto() {

/** Serializes this request. */
@Override
ChannelBuffer serialize(final byte server_version) {
ByteBuf serialize(final byte server_version) {
if (server_version < RegionClient.SERVER_VERSION_095_OR_ABOVE) {
return serializeOld(server_version);
}
Expand All @@ -587,12 +586,12 @@ ChannelBuffer serialize(final byte server_version) {
.setRegion(region.toProtobuf())
.setMutation(toMutationProto())
.build();
return toChannelBuffer(MUTATE, req);
return toByteBuf(MUTATE, req);
}

/** Serializes this request for HBase 0.94 and before. */
private ChannelBuffer serializeOld(final byte server_version) {
final ChannelBuffer buf = newBuffer(server_version,
private ByteBuf serializeOld(final byte server_version) {
final ByteBuf buf = newBuffer(server_version,
predictSerializedSize());
buf.writeInt(2); // Number of parameters.

Expand All @@ -606,7 +605,7 @@ private ChannelBuffer serializeOld(final byte server_version) {
}

/** Serialize the raw underlying `Append' into the given buffer. */
void serializeInto(final ChannelBuffer buf) {
void serializeInto(final ByteBuf buf) {
buf.writeByte(CODE); // Code for a `Append' parameter.
buf.writeByte(CODE); // Code again (see HBASE-2877).
buf.writeByte(1); // Append#APPENDT_VERSION. Stick to v1 here for now.
Expand All @@ -631,7 +630,7 @@ void serializeInto(final ChannelBuffer buf) {
}

@Override
Object deserialize(ChannelBuffer buf, int cell_size) {
Object deserialize(ByteBuf buf, int cell_size) {
if (!this.return_result) {
HBaseRpc.ensureNoCell(cell_size);
}
Expand Down
12 changes: 6 additions & 6 deletions src/AtomicIncrementRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import java.util.ArrayList;

import org.jboss.netty.buffer.ChannelBuffer;
import io.netty.buffer.ByteBuf;

import org.hbase.async.generated.ClientPB.MutateRequest;
import org.hbase.async.generated.ClientPB.MutateResponse;
Expand Down Expand Up @@ -219,7 +219,7 @@ private int predictSerializedSize() {
}

/** Serializes this request. */
ChannelBuffer serialize(final byte server_version) {
ByteBuf serialize(final byte server_version) {
if (server_version < RegionClient.SERVER_VERSION_095_OR_ABOVE) {
return serializeOld(server_version);
}
Expand All @@ -244,12 +244,12 @@ ChannelBuffer serialize(final byte server_version) {
.setRegion(region.toProtobuf())
.setMutation(incr.build())
.build();
return toChannelBuffer(MUTATE, req);
return toByteBuf(MUTATE, req);
}

/** Serializes this request for HBase 0.94 and before. */
private ChannelBuffer serializeOld(final byte server_version) {
final ChannelBuffer buf = newBuffer(server_version,
private ByteBuf serializeOld(final byte server_version) {
final ByteBuf buf = newBuffer(server_version,
predictSerializedSize());
buf.writeInt(6); // Number of parameters.

Expand All @@ -264,7 +264,7 @@ private ChannelBuffer serializeOld(final byte server_version) {
}

@Override
Object deserialize(final ChannelBuffer buf, int cell_size) {
Object deserialize(final ByteBuf buf, int cell_size) {
final MutateResponse resp = readProtobuf(buf, MutateResponse.PARSER);
// An increment must always produce a result, so we shouldn't need to
// check whether the `result' field is set here.
Expand Down
5 changes: 2 additions & 3 deletions src/BatchableRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
package org.hbase.async;

import org.jboss.netty.buffer.ChannelBuffer;
import io.netty.buffer.ByteBuf;

import org.hbase.async.generated.ClientPB.MutationProto;

Expand Down Expand Up @@ -71,7 +71,6 @@ abstract class BatchableRpc extends HBaseRpc
/**
* Package private constructor.
* @param table The name of the table this RPC is for.
* @param row The name of the row this RPC is for.
* @param family The column family to edit in that table. Subclass must
* validate, this class doesn't perform any validation on the family.
* @param timestamp The timestamp to use for {@link KeyValue}s of this RPC.
Expand Down Expand Up @@ -168,6 +167,6 @@ final boolean canBuffer() {
/**
* Serialize the part of this RPC for a {@link MultiAction}.
*/
abstract void serializePayload(final ChannelBuffer buf);
abstract void serializePayload(final ByteBuf buf);

}
4 changes: 2 additions & 2 deletions src/BinaryComparator.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
package org.hbase.async;

import com.google.protobuf.ByteString;
import org.jboss.netty.buffer.ChannelBuffer;
import io.netty.buffer.ByteBuf;

import org.hbase.async.generated.ComparatorPB;

Expand Down Expand Up @@ -73,7 +73,7 @@ ComparatorPB.Comparator toProtobuf() {
}

@Override
void serializeOld(ChannelBuffer buf) {
void serializeOld(ByteBuf buf) {
super.serializeOld(buf); // super.predictSerializedSize()
// Write class code
buf.writeByte(CODE); // 1
Expand Down
4 changes: 2 additions & 2 deletions src/BinaryPrefixComparator.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
package org.hbase.async;

import com.google.protobuf.ByteString;
import org.jboss.netty.buffer.ChannelBuffer;
import io.netty.buffer.ByteBuf;

import org.hbase.async.generated.ComparatorPB;

Expand Down Expand Up @@ -73,7 +73,7 @@ ComparatorPB.Comparator toProtobuf() {
}

@Override
void serializeOld(ChannelBuffer buf) {
void serializeOld(ByteBuf buf) {
super.serializeOld(buf); // super.predictSerializedSize()
// Write class code
buf.writeByte(CODE); // 1
Expand Down
4 changes: 2 additions & 2 deletions src/BitComparator.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
package org.hbase.async;

import com.google.protobuf.ByteString;
import org.jboss.netty.buffer.ChannelBuffer;
import io.netty.buffer.ByteBuf;

import org.hbase.async.generated.ComparatorPB;

Expand Down Expand Up @@ -94,7 +94,7 @@ ComparatorPB.Comparator toProtobuf() {
}

@Override
void serializeOld(ChannelBuffer buf) {
void serializeOld(ByteBuf buf) {
super.serializeOld(buf); // super.predictSerializedSize()
// Write class code
buf.writeByte(CODE); // 1
Expand Down
27 changes: 13 additions & 14 deletions src/Bytes.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.ZeroCopyLiteralByteString;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.util.CharsetUtil;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;

/**
* Helper functions to manipulate byte arrays.
Expand Down Expand Up @@ -429,11 +429,11 @@ public static String hex(long v) {
// Ugly stuff
// ----------
// Background: when using ReplayingDecoder (which makes it easy to deal with
// unframed RPC responses), the ChannelBuffer we manipulate is in fact a
// ReplayingDecoderBuffer, a package-private class that Netty uses. This
// unframed RPC responses), the ByteBuf we manipulate is in fact a
// ReplayingDecoderByteBuf, a package-private class that Netty uses. This
// class, for some reason, throws UnsupportedOperationException on its
// array() method. This method is unfortunately the only way to easily dump
// the contents of a ChannelBuffer, which is useful for debugging or logging
// the contents of a ByteBuf, which is useful for debugging or logging
// unexpected buffers. An issue (NETTY-346) has been filed to get access to
// the buffer, but the resolution was useless: instead of making the array()
// method work, a new internalBuffer() method was added on ReplayingDecoder,
Expand All @@ -445,16 +445,15 @@ public static String hex(long v) {
// a different hack. Yes this is horrible, but it's for the greater good as
// this is what allows us to debug unexpected buffers when deserializing RPCs
// and what's more important than being able to debug unexpected stuff?
private static final Class<?> ReplayingDecoderBuffer;
private static final Class<?> ReplayingDecoderByteBuf;
private static final Field RDB_buffer; // For Netty 3.5.0 and before.
private static final Method RDB_buf; // For Netty 3.5.1 and above.
static {
try {
ReplayingDecoderBuffer = Class.forName("org.jboss.netty.handler.codec."
+ "replay.ReplayingDecoderBuffer");
ReplayingDecoderByteBuf = Class.forName("io.netty.handler.codec.ReplayingDecoderByteBuf");
Field field = null;
try {
field = ReplayingDecoderBuffer.getDeclaredField("buffer");
field = ReplayingDecoderByteBuf.getDeclaredField("buffer");
field.setAccessible(true);
} catch (NoSuchFieldException e) {
// Ignore. Field has been removed in Netty 3.5.1.
Expand All @@ -463,7 +462,7 @@ public static String hex(long v) {
if (field != null) { // Netty 3.5.0 or before.
RDB_buf = null;
} else {
RDB_buf = ReplayingDecoderBuffer.getDeclaredMethod("buf");
RDB_buf = ReplayingDecoderByteBuf.getDeclaredMethod("buf");
RDB_buf.setAccessible(true);
}
} catch (Exception e) {
Expand All @@ -476,18 +475,18 @@ public static String hex(long v) {
* @param buf The (possibly {@code null}) buffer to pretty-print.
* @return The buffer in a pretty-printed string.
*/
public static String pretty(final ChannelBuffer buf) {
public static String pretty(final ByteBuf buf) {
if (buf == null) {
return "null";
}
byte[] array;
try {
if (buf.getClass() != ReplayingDecoderBuffer) {
if (buf.getClass() != ReplayingDecoderByteBuf) {
array = buf.array();
} else if (RDB_buf != null) { // Netty 3.5.1 and above.
array = ((ChannelBuffer) RDB_buf.invoke(buf)).array();
array = ((ByteBuf) RDB_buf.invoke(buf)).array();
} else { // Netty 3.5.0 and before.
final ChannelBuffer wrapped_buf = (ChannelBuffer) RDB_buffer.get(buf);
final ByteBuf wrapped_buf = (ByteBuf) RDB_buffer.get(buf);
array = wrapped_buf.array();
}
} catch (UnsupportedOperationException e) {
Expand Down
6 changes: 2 additions & 4 deletions src/ColumnPaginationFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
*/
package org.hbase.async;

import java.lang.UnsupportedOperationException;

import org.jboss.netty.buffer.ChannelBuffer;
import io.netty.buffer.ByteBuf;
import org.hbase.async.generated.FilterPB;

/**
Expand Down Expand Up @@ -110,7 +108,7 @@ int predictSerializedSize() {
}

@Override
void serializeOld(final ChannelBuffer buf) {
void serializeOld(final ByteBuf buf) {
buf.writeByte((byte) NAME.length); // 1
buf.writeBytes(NAME); // 53
buf.writeInt(limit); // 4
Expand Down
4 changes: 2 additions & 2 deletions src/ColumnPrefixFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
package org.hbase.async;

import org.jboss.netty.buffer.ChannelBuffer;
import io.netty.buffer.ByteBuf;

import org.hbase.async.generated.FilterPB;

Expand Down Expand Up @@ -84,7 +84,7 @@ int predictSerializedSize() {
}

@Override
void serializeOld(final ChannelBuffer buf) {
void serializeOld(final ByteBuf buf) {
buf.writeByte((byte) NAME.length); // 1
buf.writeBytes(NAME); // 49
HBaseRpc.writeByteArray(buf, prefix); // 3 + prefix.length
Expand Down
4 changes: 2 additions & 2 deletions src/ColumnRangeFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
package org.hbase.async;

import org.jboss.netty.buffer.ChannelBuffer;
import io.netty.buffer.ByteBuf;

import org.hbase.async.generated.FilterPB;

Expand Down Expand Up @@ -128,7 +128,7 @@ int predictSerializedSize() {
}

@Override
void serializeOld(final ChannelBuffer buf) {
void serializeOld(final ByteBuf buf) {
buf.writeByte((byte) NAME.length); // 1
buf.writeBytes(NAME); // 48

Expand Down
Loading