Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions src/Weaviate.Client.Tests/Unit/TestCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -582,4 +582,102 @@ public void ToCollectionConfigCreateParams_Succeeds_WhenNoLegacySettings()
Assert.Equal(export.References, result.References);
Assert.Equal(export.VectorConfig, result.VectorConfig);
}

/// <summary>
/// Tests that ReplicationConfig with AsyncConfig maps all fields to the DTO.
/// </summary>
[Fact]
public void ReplicationConfig_WithAsyncConfig_MapsToDto()
{
var asyncConfig = new ReplicationAsyncConfig
{
MaxWorkers = 4,
HashtreeHeight = 16,
Frequency = 1000,
FrequencyWhilePropagating = 500,
AliveNodesCheckingFrequency = 30000,
LoggingFrequency = 60,
DiffBatchSize = 100,
DiffPerNodeTimeout = 30,
PrePropagationTimeout = 120,
PropagationTimeout = 60,
PropagationLimit = 10000,
PropagationDelay = 5000,
PropagationConcurrency = 2,
PropagationBatchSize = 50,
};

var collection = new CollectionConfig
{
Name = "TestCollection",
ReplicationConfig = new ReplicationConfig { AsyncConfig = asyncConfig },
};

var dto = collection.ToDto();

Assert.NotNull(dto.ReplicationConfig?.AsyncConfig);
var ac = dto.ReplicationConfig!.AsyncConfig!;
Assert.Equal(4, ac.MaxWorkers);
Assert.Equal(16, ac.HashtreeHeight);
Assert.Equal(1000, ac.Frequency);
Assert.Equal(500, ac.FrequencyWhilePropagating);
Assert.Equal(30000, ac.AliveNodesCheckingFrequency);
Assert.Equal(60, ac.LoggingFrequency);
Assert.Equal(100, ac.DiffBatchSize);
Assert.Equal(30, ac.DiffPerNodeTimeout);
Assert.Equal(120, ac.PrePropagationTimeout);
Assert.Equal(60, ac.PropagationTimeout);
Assert.Equal(10000, ac.PropagationLimit);
Assert.Equal(5000, ac.PropagationDelay);
Assert.Equal(2, ac.PropagationConcurrency);
Assert.Equal(50, ac.PropagationBatchSize);
}

/// <summary>
/// Tests that DTO with AsyncConfig round-trips back to the model correctly.
/// </summary>
[Fact]
public void ReplicationConfig_WithAsyncConfig_RoundTripsFromDto()
{
var dtoAsyncConfig = new Rest.Dto.ReplicationAsyncConfig
{
MaxWorkers = 8,
HashtreeHeight = 12,
PropagationLimit = 5000,
};

var dto = new Rest.Dto.Class
{
Class1 = "TestCollection",
ReplicationConfig = new Rest.Dto.ReplicationConfig { AsyncConfig = dtoAsyncConfig },
};

var model = dto.ToModel();

Assert.NotNull(model.ReplicationConfig?.AsyncConfig);
var ac = model.ReplicationConfig!.AsyncConfig!;
Assert.Equal(8, ac.MaxWorkers);
Assert.Equal(12, ac.HashtreeHeight);
Assert.Equal(5000, ac.PropagationLimit);
// Unset fields are null
Assert.Null(ac.Frequency);
Assert.Null(ac.PropagationBatchSize);
}

