Skip to content

Commit 28d47c7

Browse files
committed
TEPHRA-263 Enforce TTL, regardless of any in-progress transactions. Also Handle the case where TTL is longer than the duration from beginning of epoch to now.
This closes #61 Signed-off-by: Ali Anwar <[email protected]> Signed-off-by: poorna <[email protected]>
1 parent cd7bd2b commit 28d47c7

File tree

26 files changed

+177
-96
lines changed

26 files changed

+177
-96
lines changed

tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java

+23-7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.tephra.util;
2020

2121
import com.google.common.primitives.Longs;
22+
import org.apache.commons.logging.Log;
23+
import org.apache.commons.logging.LogFactory;
2224
import org.apache.tephra.Transaction;
2325
import org.apache.tephra.TransactionManager;
2426
import org.apache.tephra.TransactionType;
@@ -61,9 +63,7 @@ public class TxUtils {
6163
* @return The oldest timestamp that will be visible for the given transaction and TTL configuration
6264
*/
6365
public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx) {
64-
long maxTTL = getMaxTTL(ttlByFamily);
65-
// we know that data will not be cleaned up while this tx is running up to this point as janitor uses it
66-
return maxTTL < Long.MAX_VALUE ? tx.getVisibilityUpperBound() - maxTTL * TxConstants.MAX_TX_PER_MS : 0;
66+
return getOldestVisibleTimestamp(ttlByFamily, tx, false);
6767
}
6868

6969
/**
@@ -75,12 +75,28 @@ public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Tran
7575
* @return The oldest timestamp that will be visible for the given transaction and TTL configuration
7676
*/
7777
public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx, boolean readNonTxnData) {
78-
if (readNonTxnData) {
79-
long maxTTL = getMaxTTL(ttlByFamily);
80-
return maxTTL < Long.MAX_VALUE ? System.currentTimeMillis() - maxTTL : 0;
78+
long maxTTL = getMaxTTL(ttlByFamily);
79+
if (maxTTL == Long.MAX_VALUE) {
80+
return 0;
8181
}
82+
return getOldestVisibleTimestamp(maxTTL, tx, readNonTxnData);
83+
}
8284

83-
return getOldestVisibleTimestamp(ttlByFamily, tx);
85+
/**
86+
* Returns the oldest visible timestamp for the given transaction, based on the TTL configured. If the TTL is
87+
* negative or zero, the oldest visible timestamp will be {@code 0}.
88+
* @param ttl TTL value (in milliseconds)
89+
* @param tx The current transaction
90+
* @param readNonTxnData indicates that the timestamp returned should allow reading non-transactional data
91+
* @return The oldest timestamp that will be visible for the given transaction and TTL configuration
92+
*/
93+
public static long getOldestVisibleTimestamp(long ttl, Transaction tx, boolean readNonTxnData) {
94+
if (ttl <= 0) {
95+
return 0;
96+
}
97+
long ttlFactor = readNonTxnData ? 1 : TxConstants.MAX_TX_PER_MS;
98+
// if the computed ttl is negative, return 0 because timestamps can not be negative
99+
return Math.max(0, tx.getTransactionId() - ttl * ttlFactor);
84100
}
85101

86102
/**

tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java

+35
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,14 @@
1919
package org.apache.tephra.util;
2020

2121
import org.apache.tephra.Transaction;
22+
import org.apache.tephra.TxConstants;
2223
import org.junit.Assert;
2324
import org.junit.Test;
2425

26+
import java.util.Collections;
27+
import java.util.Map;
28+
import java.util.concurrent.TimeUnit;
29+
2530
import static org.junit.Assert.assertEquals;
2631

2732
/**
@@ -54,4 +59,34 @@ public void testPruneUpperBound() {
5459
tx = new Transaction(100, 110, new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS);
5560
Assert.assertEquals(99, TxUtils.getPruneUpperBound(tx));
5661
}
62+
63+
@Test
64+
public void testTTL() {
65+
long txIdsPerSecond = 1000 * TxConstants.MAX_TX_PER_MS;
66+
byte[] family = new byte[] { 'd' };
67+
long currentTxTimeSeconds = 100;
68+
Transaction tx = new Transaction(100 * txIdsPerSecond, currentTxTimeSeconds * txIdsPerSecond,
69+
new long[] {10 * txIdsPerSecond, 30 * txIdsPerSecond},
70+
new long[] {80 * txIdsPerSecond, 90 * txIdsPerSecond},
71+
80 * txIdsPerSecond);
72+
int ttlSeconds = 60;
73+
Map<byte[], Long> ttlByFamily = Collections.singletonMap(family, ttlSeconds * 1000L);
74+
// ttl should only be impacted by the current transaction's id, and not by any older, in-progress transactions
75+
Assert.assertEquals((currentTxTimeSeconds - ttlSeconds) * txIdsPerSecond,
76+
TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx));
77+
}
78+
79+
@Test
80+
public void testLargeTTL() {
81+
long txIdsPerSecond = 1000 * TxConstants.MAX_TX_PER_MS;
82+
byte[] family = new byte[] { 'd' };
83+
long currentTxTimeSeconds = 100;
84+
Transaction tx = new Transaction(100 * txIdsPerSecond, currentTxTimeSeconds * txIdsPerSecond,
85+
new long[] { }, new long[] { }, 100);
86+
// ~100 years, so that the computed start timestamp is prior to 0 (epoch)
87+
long ttlSeconds = TimeUnit.DAYS.toSeconds(365 * 100);
88+
Map<byte[], Long> ttlByFamily = Collections.singletonMap(family, ttlSeconds * 1000L);
89+
// oldest visible timestamp should be 0, not negative, because HBase timestamps can not be negative
90+
Assert.assertEquals(0, TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx));
91+
}
5792
}

tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public class TransactionFilters {
4040
*/
4141
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
4242
ScanType scanType) {
43-
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
43+
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false,
44+
scanType, null));
4445
}
4546

