Skip to content

Commit a857fe4

Browse files
committed
Kafka to Postgres test
1 parent 33dd523 commit a857fe4

File tree

5 files changed

+72
-24
lines changed

5 files changed

+72
-24
lines changed

README.MD

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
# Prerequisite
2-
Install and run zookeeper and kafka server https://kafka.apache.org/quickstart
1+
This project involves Kafka consumer and producer, and Postgresql database and a rest-service
32

4-
# Run
5-
Go to http://localhost:9011/kafka/publish?message="hello" to produce and consume message
3+
The tests demonstrate:
4+
1. Calling a rest service that adds a message to a (embedded) Kafka topic, where the message consumer writes to a postgresql database
5+
2. Multithreaded usage of postgresql with Spring Data and JPA and locking of rows.
66

7-
8-
Based on https://www.confluent.io/blog/apache-kafka-spring-boot-application/
7+
# TODO:
8+
Simulate Kafka calls that result in errors in multithreaded environment to test different kinds of kafka settings and guarentees (e.g. exactly once vs at least once processing)

src/main/java/no/ainiq/kafkademo/app/HobbyNotificationService.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,26 @@ public HobbyNotificationService(HobbyUserRepository repository) {
2020
}
2121

2222
@Transactional
23-
public void notify(String name) {
23+
public void notifyUser(String name) {
2424
HobbyUser user = repository.findByNameAndStatus(name, HobbyUser.NotificationStatus.READY.name());
2525
if (user == null) {
2626
LOGGER.info("User with name {} already sent or failed", name);
2727
return;
2828
}
29+
notify(user);
30+
}
31+
32+
@Transactional
33+
public void notifyNextUser() {
34+
HobbyUser user = repository.findReady();
35+
if (user == null) {
36+
LOGGER.info("no users left to process");
37+
return;
38+
}
39+
notify(user);
40+
}
41+
42+
private void notify(HobbyUser user) {
2943
try {
3044
sendNotification(user);
3145
user.success();

src/main/java/no/ainiq/kafkademo/app/HobbyUser.java

+11
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,17 @@ public void failed() {
5959

6060
}
6161

62+
@Override
63+
public String toString() {
64+
return "HobbyUser{" +
65+
"id=" + id +
66+
", name='" + name + '\'' +
67+
", hobbies=" + hobbies +
68+
", status=" + status +
69+
", tries=" + tries +
70+
'}';
71+
}
72+
6273
public enum NotificationStatus {
6374
READY, SENT, FAILED
6475
}

src/main/java/no/ainiq/kafkademo/app/repository/HobbyUserRepository.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,16 @@
44
import org.springframework.data.jpa.repository.Query;
55
import org.springframework.data.repository.CrudRepository;
66

7+
import java.util.List;
8+
79
public interface HobbyUserRepository extends CrudRepository<HobbyUser, Long> {
810
HobbyUser findByName(String name);
911

10-
//@Lock(LockModeType.PESSIMISTIC_READ)
1112
@Query(value = "select * from hobbyuser where name=?1 and status = ?2 for update", nativeQuery = true)
1213
HobbyUser findByNameAndStatus(String name, String status);
14+
15+
@Query(value = "select * from hobbyuser where status = 'READY' for update skip locked limit 1", nativeQuery = true)
16+
HobbyUser findReady();
17+
18+
List<HobbyUser> findByStatus(HobbyUser.NotificationStatus status);
1319
}

src/test/java/no/ainiq/kafkademo/ConcurrentPostgresTest.java

+33-16
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import no.ainiq.kafkademo.app.HobbyNotificationService;
55
import no.ainiq.kafkademo.app.HobbyUser;
66
import no.ainiq.kafkademo.app.repository.HobbyUserRepository;
7+
import org.junit.jupiter.api.DisplayName;
78
import org.junit.jupiter.api.Test;
89
import org.springframework.beans.factory.annotation.Autowired;
910
import org.springframework.boot.test.context.SpringBootTest;
@@ -16,9 +17,8 @@
1617
import java.util.concurrent.Callable;
1718
import java.util.concurrent.ExecutorService;
1819
import java.util.concurrent.Executors;
19-
import java.util.concurrent.TimeUnit;
2020

21-
import static org.awaitility.Awaitility.await;
21+
import static org.assertj.core.api.Assertions.assertThat;
2222

2323
@SpringBootTest(value = {
2424
"kafka-enabled=false",
@@ -44,27 +44,44 @@ public class ConcurrentPostgresTest {
4444
@Autowired
4545
HobbyNotificationService service;
4646

47-
@Test
48-
void notification() throws InterruptedException {
47+
@Test()
48+
@DisplayName("4 threads try to notify each user simultaneously")
49+
void test_for_update() throws InterruptedException {
4950
List<String> names = names(10);
5051
names.forEach(n -> repository.save(HobbyUser.newInstance(n)));
5152
ExecutorService executorService = Executors.newFixedThreadPool(4);
5253

5354
List<Callable<Object>> callables = new ArrayList<>();
54-
callables.add(Executors.callable(() -> names.forEach(service::notify)));
55-
callables.add(Executors.callable(() -> names.forEach(service::notify)));
56-
callables.add(Executors.callable(() -> names.forEach(service::notify)));
57-
callables.add(Executors.callable(() -> names.forEach(service::notify)));
55+
callables.add(Executors.callable(() -> names.forEach(service::notifyUser)));
56+
callables.add(Executors.callable(() -> names.forEach(service::notifyUser)));
57+
callables.add(Executors.callable(() -> names.forEach(service::notifyUser)));
58+
callables.add(Executors.callable(() -> names.forEach(service::notifyUser)));
5859

5960
executorService.invokeAll(callables);
60-
61-
System.out.println("STARTED!!!");
62-
await().atMost(10, TimeUnit.SECONDS)
63-
.pollInterval(3, TimeUnit.SECONDS)
64-
.until(() -> {
65-
HobbyUser lastUser = repository.findByName(names.get(names.size() - 1));
66-
return lastUser.getTries() == 3 || lastUser.getStatus() == HobbyUser.NotificationStatus.SENT;
67-
});
61+
62+
System.out.println("RESULTS:");
63+
repository.findAll().forEach(System.out::println);
64+
65+
assertThat(repository.findByStatus(HobbyUser.NotificationStatus.READY)).isEmpty();
66+
}
67+
68+
@Test
69+
@DisplayName("4 threads notify 1 ready user at a time - skipping locked users by other threads")
70+
void test_skip_locked() throws InterruptedException {
71+
List<String> names = names(10);
72+
names.forEach(n -> repository.save(HobbyUser.newInstance(n)));
73+
ExecutorService executorService = Executors.newFixedThreadPool(4);
74+
75+
List<Callable<Object>> callables = new ArrayList<>();
76+
for (int i = 0; i < 40; i++) {
77+
callables.add(Executors.callable(() -> service.notifyNextUser()));
78+
}
79+
executorService.invokeAll(callables);
80+
81+
System.out.println("RESULTS:");
82+
repository.findAll().forEach(System.out::println);
83+
84+
assertThat(repository.findByStatus(HobbyUser.NotificationStatus.READY)).isEmpty();
6885

6986
}
7087

0 commit comments

Comments
 (0)