/
ExampleBasicConfiguration.java
116 lines (101 loc) · 5.59 KB
/
ExampleBasicConfiguration.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package ru.yoomoney.tech.dbqueue.test;
import org.junit.Assert;
import org.junit.Test;
import ru.yoomoney.tech.dbqueue.api.EnqueueParams;
import ru.yoomoney.tech.dbqueue.api.QueueProducer;
import ru.yoomoney.tech.dbqueue.api.impl.MonitoringQueueProducer;
import ru.yoomoney.tech.dbqueue.api.impl.NoopPayloadTransformer;
import ru.yoomoney.tech.dbqueue.api.impl.ShardingQueueProducer;
import ru.yoomoney.tech.dbqueue.api.impl.SingleQueueShardRouter;
import ru.yoomoney.tech.dbqueue.config.DatabaseDialect;
import ru.yoomoney.tech.dbqueue.config.QueueService;
import ru.yoomoney.tech.dbqueue.config.QueueShard;
import ru.yoomoney.tech.dbqueue.config.QueueShardId;
import ru.yoomoney.tech.dbqueue.config.QueueTableSchema;
import ru.yoomoney.tech.dbqueue.config.impl.LoggingTaskLifecycleListener;
import ru.yoomoney.tech.dbqueue.config.impl.LoggingThreadLifecycleListener;
import ru.yoomoney.tech.dbqueue.settings.ExtSettings;
import ru.yoomoney.tech.dbqueue.settings.FailRetryType;
import ru.yoomoney.tech.dbqueue.settings.FailureSettings;
import ru.yoomoney.tech.dbqueue.settings.PollSettings;
import ru.yoomoney.tech.dbqueue.settings.ProcessingMode;
import ru.yoomoney.tech.dbqueue.settings.ProcessingSettings;
import ru.yoomoney.tech.dbqueue.settings.QueueConfig;
import ru.yoomoney.tech.dbqueue.settings.QueueId;
import ru.yoomoney.tech.dbqueue.settings.QueueLocation;
import ru.yoomoney.tech.dbqueue.settings.QueueSettings;
import ru.yoomoney.tech.dbqueue.settings.ReenqueueRetryType;
import ru.yoomoney.tech.dbqueue.settings.ReenqueueSettings;
import ru.yoomoney.tech.dbqueue.spring.dao.SpringDatabaseAccessLayer;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.Thread.sleep;
import static java.util.Collections.singletonList;
import static org.hamcrest.CoreMatchers.equalTo;
/**
* @author Oleg Kandaurov
* @since 14.08.2017
*/
public class ExampleBasicConfiguration {
public static final String PG_DEFAULT_TABLE_DDL = "CREATE TABLE %s (\n" +
" id BIGSERIAL PRIMARY KEY,\n" +
" queue_name TEXT NOT NULL,\n" +
" payload TEXT,\n" +
" created_at TIMESTAMP WITH TIME ZONE DEFAULT now(),\n" +
" next_process_at TIMESTAMP WITH TIME ZONE DEFAULT now(),\n" +
" attempt INTEGER DEFAULT 0,\n" +
" reenqueue_attempt INTEGER DEFAULT 0,\n" +
" total_attempt INTEGER DEFAULT 0\n" +
");" +
"CREATE INDEX %s_name_time_desc_idx\n" +
" ON %s (queue_name, next_process_at, id DESC);\n" +
"\n";
@Test
public void example_config() throws InterruptedException {
AtomicInteger taskConsumedCount = new AtomicInteger(0);
DefaultDatabaseInitializer.createTable(PG_DEFAULT_TABLE_DDL, "example_task_table");
SpringDatabaseAccessLayer databaseAccessLayer = new SpringDatabaseAccessLayer(
DatabaseDialect.POSTGRESQL, QueueTableSchema.builder().build(),
DefaultDatabaseInitializer.getJdbcTemplate(),
DefaultDatabaseInitializer.getTransactionTemplate());
QueueShard<SpringDatabaseAccessLayer> shard = new QueueShard<>(new QueueShardId("main"), databaseAccessLayer);
QueueId queueId = new QueueId("example_queue");
QueueSettings queueSettings = QueueSettings.builder()
.withProcessingSettings(ProcessingSettings.builder()
.withProcessingMode(ProcessingMode.SEPARATE_TRANSACTIONS)
.withThreadCount(1).build())
.withPollSettings(PollSettings.builder()
.withBetweenTaskTimeout(Duration.ofMillis(100))
.withNoTaskTimeout(Duration.ofMillis(100))
.withFatalCrashTimeout(Duration.ofSeconds(1)).build())
.withFailureSettings(FailureSettings.builder()
.withRetryType(FailRetryType.GEOMETRIC_BACKOFF)
.withRetryInterval(Duration.ofMinutes(1)).build())
.withReenqueueSettings(ReenqueueSettings.builder()
.withRetryType(ReenqueueRetryType.MANUAL).build())
.withExtSettings(ExtSettings.builder().withSettings(new LinkedHashMap<>()).build())
.build();
QueueConfig config = new QueueConfig(QueueLocation.builder().withTableName("example_task_table")
.withQueueId(queueId).build(), queueSettings);
ShardingQueueProducer<String, SpringDatabaseAccessLayer> shardingQueueProducer = new ShardingQueueProducer<>(
config, NoopPayloadTransformer.getInstance(), new SingleQueueShardRouter<>(shard));
QueueProducer<String> producer = new MonitoringQueueProducer<>(shardingQueueProducer, queueId);
StringQueueConsumer consumer = new StringQueueConsumer(config, taskConsumedCount);
QueueService queueService = new QueueService(singletonList(shard),
new LoggingThreadLifecycleListener(),
new LoggingTaskLifecycleListener());
queueService.registerQueue(consumer);
queueService.start();
producer.enqueue(EnqueueParams.create("example task"));
sleep(500);
queueService.pause();
producer.enqueue(EnqueueParams.create("example task"));
sleep(500);
queueService.unpause();
sleep(500);
queueService.shutdown();
queueService.awaitTermination(Duration.ofSeconds(10));
Assert.assertThat(taskConsumedCount.get(), equalTo(2));
}
}