diff --git a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java index aaca23d1..f6df3c94 100644 --- a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java +++ b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java @@ -19,6 +19,8 @@ package org.apache.tephra.util; import com.google.common.primitives.Longs; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionType; @@ -61,9 +63,7 @@ public class TxUtils { * @return The oldest timestamp that will be visible for the given transaction and TTL configuration */ public static long getOldestVisibleTimestamp(Map ttlByFamily, Transaction tx) { - long maxTTL = getMaxTTL(ttlByFamily); - // we know that data will not be cleaned up while this tx is running up to this point as janitor uses it - return maxTTL < Long.MAX_VALUE ? tx.getVisibilityUpperBound() - maxTTL * TxConstants.MAX_TX_PER_MS : 0; + return getOldestVisibleTimestamp(ttlByFamily, tx, false); } /** @@ -75,12 +75,33 @@ public static long getOldestVisibleTimestamp(Map ttlByFamily, Tran * @return The oldest timestamp that will be visible for the given transaction and TTL configuration */ public static long getOldestVisibleTimestamp(Map ttlByFamily, Transaction tx, boolean readNonTxnData) { - if (readNonTxnData) { - long maxTTL = getMaxTTL(ttlByFamily); - return maxTTL < Long.MAX_VALUE ? System.currentTimeMillis() - maxTTL : 0; + long maxTTL = getMaxTTL(ttlByFamily); + if (maxTTL == Long.MAX_VALUE) { + return 0; } + return getOldestVisibleTimestamp(maxTTL, tx, readNonTxnData); + } - return getOldestVisibleTimestamp(ttlByFamily, tx); + /** + * Returns the oldest visible timestamp for the given transaction, based on the TTL configured. If the TTL is + * negative or zero, the oldest visible timestamp will be {@code 0}. + * @param ttl TTL value (in milliseconds) + * @param tx The current transaction + * @param readNonTxnData indicates that the timestamp returned should allow reading non-transactional data + * @return The oldest timestamp that will be visible for the given transaction and TTL configuration + */ + public static long getOldestVisibleTimestamp(long ttl, Transaction tx, boolean readNonTxnData) { + if (ttl <= 0) { + return 0; + } + long oldestVisibleTimestamp; + if (readNonTxnData) { + oldestVisibleTimestamp = getTimestamp(tx.getTransactionId()) - ttl; + } else { + oldestVisibleTimestamp = tx.getTransactionId() - ttl * TxConstants.MAX_TX_PER_MS; + } + // if the computed ttl is negative, return 0 because timestamps can not be negative + return Math.max(0, oldestVisibleTimestamp); } /** @@ -103,10 +124,13 @@ public static long getMaxVisibleTimestamp(Transaction tx) { * as being written by this transaction (and therefore visible). */ public static Transaction createDummyTransaction(TransactionVisibilityState txVisibilityState) { - return new Transaction(txVisibilityState.getReadPointer(), Long.MAX_VALUE, + return new Transaction(txVisibilityState.getReadPointer(), + txVisibilityState.getVisibilityUpperBound(), + Long.MAX_VALUE, Longs.toArray(txVisibilityState.getInvalid()), Longs.toArray(txVisibilityState.getInProgress().keySet()), - TxUtils.getFirstShortInProgress(txVisibilityState.getInProgress()), TransactionType.SHORT); + TxUtils.getFirstShortInProgress(txVisibilityState.getInProgress()), TransactionType.SHORT, + new long[0], Transaction.VisibilityLevel.SNAPSHOT); } /** diff --git a/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java index db687fe7..b172e89e 100644 --- a/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java @@ -19,9 +19,14 @@ package org.apache.tephra.util; import org.apache.tephra.Transaction; +import org.apache.tephra.TxConstants; import org.junit.Assert; import org.junit.Test; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import static org.junit.Assert.assertEquals; /** @@ -54,4 +59,34 @@ public void testPruneUpperBound() { tx = new Transaction(100, 110, new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS); Assert.assertEquals(99, TxUtils.getPruneUpperBound(tx)); } + + @Test + public void testTTL() { + long txIdsPerSecond = 1000 * TxConstants.MAX_TX_PER_MS; + byte[] family = new byte[] { 'd' }; + long currentTxTimeSeconds = 100; + Transaction tx = new Transaction(100 * txIdsPerSecond, currentTxTimeSeconds * txIdsPerSecond, + new long[] {10 * txIdsPerSecond, 30 * txIdsPerSecond}, + new long[] {80 * txIdsPerSecond, 90 * txIdsPerSecond}, + 80 * txIdsPerSecond); + int ttlSeconds = 60; + Map ttlByFamily = Collections.singletonMap(family, ttlSeconds * 1000L); + // ttl should only be impacted by the current transaction's id, and not by any older, in-progress transactions + Assert.assertEquals((currentTxTimeSeconds - ttlSeconds) * txIdsPerSecond, + TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx)); + } + + @Test + public void testLargeTTL() { + long txIdsPerSecond = 1000 * TxConstants.MAX_TX_PER_MS; + byte[] family = new byte[] { 'd' }; + long currentTxTimeSeconds = 100; + Transaction tx = new Transaction(100 * txIdsPerSecond, currentTxTimeSeconds * txIdsPerSecond, + new long[] { }, new long[] { }, 100); + // ~100 years, so that the computed start timestamp is prior to 0 (epoch) + long ttlSeconds = TimeUnit.DAYS.toSeconds(365 * 100); + Map ttlByFamily = Collections.singletonMap(family, ttlSeconds * 1000L); + // oldest visible timestamp should be 0, not negative, because HBase timestamps can not be negative + Assert.assertEquals(0, TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx)); + } } diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 36752686..88a06134 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -85,14 +85,16 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ - public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { + public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(ttlEntry.getKey(), - familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); + // we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp + // in #filterKeyValue, using TxUtils#getTimestampForTTL(long) + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index 9c5dca24..7fe511a2 100644 --- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -107,7 +107,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { public static TemporaryFolder tmpFolder = new TemporaryFolder(); private static MiniDFSCluster dfsCluster; - + private static final class TestBytes { private static final byte[] table = Bytes.toBytes("testtable"); private static final byte[] family = Bytes.toBytes("f1"); diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 4d34ed9b..2a417a4a 100644 --- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -294,8 +294,9 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - Transaction tx = txManager.startShort(); - long now = tx.getVisibilityUpperBound(); + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out + Transaction tx = new Transaction(now, now, new long[0], new long[0], now); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 9a617a97..feb1b353 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -86,13 +86,15 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(ttlEntry.getKey(), - familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); + // we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp + // in #filterKeyValue, using TxUtils#getTimestampForTTL(long) + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 3352eef4..9054fed7 100644 --- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -293,8 +293,9 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - Transaction tx = txManager.startShort(); - long now = tx.getVisibilityUpperBound(); + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out + Transaction tx = new Transaction(now, now, new long[0], new long[0], now); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 9825deda..b901e3c6 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -89,13 +89,15 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), - familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); + // we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp + // in #filterKeyValue, using TxUtils#getTimestampForTTL(long) + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index c27a10d2..111a4d91 100644 --- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -293,8 +293,9 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - Transaction tx = txManager.startShort(); - long now = tx.getVisibilityUpperBound(); + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out + Transaction tx = new Transaction(now, now, new long[0], new long[0], now); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 9825deda..b901e3c6 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -89,13 +89,15 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), - familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); + // we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp + // in #filterKeyValue, using TxUtils#getTimestampForTTL(long) + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index 30fcd8a7..edcbf226 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -107,7 +107,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { public static TemporaryFolder tmpFolder = new TemporaryFolder(); private static MiniDFSCluster dfsCluster; - + private static final class TestBytes { private static final byte[] table = Bytes.toBytes("testtable"); private static final byte[] family = Bytes.toBytes("f1"); diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 4b2b40c3..7e184fc0 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -293,8 +293,9 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - Transaction tx = txManager.startShort(); - long now = tx.getVisibilityUpperBound(); + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out + Transaction tx = new Transaction(now, now, new long[0], new long[0], now); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 5ad7c29c..5f48b5d2 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -70,7 +70,7 @@ public class TransactionVisibilityFilter extends FilterBase { * @param scanType the type of scan operation being performed */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType) { + ScanType scanType) { this(tx, ttlByFamily, allowEmptyValues, scanType, null); } @@ -86,14 +86,16 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ - public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { + public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), - familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); + // we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp + // in #filterKeyValue, using TxUtils#getTimestampForTTL(long) + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index 11ffd1aa..5e676677 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -105,10 +105,6 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { public static TemporaryFolder tmpFolder = new TemporaryFolder(); private static MiniDFSCluster dfsCluster; - - public static void tearDownAfterClass() throws Exception { - dfsCluster.shutdown(); - } private static final class TestBytes { private static final byte[] table = Bytes.toBytes("testtable"); @@ -190,6 +186,9 @@ public static void shutdownAfterClass() throws Exception { if (txManager != null) { txManager.stopAndWait(); } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } } @Before diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 1b02609f..822b5ff4 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -294,8 +294,9 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - Transaction tx = txManager.startShort(); - long now = tx.getVisibilityUpperBound(); + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out + Transaction tx = new Transaction(now, now, new long[0], new long[0], now); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 40e2c37a..df866e2f 100644 --- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -70,7 +70,7 @@ public class TransactionVisibilityFilter extends FilterBase { * @param scanType the type of scan operation being performed */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType) { + ScanType scanType) { this(tx, ttlByFamily, allowEmptyValues, scanType, null); } @@ -86,14 +86,16 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ - public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { + public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), - familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); + // we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp + // in #filterKeyValue, using TxUtils#getTimestampForTTL(long) + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index 11ffd1aa..5e676677 100644 --- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -105,10 +105,6 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { public static TemporaryFolder tmpFolder = new TemporaryFolder(); private static MiniDFSCluster dfsCluster; - - public static void tearDownAfterClass() throws Exception { - dfsCluster.shutdown(); - } private static final class TestBytes { private static final byte[] table = Bytes.toBytes("testtable"); @@ -190,6 +186,9 @@ public static void shutdownAfterClass() throws Exception { if (txManager != null) { txManager.stopAndWait(); } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } } @Before diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 1b02609f..822b5ff4 100644 --- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -294,8 +294,9 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - Transaction tx = txManager.startShort(); - long now = tx.getVisibilityUpperBound(); + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out + Transaction tx = new Transaction(now, now, new long[0], new long[0], now); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));