Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.ignite.internal.managers.discovery;

import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasAckMessage;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasAckMessageSerializer;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessage;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessageSerializer;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
Expand Down Expand Up @@ -113,5 +117,8 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)18, TcpDiscoveryStatusCheckMessage::new, new TcpDiscoveryStatusCheckMessageSerializer());
factory.register((short)19, TcpDiscoveryNodeAddFinishedMessage::new, new TcpDiscoveryNodeAddFinishedMessageSerializer());
factory.register((short)20, TcpDiscoveryJoinRequestMessage::new, new TcpDiscoveryJoinRequestMessageSerializer());
factory.register((short)21, DistributedMetaStorageUpdateAckMessage::new, new DistributedMetaStorageUpdateAckMessageSerializer());
factory.register((short)22, DistributedMetaStorageCasAckMessage::new, new DistributedMetaStorageCasAckMessageSerializer());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,34 @@
package org.apache.ignite.internal.processors.metastorage.persistence;

import java.util.UUID;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.util.typedef.internal.S;

/** */
class DistributedMetaStorageCasAckMessage extends DistributedMetaStorageUpdateAckMessage {
public class DistributedMetaStorageCasAckMessage extends DistributedMetaStorageUpdateAckMessage {
/** */
private static final long serialVersionUID = 0L;

/** */
private final boolean updated;
@Order(2)
boolean updated;

/** Empty constructor of {@link DiscoveryMessageFactory}. */
public DistributedMetaStorageCasAckMessage() {
// No-op.
}

/** */
public DistributedMetaStorageCasAckMessage(UUID reqId, String errorMsg, boolean updated) {
super(reqId, errorMsg);
public DistributedMetaStorageCasAckMessage(UUID reqId, boolean updated) {
super(reqId);

this.updated = updated;
}

/** */
public boolean updated() {
return updated;
/** {@inheritDoc} */
@Override public short directType() {
return 22;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public boolean matches() {

/** {@inheritDoc} */
@Override @Nullable public DiscoveryCustomMessage ackMessage() {
return new DistributedMetaStorageCasAckMessage(requestId(), errorMessage(), matches);
return new DistributedMetaStorageCasAckMessage(requestId(), matches);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,9 +1128,6 @@ private void onUpdateMessage(
ClusterNode node,
DistributedMetaStorageUpdateMessage msg
) {
if (msg.errorMessage() != null)
return;

lock.writeLock().lock();

try {
Expand Down Expand Up @@ -1163,20 +1160,14 @@ private void onAckMessage(
ClusterNode node,
DistributedMetaStorageUpdateAckMessage msg
) {
GridFutureAdapter<Boolean> fut = updateFuts.remove(msg.requestId());
GridFutureAdapter<Boolean> fut = updateFuts.remove(msg.reqId);

if (fut != null) {
String errorMsg = msg.errorMessage();

if (errorMsg == null) {
Boolean res = msg instanceof DistributedMetaStorageCasAckMessage
? ((DistributedMetaStorageCasAckMessage)msg).updated()
: null;
Boolean res = msg instanceof DistributedMetaStorageCasAckMessage
? ((DistributedMetaStorageCasAckMessage)msg).updated
: null;

fut.onDone(res);
}
else
fut.onDone(new IllegalStateException(errorMsg));
fut.onDone(res);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,46 @@
package org.apache.ignite.internal.processors.metastorage.persistence;

import java.util.UUID;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;

/** */
class DistributedMetaStorageUpdateAckMessage implements DiscoveryCustomMessage {
public class DistributedMetaStorageUpdateAckMessage implements DiscoveryCustomMessage, Message {
/** */
private static final long serialVersionUID = 0L;

/** */
private final IgniteUuid id = IgniteUuid.randomUuid();
@Order(0)
IgniteUuid id;

/** Request ID. */
private final UUID reqId;
@Order(1)
UUID reqId;

/** */
private final String errorMsg;
/** Empty constructor of {@link DiscoveryMessageFactory}. */
public DistributedMetaStorageUpdateAckMessage() {
// No-op.
}

/** */
public DistributedMetaStorageUpdateAckMessage(UUID reqId, String errorMsg) {
public DistributedMetaStorageUpdateAckMessage(UUID reqId) {
id = IgniteUuid.randomUuid();
this.reqId = reqId;
this.errorMsg = errorMsg;
}

/** {@inheritDoc} */
@Override public IgniteUuid id() {
return id;
}

/** */
public UUID requestId() {
return reqId;
}

/** */
public String errorMessage() {
return errorMsg;
}

/** {@inheritDoc} */
@Override @Nullable public DiscoveryCustomMessage ackMessage() {
return null;
Expand All @@ -80,6 +77,11 @@ public String errorMessage() {
throw new UnsupportedOperationException();
}

/** {@inheritDoc} */
@Override public short directType() {
return 21;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DistributedMetaStorageUpdateAckMessage.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessage {
/** */
private final byte[] valBytes;

/** */
private String errorMsg;

/** */
public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valBytes) {
this.reqId = reqId;
Expand Down Expand Up @@ -81,19 +78,9 @@ public boolean isAckMessage() {
return false;
}

/** */
public void errorMessage(String errorMsg) {
this.errorMsg = errorMsg;
}

/** */
protected String errorMessage() {
return errorMsg;
}

/** {@inheritDoc} */
@Override @Nullable public DiscoveryCustomMessage ackMessage() {
return new DistributedMetaStorageUpdateAckMessage(reqId, errorMsg);
return new DistributedMetaStorageUpdateAckMessage(reqId);
}

/** {@inheritDoc} */
Expand Down