Skip to content

Commit 9ec55bb

Browse files
feat: shedlock over jdbc
1 parent 7682a1d commit 9ec55bb

File tree

5 files changed

+117
-152
lines changed

5 files changed

+117
-152
lines changed

shedlock-ydb/pom.xml

+3-13
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@
3838

3939
<junit5.version>5.9.3</junit5.version>
4040
<log4j2.version>2.17.2</log4j2.version>
41-
<ydb.sdk.version>2.2.6</ydb.sdk.version>
42-
<ydb.jdbc.version>2.2.2</ydb.jdbc.version>
41+
<ydb.sdk.version>2.3.6</ydb.sdk.version>
42+
<ydb.jdbc.version>2.3.5</ydb.jdbc.version>
4343
<spring.boot.version>3.2.3</spring.boot.version>
4444
<shedlock-spring.version>5.15.0</shedlock-spring.version>
4545
</properties>
@@ -53,13 +53,6 @@
5353

5454
<dependencyManagement>
5555
<dependencies>
56-
<dependency>
57-
<groupId>tech.ydb</groupId>
58-
<artifactId>ydb-sdk-bom</artifactId>
59-
<version>${ydb.sdk.version}</version>
60-
<type>pom</type>
61-
<scope>import</scope>
62-
</dependency>
6356
<dependency>
6457
<groupId>org.springframework.boot</groupId>
6558
<artifactId>spring-boot-dependencies</artifactId>
@@ -71,10 +64,6 @@
7164
</dependencyManagement>
7265

7366
<dependencies>
74-
<dependency>
75-
<groupId>tech.ydb</groupId>
76-
<artifactId>ydb-sdk-coordination</artifactId>
77-
</dependency>
7867
<dependency>
7968
<groupId>net.javacrumbs.shedlock</groupId>
8069
<artifactId>shedlock-spring</artifactId>
@@ -101,6 +90,7 @@
10190
<dependency>
10291
<groupId>tech.ydb.test</groupId>
10392
<artifactId>ydb-junit5-support</artifactId>
93+
<version>${ydb.sdk.version}</version>
10494
<scope>test</scope>
10595
</dependency>
10696
<dependency>

shedlock-ydb/src/main/java/tech/ydb/lock/provider/YdbCoordinationServiceLockProvider.java

-127
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package tech.ydb.lock.provider;
2+
3+
import java.sql.SQLException;
4+
import java.util.Optional;
5+
import javax.sql.DataSource;
6+
import net.javacrumbs.shedlock.core.LockConfiguration;
7+
import net.javacrumbs.shedlock.core.LockProvider;
8+
import net.javacrumbs.shedlock.core.SimpleLock;
9+
import net.javacrumbs.shedlock.support.Utils;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
/**
14+
* @author Kirill Kurdyukov
15+
*/
16+
public class YdbJDBCLockProvider implements LockProvider {
17+
private static final Logger LOGGER = LoggerFactory.getLogger(YdbJDBCLockProvider.class);
18+
private static final String LOCKED_BY = "Hostname=" + Utils.getHostname() + ", " +
19+
"Current PID=" + ProcessHandle.current().pid();
20+
21+
private final DataSource dataSource;
22+
23+
public YdbJDBCLockProvider(DataSource dataSource) {
24+
this.dataSource = dataSource;
25+
}
26+
27+
@Override
28+
public Optional<SimpleLock> lock(LockConfiguration lockConfiguration) {
29+
try (var connection = dataSource.getConnection()) {
30+
var autoCommit = connection.getAutoCommit();
31+
try {
32+
connection.setAutoCommit(false);
33+
34+
var selectPS = connection.prepareStatement("SELECT lock_until, locked_by FROM shedlock " +
35+
"WHERE name = ? AND lock_until > CurrentUtcTimestamp() + ?");
36+
37+
selectPS.setString(1, lockConfiguration.getName());
38+
selectPS.setObject(2, lockConfiguration.getLockAtMostFor());
39+
40+
var haveLeader = false;
41+
try (var rs = selectPS.executeQuery()) {
42+
haveLeader = rs.next();
43+
}
44+
45+
if (haveLeader) {
46+
return Optional.empty();
47+
}
48+
49+
var upsertPS = connection.prepareStatement("" +
50+
"UPSERT INTO shedlock(name, lock_until, locked_at, locked_by) " +
51+
"VALUES (?, Unwrap(CurrentUtcTimestamp() + ?), CurrentUtcTimestamp(), ?)"
52+
);
53+
54+
upsertPS.setObject(1, lockConfiguration.getName());
55+
upsertPS.setObject(2, lockConfiguration.getLockAtMostFor());
56+
upsertPS.setObject(3, LOCKED_BY);
57+
upsertPS.execute();
58+
59+
connection.commit();
60+
61+
LOGGER.debug("Instance[{}] is leader", LOCKED_BY);
62+
63+
return Optional.of(new YdbJDBCLock(lockConfiguration.getName(), dataSource));
64+
} finally {
65+
connection.setAutoCommit(autoCommit);
66+
}
67+
} catch (SQLException e) {
68+
LOGGER.debug(String.format("Instance[{%s}] acquire lock is failed", LOCKED_BY), e);
69+
70+
return Optional.empty();
71+
}
72+
}
73+
74+
private record YdbJDBCLock(String name, DataSource dataSource) implements SimpleLock {
75+
@Override
76+
public void unlock() {
77+
try (var connection = dataSource.getConnection()) {
78+
var ps = connection.prepareStatement(
79+
"UPDATE shedlock SET lock_until = CurrentUtcTimestamp() WHERE name = ?");
80+
ps.setObject(1, name);
81+
ps.execute();
82+
} catch (SQLException e) {
83+
LOGGER.error(String.format("Instance[{%s}] release lock is failed", LOCKED_BY), e);
84+
85+
throw new RuntimeException(e);
86+
}
87+
}
88+
}
89+
}
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
package tech.ydb.lock.provider;
22

