diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index a225c1abec7bf..150813ca0f46a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -17,6 +17,10 @@ package org.apache.ignite.internal.managers.discovery; +import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasAckMessage; +import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasAckMessageSerializer; +import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessage; +import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessageSerializer; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; @@ -113,5 +117,8 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)18, TcpDiscoveryStatusCheckMessage::new, new TcpDiscoveryStatusCheckMessageSerializer()); factory.register((short)19, TcpDiscoveryNodeAddFinishedMessage::new, new TcpDiscoveryNodeAddFinishedMessageSerializer()); factory.register((short)20, TcpDiscoveryJoinRequestMessage::new, new TcpDiscoveryJoinRequestMessageSerializer()); + factory.register((short)21, DistributedMetaStorageUpdateAckMessage::new, new DistributedMetaStorageUpdateAckMessageSerializer()); + factory.register((short)22, DistributedMetaStorageCasAckMessage::new, new DistributedMetaStorageCasAckMessageSerializer()); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java index 30dda350ec66a..0f02e5a4c800d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java @@ -18,26 +18,34 @@ package org.apache.ignite.internal.processors.metastorage.persistence; import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; /** */ -class DistributedMetaStorageCasAckMessage extends DistributedMetaStorageUpdateAckMessage { +public class DistributedMetaStorageCasAckMessage extends DistributedMetaStorageUpdateAckMessage { /** */ private static final long serialVersionUID = 0L; /** */ - private final boolean updated; + @Order(2) + boolean updated; + + /** Empty constructor of {@link DiscoveryMessageFactory}. */ + public DistributedMetaStorageCasAckMessage() { + // No-op. + } /** */ - public DistributedMetaStorageCasAckMessage(UUID reqId, String errorMsg, boolean updated) { - super(reqId, errorMsg); + public DistributedMetaStorageCasAckMessage(UUID reqId, boolean updated) { + super(reqId); this.updated = updated; } - /** */ - public boolean updated() { - return updated; + /** {@inheritDoc} */ + @Override public short directType() { + return 22; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java index fa279596cba85..88f64122892d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java @@ -57,7 +57,7 @@ public boolean matches() { /** {@inheritDoc} */ @Override @Nullable public DiscoveryCustomMessage ackMessage() { - return new DistributedMetaStorageCasAckMessage(requestId(), errorMessage(), matches); + return new DistributedMetaStorageCasAckMessage(requestId(), matches); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index 147b98ceead25..4907f42e2e066 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -1128,9 +1128,6 @@ private void onUpdateMessage( ClusterNode node, DistributedMetaStorageUpdateMessage msg ) { - if (msg.errorMessage() != null) - return; - lock.writeLock().lock(); try { @@ -1163,20 +1160,14 @@ private void onAckMessage( ClusterNode node, DistributedMetaStorageUpdateAckMessage msg ) { - GridFutureAdapter fut = updateFuts.remove(msg.requestId()); + GridFutureAdapter fut = updateFuts.remove(msg.reqId); if (fut != null) { - String errorMsg = msg.errorMessage(); - - if (errorMsg == null) { - Boolean res = msg instanceof DistributedMetaStorageCasAckMessage - ? ((DistributedMetaStorageCasAckMessage)msg).updated() - : null; + Boolean res = msg instanceof DistributedMetaStorageCasAckMessage + ? ((DistributedMetaStorageCasAckMessage)msg).updated + : null; - fut.onDone(res); - } - else - fut.onDone(new IllegalStateException(errorMsg)); + fut.onDone(res); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java index 9008f8b72579e..95943903d719c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java @@ -18,32 +18,39 @@ package org.apache.ignite.internal.processors.metastorage.persistence; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** */ -class DistributedMetaStorageUpdateAckMessage implements DiscoveryCustomMessage { +public class DistributedMetaStorageUpdateAckMessage implements DiscoveryCustomMessage, Message { /** */ private static final long serialVersionUID = 0L; /** */ - private final IgniteUuid id = IgniteUuid.randomUuid(); + @Order(0) + IgniteUuid id; /** Request ID. */ - private final UUID reqId; + @Order(1) + UUID reqId; - /** */ - private final String errorMsg; + /** Empty constructor of {@link DiscoveryMessageFactory}. */ + public DistributedMetaStorageUpdateAckMessage() { + // No-op. + } /** */ - public DistributedMetaStorageUpdateAckMessage(UUID reqId, String errorMsg) { + public DistributedMetaStorageUpdateAckMessage(UUID reqId) { + id = IgniteUuid.randomUuid(); this.reqId = reqId; - this.errorMsg = errorMsg; } /** {@inheritDoc} */ @@ -51,16 +58,6 @@ public DistributedMetaStorageUpdateAckMessage(UUID reqId, String errorMsg) { return id; } - /** */ - public UUID requestId() { - return reqId; - } - - /** */ - public String errorMessage() { - return errorMsg; - } - /** {@inheritDoc} */ @Override @Nullable public DiscoveryCustomMessage ackMessage() { return null; @@ -80,6 +77,11 @@ public String errorMessage() { throw new UnsupportedOperationException(); } + /** {@inheritDoc} */ + @Override public short directType() { + return 21; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DistributedMetaStorageUpdateAckMessage.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java index 3b9e462200133..372a676530dac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java @@ -46,9 +46,6 @@ class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessage { /** */ private final byte[] valBytes; - /** */ - private String errorMsg; - /** */ public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valBytes) { this.reqId = reqId; @@ -76,14 +73,9 @@ public byte[] value() { return valBytes; } - /** */ - protected String errorMessage() { - return errorMsg; - } - /** {@inheritDoc} */ @Override @Nullable public DiscoveryCustomMessage ackMessage() { - return new DistributedMetaStorageUpdateAckMessage(reqId, errorMsg); + return new DistributedMetaStorageUpdateAckMessage(reqId); } /** {@inheritDoc} */