From 28d47c7152438e8967826846a6b941b7dad679df Mon Sep 17 00:00:00 2001 From: Ali Anwar Date: Fri, 15 Sep 2017 17:06:07 -0700 Subject: [PATCH 1/3] TEPHRA-263 Enforce TTL, regardless of any in-progress transactions. Also Handle the case where TTL is longer than the duration from beginning of epoch to now. This closes #61 Signed-off-by: Ali Anwar Signed-off-by: poorna --- .../java/org/apache/tephra/util/TxUtils.java | 30 ++++++++++++---- .../org/apache/tephra/util/TxUtilsTest.java | 35 +++++++++++++++++++ .../hbase/coprocessor/TransactionFilters.java | 9 +++-- .../coprocessor/TransactionProcessor.java | 2 +- .../TransactionVisibilityFilter.java | 9 ++--- .../TransactionVisibilityFilterTest.java | 15 ++++---- .../hbase/coprocessor/TransactionFilters.java | 9 +++-- .../coprocessor/TransactionProcessor.java | 2 +- .../TransactionVisibilityFilter.java | 7 ++-- .../TransactionVisibilityFilterTest.java | 15 ++++---- .../hbase/coprocessor/TransactionFilters.java | 9 +++-- .../coprocessor/TransactionProcessor.java | 2 +- .../TransactionVisibilityFilter.java | 7 ++-- .../TransactionVisibilityFilterTest.java | 15 ++++---- .../hbase/coprocessor/TransactionFilters.java | 9 +++-- .../coprocessor/TransactionProcessor.java | 2 +- .../TransactionVisibilityFilter.java | 7 ++-- .../TransactionVisibilityFilterTest.java | 15 ++++---- .../hbase/coprocessor/TransactionFilters.java | 9 +++-- .../coprocessor/TransactionProcessor.java | 2 +- .../TransactionVisibilityFilter.java | 11 +++--- .../TransactionVisibilityFilterTest.java | 15 ++++---- .../hbase/coprocessor/TransactionFilters.java | 9 +++-- .../coprocessor/TransactionProcessor.java | 2 +- .../TransactionVisibilityFilter.java | 11 +++--- .../TransactionVisibilityFilterTest.java | 15 ++++---- 26 files changed, 177 insertions(+), 96 deletions(-) diff --git a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java index aaca23d1..5eb2b3ec 100644 --- a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java +++ b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java @@ -19,6 +19,8 @@ package org.apache.tephra.util; import com.google.common.primitives.Longs; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionType; @@ -61,9 +63,7 @@ public class TxUtils { * @return The oldest timestamp that will be visible for the given transaction and TTL configuration */ public static long getOldestVisibleTimestamp(Map ttlByFamily, Transaction tx) { - long maxTTL = getMaxTTL(ttlByFamily); - // we know that data will not be cleaned up while this tx is running up to this point as janitor uses it - return maxTTL < Long.MAX_VALUE ? tx.getVisibilityUpperBound() - maxTTL * TxConstants.MAX_TX_PER_MS : 0; + return getOldestVisibleTimestamp(ttlByFamily, tx, false); } /** @@ -75,12 +75,28 @@ public static long getOldestVisibleTimestamp(Map ttlByFamily, Tran * @return The oldest timestamp that will be visible for the given transaction and TTL configuration */ public static long getOldestVisibleTimestamp(Map ttlByFamily, Transaction tx, boolean readNonTxnData) { - if (readNonTxnData) { - long maxTTL = getMaxTTL(ttlByFamily); - return maxTTL < Long.MAX_VALUE ? System.currentTimeMillis() - maxTTL : 0; + long maxTTL = getMaxTTL(ttlByFamily); + if (maxTTL == Long.MAX_VALUE) { + return 0; } + return getOldestVisibleTimestamp(maxTTL, tx, readNonTxnData); + } - return getOldestVisibleTimestamp(ttlByFamily, tx); + /** + * Returns the oldest visible timestamp for the given transaction, based on the TTL configured. If the TTL is + * negative or zero, the oldest visible timestamp will be {@code 0}. + * @param ttl TTL value (in milliseconds) + * @param tx The current transaction + * @param readNonTxnData indicates that the timestamp returned should allow reading non-transactional data + * @return The oldest timestamp that will be visible for the given transaction and TTL configuration + */ + public static long getOldestVisibleTimestamp(long ttl, Transaction tx, boolean readNonTxnData) { + if (ttl <= 0) { + return 0; + } + long ttlFactor = readNonTxnData ? 1 : TxConstants.MAX_TX_PER_MS; + // if the computed ttl is negative, return 0 because timestamps can not be negative + return Math.max(0, tx.getTransactionId() - ttl * ttlFactor); } /** diff --git a/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java index db687fe7..b172e89e 100644 --- a/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java @@ -19,9 +19,14 @@ package org.apache.tephra.util; import org.apache.tephra.Transaction; +import org.apache.tephra.TxConstants; import org.junit.Assert; import org.junit.Test; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import static org.junit.Assert.assertEquals; /** @@ -54,4 +59,34 @@ public void testPruneUpperBound() { tx = new Transaction(100, 110, new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS); Assert.assertEquals(99, TxUtils.getPruneUpperBound(tx)); } + + @Test + public void testTTL() { + long txIdsPerSecond = 1000 * TxConstants.MAX_TX_PER_MS; + byte[] family = new byte[] { 'd' }; + long currentTxTimeSeconds = 100; + Transaction tx = new Transaction(100 * txIdsPerSecond, currentTxTimeSeconds * txIdsPerSecond, + new long[] {10 * txIdsPerSecond, 30 * txIdsPerSecond}, + new long[] {80 * txIdsPerSecond, 90 * txIdsPerSecond}, + 80 * txIdsPerSecond); + int ttlSeconds = 60; + Map ttlByFamily = Collections.singletonMap(family, ttlSeconds * 1000L); + // ttl should only be impacted by the current transaction's id, and not by any older, in-progress transactions + Assert.assertEquals((currentTxTimeSeconds - ttlSeconds) * txIdsPerSecond, + TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx)); + } + + @Test + public void testLargeTTL() { + long txIdsPerSecond = 1000 * TxConstants.MAX_TX_PER_MS; + byte[] family = new byte[] { 'd' }; + long currentTxTimeSeconds = 100; + Transaction tx = new Transaction(100 * txIdsPerSecond, currentTxTimeSeconds * txIdsPerSecond, + new long[] { }, new long[] { }, 100); + // ~100 years, so that the computed start timestamp is prior to 0 (epoch) + long ttlSeconds = TimeUnit.DAYS.toSeconds(365 * 100); + Map ttlByFamily = Collections.singletonMap(family, ttlSeconds * 1000L); + // oldest visible timestamp should be 0, not negative, because HBase timestamps can not be negative + Assert.assertEquals(0, TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx)); + } } diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index 0ca9f9c2..c46082fc 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,7 +40,8 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, + scanType, null)); } /** @@ -50,13 +51,15 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out + * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); + boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, + scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 10ecfa40..d78ccea9 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan operation being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); } /** diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 36752686..d24f5042 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -70,7 +70,7 @@ public class TransactionVisibilityFilter extends FilterBase { */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, scanType, null); + this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); } /** @@ -80,19 +80,20 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out + * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ - public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { + public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, + boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(ttlEntry.getKey(), - familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 4d34ed9b..7286a2a3 100644 --- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -294,8 +294,9 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - Transaction tx = txManager.startShort(); - long now = tx.getVisibilityUpperBound(); + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out + Transaction tx = new Transaction(now, now, new long[0], new long[0], now); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); @@ -353,7 +354,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); } @Override diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index 0ca9f9c2..c46082fc 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,7 +40,8 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, + scanType, null)); } /** @@ -50,13 +51,15 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out + * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); + boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, + scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 17d55a4e..435ae02a 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); } /** diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 9a617a97..3e3db595 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -70,7 +70,7 @@ public class TransactionVisibilityFilter extends FilterBase { */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, scanType, null); + this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); } /** @@ -80,19 +80,20 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out + * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { + boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(ttlEntry.getKey(), - familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 3352eef4..49de0eba 100644 --- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -293,8 +293,9 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - Transaction tx = txManager.startShort(); - long now = tx.getVisibilityUpperBound(); + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out + Transaction tx = new Transaction(now, now, new long[0], new long[0], now); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); @@ -352,7 +353,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); } @Override diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index 0ca9f9c2..c46082fc 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,7 +40,8 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, + scanType, null)); } /** @@ -50,13 +51,15 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out + * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); + boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, + scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index ca960526..ab2ac8d4 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); } /** diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 9825deda..9e324787 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -73,7 +73,7 @@ public class TransactionVisibilityFilter extends FilterBase { */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, scanType, null); + this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); } /** @@ -83,19 +83,20 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out + * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { + boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), - familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index c27a10d2..d4e4ed16 100644 --- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -293,8 +293,9 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - Transaction tx = txManager.startShort(); - long now = tx.getVisibilityUpperBound(); + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out + Transaction tx = new Transaction(now, now, new long[0], new long[0], now); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); @@ -352,7 +353,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); } @Override diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index 0ca9f9c2..c46082fc 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,7 +40,8 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, + scanType, null)); } /** @@ -50,13 +51,15 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out + * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); + boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, + scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 263ee98e..7325a7a1 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); } /** diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 9825deda..9e324787 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -73,7 +73,7 @@ public class TransactionVisibilityFilter extends FilterBase { */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, scanType, null); + this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); } /** @@ -83,19 +83,20 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out + * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { + boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), - familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 4b2b40c3..28dfba83 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -293,8 +293,9 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - Transaction tx = txManager.startShort(); - long now = tx.getVisibilityUpperBound(); + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out + Transaction tx = new Transaction(now, now, new long[0], new long[0], now); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); @@ -352,7 +353,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); } @Override diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index 0ca9f9c2..c46082fc 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,7 +40,8 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, + scanType, null)); } /** @@ -50,13 +51,15 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out + * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); + boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, + scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 553f5989..848cb1fb 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); } /** diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 5ad7c29c..bd3c719e 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -70,8 +70,8 @@ public class TransactionVisibilityFilter extends FilterBase { * @param scanType the type of scan operation being performed */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, scanType, null); + ScanType scanType) { + this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); } /** @@ -81,19 +81,20 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out + * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ - public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { + public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, + boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), - familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 1b02609f..b6650f61 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -294,8 +294,9 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - Transaction tx = txManager.startShort(); - long now = tx.getVisibilityUpperBound(); + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out + Transaction tx = new Transaction(now, now, new long[0], new long[0], now); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); @@ -353,7 +354,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); } @Override diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index 0ca9f9c2..c46082fc 100644 --- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,7 +40,8 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, + scanType, null)); } /** @@ -50,13 +51,15 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out + * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); + boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, + scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 553f5989..848cb1fb 100644 --- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); } /** diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 40e2c37a..81eb604c 100644 --- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -70,8 +70,8 @@ public class TransactionVisibilityFilter extends FilterBase { * @param scanType the type of scan operation being performed */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, scanType, null); + ScanType scanType) { + this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); } /** @@ -81,19 +81,20 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out + * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ - public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { + public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, + boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), - familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 1b02609f..b6650f61 100644 --- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -294,8 +294,9 @@ public void testTTLFiltering() throws Exception { ttls.put(FAM2, 30L); ttls.put(FAM3, 0L); - Transaction tx = txManager.startShort(); - long now = tx.getVisibilityUpperBound(); + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + // we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out + Transaction tx = new Transaction(now, now, new long[0], new long[0], now); Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN); assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL, filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now))); @@ -353,7 +354,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); } @Override From b15a8f94c3ddd1f15c9ca9f956ba6d43e5850d81 Mon Sep 17 00:00:00 2001 From: Ali Anwar Date: Thu, 21 Sep 2017 17:08:02 -0700 Subject: [PATCH 2/3] TEPHRA-263 Fix test cases and TTL enforcement for tables configured to read non transactional data. --- .../src/main/java/org/apache/tephra/util/TxUtils.java | 11 ++++++++--- .../tephra/hbase/coprocessor/TransactionFilters.java | 9 +++------ .../hbase/coprocessor/TransactionProcessor.java | 2 +- .../coprocessor/TransactionVisibilityFilter.java | 9 +++++---- .../tephra/hbase/TransactionAwareHTableTest.java | 2 +- .../coprocessor/TransactionVisibilityFilterTest.java | 10 +++++----- .../tephra/hbase/coprocessor/TransactionFilters.java | 9 +++------ .../hbase/coprocessor/TransactionProcessor.java | 2 +- .../coprocessor/TransactionVisibilityFilter.java | 9 +++++---- .../coprocessor/TransactionVisibilityFilterTest.java | 10 +++++----- .../tephra/hbase/coprocessor/TransactionFilters.java | 9 +++------ .../hbase/coprocessor/TransactionProcessor.java | 2 +- .../coprocessor/TransactionVisibilityFilter.java | 9 +++++---- .../coprocessor/TransactionVisibilityFilterTest.java | 10 +++++----- .../tephra/hbase/coprocessor/TransactionFilters.java | 9 +++------ .../hbase/coprocessor/TransactionProcessor.java | 2 +- .../coprocessor/TransactionVisibilityFilter.java | 9 +++++---- .../tephra/hbase/TransactionAwareHTableTest.java | 2 +- .../coprocessor/TransactionVisibilityFilterTest.java | 10 +++++----- .../tephra/hbase/coprocessor/TransactionFilters.java | 9 +++------ .../hbase/coprocessor/TransactionProcessor.java | 2 +- .../coprocessor/TransactionVisibilityFilter.java | 9 +++++---- .../tephra/hbase/TransactionAwareHTableTest.java | 7 +++---- .../coprocessor/TransactionVisibilityFilterTest.java | 10 +++++----- .../tephra/hbase/coprocessor/TransactionFilters.java | 9 +++------ .../hbase/coprocessor/TransactionProcessor.java | 2 +- .../coprocessor/TransactionVisibilityFilter.java | 9 +++++---- .../tephra/hbase/TransactionAwareHTableTest.java | 7 +++---- .../coprocessor/TransactionVisibilityFilterTest.java | 10 +++++----- 29 files changed, 100 insertions(+), 109 deletions(-) diff --git a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java index 5eb2b3ec..9619fc49 100644 --- a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java +++ b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java @@ -94,9 +94,14 @@ public static long getOldestVisibleTimestamp(long ttl, Transaction tx, boolean r if (ttl <= 0) { return 0; } - long ttlFactor = readNonTxnData ? 1 : TxConstants.MAX_TX_PER_MS; + long oldestVisibleTimestamp; + if (readNonTxnData) { + oldestVisibleTimestamp = getTimestamp(tx.getTransactionId()) - ttl; + } else { + oldestVisibleTimestamp = tx.getTransactionId() - ttl * TxConstants.MAX_TX_PER_MS; + } // if the computed ttl is negative, return 0 because timestamps can not be negative - return Math.max(0, tx.getTransactionId() - ttl * ttlFactor); + return Math.max(0, oldestVisibleTimestamp); } /** @@ -119,7 +124,7 @@ public static long getMaxVisibleTimestamp(Transaction tx) { * as being written by this transaction (and therefore visible). */ public static Transaction createDummyTransaction(TransactionVisibilityState txVisibilityState) { - return new Transaction(txVisibilityState.getReadPointer(), Long.MAX_VALUE, + return new Transaction(txVisibilityState.getReadPointer(), txVisibilityState.getWritePointer(), Longs.toArray(txVisibilityState.getInvalid()), Longs.toArray(txVisibilityState.getInProgress().keySet()), TxUtils.getFirstShortInProgress(txVisibilityState.getInProgress()), TransactionType.SHORT); diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index c46082fc..0ca9f9c2 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,8 +40,7 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, - scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); } /** @@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, - scanType, cellFilter)); + ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index d78ccea9..10ecfa40 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan operation being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } /** diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index d24f5042..88a06134 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -70,7 +70,7 @@ public class TransactionVisibilityFilter extends FilterBase { */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); + this(tx, ttlByFamily, allowEmptyValues, scanType, null); } /** @@ -80,20 +80,21 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(ttlEntry.getKey(), - TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); + // we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp + // in #filterKeyValue, using TxUtils#getTimestampForTTL(long) + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index 9c5dca24..7fe511a2 100644 --- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -107,7 +107,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { public static TemporaryFolder tmpFolder = new TemporaryFolder(); private static MiniDFSCluster dfsCluster; - + private static final class TestBytes { private static final byte[] table = Bytes.toBytes("testtable"); private static final byte[] family = Bytes.toBytes("f1"); diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 7286a2a3..2a417a4a 100644 --- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -354,7 +354,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); } @Override diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index c46082fc..0ca9f9c2 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,8 +40,7 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, - scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); } /** @@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, - scanType, cellFilter)); + ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 435ae02a..17d55a4e 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } /** diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 3e3db595..feb1b353 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -70,7 +70,7 @@ public class TransactionVisibilityFilter extends FilterBase { */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); + this(tx, ttlByFamily, allowEmptyValues, scanType, null); } /** @@ -80,20 +80,21 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(ttlEntry.getKey(), - TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); + // we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp + // in #filterKeyValue, using TxUtils#getTimestampForTTL(long) + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 49de0eba..9054fed7 100644 --- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -353,7 +353,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); } @Override diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index c46082fc..0ca9f9c2 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,8 +40,7 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, - scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); } /** @@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, - scanType, cellFilter)); + ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index ab2ac8d4..ca960526 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } /** diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 9e324787..b901e3c6 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -73,7 +73,7 @@ public class TransactionVisibilityFilter extends FilterBase { */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); + this(tx, ttlByFamily, allowEmptyValues, scanType, null); } /** @@ -83,20 +83,21 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), - TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); + // we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp + // in #filterKeyValue, using TxUtils#getTimestampForTTL(long) + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index d4e4ed16..111a4d91 100644 --- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -353,7 +353,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); } @Override diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index c46082fc..0ca9f9c2 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,8 +40,7 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, - scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); } /** @@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, - scanType, cellFilter)); + ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 7325a7a1..263ee98e 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } /** diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 9e324787..b901e3c6 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -73,7 +73,7 @@ public class TransactionVisibilityFilter extends FilterBase { */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); + this(tx, ttlByFamily, allowEmptyValues, scanType, null); } /** @@ -83,20 +83,21 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), - TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); + // we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp + // in #filterKeyValue, using TxUtils#getTimestampForTTL(long) + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index 30fcd8a7..edcbf226 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -107,7 +107,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { public static TemporaryFolder tmpFolder = new TemporaryFolder(); private static MiniDFSCluster dfsCluster; - + private static final class TestBytes { private static final byte[] table = Bytes.toBytes("testtable"); private static final byte[] family = Bytes.toBytes("f1"); diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index 28dfba83..7e184fc0 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -353,7 +353,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); } @Override diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index c46082fc..0ca9f9c2 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,8 +40,7 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, - scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); } /** @@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, - scanType, cellFilter)); + ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 848cb1fb..553f5989 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } /** diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index bd3c719e..5f48b5d2 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -71,7 +71,7 @@ public class TransactionVisibilityFilter extends FilterBase { */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); + this(tx, ttlByFamily, allowEmptyValues, scanType, null); } /** @@ -81,20 +81,21 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), - TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); + // we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp + // in #filterKeyValue, using TxUtils#getTimestampForTTL(long) + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index 11ffd1aa..5e676677 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -105,10 +105,6 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { public static TemporaryFolder tmpFolder = new TemporaryFolder(); private static MiniDFSCluster dfsCluster; - - public static void tearDownAfterClass() throws Exception { - dfsCluster.shutdown(); - } private static final class TestBytes { private static final byte[] table = Bytes.toBytes("testtable"); @@ -190,6 +186,9 @@ public static void shutdownAfterClass() throws Exception { if (txManager != null) { txManager.stopAndWait(); } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } } @Before diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index b6650f61..822b5ff4 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -354,7 +354,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); } @Override diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java index c46082fc..0ca9f9c2 100644 --- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java @@ -40,8 +40,7 @@ public class TransactionFilters { */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false, - scanType, null)); + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); } /** @@ -51,15 +50,13 @@ public static Filter getVisibilityFilter(Transaction tx, Map ttlBy * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public static Filter getVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, - scanType, cellFilter)); + ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); } } diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 848cb1fb..553f5989 100644 --- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti * @param type the type of scan being performed */ protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter); + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); } /** diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java index 81eb604c..df866e2f 100644 --- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -71,7 +71,7 @@ public class TransactionVisibilityFilter extends FilterBase { */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, false, scanType, null); + this(tx, ttlByFamily, allowEmptyValues, scanType, null); } /** @@ -81,20 +81,21 @@ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} * these will be interpreted as "delete" markers and the column will be filtered out - * @param readNonTxnData whether data written before Tephra was enabled on a table should be readable * @param scanType the type of scan operation being performed * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. */ public TransactionVisibilityFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, - boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) { + ScanType scanType, @Nullable Filter cellFilter) { this.tx = tx; this.oldestTsByFamily = Maps.newTreeMap(); for (Map.Entry ttlEntry : ttlByFamily.entrySet()) { long familyTTL = ttlEntry.getValue(); oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), - TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData)); + // we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp + // in #filterKeyValue, using TxUtils#getTimestampForTTL(long) + TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false)); } this.allowEmptyValues = allowEmptyValues; this.clearDeletes = diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index 11ffd1aa..5e676677 100644 --- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -105,10 +105,6 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest { public static TemporaryFolder tmpFolder = new TemporaryFolder(); private static MiniDFSCluster dfsCluster; - - public static void tearDownAfterClass() throws Exception { - dfsCluster.shutdown(); - } private static final class TestBytes { private static final byte[] table = Bytes.toBytes("testtable"); @@ -190,6 +186,9 @@ public static void shutdownAfterClass() throws Exception { if (txManager != null) { txManager.stopAndWait(); } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } } @Before diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java index b6650f61..822b5ff4 100644 --- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java +++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java @@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { TxFilterFactory txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter); } }; runFilteringTest(txFilterFactory, @@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter); } }; runFilteringTest(txFilterFactory, @@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter); } }; runFilteringTest(txFilterFactory, @@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException { txFilterFactory = new TxFilterFactory() { @Override public Filter getTxFilter(Transaction tx, Map familyTTLs) { - return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter); + return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter); } }; runFilteringTest(txFilterFactory, @@ -354,7 +354,7 @@ private interface TxFilterFactory { private class CustomTxFilter extends TransactionVisibilityFilter { public CustomTxFilter(Transaction tx, Map ttlByFamily, boolean allowEmptyValues, ScanType scanType, @Nullable Filter cellFilter) { - super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter); + super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter); } @Override From f50a467d7cd7d067c95f9eff8f50288d6e7c5776 Mon Sep 17 00:00:00 2001 From: Ali Anwar Date: Thu, 21 Sep 2017 17:40:20 -0700 Subject: [PATCH 3/3] Update TxUtils#createDummyTransaction. --- .../src/main/java/org/apache/tephra/util/TxUtils.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java index 9619fc49..f6df3c94 100644 --- a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java +++ b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java @@ -124,10 +124,13 @@ public static long getMaxVisibleTimestamp(Transaction tx) { * as being written by this transaction (and therefore visible). */ public static Transaction createDummyTransaction(TransactionVisibilityState txVisibilityState) { - return new Transaction(txVisibilityState.getReadPointer(), txVisibilityState.getWritePointer(), + return new Transaction(txVisibilityState.getReadPointer(), + txVisibilityState.getVisibilityUpperBound(), + Long.MAX_VALUE, Longs.toArray(txVisibilityState.getInvalid()), Longs.toArray(txVisibilityState.getInProgress().keySet()), - TxUtils.getFirstShortInProgress(txVisibilityState.getInProgress()), TransactionType.SHORT); + TxUtils.getFirstShortInProgress(txVisibilityState.getInProgress()), TransactionType.SHORT, + new long[0], Transaction.VisibilityLevel.SNAPSHOT); } /**