Skip to content

Commit 4f3dcc5

Browse files
authored
Merge pull request #538 from weaviate/feat/async-replication-config
Async replication config for collections
2 parents 67584cf + 65750f4 commit 4f3dcc5

3 files changed

Lines changed: 215 additions & 3 deletions

File tree

src/it/java/io/weaviate/integration/CollectionsITest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.weaviate.client6.v1.api.collections.Quantization;
1919
import io.weaviate.client6.v1.api.collections.ReferenceProperty;
2020
import io.weaviate.client6.v1.api.collections.Replication;
21+
import io.weaviate.client6.v1.api.collections.Replication.AsyncReplicationConfig;
2122
import io.weaviate.client6.v1.api.collections.VectorConfig;
2223
import io.weaviate.client6.v1.api.collections.config.Shard;
2324
import io.weaviate.client6.v1.api.collections.config.ShardStatus;
@@ -328,4 +329,53 @@ public void test_objectTtl() throws IOException {
328329
.extracting(CollectionConfig::objectTtl).isNotNull()
329330
.returns(false, ObjectTtl::enabled);
330331
}
332+
333+
@Test
334+
public void test_asyncReplicationConfig() throws IOException {
335+
Weaviate.Version.latest().orSkip();
336+
337+
// Arrange
338+
var nsThings = ns("Things");
339+
340+
// Act
341+
var things = client.collections.create(nsThings,
342+
c -> c.replication(Replication.of(
343+
repl -> repl
344+
.asyncEnabled(true)
345+
.asyncReplication(AsyncReplicationConfig.of(
346+
async -> async
347+
.hashTreeHeight(1)
348+
.maxWorkers(2)
349+
.frequencyMillis(3)
350+
.frequencyMillisWhilePropagating(4)
351+
.aliveNodesCheckingFrequencyMillis(5)
352+
.loggingFrequencySeconds(6)
353+
.diffBatchSize(7)
354+
.diffPerNodeTimeoutSeconds(8)
355+
.prePropagationTimeoutSeconds(9)
356+
.propagationTimeoutSeconds(10)
357+
.propagationDelayMillis(11)
358+
.propagationLimit(12)
359+
.propagationConcurrency(13)
360+
.propagationBatchSize(14))))));
361+
362+
// Assert
363+
Assertions.assertThat(things.config.get()).get()
364+
.extracting(CollectionConfig::replication)
365+
.extracting(Replication::asyncReplicationConfig)
366+
.returns(1, AsyncReplicationConfig::hashTreeHeight)
367+
.returns(2, AsyncReplicationConfig::maxWorkers)
368+
.returns(3, AsyncReplicationConfig::frequencyMillis)
369+
.returns(4, AsyncReplicationConfig::frequencyMillisWhilePropagating)
370+
.returns(5, AsyncReplicationConfig::aliveNodesCheckingFrequencyMillis)
371+
.returns(6, AsyncReplicationConfig::loggingFrequencySeconds)
372+
.returns(7, AsyncReplicationConfig::diffBatchSize)
373+
.returns(8, AsyncReplicationConfig::diffPerNodeTimeoutSeconds)
374+
.returns(9, AsyncReplicationConfig::prePropagationTimeoutSeconds)
375+
.returns(10, AsyncReplicationConfig::propagationTimeoutSeconds)
376+
.returns(11, AsyncReplicationConfig::propagationDelayMillis)
377+
.returns(12, AsyncReplicationConfig::propagationLimit)
378+
.returns(13, AsyncReplicationConfig::propagationConcurrency)
379+
.returns(14, AsyncReplicationConfig::propagationBatchSize);
380+
}
331381
}

src/main/java/io/weaviate/client6/v1/api/collections/InvertedIndex.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public Bm25 build() {
7474
}
7575
}
7676

