Skip to content

Commit 669f25e

Browse files
feat update
1 parent 8604d5c commit 669f25e

File tree

2 files changed

+50
-19
lines changed

2 files changed

+50
-19
lines changed

shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbJDBCLockProvider.java

+31-14
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,17 @@ public Optional<SimpleLock> lock(LockConfiguration lockConfiguration) {
3131
try {
3232
connection.setAutoCommit(false);
3333

34-
var selectPS = connection.prepareStatement("SELECT lock_until, locked_by FROM shedlock " +
35-
"WHERE name = ? AND lock_until > CurrentUtcTimestamp() + ?");
34+
var selectPS = connection.prepareStatement("SELECT locked_by, lock_until FROM shedlock " +
35+
"WHERE name = ? AND lock_until > CurrentUtcTimestamp()");
3636

3737
selectPS.setString(1, lockConfiguration.getName());
38-
selectPS.setObject(2, lockConfiguration.getLockAtMostFor());
3938

40-
var haveLeader = false;
4139
try (var rs = selectPS.executeQuery()) {
42-
haveLeader = rs.next();
43-
}
44-
45-
if (haveLeader) {
46-
return Optional.empty();
40+
if (rs.next()) {
41+
LOGGER.debug("Instance[{}] acquire lock is failed. Leader is {}, lock_until = {}",
42+
LOCKED_BY, rs.getString(1), rs.getString(2));
43+
return Optional.empty();
44+
}
4745
}
4846

4947
var upsertPS = connection.prepareStatement("" +
@@ -65,31 +63,50 @@ public Optional<SimpleLock> lock(LockConfiguration lockConfiguration) {
6563
connection.setAutoCommit(autoCommit);
6664
}
6765
} catch (SQLException e) {
68-
LOGGER.debug(String.format("Instance[{%s}] acquire lock is failed", LOCKED_BY), e);
66+
LOGGER.debug("Instance[{}] acquire lock is failed", LOCKED_BY);
6967

7068
return Optional.empty();
7169
}
7270
}
7371

7472
private record YdbJDBCLock(String name, DataSource dataSource) implements SimpleLock {
73+
private static final int ATTEMPT_RELEASE_LOCK = 10;
74+
7575
@Override
7676
public void unlock() {
77+
for (int i = 0; i < ATTEMPT_RELEASE_LOCK; i++) {
78+
try {
79+
LOGGER.debug("Instance[{}] trying unlock..", LOCKED_BY);
80+
81+
doUnlock();
82+
83+
return;
84+
} catch (SQLException e) {
85+
if (i == ATTEMPT_RELEASE_LOCK - 1) {
86+
throw new RuntimeException(e);
87+
}
88+
}
89+
}
90+
}
91+
92+
private void doUnlock() throws SQLException {
7793
try (var connection = dataSource.getConnection()) {
7894
var autoCommit = connection.getAutoCommit();
7995

8096
try {
8197
connection.setAutoCommit(true);
8298
var ps = connection.prepareStatement(
83-
"UPDATE shedlock SET lock_until = CurrentUtcTimestamp() WHERE name = ?");
84-
ps.setObject(1, name);
99+
"UPDATE shedlock SET lock_until = CurrentUtcTimestamp() WHERE name = ? and locked_by = ?");
100+
ps.setString(1, name);
101+
ps.setString(2, LOCKED_BY);
85102
ps.execute();
86103
} finally {
87104
connection.setAutoCommit(autoCommit);
88105
}
89106
} catch (SQLException e) {
90-
LOGGER.error(String.format("Instance[{%s}] release lock is failed", LOCKED_BY), e);
107+
LOGGER.debug(String.format("Instance[{%s}] release lock is failed", LOCKED_BY), e);
91108

92-
throw new RuntimeException(e);
109+
throw e;
93110
}
94111
}
95112
}

shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java

+19-5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.util.concurrent.ExecutionException;
99
import java.util.concurrent.Executors;
1010
import java.util.concurrent.Future;
11+
import java.util.concurrent.ThreadLocalRandom;
1112
import java.util.concurrent.atomic.AtomicBoolean;
1213
import java.util.concurrent.atomic.AtomicInteger;
1314
import javax.sql.DataSource;
@@ -16,6 +17,8 @@
1617
import org.junit.jupiter.api.Assertions;
1718
import org.junit.jupiter.api.Test;
1819
import org.junit.jupiter.api.extension.RegisterExtension;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
1922
import org.springframework.beans.factory.annotation.Autowired;
2023
import org.springframework.boot.test.context.SpringBootTest;
2124
import org.springframework.test.context.DynamicPropertyRegistry;
@@ -27,6 +30,7 @@
2730
*/
2831
@SpringBootTest(classes = TestApp.class)
2932
public class YdbLockProviderTest {
33+
private static final Logger logger = LoggerFactory.getLogger(YdbLockProviderTest.class);
3034

3135
@RegisterExtension
3236
private static final YdbHelperExtension ydb = new YdbHelperExtension();
@@ -74,19 +78,20 @@ public void integrationTest() throws ExecutionException, InterruptedException, S
7478
var atomicInt = new AtomicInteger();
7579
var locked = new AtomicBoolean();
7680

77-
for (int i = 0; i < 10; i++) {
81+
for (int i = 0; i < 100; i++) {
7882
lockFutures.add(executorServer.submit(() -> {
7983
Optional<SimpleLock> optinal = Optional.empty();
8084

8185
while (optinal.isEmpty()) {
8286
optinal = lockProvider.lock(new LockConfiguration(
83-
Instant.now(), "semaphore", Duration.ofSeconds(10), Duration.ZERO));
87+
Instant.now(), "semaphore", Duration.ofSeconds(100), Duration.ZERO));
8488

8589
optinal.ifPresent(simpleLock -> {
86-
if (locked.get()) {
90+
if (locked.compareAndExchange(false, true)) {
91+
logger.debug("Failed test! System has two leaders");
92+
8793
throw new RuntimeException();
8894
}
89-
locked.set(true);
9095

9196
try {
9297
Thread.sleep(100);
@@ -96,8 +101,17 @@ public void integrationTest() throws ExecutionException, InterruptedException, S
96101

97102
atomicInt.addAndGet(50);
98103
locked.set(false);
104+
105+
logger.info("Leader does UNLOCK!");
106+
99107
simpleLock.unlock();
100108
});
109+
110+
try {
111+
Thread.sleep(ThreadLocalRandom.current().nextInt(3_000));
112+
} catch (InterruptedException e) {
113+
throw new RuntimeException(e);
114+
}
101115
}
102116
}));
103117
}
@@ -106,6 +120,6 @@ public void integrationTest() throws ExecutionException, InterruptedException, S
106120
future.get();
107121
}
108122

109-
Assertions.assertEquals(4950, atomicInt.get());
123+
Assertions.assertEquals(5000, atomicInt.get());
110124
}
111125
}

0 commit comments

Comments
 (0)