diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index e30853967bb82..fe0a323461a7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -33,10 +33,16 @@ import org.apache.ignite.internal.processors.authentication.UserSerializer; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessageSerializer; +import org.apache.ignite.internal.processors.cache.CacheStatisticsClearMessage; +import org.apache.ignite.internal.processors.cache.CacheStatisticsClearMessageSerializer; import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessage; import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessageSerializer; import org.apache.ignite.internal.processors.cache.ClientCacheChangeDiscoveryMessage; import org.apache.ignite.internal.processors.cache.ClientCacheChangeDiscoveryMessageSerializer; +import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessage; +import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessageMarshallableSerializer; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatchMarshallableSerializer; import org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessage; import org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessageSerializer; import org.apache.ignite.internal.processors.cache.WalStateFinishMessage; @@ -85,6 +91,8 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionSerializer; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessageSerializer; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessageMarshallableSerializer; import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage; import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessageSerializer; import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage; @@ -330,5 +338,12 @@ public DiscoveryMessageFactory(Marshaller marsh, ClassLoader clsLdr) { factory.register((short)529, SnapshotCheckHandlersNodeResponse::new, new SnapshotCheckHandlersNodeResponseSerializer()); factory.register((short)530, SnapshotPartitionsVerifyHandlerResponse::new, new SnapshotPartitionsVerifyHandlerResponseMarshallableSerializer(marsh, clsLdr)); + factory.register((short)531, CacheStatisticsClearMessage::new, new CacheStatisticsClearMessageSerializer()); + factory.register((short)532, ChangeGlobalStateMessage::new, + new ChangeGlobalStateMessageMarshallableSerializer(marsh, clsLdr)); + factory.register((short)533, ClientCacheChangeDummyDiscoveryMessage::new, + new ClientCacheChangeDummyDiscoveryMessageMarshallableSerializer(marsh, clsLdr)); + factory.register((short)534, DynamicCacheChangeBatch::new, + new DynamicCacheChangeBatchMarshallableSerializer(marsh, clsLdr)); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsClearMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsClearMessage.java index f360a6acc70d5..a30edd8fc27b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsClearMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheStatisticsClearMessage.java @@ -19,15 +19,17 @@ import java.util.Collection; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** * Cache statistics clear discovery message. */ -public class CacheStatisticsClearMessage implements DiscoveryCustomMessage { +public class CacheStatisticsClearMessage implements DiscoveryCustomMessage, Message { /** */ private static final long serialVersionUID = 0L; @@ -35,16 +37,27 @@ public class CacheStatisticsClearMessage implements DiscoveryCustomMessage { private static final byte INITIAL_MSG_MASK = 0x01; /** Custom message ID. */ - private final IgniteUuid id = IgniteUuid.randomUuid(); + @Order(0) + IgniteUuid id; /** Request id. */ - private final UUID reqId; + @Order(1) + UUID reqId; /** Cache names. */ - private final Collection caches; + @Order(2) + Collection caches; /** Flags. */ - private final byte flags; + @Order(3) + byte flags; + + /** + * Default constructor. + */ + public CacheStatisticsClearMessage() { + // No-op. + } /** * Constructor for request. @@ -54,7 +67,9 @@ public class CacheStatisticsClearMessage implements DiscoveryCustomMessage { public CacheStatisticsClearMessage(UUID reqId, Collection caches) { this.reqId = reqId; this.caches = caches; - this.flags = INITIAL_MSG_MASK; + + id = IgniteUuid.randomUuid(); + flags = INITIAL_MSG_MASK; } /** @@ -63,14 +78,15 @@ public CacheStatisticsClearMessage(UUID reqId, Collection caches) { * @param msg Request message. */ private CacheStatisticsClearMessage(CacheStatisticsClearMessage msg) { - this.reqId = msg.reqId; - this.caches = null; - this.flags = 0; + id = IgniteUuid.randomUuid(); + reqId = msg.reqId; + caches = null; + flags = 0; } /** {@inheritDoc} */ @Override public IgniteUuid id() { - return this.id; + return id; } /** {@inheritDoc} */ @@ -78,11 +94,16 @@ private CacheStatisticsClearMessage(CacheStatisticsClearMessage msg) { return initial() ? new CacheStatisticsClearMessage(this) : null; } + /** {@inheritDoc} */ + @Override public short directType() { + return 531; + } + /** * @return Cache names. */ public Collection caches() { - return this.caches; + return caches; } /** @@ -96,7 +117,7 @@ public boolean initial() { * @return Request id. */ public UUID requestId() { - return this.reqId; + return reqId; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java index 5a31a26a8ba2f..3317a44166beb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java @@ -20,30 +20,46 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.jetbrains.annotations.Nullable; /** * Dummy discovery message which is not really sent via ring, it is just added in local discovery worker queue. */ public class ClientCacheChangeDummyDiscoveryMessage extends AbstractCachePartitionExchangeWorkerTask - implements DiscoveryCustomMessage { + implements DiscoveryCustomMessage, MarshallableMessage { /** */ private static final long serialVersionUID = 0L; /** */ - private final UUID reqId; + @Order(0) + UUID reqId; /** */ - private final Map startReqs; + Map startReqs; + + /** JDK Serialized version of startReqs. */ + @Order(1) + byte[] startRequestsBytes; /** */ @GridToStringInclude - private final Set cachesToClose; + @Order(2) + Set cachesToClose; + + /** */ + public ClientCacheChangeDummyDiscoveryMessage() { + super(null); + } /** * @param secCtx Security context in which current task must be executed. @@ -103,6 +119,23 @@ Set cachesToClose() { throw new UnsupportedOperationException(); } + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (startReqs != null) + startRequestsBytes = U.marshal(marsh, startReqs); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + if (startRequestsBytes != null) + startReqs = U.unmarshal(marsh, startRequestsBytes, clsLdr); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 533; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(ClientCacheChangeDummyDiscoveryMessage.class, this, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index 2e70e956f07f0..a0f913e3bc07f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -19,6 +19,8 @@ import java.util.Collection; import java.util.Set; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; @@ -28,35 +30,50 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.jetbrains.annotations.Nullable; /** * Cache change batch. */ -public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { +public class DynamicCacheChangeBatch implements DiscoveryCustomMessage, MarshallableMessage { /** */ private static final long serialVersionUID = 0L; /** Discovery custom message ID. */ - private IgniteUuid id = IgniteUuid.randomUuid(); + @Order(0) + IgniteUuid id; /** Change requests. */ @GridToStringInclude - private Collection reqs; + Collection reqs; + + /** JDK Serialized version of reqs. */ + @Order(1) + byte[] requestsBytes; /** Cache updates to be executed on exchange. */ - private transient ExchangeActions exchangeActions; + private ExchangeActions exchangeActions; /** */ - private boolean startCaches; + @Order(2) + boolean startCaches; /** Restarting caches. */ - private Set restartingCaches; + @Order(3) + Set restartingCaches; /** Affinity (cache related) services updates to be processed on services deployment process. */ @GridToStringExclude - @Nullable private transient ServiceDeploymentActions serviceDeploymentActions; + @Nullable private ServiceDeploymentActions serviceDeploymentActions; + + /** */ + public DynamicCacheChangeBatch() { + // No-op. + } /** * @param reqs Requests. @@ -64,6 +81,7 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { public DynamicCacheChangeBatch(Collection reqs) { assert !F.isEmpty(reqs) : reqs; + id = IgniteUuid.randomUuid(); this.reqs = reqs; } @@ -157,6 +175,23 @@ public void startCaches(boolean startCaches) { this.startCaches = startCaches; } + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (reqs != null) + requestsBytes = U.marshal(marsh, reqs); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + if (requestsBytes != null) + reqs = U.unmarshal(marsh, requestsBytes, clsLdr); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 534; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DynamicCacheChangeBatch.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java index be4e628b18cdb..d5763cbba05b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java @@ -19,7 +19,9 @@ import java.util.List; import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; @@ -29,47 +31,68 @@ import org.apache.ignite.internal.processors.service.ServiceDeploymentActions; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.MarshallableMessage; import org.jetbrains.annotations.Nullable; /** * Message represent request for change cluster global state. */ -public class ChangeGlobalStateMessage implements DiscoveryCustomMessage { +public class ChangeGlobalStateMessage implements DiscoveryCustomMessage, MarshallableMessage { /** */ private static final long serialVersionUID = 0L; /** Custom message ID. */ - private IgniteUuid id = IgniteUuid.randomUuid(); + @Order(0) + IgniteUuid id; /** Request ID */ - private UUID reqId; + @Order(1) + UUID reqId; /** Initiator node ID. */ - private UUID initiatingNodeId; + @Order(2) + UUID initiatingNodeId; /** Cluster state */ - private ClusterState state; + @Order(3) + ClusterState state; /** Configurations read from persistent store. */ private List storedCfgs; + /** JDK Serialized version of storedCfgs. */ + @Order(4) + byte[] storedCfgsBytes; + /** */ @Nullable private BaselineTopology baselineTopology; + /** JDK Serialized version of baselineTopology. */ + @Order(5) + byte[] baselineTopologyBytes; + /** */ - private boolean forceChangeBaselineTopology; + @Order(6) + boolean forceChangeBaselineTopology; /** */ @GridToStringExclude - private transient ExchangeActions exchangeActions; + private ExchangeActions exchangeActions; /** Services deployment actions to be processed on services deployment process. */ @GridToStringExclude - @Nullable private transient ServiceDeploymentActions serviceDeploymentActions; + @Nullable private ServiceDeploymentActions serviceDeploymentActions; /** If {@code true}, cluster deactivation will be forced. */ - private boolean forceDeactivation; + @Order(7) + boolean forceDeactivation; + + /** No-arg constructor for deserialization. */ + public ChangeGlobalStateMessage() { + } /** * @param reqId State change request ID. @@ -94,6 +117,7 @@ public ChangeGlobalStateMessage( assert reqId != null; assert initiatingNodeId != null; + id = IgniteUuid.randomUuid(); this.reqId = reqId; this.initiatingNodeId = initiatingNodeId; this.storedCfgs = storedCfgs; @@ -104,7 +128,7 @@ public ChangeGlobalStateMessage( } /** - * @return Configurations read from persistent store.. + * @return Configurations read from persistent store. */ @Nullable public List storedCacheConfigurations() { return storedCfgs; @@ -211,6 +235,29 @@ public UUID requestId() { return reqId; } + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + if (storedCfgs != null) + storedCfgsBytes = U.marshal(marsh, storedCfgs); + + if (baselineTopology != null) + baselineTopologyBytes = U.marshal(marsh, baselineTopology); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + if (storedCfgsBytes != null) + storedCfgs = U.unmarshal(marsh, storedCfgsBytes, clsLdr); + + if (baselineTopologyBytes != null) + baselineTopology = U.unmarshal(marsh, baselineTopologyBytes, clsLdr); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 532; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(ChangeGlobalStateMessage.class, this);