77-
public record Stopwords(
77+
public static record Stopwords(
7878
/** Selected preset. */
7979
@SerializedName("preset") String preset,
8080
/** Custom words added to the selected preset. */

src/main/java/io/weaviate/client6/v1/api/collections/Replication.java

Lines changed: 164 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
public record Replication(
1010
@SerializedName("factor") Integer replicationFactor,
1111
@SerializedName("asyncEnabled") Boolean asyncEnabled,
12-
@SerializedName("deletionStrategy") DeletionStrategy deletionStrategy) {
12+
@SerializedName("deletionStrategy") DeletionStrategy deletionStrategy,
13+
@SerializedName("asyncConfig") AsyncReplicationConfig asyncReplicationConfig) {
1314

1415
public static enum DeletionStrategy {
1516
@SerializedName("NoAutomatedResolution")
@@ -28,13 +29,168 @@ public Replication(Builder builder) {
2829
this(
2930
builder.replicationFactor,
3031
builder.asyncEnabled,
31-
builder.deletionStrategy);
32+
builder.deletionStrategy,
33+
builder.asyncReplicationConfig);
34+
}
35+
36+
public static record AsyncReplicationConfig(
37+
@SerializedName("hashtreeHeight") Integer hashTreeHeight,
38+
@SerializedName("maxWorkers") Integer maxWorkers,
39+
@SerializedName("frequency") Integer frequencyMillis,
40+
@SerializedName("frequencyWhilePropagating") Integer frequencyMillisWhilePropagating,
41+
@SerializedName("aliveNodesCheckingFrequency") Integer aliveNodesCheckingFrequencyMillis,
42+
@SerializedName("loggingFrequency") Integer loggingFrequencySeconds,
43+
@SerializedName("diffBatchSize") Integer diffBatchSize,
44+
@SerializedName("diffPerNodeTimeout") Integer diffPerNodeTimeoutSeconds,
45+
@SerializedName("prePropagationTimeout") Integer prePropagationTimeoutSeconds,
46+
@SerializedName("propagationTimeout") Integer propagationTimeoutSeconds,
47+
@SerializedName("propagationDelay") Integer propagationDelayMillis,
48+
@SerializedName("propagationLimit") Integer propagationLimit,
49+
@SerializedName("propagationConcurrency") Integer propagationConcurrency,
50+
@SerializedName("propagationBatchSize") Integer propagationBatchSize) {
51+
52+
public AsyncReplicationConfig(Builder builder) {
53+
this(
54+
builder.hashTreeHeight,
55+
builder.maxWorkers,
56+
builder.frequencyMillis,
57+
builder.frequencyMillisWhilePropagating,
58+
builder.aliveNodesCheckingFrequencyMillis,
59+
builder.loggingFrequencySeconds,
60+
builder.diffBatchSize,
61+
builder.diffPerNodeTimeoutSeconds,
62+
builder.prePropagationTimeoutSeconds,
63+
builder.propagationTimeoutSeconds,
64+
builder.propagationDelayMillis,
65+
builder.propagationLimit,
66+
builder.propagationConcurrency,
67+
builder.propagationBatchSize);
68+
}
69+
70+
public static AsyncReplicationConfig of(Function<Builder, ObjectBuilder<AsyncReplicationConfig>> fn) {
71+
return fn.apply(new Builder()).build();
72+
}
73+
74+
public static class Builder implements ObjectBuilder<AsyncReplicationConfig> {
75+
private Integer hashTreeHeight;
76+
private Integer maxWorkers;
77+
private Integer frequencyMillis;
78+
private Integer frequencyMillisWhilePropagating;
79+
private Integer aliveNodesCheckingFrequencyMillis;
80+
private Integer loggingFrequencySeconds;
81+
private Integer diffBatchSize;
82+
private Integer diffPerNodeTimeoutSeconds;
83+
private Integer prePropagationTimeoutSeconds;
84+
private Integer propagationTimeoutSeconds;
85+
private Integer propagationDelayMillis;
86+
private Integer propagationLimit;
87+
private Integer propagationConcurrency;
88+
private Integer propagationBatchSize;
89+
90+
/** Height of the hashtree used for diffing. */
91+
public Builder hashTreeHeight(int hashTreeHeight) {
92+
this.hashTreeHeight = hashTreeHeight;
93+
return this;
94+
}
95+
96+
/** Maximum number of async replication workers. */
97+
public Builder maxWorkers(int maxWorkers) {
98+
this.maxWorkers = maxWorkers;
99+
return this;
100+
}
101+
102+
/**
103+
* Base frequency in milliseconds at which async replication
104+
* runs diff calculations.
105+
*/
106+
public Builder frequencyMillis(int frequencyMillis) {
107+
this.frequencyMillis = frequencyMillis;
108+
return this;
109+
}
110+
111+
/**
112+
* Frequency in milliseconds at which async replication runs
113+
* while propagation is active.
114+
*/
115+
public Builder frequencyMillisWhilePropagating(int frequencyMillisWhilePropagating) {
116+
this.frequencyMillisWhilePropagating = frequencyMillisWhilePropagating;
117+
return this;
118+
}
119+
120+
/** Interval in milliseconds at which liveness of target nodes is checked." */
121+
public Builder aliveNodesCheckingFrequencyMillis(int aliveNodesCheckingFrequencyMillis) {
122+
this.aliveNodesCheckingFrequencyMillis = aliveNodesCheckingFrequencyMillis;
123+
return this;
124+
}
125+
126+
/** Interval in seconds at which async replication logs its status. */
127+
public Builder loggingFrequencySeconds(int loggingFrequencySeconds) {
128+
this.loggingFrequencySeconds = loggingFrequencySeconds;
129+
return this;
130+
}
131+
132+
/** Maximum number of object keys included in a single diff batch. */
133+
public Builder diffBatchSize(int diffBatchSize) {
134+
this.diffBatchSize = diffBatchSize;
135+
return this;
136+
}
137+
138+
/** Timeout in seconds for computing a diff against a single node. */
139+
public Builder diffPerNodeTimeoutSeconds(int diffPerNodeTimeoutSeconds) {
140+
this.diffPerNodeTimeoutSeconds = diffPerNodeTimeoutSeconds;
141+
return this;
142+
}
143+
144+
/** Overall timeout in seconds for the pre-propagation phase. */
145+
public Builder prePropagationTimeoutSeconds(int prePropagationTimeoutSeconds) {
146+
this.prePropagationTimeoutSeconds = prePropagationTimeoutSeconds;
147+
return this;
148+
}
149+
150+
/** Timeout in seconds for propagating batch of changes to a node. */
151+
public Builder propagationTimeoutSeconds(int propagationTimeoutSeconds) {
152+
this.propagationTimeoutSeconds = propagationTimeoutSeconds;
153+
return this;
154+
}
155+
156+
/**
157+
* Delay in milliseconds before newly added or updated objects are propagated.
158+
*/
159+
public Builder propagationDelayMillis(int propagationDelayMillis) {
160+
this.propagationDelayMillis = propagationDelayMillis;
161+
return this;
162+
}
163+
164+
/** Maximum number of objects to propagate in a single async replication run. */
165+
public Builder propagationLimit(int propagationLimit) {
166+
this.propagationLimit = propagationLimit;
167+
return this;
168+
}
169+
170+
/** Maximum number of concurrent propagation workers. */
171+
public Builder propagationConcurrency(int propagationConcurrency) {
172+
this.propagationConcurrency = propagationConcurrency;
173+
return this;
174+
}
175+
176+
/** Maximum number of objects to propagate in a single async replication run. */
177+
public Builder propagationBatchSize(int propagationBatchSize) {
178+
this.propagationBatchSize = propagationBatchSize;
179+
return this;
180+
}
181+
182+
@Override
183+
public AsyncReplicationConfig build() {
184+
return new AsyncReplicationConfig(this);
185+
}
186+
}
32187
}
33188

34189
public static class Builder implements ObjectBuilder<Replication> {
35190
private Integer replicationFactor;
36191
private Boolean asyncEnabled;
37192
private DeletionStrategy deletionStrategy;
193+
private AsyncReplicationConfig asyncReplicationConfig;
38194

39195
/** Set desired replication factor for this collection. */
40196
public Builder replicationFactor(int replicationFactor) {
@@ -57,6 +213,12 @@ public Builder deletionStrategy(DeletionStrategy deletionStrategy) {
57213
return this;
58214
}
59215

216+
/** Configuration parameters for asynchronous replication. */
217+
public Builder asyncReplication(AsyncReplicationConfig asyncReplicationConfig) {
218+
this.asyncReplicationConfig = asyncReplicationConfig;
219+
return this;
220+
}
221+
60222
@Override
61223
public Replication build() {
62224
return new Replication(this);

0 commit comments

Comments
 (0)