3-
import java.sql.SQLException;
43
import javax.sql.DataSource;
4+
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
55
import org.springframework.context.annotation.Bean;
66
import org.springframework.context.annotation.Configuration;
7-
import tech.ydb.jdbc.YdbConnection;
87

98
/**
109
* @author Kirill Kurdyukov
@@ -13,11 +12,8 @@
1312
@Configuration
1413
public class YdbLockProviderConfiguration {
1514
@Bean
16-
public YdbCoordinationServiceLockProvider ydbLockProvider(DataSource dataSource) throws SQLException {
17-
var provider = new YdbCoordinationServiceLockProvider(dataSource.getConnection().unwrap(YdbConnection.class));
18-
19-
provider.init();
20-
21-
return provider;
15+
@ConditionalOnBean(DataSource.class)
16+
public YdbJDBCLockProvider ydbLockProvider(DataSource dataSource) {
17+
return new YdbJDBCLockProvider(dataSource);
2218
}
2319
}

shedlock-ydb/src/test/java/tech/ydb/lock/provider/YdbLockProviderTest.java

+21-4
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package tech.ydb.lock.provider;
22

3+
import java.sql.SQLException;
34
import java.time.Duration;
45
import java.time.Instant;
56
import java.util.Optional;
67
import java.util.concurrent.ExecutionException;
78
import java.util.concurrent.Executors;
89
import java.util.concurrent.atomic.AtomicBoolean;
910
import java.util.concurrent.atomic.AtomicInteger;
11+
import javax.sql.DataSource;
1012
import net.javacrumbs.shedlock.core.LockConfiguration;
1113
import net.javacrumbs.shedlock.core.SimpleLock;
1214
import org.junit.jupiter.api.Assertions;
@@ -46,10 +48,25 @@ private static String jdbcUrl() {
4648
}
4749

4850
@Autowired
49-
private YdbCoordinationServiceLockProvider lockProvider;
51+
private YdbJDBCLockProvider lockProvider;
52+
53+
@Autowired
54+
private DataSource dataSource;
5055

5156
@Test
52-
public void integrationTest() throws ExecutionException, InterruptedException {
57+
public void integrationTest() throws ExecutionException, InterruptedException, SQLException {
58+
try (var connection = dataSource.getConnection()) {
59+
var statement = connection.createStatement();
60+
statement.execute(
61+
"CREATE TABLE shedlock(" +
62+
"name TEXT NOT NULL, " +
63+
"lock_until TIMESTAMP NOT NULL," +
64+
"locked_at TIMESTAMP NOT NULL," +
65+
"locked_by TEXT NOT NULL, " +
66+
"PRIMARY KEY (name)" +
67+
");");
68+
}
69+
5370
var executorServer = Executors.newFixedThreadPool(10);
5471
var atomicInt = new AtomicInteger();
5572
var locked = new AtomicBoolean();
@@ -59,8 +76,8 @@ public void integrationTest() throws ExecutionException, InterruptedException {
5976
Optional<SimpleLock> optinal = Optional.empty();
6077

6178
while (optinal.isEmpty()) {
62-
optinal = lockProvider.lock(
63-
new LockConfiguration(Instant.now(), "semaphore", Duration.ZERO, Duration.ZERO));
79+
optinal = lockProvider.lock(new LockConfiguration(
80+
Instant.now(), "semaphore", Duration.ofSeconds(10), Duration.ZERO));
6481

6582
optinal.ifPresent(simpleLock -> {
6683
if (locked.get()) {

0 commit comments

Comments
 (0)