/// <summary>
/// Tests that ReplicationConfig without AsyncConfig does not produce an asyncConfig in the DTO.
/// </summary>
[Fact]
public void ReplicationConfig_WithoutAsyncConfig_ProducesNullAsyncConfigInDto()
{
var collection = new CollectionConfig
{
Name = "TestCollection",
ReplicationConfig = new ReplicationConfig { Factor = 2 },
};

var dto = collection.ToDto();

Assert.Null(dto.ReplicationConfig?.AsyncConfig);
}
}
39 changes: 39 additions & 0 deletions src/Weaviate.Client/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,25 @@ internal static Rest.Dto.Class ToDto(this CollectionConfig collection)
AsyncEnabled = rc.AsyncEnabled,
DeletionStrategy = (Rest.Dto.ReplicationConfigDeletionStrategy?)rc.DeletionStrategy,
Factor = rc.Factor,
AsyncConfig = rc.AsyncConfig is ReplicationAsyncConfig ac
? new Rest.Dto.ReplicationAsyncConfig
{
MaxWorkers = ac.MaxWorkers,
HashtreeHeight = ac.HashtreeHeight,
Frequency = ac.Frequency,
FrequencyWhilePropagating = ac.FrequencyWhilePropagating,
AliveNodesCheckingFrequency = ac.AliveNodesCheckingFrequency,
LoggingFrequency = ac.LoggingFrequency,
DiffBatchSize = ac.DiffBatchSize,
DiffPerNodeTimeout = ac.DiffPerNodeTimeout,
PrePropagationTimeout = ac.PrePropagationTimeout,
PropagationTimeout = ac.PropagationTimeout,
PropagationLimit = ac.PropagationLimit,
PropagationDelay = ac.PropagationDelay,
PropagationConcurrency = ac.PropagationConcurrency,
PropagationBatchSize = ac.PropagationBatchSize,
}
: null,
};
}

Expand Down Expand Up @@ -389,6 +408,26 @@ internal static CollectionConfigExport ToModel(this Rest.Dto.Class collection)
rc.Factor ?? Weaviate.Client.Models.ReplicationConfig.Default.Factor,

DeletionStrategy = (DeletionStrategy?)rc.DeletionStrategy,

AsyncConfig = rc.AsyncConfig is Rest.Dto.ReplicationAsyncConfig ac
? new ReplicationAsyncConfig
{
MaxWorkers = ac.MaxWorkers,
HashtreeHeight = ac.HashtreeHeight,
Frequency = ac.Frequency,
FrequencyWhilePropagating = ac.FrequencyWhilePropagating,
AliveNodesCheckingFrequency = ac.AliveNodesCheckingFrequency,
LoggingFrequency = ac.LoggingFrequency,
DiffBatchSize = ac.DiffBatchSize,
DiffPerNodeTimeout = ac.DiffPerNodeTimeout,
PrePropagationTimeout = ac.PrePropagationTimeout,
PropagationTimeout = ac.PropagationTimeout,
PropagationLimit = ac.PropagationLimit,
PropagationDelay = ac.PropagationDelay,
PropagationConcurrency = ac.PropagationConcurrency,
PropagationBatchSize = ac.PropagationBatchSize,
}
: null,
}
: null,
ShardingConfig = shardingConfig,
Expand Down
10 changes: 10 additions & 0 deletions src/Weaviate.Client/Models/Collection.Update.cs
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,16 @@ public int Factor
get => WrappedReplicationConfig.Factor;
set => WrappedReplicationConfig.Factor = value;
}

/// <summary>
/// Gets or sets fine-grained parameters for asynchronous replication.
/// Requires Weaviate 1.36 or later.
/// </summary>
public ReplicationAsyncConfig? AsyncConfig
{
get => WrappedReplicationConfig.AsyncConfig;
set => WrappedReplicationConfig.AsyncConfig = value;
}
}

/// <summary>
Expand Down
56 changes: 56 additions & 0 deletions src/Weaviate.Client/Models/ReplicationConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,56 @@ public enum DeletionStrategy
TimeBasedResolution,
}

