Skip to content

Commit 2de8059

Browse files
committed
Use MarshallableMessage
1 parent 1026568 commit 2de8059

4 files changed

Lines changed: 72 additions & 145 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@
3636
import org.apache.ignite.internal.processors.cache.ClientCacheChangeDiscoveryMessage;
3737
import org.apache.ignite.internal.processors.cache.ClientCacheChangeDiscoveryMessageSerializer;
3838
import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessage;
39-
import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessageSerializer;
39+
import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessageMarshallableSerializer;
4040
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
41-
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatchSerializer;
41+
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatchMarshallableSerializer;
4242
import org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessage;
4343
import org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessageSerializer;
4444
import org.apache.ignite.internal.processors.cache.WalStateFinishMessage;
@@ -54,7 +54,7 @@
5454
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
5555
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessageSerializer;
5656
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
57-
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessageSerializer;
57+
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessageMarshallableSerializer;
5858
import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage;
5959
import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessageSerializer;
6060
import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
@@ -238,8 +238,11 @@ public DiscoveryMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMa
238238
factory.register((short)518, MappingProposedMessage::new, new MappingProposedMessageSerializer());
239239
factory.register((short)519, MarshallerMappingItem::new, new MarshallerMappingItemSerializer());
240240
factory.register((short)520, CacheStatisticsClearMessage::new, new CacheStatisticsClearMessageSerializer());
241-
factory.register((short)521, ChangeGlobalStateMessage::new, new ChangeGlobalStateMessageSerializer());
242-
factory.register((short)522, ClientCacheChangeDummyDiscoveryMessage::new, new ClientCacheChangeDummyDiscoveryMessageSerializer());
243-
factory.register((short)523, DynamicCacheChangeBatch::new, new DynamicCacheChangeBatchSerializer());
241+
factory.register((short)521, ChangeGlobalStateMessage::new,
242+
new ChangeGlobalStateMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
243+
factory.register((short)522, ClientCacheChangeDummyDiscoveryMessage::new,
244+
new ClientCacheChangeDummyDiscoveryMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
245+
factory.register((short)523, DynamicCacheChangeBatch::new,
246+
new DynamicCacheChangeBatchMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
244247
}
245248
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClientCacheChangeDummyDiscoveryMessage.java

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,22 @@
2121
import java.util.Set;
2222
import java.util.UUID;
2323
import org.apache.ignite.IgniteCheckedException;
24-
import org.apache.ignite.IgniteException;
2524
import org.apache.ignite.internal.Order;
2625
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2726
import org.apache.ignite.internal.processors.security.SecurityContext;
2827
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
2928
import org.apache.ignite.internal.util.typedef.internal.S;
3029
import org.apache.ignite.internal.util.typedef.internal.U;
3130
import org.apache.ignite.lang.IgniteUuid;
32-
import org.apache.ignite.marshaller.Marshallers;
33-
import org.apache.ignite.plugin.extensions.communication.Message;
31+
import org.apache.ignite.marshaller.Marshaller;
32+
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
3433
import org.jetbrains.annotations.Nullable;
3534

3635
/**
3736
* Dummy discovery message which is not really sent via ring, it is just added in local discovery worker queue.
3837
*/
3938
public class ClientCacheChangeDummyDiscoveryMessage extends AbstractCachePartitionExchangeWorkerTask
40-
implements DiscoveryCustomMessage, Message {
39+
implements DiscoveryCustomMessage, MarshallableMessage {
4140
/** */
4241
private static final long serialVersionUID = 0L;
4342

@@ -48,8 +47,8 @@ public class ClientCacheChangeDummyDiscoveryMessage extends AbstractCachePartiti
4847
/** */
4948
Map<String, DynamicCacheChangeRequest> startReqs;
5049

51-
/** */
52-
@Order(value = 1, method = "startRequestsBytes")
50+
/** JDK Serialized version of startReqs. */
51+
@Order(1)
5352
byte[] startRequestsBytes;
5453

5554
/** */
@@ -100,33 +99,7 @@ UUID requestId() {
10099
* @return Cache start requests.
101100
*/
102101
@Nullable Map<String, DynamicCacheChangeRequest> startRequests() {
103-
if (startReqs != null)
104-
return startReqs;
105-
106-
try {
107-
return (startRequestsBytes != null) ? (startReqs = U.unmarshal(Marshallers.jdk(), startRequestsBytes, null)) : null;
108-
}
109-
catch (IgniteCheckedException e) {
110-
throw new IgniteException("Failed to unmarshal start requests", e);
111-
}
112-
}
113-
114-
/** */
115-
byte[] startRequestsBytes() {
116-
if (startRequestsBytes != null)
117-
return startRequestsBytes;
118-
119-
try {
120-
return (startReqs != null) ? U.marshal(Marshallers.jdk(), startReqs) : null;
121-
}
122-
catch (IgniteCheckedException e) {
123-
throw new IgniteException("Failed to marshal start requests", e);
124-
}
125-
}
126-
127-
/** */
128-
void startRequestsBytes(byte[] startRequestsBytes) {
129-
this.startRequestsBytes = startRequestsBytes;
102+
return startReqs;
130103
}
131104

132105
/**
@@ -146,6 +119,18 @@ Set<String> cachesToClose() {
146119
throw new UnsupportedOperationException();
147120
}
148121

122+
/** {@inheritDoc} */
123+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
124+
if (startReqs != null)
125+
startRequestsBytes = U.marshal(marsh, startReqs);
126+
}
127+
128+
/** {@inheritDoc} */
129+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
130+
if (startRequestsBytes != null)
131+
startReqs = U.unmarshal(marsh, startRequestsBytes, clsLdr);
132+
}
133+
149134
/** {@inheritDoc} */
150135
@Override public short directType() {
151136
return 522;

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.Collection;
2121
import java.util.Set;
2222
import org.apache.ignite.IgniteCheckedException;
23-
import org.apache.ignite.IgniteException;
2423
import org.apache.ignite.internal.Order;
2524
import org.apache.ignite.internal.managers.discovery.DiscoCache;
2625
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
@@ -33,15 +32,14 @@
3332
import org.apache.ignite.internal.util.typedef.internal.S;
3433
import org.apache.ignite.internal.util.typedef.internal.U;
3534
import org.apache.ignite.lang.IgniteUuid;
36-
import org.apache.ignite.plugin.extensions.communication.Message;
35+
import org.apache.ignite.marshaller.Marshaller;
36+
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
3737
import org.jetbrains.annotations.Nullable;
3838

39-
import static org.apache.ignite.marshaller.Marshallers.jdk;
40-
4139
/**
4240
* Cache change batch.
4341
*/
44-
public class DynamicCacheChangeBatch implements DiscoveryCustomMessage, Message {
42+
public class DynamicCacheChangeBatch implements DiscoveryCustomMessage, MarshallableMessage {
4543
/** */
4644
private static final long serialVersionUID = 0L;
4745

@@ -54,7 +52,7 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage, Message
5452
Collection<DynamicCacheChangeRequest> reqs;
5553

5654
/** JDK Serialized version of reqs. */
57-
@Order(value = 1, method = "requestsBytes")
55+
@Order(1)
5856
byte[] requestsBytes;
5957

6058
/** Cache updates to be executed on exchange. */
@@ -107,33 +105,7 @@ public DynamicCacheChangeBatch(Collection<DynamicCacheChangeRequest> reqs) {
107105
* @return Collection of change requests.
108106
*/
109107
public Collection<DynamicCacheChangeRequest> requests() {
110-
if (reqs != null)
111-
return reqs;
112-
113-
try {
114-
return (requestsBytes != null) ? (reqs = U.unmarshal(jdk(), requestsBytes, null)) : null;
115-
}
116-
catch (IgniteCheckedException e) {
117-
throw new IgniteException("Failed to unmarshal cache change requests", e);
118-
}
119-
}
120-
121-
/** */
122-
byte[] requestsBytes() {
123-
if (requestsBytes != null)
124-
return requestsBytes;
125-
126-
try {
127-
return (reqs != null) ? U.marshal(jdk(), reqs) : null;
128-
}
129-
catch (IgniteCheckedException e) {
130-
throw new IgniteException("Failed to marshal cache change requests", e);
131-
}
132-
}
133-
134-
/** */
135-
void requestsBytes(byte[] requestsBytes) {
136-
this.requestsBytes = requestsBytes;
108+
return reqs;
137109
}
138110

139111
/**
@@ -203,9 +175,21 @@ public void startCaches(boolean startCaches) {
203175
this.startCaches = startCaches;
204176
}
205177

178+
/** {@inheritDoc} */
179+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
180+
if (reqs != null)
181+
requestsBytes = U.marshal(marsh, reqs);
182+
}
183+
184+
/** {@inheritDoc} */
185+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
186+
if (requestsBytes != null)
187+
reqs = U.unmarshal(marsh, requestsBytes, clsLdr);
188+
}
189+
206190
/** {@inheritDoc} */
207191
@Override public short directType() {
208-
return 510;
192+
return 523;
209193
}
210194

211195
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ChangeGlobalStateMessage.java

Lines changed: 27 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.List;
2121
import java.util.UUID;
2222
import org.apache.ignite.IgniteCheckedException;
23-
import org.apache.ignite.IgniteException;
2423
import org.apache.ignite.cluster.ClusterState;
2524
import org.apache.ignite.internal.Order;
2625
import org.apache.ignite.internal.managers.discovery.DiscoCache;
@@ -34,14 +33,14 @@
3433
import org.apache.ignite.internal.util.typedef.internal.S;
3534
import org.apache.ignite.internal.util.typedef.internal.U;
3635
import org.apache.ignite.lang.IgniteUuid;
37-
import org.apache.ignite.marshaller.Marshallers;
38-
import org.apache.ignite.plugin.extensions.communication.Message;
36+
import org.apache.ignite.marshaller.Marshaller;
37+
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
3938
import org.jetbrains.annotations.Nullable;
4039

4140
/**
4241
* Message represent request for change cluster global state.
4342
*/
44-
public class ChangeGlobalStateMessage implements DiscoveryCustomMessage, Message {
43+
public class ChangeGlobalStateMessage implements DiscoveryCustomMessage, MarshallableMessage {
4544
/** */
4645
private static final long serialVersionUID = 0L;
4746

@@ -64,19 +63,15 @@ public class ChangeGlobalStateMessage implements DiscoveryCustomMessage, Message
6463
/** Configurations read from persistent store. */
6564
private List<StoredCacheData> storedCfgs;
6665

67-
/**
68-
* JDK Serialized version of storedCfgs
69-
*/
70-
@Order(value = 4, method = "storedCacheConfigurationsBytes")
66+
/** JDK Serialized version of storedCfgs. */
67+
@Order(4)
7168
byte[] storedCfgsBytes;
7269

7370
/** */
7471
@Nullable private BaselineTopology baselineTopology;
7572

76-
/**
77-
* JDK Serialized version of baselineTopology
78-
*/
79-
@Order(value = 5, method = "baselineTopologyBytes")
73+
/** JDK Serialized version of baselineTopology. */
74+
@Order(5)
8075
byte[] baselineTopologyBytes;
8176

8277
/** */
@@ -136,35 +131,7 @@ public ChangeGlobalStateMessage(
136131
* @return Configurations read from persistent store.
137132
*/
138133
@Nullable public List<StoredCacheData> storedCacheConfigurations() {
139-
if (storedCfgs != null) {
140-
return storedCfgs;
141-
}
142-
143-
try {
144-
return (storedCfgsBytes != null) ? (storedCfgs = U.unmarshal(Marshallers.jdk(), storedCfgsBytes, null)) : null;
145-
}
146-
catch (IgniteCheckedException e) {
147-
throw new IgniteException("Failed to unmarshal schema operation", e);
148-
}
149-
}
150-
151-
/** */
152-
byte[] storedCacheConfigurationsBytes() {
153-
if (storedCfgsBytes != null) {
154-
return storedCfgsBytes;
155-
}
156-
157-
try {
158-
return (storedCfgs != null) ? U.marshal(Marshallers.jdk(), storedCfgs) : null;
159-
}
160-
catch (IgniteCheckedException e) {
161-
throw new IgniteException("Failed to marshal schema operation", e);
162-
}
163-
}
164-
165-
/** */
166-
byte[] storedCacheConfigurationsBytes(byte[] storedCfgsBytes) {
167-
return this.storedCfgsBytes = storedCfgsBytes;
134+
return storedCfgs;
168135
}
169136

170137
/**
@@ -250,37 +217,7 @@ public boolean forceChangeBaselineTopology() {
250217
* @return Baseline topology.
251218
*/
252219
@Nullable public BaselineTopology baselineTopology() {
253-
if (baselineTopology != null) {
254-
return baselineTopology;
255-
}
256-
257-
try {
258-
return (baselineTopologyBytes != null) ?
259-
(baselineTopology = U.unmarshal(Marshallers.jdk(), baselineTopologyBytes, null)) :
260-
null;
261-
}
262-
catch (IgniteCheckedException e) {
263-
throw new IgniteException("Failed to unmarshal schema operation", e);
264-
}
265-
}
266-
267-
/** */
268-
byte[] baselineTopologyBytes() {
269-
if (baselineTopologyBytes != null) {
270-
return baselineTopologyBytes;
271-
}
272-
273-
try {
274-
return (baselineTopology != null) ? U.marshal(Marshallers.jdk(), baselineTopology) : null;
275-
}
276-
catch (IgniteCheckedException e) {
277-
throw new IgniteException("Failed to marshal schema operation", e);
278-
}
279-
}
280-
281-
/** */
282-
void baselineTopologyBytes(byte[] baselineTopologyBytes) {
283-
this.baselineTopologyBytes = baselineTopologyBytes;
220+
return baselineTopology;
284221
}
285222

286223
/**
@@ -298,6 +235,24 @@ public UUID requestId() {
298235
return reqId;
299236
}
300237

238+
/** {@inheritDoc} */
239+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
240+
if (storedCfgs != null)
241+
storedCfgsBytes = U.marshal(marsh, storedCfgs);
242+
243+
if (baselineTopology != null)
244+
baselineTopologyBytes = U.marshal(marsh, baselineTopology);
245+
}
246+
247+
/** {@inheritDoc} */
248+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
249+
if (storedCfgsBytes != null)
250+
storedCfgs = U.unmarshal(marsh, storedCfgsBytes, clsLdr);
251+
252+
if (baselineTopologyBytes != null)
253+
baselineTopology = U.unmarshal(marsh, baselineTopologyBytes, clsLdr);
254+
}
255+
301256
/** {@inheritDoc} */
302257
@Override public short directType() {
303258
return 521;

0 commit comments

Comments
 (0)