4647
/**
@@ -50,13 +51,15 @@ public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlBy
5051
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
5152
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
5253
* these will be interpreted as "delete" markers and the column will be filtered out
54+
* @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
5355
* @param scanType the type of scan operation being performed
5456
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
5557
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
5658
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
5759
*/
5860
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
59-
ScanType scanType, @Nullable Filter cellFilter) {
60-
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
61+
boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
62+
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData,
63+
scanType, cellFilter));
6164
}
6265
}

tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti
446446
* @param type the type of scan operation being performed
447447
*/
448448
protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
449-
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
449+
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter);
450450
}
451451

452452
/**

tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class TransactionVisibilityFilter extends FilterBase {
7070
*/
7171
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
7272
ScanType scanType) {
73-
this(tx, ttlByFamily, allowEmptyValues, scanType, null);
73+
this(tx, ttlByFamily, allowEmptyValues, false, scanType, null);
7474
}
7575

7676
/**
@@ -80,19 +80,20 @@ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily
8080
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
8181
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
8282
* these will be interpreted as "delete" markers and the column will be filtered out
83+
* @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
8384
* @param scanType the type of scan operation being performed
8485
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
8586
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
8687
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
8788
*/
88-
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
89-
ScanType scanType, @Nullable Filter cellFilter) {
89+
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
90+
boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
9091
this.tx = tx;
9192
this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
9293
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
9394
long familyTTL = ttlEntry.getValue();
9495
oldestTsByFamily.put(ttlEntry.getKey(),
95-
familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
96+
TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData));
9697
}
9798
this.allowEmptyValues = allowEmptyValues;
9899
this.clearDeletes =

tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
7474
TxFilterFactory txFilterFactory = new TxFilterFactory() {
7575
@Override
7676
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
77-
return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeFilter);
77+
return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeFilter);
7878
}
7979
};
8080
runFilteringTest(txFilterFactory,
@@ -94,7 +94,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
9494
txFilterFactory = new TxFilterFactory() {
9595
@Override
9696
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
97-
return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, skipFilter);
97+
return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, skipFilter);
9898
}
9999
};
100100
runFilteringTest(txFilterFactory,
@@ -114,7 +114,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
114114
txFilterFactory = new TxFilterFactory() {
115115
@Override
116116
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
117-
return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, includeNextFilter);
117+
return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, includeNextFilter);
118118
}
119119
};
120120
runFilteringTest(txFilterFactory,
@@ -134,7 +134,7 @@ public ReturnCode filterKeyValue(Cell ignored) throws IOException {
134134
txFilterFactory = new TxFilterFactory() {
135135
@Override
136136
public Filter getTxFilter(Transaction tx, Map<byte[], Long> familyTTLs) {
137-
return new TransactionVisibilityFilter(tx, familyTTLs, false, ScanType.USER_SCAN, nextColFilter);
137+
return new TransactionVisibilityFilter(tx, familyTTLs, false, false, ScanType.USER_SCAN, nextColFilter);
138138
}
139139
};
140140
runFilteringTest(txFilterFactory,
@@ -294,8 +294,9 @@ public void testTTLFiltering() throws Exception {
294294
ttls.put(FAM2, 30L);
295295
ttls.put(FAM3, 0L);
296296

297-
Transaction tx = txManager.startShort();
298-
long now = tx.getVisibilityUpperBound();
297+
long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
298+
// we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out
299+
Transaction tx = new Transaction(now, now, new long[0], new long[0], now);
299300
Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
300301
assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
301302
filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
@@ -353,7 +354,7 @@ private interface TxFilterFactory {
353354
private class CustomTxFilter extends TransactionVisibilityFilter {
354355
public CustomTxFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, ScanType scanType,
355356
@Nullable Filter cellFilter) {
356-
super(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter);
357+
super(tx, ttlByFamily, allowEmptyValues, false, scanType, cellFilter);
357358
}
358359

359360
@Override

tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionFilters.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public class TransactionFilters {
4040
*/
4141
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
4242
ScanType scanType) {
43-
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null));
43+
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, false,
44+
scanType, null));
4445
}
4546

4647
/**
@@ -50,13 +51,15 @@ public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlBy
5051
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
5152
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
5253
* these will be interpreted as "delete" markers and the column will be filtered out
54+
* @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
5355
* @param scanType the type of scan operation being performed
5456
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
5557
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
5658
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
5759
*/
5860
public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
59-
ScanType scanType, @Nullable Filter cellFilter) {
60-
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter));
61+
boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
62+
return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData,
63+
scanType, cellFilter));
6164
}
6265
}

tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ private boolean isRollbackOperation(OperationWithAttributes op) throws IOExcepti
446446
* @param type the type of scan being performed
447447
*/
448448
protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) {
449-
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter);
449+
return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, readNonTxnData, type, filter);
450450
}
451451

452452
/**

tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class TransactionVisibilityFilter extends FilterBase {
7070
*/
7171
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
7272
ScanType scanType) {
73-
this(tx, ttlByFamily, allowEmptyValues, scanType, null);
73+
this(tx, ttlByFamily, allowEmptyValues, false, scanType, null);
7474
}
7575

7676
/**
@@ -80,19 +80,20 @@ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily
8080
* @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name
8181
* @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false}
8282
* these will be interpreted as "delete" markers and the column will be filtered out
83+
* @param readNonTxnData whether data written before Tephra was enabled on a table should be readable
8384
* @param scanType the type of scan operation being performed
8485
* @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by
8586
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
8687
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
8788
*/
8889
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
89-
ScanType scanType, @Nullable Filter cellFilter) {
90+
boolean readNonTxnData, ScanType scanType, @Nullable Filter cellFilter) {
9091
this.tx = tx;
9192
this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
9293
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
9394
long familyTTL = ttlEntry.getValue();
9495
oldestTsByFamily.put(ttlEntry.getKey(),
95-
familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
96+
TxUtils.getOldestVisibleTimestamp(familyTTL, tx, readNonTxnData));
9697
}
9798
this.allowEmptyValues = allowEmptyValues;
9899
this.clearDeletes =

0 commit comments

Comments
 (0)