Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ public final class ClientCacheConfiguration implements Serializable {
/** @serial Expiry policy. */
private ExpiryPolicy expiryPlc;

/** @serial Partitions count. */
private int partitions = -1;

/**
* Root directories where partition files are stored.
* @see DataStorageConfiguration#setStoragePath(String)
Expand Down Expand Up @@ -193,6 +196,7 @@ public ClientCacheConfiguration(ClientCacheConfiguration ccfg) {
writeSynchronizationMode = ccfg.getWriteSynchronizationMode();
storagePaths = ccfg.getStoragePaths();
idxPath = ccfg.getIndexPath();
partitions = ccfg.getPartitions();
}

/**
Expand Down Expand Up @@ -793,6 +797,27 @@ public ClientCacheConfiguration setExpiryPolicy(ExpiryPolicy expiryPlc) {
return this;
}

/**
* Gets partitions count.
*
* @return Partitions count.
*/
public int getPartitions() {
return partitions;
}

/**
* Sets partitions count.
*
* @param partitions Partitions count.
* @return {@code this} for chaining.
*/
public ClientCacheConfiguration setPartitions(int partitions) {
Copy link
Member

@timoninmaxim timoninmaxim Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the idea of separating partitions from affinity function on thin clients. I think we should discuss API would allow full configuring affinity function on thin clients. Smth like:

ClientCacheConfiguration#setAffinityFunction(Class<?> affinityFunction, Object... args).

this.partitions = partitions;

return this;
}

/**
* @return A path to the root directory where the Persistent Store for cache group will persist data and indexes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.ignite.internal.util.typedef.T2;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.CACHE_CFG_PARTITIONS;
import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.CACHE_STORAGES;
import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.QRY_INITIATOR_ID;
import static org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature.QRY_PARTITIONS_BATCH_SIZE;
Expand Down Expand Up @@ -382,6 +383,11 @@ else if (cfg.getExpiryPolicy() != null) {
else if (!F.isEmpty(cfg.getStoragePaths()) || !F.isEmpty(cfg.getIndexPath()))
throw new ClientFeatureNotSupportedByServerException("Cache storages are not supported by the server");

if (protocolCtx.isFeatureSupported(CACHE_CFG_PARTITIONS))
itemWriter.accept(CfgItem.PARTITIONS, w -> w.writeInt(cfg.getPartitions()));
else if (cfg.getPartitions() > 0)
throw new ClientProtocolError("Partitions configuration by thin client is not supported by the server");

writer.writeInt(origPos, out.position() - origPos - 4); // configuration length
writer.writeInt(origPos + 4, propCnt.get()); // properties count
}
Expand Down Expand Up @@ -518,7 +524,8 @@ ClientCacheConfiguration cacheConfiguration(BinaryInputStream in, ProtocolContex
: reader.readStringArray())
.setIndexPath(!protocolCtx.isFeatureSupported(CACHE_STORAGES)
? null
: reader.readString());
: reader.readString())
.setPartitions(!protocolCtx.isFeatureSupported(CACHE_CFG_PARTITIONS) ? null : reader.readInt());
}
}

Expand Down Expand Up @@ -798,7 +805,10 @@ private enum CfgItem {
STORAGE_PATH(408),

/** Index path. */
IDX_PATH(409);
IDX_PATH(409),

/** Partitions count. */
PARTITIONS(410);

/** Code. */
private final short code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ public enum ProtocolBitmaskFeature {
DC_AWARE(22),

/** SqlFieldsQuery initiatorId property. */
QRY_INITIATOR_ID(23);
QRY_INITIATOR_ID(23),

/** Partitions count in cache configuration. */
CACHE_CFG_PARTITIONS(24);

/** */
private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.ignite.cache.CachePartialUpdateException;
import org.apache.ignite.cache.CacheServerNotFoundException;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.store.CacheStoreSessionListener;
Expand Down Expand Up @@ -1634,6 +1635,21 @@ public static void validateKeyConfigiration(
}
}

/**
* @return Default affinity function.
*/
public static AffinityFunction createDefaultAffinity() {
return new RendezvousAffinityFunction();
}

/**
* @param partitions Partitions count.
* @return Default affinity function with predefined partitions count.
*/
public static AffinityFunction createDefaultAffinity(int partitions) {
return new RendezvousAffinityFunction(false, partitions);
}

/**
* @param log Logger.
* @param cfg Initializes cache configuration with proper defaults.
Expand All @@ -1650,15 +1666,10 @@ public static void initializeConfigDefaults(IgniteLogger log, CacheConfiguration
cfg.setNodeFilter(CacheConfiguration.ALL_NODES);

if (cfg.getAffinity() == null) {
if (cfg.getCacheMode() == PARTITIONED) {
RendezvousAffinityFunction aff = new RendezvousAffinityFunction();

cfg.setAffinity(aff);
}
if (cfg.getCacheMode() == PARTITIONED)
cfg.setAffinity(createDefaultAffinity());
else {
RendezvousAffinityFunction aff = new RendezvousAffinityFunction(false, 512);

cfg.setAffinity(aff);
cfg.setAffinity(createDefaultAffinity(512));

cfg.setBackups(Integer.MAX_VALUE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ public enum ClientBitmaskFeature implements ThinProtocolFeature {
DC_AWARE(22),

/** SqlFieldsQuery initiatorId property. */
QRY_INITIATOR_ID(23);
QRY_INITIATOR_ID(23),

/** Partitions count in cache configuration. */
CACHE_CFG_PARTITIONS(24);

/** */
private static final EnumSet<ClientBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.binary.BinaryWriterEx;
Expand All @@ -41,6 +42,7 @@
import org.apache.ignite.internal.processors.platform.client.ClientProtocolVersionFeature;
import org.apache.ignite.internal.processors.platform.utils.PlatformConfigurationUtils;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;

import static java.util.Optional.ofNullable;
import static org.apache.ignite.internal.processors.platform.client.ClientProtocolVersionFeature.QUERY_ENTITY_PRECISION_AND_SCALE;
Expand Down Expand Up @@ -148,6 +150,9 @@ public class ClientCacheConfigurationSerializer {
/** */
private static final short IDX_PATH = 409;

/** */
private static final short PARTITIONS = 410;

/**
* Writes the cache configuration.
* @param writer Writer.
Expand Down Expand Up @@ -226,6 +231,11 @@ static void write(BinaryWriterEx writer, CacheConfiguration cfg, ClientProtocolC
writer.writeString(cfg.getIndexPath());
}

if (protocolCtx.isFeatureSupported(ClientBitmaskFeature.CACHE_CFG_PARTITIONS)) {
AffinityFunction aff = cfg.getAffinity();
writer.writeInt(aff == null ? -1 : aff.partitions());
}

// Write length (so that part of the config can be skipped).
writer.writeInt(pos, writer.out().position() - pos - 4);
}
Expand Down Expand Up @@ -471,6 +481,14 @@ static T2<CacheConfiguration, Boolean> read(BinaryRawReader reader, ClientProtoc
case IDX_PATH:
cfg.setIndexPath(reader.readString());
break;

case PARTITIONS:
int partitions = reader.readInt();

if (partitions > 0)
cfg.setAffinity(CU.createDefaultAffinity(partitions));

break;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.client.thin;

import java.util.AbstractMap;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.Comparers;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
import org.junit.Test;

/**
* Thin client cache configuration tests.
*/
public class CacheConfigurationTest extends AbstractThinClientTest {
/**
* Tested API:
* <ul>
* <li>{@link ClientCache#getName()}</li>
* <li>{@link ClientCache#getConfiguration()}</li>
* </ul>
*/
@Test
public void testCacheConfiguration() throws Exception {
final String dataRegionName = "functional-test-data-region";

IgniteConfiguration cfg = getConfiguration()
.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setName(dataRegionName)));

try (Ignite ignite = startGrid(cfg); IgniteClient client = startClient(ignite)) {
final String CACHE_NAME = "testCacheConfiguration";

ClientCacheConfiguration cacheCfgTemplate = new ClientCacheConfiguration().setName(CACHE_NAME)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setBackups(3)
.setCacheMode(CacheMode.PARTITIONED)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
.setEagerTtl(false)
.setGroupName("FunctionalTest")
.setDefaultLockTimeout(12345)
.setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE)
.setReadFromBackup(true)
.setRebalanceBatchSize(67890)
.setRebalanceBatchesPrefetchCount(102938)
.setRebalanceDelay(54321)
.setRebalanceMode(CacheRebalanceMode.SYNC)
.setRebalanceOrder(2)
.setRebalanceThrottle(564738)
.setRebalanceTimeout(142536)
.setKeyConfiguration(new CacheKeyConfiguration("Employee", "orgId"))
.setQueryEntities(new QueryEntity(int.class.getName(), "Employee")
.setTableName("EMPLOYEE")
.setFields(
Stream.of(
new AbstractMap.SimpleEntry<>("id", Integer.class.getName()),
new AbstractMap.SimpleEntry<>("orgId", Integer.class.getName())
).collect(Collectors.toMap(
AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue, (a, b) -> a, LinkedHashMap::new
))
)
// During query normalization null keyFields become empty set.
// Set empty collection for comparator.
.setKeyFields(Collections.emptySet())
.setKeyFieldName("id")
.setNotNullFields(Collections.singleton("id"))
.setDefaultFieldValues(Collections.singletonMap("id", 0))
.setIndexes(Collections.singletonList(new QueryIndex("id", true, "IDX_EMPLOYEE_ID")))
.setAliases(Stream.of("id", "orgId").collect(Collectors.toMap(f -> f, String::toUpperCase)))
)
.setExpiryPolicy(new PlatformExpiryPolicy(10, 20, 30))
.setCopyOnRead(!CacheConfiguration.DFLT_COPY_ON_READ)
.setDataRegionName(dataRegionName)
.setMaxConcurrentAsyncOperations(4)
.setMaxQueryIteratorsCount(4)
.setOnheapCacheEnabled(true)
.setQueryDetailMetricsSize(1024)
.setQueryParallelism(4)
.setSqlEscapeAll(true)
.setSqlIndexMaxInlineSize(1024)
.setSqlSchema("functional-test-schema")
.setStatisticsEnabled(true);

ClientCacheConfiguration cacheCfg = new ClientCacheConfiguration(cacheCfgTemplate);

ClientCache<Object, Object> cache = client.createCache(cacheCfg);

assertEquals(CACHE_NAME, cache.getName());

assertTrue(Comparers.equal(cacheCfgTemplate, cache.getConfiguration()));
}
}

/** Tests cache partitions configuration. */
@Test
public void testCachePartitionsConfiguration() throws Exception {
try (Ignite ignite = startGrid(0); IgniteClient client = startClient(ignite)) {
// Explicit partitions count test.
String cacheName = "test";

// Client to server propagation.
client.createCache(new ClientCacheConfiguration().setName(cacheName).setPartitions(100));

assertEquals(100, ignite.cache(cacheName)
.getConfiguration(CacheConfiguration.class).getAffinity().partitions());

// Server to client propagation.
assertEquals(100, client.cache(cacheName).getConfiguration().getPartitions());

// Implicit partitions count test.
cacheName = "test2";

client.createCache(new ClientCacheConfiguration().setName(cacheName));

assertEquals(ignite.cache(cacheName).getConfiguration(CacheConfiguration.class)
.getAffinity().partitions(), client.cache(cacheName).getConfiguration().getPartitions());
}
}
}
Loading
Loading