/// <summary>
/// Fine-grained configuration parameters for asynchronous replication.
/// All fields are optional; omitted fields use server defaults.
/// Requires Weaviate 1.36 or later.
/// </summary>
public record ReplicationAsyncConfig
{
/// <summary>Maximum number of async replication workers.</summary>
public long? MaxWorkers { get; set; }

/// <summary>Height of the hashtree used for diffing.</summary>
public long? HashtreeHeight { get; set; }

/// <summary>Base frequency in milliseconds at which async replication runs diff calculations.</summary>
public long? Frequency { get; set; }

/// <summary>Frequency in milliseconds at which async replication runs while propagation is active.</summary>
public long? FrequencyWhilePropagating { get; set; }

/// <summary>Interval in milliseconds at which liveness of target nodes is checked.</summary>
public long? AliveNodesCheckingFrequency { get; set; }

/// <summary>Interval in seconds at which async replication logs its status.</summary>
public long? LoggingFrequency { get; set; }

/// <summary>Maximum number of object keys included in a single diff batch.</summary>
public long? DiffBatchSize { get; set; }

/// <summary>Timeout in seconds for computing a diff against a single node.</summary>
public long? DiffPerNodeTimeout { get; set; }

/// <summary>Overall timeout in seconds for the pre-propagation phase.</summary>
public long? PrePropagationTimeout { get; set; }

/// <summary>Timeout in seconds for propagating a batch of changes to a node.</summary>
public long? PropagationTimeout { get; set; }

/// <summary>Maximum number of objects to propagate in a single async replication run.</summary>
public long? PropagationLimit { get; set; }

/// <summary>Delay in milliseconds before newly added or updated objects are propagated.</summary>
public long? PropagationDelay { get; set; }

/// <summary>Maximum number of concurrent propagation workers.</summary>
public long? PropagationConcurrency { get; set; }

/// <summary>Number of objects to include in a single propagation batch.</summary>
public long? PropagationBatchSize { get; set; }
}

/// <summary>
/// ReplicationConfig Configure how replication is executed in a cluster.
/// </summary>
Expand All @@ -41,6 +91,12 @@ public record ReplicationConfig : IEquatable<ReplicationConfig>
/// </summary>
public bool AsyncEnabled { get; set; } = false;

/// <summary>
/// Fine-grained parameters for asynchronous replication.
/// Requires Weaviate 1.36 or later; ignored by older servers.
/// </summary>
public ReplicationAsyncConfig? AsyncConfig { get; set; }

