Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEPHRA-263 Fix test cases and TTL enforcement #64

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.tephra.util;

import com.google.common.primitives.Longs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionType;
Expand Down Expand Up @@ -61,9 +63,7 @@ public class TxUtils {
* @return The oldest timestamp that will be visible for the given transaction and TTL configuration
*/
public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx) {
long maxTTL = getMaxTTL(ttlByFamily);
// we know that data will not be cleaned up while this tx is running up to this point as janitor uses it
return maxTTL < Long.MAX_VALUE ? tx.getVisibilityUpperBound() - maxTTL * TxConstants.MAX_TX_PER_MS : 0;
return getOldestVisibleTimestamp(ttlByFamily, tx, false);
}

/**
Expand All @@ -75,12 +75,33 @@ public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Tran
* @return The oldest timestamp that will be visible for the given transaction and TTL configuration
*/
public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx, boolean readNonTxnData) {
if (readNonTxnData) {
long maxTTL = getMaxTTL(ttlByFamily);
return maxTTL < Long.MAX_VALUE ? System.currentTimeMillis() - maxTTL : 0;
long maxTTL = getMaxTTL(ttlByFamily);
if (maxTTL == Long.MAX_VALUE) {
return 0;
}
return getOldestVisibleTimestamp(maxTTL, tx, readNonTxnData);
}

return getOldestVisibleTimestamp(ttlByFamily, tx);
/**
* Returns the oldest visible timestamp for the given transaction, based on the TTL configured. If the TTL is
* negative or zero, the oldest visible timestamp will be {@code 0}.
* @param ttl TTL value (in milliseconds)
* @param tx The current transaction
* @param readNonTxnData indicates that the timestamp returned should allow reading non-transactional data
* @return The oldest timestamp that will be visible for the given transaction and TTL configuration
*/
public static long getOldestVisibleTimestamp(long ttl, Transaction tx, boolean readNonTxnData) {
if (ttl <= 0) {
return 0;
}
long oldestVisibleTimestamp;
if (readNonTxnData) {
oldestVisibleTimestamp = getTimestamp(tx.getTransactionId()) - ttl;
} else {
oldestVisibleTimestamp = tx.getTransactionId() - ttl * TxConstants.MAX_TX_PER_MS;
}
// if the computed ttl is negative, return 0 because timestamps can not be negative
return Math.max(0, oldestVisibleTimestamp);
}

/**
Expand All @@ -103,10 +124,13 @@ public static long getMaxVisibleTimestamp(Transaction tx) {
* as being written by this transaction (and therefore visible).
*/
public static Transaction createDummyTransaction(TransactionVisibilityState txVisibilityState) {
return new Transaction(txVisibilityState.getReadPointer(), Long.MAX_VALUE,
return new Transaction(txVisibilityState.getReadPointer(),
txVisibilityState.getVisibilityUpperBound(),
Long.MAX_VALUE,
Longs.toArray(txVisibilityState.getInvalid()),
Longs.toArray(txVisibilityState.getInProgress().keySet()),
TxUtils.getFirstShortInProgress(txVisibilityState.getInProgress()), TransactionType.SHORT);
TxUtils.getFirstShortInProgress(txVisibilityState.getInProgress()), TransactionType.SHORT,
new long[0], Transaction.VisibilityLevel.SNAPSHOT);
}

/**
Expand Down
35 changes: 35 additions & 0 deletions tephra-core/src/test/java/org/apache/tephra/util/TxUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@
package org.apache.tephra.util;

import org.apache.tephra.Transaction;
import org.apache.tephra.TxConstants;
import org.junit.Assert;
import org.junit.Test;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;

/**
Expand Down Expand Up @@ -54,4 +59,34 @@ public void testPruneUpperBound() {
tx = new Transaction(100, 110, new long[] {}, new long[] {}, Transaction.NO_TX_IN_PROGRESS);
Assert.assertEquals(99, TxUtils.getPruneUpperBound(tx));
}

@Test
public void testTTL() {
long txIdsPerSecond = 1000 * TxConstants.MAX_TX_PER_MS;
byte[] family = new byte[] { 'd' };
long currentTxTimeSeconds = 100;
Transaction tx = new Transaction(100 * txIdsPerSecond, currentTxTimeSeconds * txIdsPerSecond,
new long[] {10 * txIdsPerSecond, 30 * txIdsPerSecond},
new long[] {80 * txIdsPerSecond, 90 * txIdsPerSecond},
80 * txIdsPerSecond);
int ttlSeconds = 60;
Map<byte[], Long> ttlByFamily = Collections.singletonMap(family, ttlSeconds * 1000L);
// ttl should only be impacted by the current transaction's id, and not by any older, in-progress transactions
Assert.assertEquals((currentTxTimeSeconds - ttlSeconds) * txIdsPerSecond,
TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx));
}

@Test
public void testLargeTTL() {
long txIdsPerSecond = 1000 * TxConstants.MAX_TX_PER_MS;
byte[] family = new byte[] { 'd' };
long currentTxTimeSeconds = 100;
Transaction tx = new Transaction(100 * txIdsPerSecond, currentTxTimeSeconds * txIdsPerSecond,
new long[] { }, new long[] { }, 100);
// ~100 years, so that the computed start timestamp is prior to 0 (epoch)
long ttlSeconds = TimeUnit.DAYS.toSeconds(365 * 100);
Map<byte[], Long> ttlByFamily = Collections.singletonMap(family, ttlSeconds * 1000L);
// oldest visible timestamp should be 0, not negative, because HBase timestamps can not be negative
Assert.assertEquals(0, TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,16 @@ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType, @Nullable Filter cellFilter) {
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType, @Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(ttlEntry.getKey(),
familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
// we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp
// in #filterKeyValue, using TxUtils#getTimestampForTTL(long)
TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false));
}
this.allowEmptyValues = allowEmptyValues;
this.clearDeletes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
public static TemporaryFolder tmpFolder = new TemporaryFolder();

private static MiniDFSCluster dfsCluster;

private static final class TestBytes {
private static final byte[] table = Bytes.toBytes("testtable");
private static final byte[] family = Bytes.toBytes("f1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,9 @@ public void testTTLFiltering() throws Exception {
ttls.put(FAM2, 30L);
ttls.put(FAM3, 0L);

Transaction tx = txManager.startShort();
long now = tx.getVisibilityUpperBound();
long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
// we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out
Transaction tx = new Transaction(now, now, new long[0], new long[0], now);
Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,15 @@ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType, @Nullable Filter cellFilter) {
ScanType scanType, @Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(ttlEntry.getKey(),
familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
// we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp
// in #filterKeyValue, using TxUtils#getTimestampForTTL(long)
TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false));
}
this.allowEmptyValues = allowEmptyValues;
this.clearDeletes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,9 @@ public void testTTLFiltering() throws Exception {
ttls.put(FAM2, 30L);
ttls.put(FAM3, 0L);

Transaction tx = txManager.startShort();
long now = tx.getVisibilityUpperBound();
long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
// we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out
Transaction tx = new Transaction(now, now, new long[0], new long[0], now);
Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,15 @@ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType, @Nullable Filter cellFilter) {
ScanType scanType, @Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap();
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()),
familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
// we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp
// in #filterKeyValue, using TxUtils#getTimestampForTTL(long)
TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false));
}
this.allowEmptyValues = allowEmptyValues;
this.clearDeletes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,9 @@ public void testTTLFiltering() throws Exception {
ttls.put(FAM2, 30L);
ttls.put(FAM3, 0L);

Transaction tx = txManager.startShort();
long now = tx.getVisibilityUpperBound();
long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
// we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out
Transaction tx = new Transaction(now, now, new long[0], new long[0], now);
Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,15 @@ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType, @Nullable Filter cellFilter) {
ScanType scanType, @Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap();
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()),
familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
// we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp
// in #filterKeyValue, using TxUtils#getTimestampForTTL(long)
TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false));
}
this.allowEmptyValues = allowEmptyValues;
this.clearDeletes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
public static TemporaryFolder tmpFolder = new TemporaryFolder();

private static MiniDFSCluster dfsCluster;

private static final class TestBytes {
private static final byte[] table = Bytes.toBytes("testtable");
private static final byte[] family = Bytes.toBytes("f1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,9 @@ public void testTTLFiltering() throws Exception {
ttls.put(FAM2, 30L);
ttls.put(FAM3, 0L);

Transaction tx = txManager.startShort();
long now = tx.getVisibilityUpperBound();
long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
// we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out
Transaction tx = new Transaction(now, now, new long[0], new long[0], now);
Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class TransactionVisibilityFilter extends FilterBase {
* @param scanType the type of scan operation being performed
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
ScanType scanType) {
this(tx, ttlByFamily, allowEmptyValues, scanType, null);
}

Expand All @@ -86,14 +86,16 @@ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType, @Nullable Filter cellFilter) {
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType, @Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap();
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()),
familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
// we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp
// in #filterKeyValue, using TxUtils#getTimestampForTTL(long)
TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false));
}
this.allowEmptyValues = allowEmptyValues;
this.clearDeletes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,6 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
public static TemporaryFolder tmpFolder = new TemporaryFolder();

private static MiniDFSCluster dfsCluster;

public static void tearDownAfterClass() throws Exception {
dfsCluster.shutdown();
}

private static final class TestBytes {
private static final byte[] table = Bytes.toBytes("testtable");
Expand Down Expand Up @@ -190,6 +186,9 @@ public static void shutdownAfterClass() throws Exception {
if (txManager != null) {
txManager.stopAndWait();
}
if (dfsCluster != null) {
dfsCluster.shutdown();
}
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,9 @@ public void testTTLFiltering() throws Exception {
ttls.put(FAM2, 30L);
ttls.put(FAM3, 0L);

Transaction tx = txManager.startShort();
long now = tx.getVisibilityUpperBound();
long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
// we explicitly set the readPointer to 'now', because if you set it to an older value, it can filter values out
Transaction tx = new Transaction(now, now, new long[0], new long[0], now);
Filter filter = new TransactionVisibilityFilter(tx, ttls, false, ScanType.USER_SCAN);
assertEquals(Filter.ReturnCode.INCLUDE_AND_NEXT_COL,
filter.filterKeyValue(newKeyValue("row1", FAM, "val1", now)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class TransactionVisibilityFilter extends FilterBase {
* @param scanType the type of scan operation being performed
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType) {
ScanType scanType) {
this(tx, ttlByFamily, allowEmptyValues, scanType, null);
}

Expand All @@ -86,14 +86,16 @@ public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily
* calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then
* {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead.
*/
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType, @Nullable Filter cellFilter) {
public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues,
ScanType scanType, @Nullable Filter cellFilter) {
this.tx = tx;
this.oldestTsByFamily = Maps.newTreeMap();
for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) {
long familyTTL = ttlEntry.getValue();
oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()),
familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS);
// we pass false for 'readNonTxData'. For non transactional data, we scale the cell timestamp
// in #filterKeyValue, using TxUtils#getTimestampForTTL(long)
TxUtils.getOldestVisibleTimestamp(familyTTL, tx, false));
}
this.allowEmptyValues = allowEmptyValues;
this.clearDeletes =
Expand Down
Loading