/// <summary>
/// Conflict resolution strategy for deleted objects.
/// Enum: [NoAutomatedResolution DeleteOnConflict TimeBasedResolution]
Expand Down
44 changes: 44 additions & 0 deletions src/Weaviate.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6587,3 +6587,47 @@ Weaviate.Client.WeaviateUnprocessableEntityException.WeaviateUnprocessableEntity
~Weaviate.Client.Models.Typed.WeaviateGroup<T, TObject>.WeaviateGroup(Weaviate.Client.Models.Typed.WeaviateGroup<T, TObject>! original) -> void
~Weaviate.Client.Models.Typed.WeaviateGroup<T>.WeaviateGroup(Weaviate.Client.Models.Typed.WeaviateGroup<T>! original) -> void
~Weaviate.Client.Models.Typed.WeaviateObject<T>.WeaviateObject(Weaviate.Client.Models.Typed.WeaviateObject<T>! original) -> void
Weaviate.Client.Models.ReplicationAsyncConfig
Weaviate.Client.Models.ReplicationAsyncConfig.AliveNodesCheckingFrequency.get -> long?
Weaviate.Client.Models.ReplicationAsyncConfig.AliveNodesCheckingFrequency.set -> void
Weaviate.Client.Models.ReplicationAsyncConfig.DiffBatchSize.get -> long?
Weaviate.Client.Models.ReplicationAsyncConfig.DiffBatchSize.set -> void
Weaviate.Client.Models.ReplicationAsyncConfig.DiffPerNodeTimeout.get -> long?
Weaviate.Client.Models.ReplicationAsyncConfig.DiffPerNodeTimeout.set -> void
Weaviate.Client.Models.ReplicationAsyncConfig.Frequency.get -> long?
Weaviate.Client.Models.ReplicationAsyncConfig.Frequency.set -> void
Weaviate.Client.Models.ReplicationAsyncConfig.FrequencyWhilePropagating.get -> long?
Weaviate.Client.Models.ReplicationAsyncConfig.FrequencyWhilePropagating.set -> void
Weaviate.Client.Models.ReplicationAsyncConfig.HashtreeHeight.get -> long?
Weaviate.Client.Models.ReplicationAsyncConfig.HashtreeHeight.set -> void
Weaviate.Client.Models.ReplicationAsyncConfig.LoggingFrequency.get -> long?
Weaviate.Client.Models.ReplicationAsyncConfig.LoggingFrequency.set -> void
Weaviate.Client.Models.ReplicationAsyncConfig.MaxWorkers.get -> long?
Weaviate.Client.Models.ReplicationAsyncConfig.MaxWorkers.set -> void
Weaviate.Client.Models.ReplicationAsyncConfig.PrePropagationTimeout.get -> long?
Weaviate.Client.Models.ReplicationAsyncConfig.PrePropagationTimeout.set -> void
Weaviate.Client.Models.ReplicationAsyncConfig.PropagationBatchSize.get -> long?
Weaviate.Client.Models.ReplicationAsyncConfig.PropagationBatchSize.set -> void
Weaviate.Client.Models.ReplicationAsyncConfig.PropagationConcurrency.get -> long?
Weaviate.Client.Models.ReplicationAsyncConfig.PropagationConcurrency.set -> void
Weaviate.Client.Models.ReplicationAsyncConfig.PropagationDelay.get -> long?
Weaviate.Client.Models.ReplicationAsyncConfig.PropagationDelay.set -> void
Weaviate.Client.Models.ReplicationAsyncConfig.PropagationLimit.get -> long?
Weaviate.Client.Models.ReplicationAsyncConfig.PropagationLimit.set -> void
Weaviate.Client.Models.ReplicationAsyncConfig.PropagationTimeout.get -> long?
Weaviate.Client.Models.ReplicationAsyncConfig.PropagationTimeout.set -> void
Weaviate.Client.Models.ReplicationAsyncConfig.ReplicationAsyncConfig() -> void
Weaviate.Client.Models.ReplicationAsyncConfig.ReplicationAsyncConfig(Weaviate.Client.Models.ReplicationAsyncConfig! original) -> void
Weaviate.Client.Models.ReplicationConfig.AsyncConfig.get -> Weaviate.Client.Models.ReplicationAsyncConfig?
Weaviate.Client.Models.ReplicationConfig.AsyncConfig.set -> void
Weaviate.Client.Models.ReplicationConfigUpdate.AsyncConfig.get -> Weaviate.Client.Models.ReplicationAsyncConfig?
Weaviate.Client.Models.ReplicationConfigUpdate.AsyncConfig.set -> void
override Weaviate.Client.Models.ReplicationAsyncConfig.Equals(object? obj) -> bool
override Weaviate.Client.Models.ReplicationAsyncConfig.GetHashCode() -> int
override Weaviate.Client.Models.ReplicationAsyncConfig.ToString() -> string!
static Weaviate.Client.Models.ReplicationAsyncConfig.operator !=(Weaviate.Client.Models.ReplicationAsyncConfig? left, Weaviate.Client.Models.ReplicationAsyncConfig? right) -> bool
static Weaviate.Client.Models.ReplicationAsyncConfig.operator ==(Weaviate.Client.Models.ReplicationAsyncConfig? left, Weaviate.Client.Models.ReplicationAsyncConfig? right) -> bool
virtual Weaviate.Client.Models.ReplicationAsyncConfig.<Clone>$() -> Weaviate.Client.Models.ReplicationAsyncConfig!
virtual Weaviate.Client.Models.ReplicationAsyncConfig.EqualityContract.get -> System.Type!
virtual Weaviate.Client.Models.ReplicationAsyncConfig.Equals(Weaviate.Client.Models.ReplicationAsyncConfig? other) -> bool
virtual Weaviate.Client.Models.ReplicationAsyncConfig.PrintMembers(System.Text.StringBuilder! builder) -> bool
Loading
Loading