Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.thread.context;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteException;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.thread.context.OperationContextAttribute.MAX_ATTR_CNT;

/** */
public class DistributedOperationAttributeRegistry {
/** */
private static final DistributedOperationAttributeRegistry INSTANCE = new DistributedOperationAttributeRegistry();

/** */
private final AtomicInteger bitmaskGen = new AtomicInteger();

/** Attributes by message type. */
private final Map<Class<? extends Message>, OperationContextAttribute<? extends Message>> msgAttrs = new ConcurrentHashMap<>();

/** Attributes by message id. */
private final Map<Byte, OperationContextAttribute<? extends Message>> idAttrs = new ConcurrentHashMap<>();

/** Attributes mapping: bitmask -> id */
private final Map<Integer, Byte> attrsMaskIdMap = new ConcurrentHashMap<>();

/** */
public static DistributedOperationAttributeRegistry get() {
return INSTANCE;
}

/** */
public <T extends Message> OperationContextAttribute<T> register(int id, Class<T> msgType, @Nullable T initVal) {
assert id >= 0;
assert initVal == null || msgType.isAssignableFrom(initVal.getClass());

OperationContextAttribute<T> attr;

synchronized (bitmaskGen) {
try {
assert msgAttrs.size() < MAX_ATTR_CNT
: "Exceeded maximum number of created Attributes instances [maxCnt=" + MAX_ATTR_CNT + ']';
assert id < MAX_ATTR_CNT : "Exceeded maximum attribute id " + (MAX_ATTR_CNT - 1);

byte id0 = (byte)id;

int bitmask = 1 << bitmaskGen.getAndIncrement();

assert attrsMaskIdMap.get(bitmask) == null;
assert idAttrs.get(id0) == null;

attr = new OperationContextAttribute<>(bitmask, initVal);

if (msgAttrs.putIfAbsent(msgType, attr) != null)
throw new IgniteException("Attribute with message type " + msgType.getSimpleName() + " already exists.");

attrsMaskIdMap.put(bitmask, id0);
idAttrs.put(id0, attr);
}
catch (Throwable t) {
bitmaskGen.decrementAndGet();

throw t;
}
}

return attr;
}

/** */
public Collection<OperationContextAttribute<? extends Message>> attributes() {
return Collections.unmodifiableCollection(msgAttrs.values());
}

/** */
public @Nullable <T extends Message> OperationContextAttribute<T> attribute(Class<T> msgType) {
return (OperationContextAttribute<T>)INSTANCE.msgAttrs.get(msgType);
}

/** */
public @Nullable <T extends Message> OperationContextAttribute<T> attribute(byte attrId) {
return (OperationContextAttribute<T>)INSTANCE.idAttrs.get(attrId);
}

/** */
public @Nullable Byte attributeId(OperationContextAttribute<? extends Message> attr) {
return attrsMaskIdMap.get(attr.bitmask());
}

/** */
public static Message attributeMessage(OperationContextAttribute<?> attr, @Nullable Object val) {
if (val == null)
return null;

assert val instanceof Message
: "Distributed context attribute value have to be a Message. Current type is: " + val.getClass().getSimpleName();

assert attr.initialValue() == null || attr.initialValue().getClass() == val.getClass();

return (Message)val;
}

/** Mostly for testing purposes. */
public void clear() {
msgAttrs.clear();
attrsMaskIdMap.clear();
bitmaskGen.set(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.ignite.internal.thread.context.concurrent.OperationContextAwareExecutor;
import org.apache.ignite.internal.thread.context.function.OperationContextAwareCallable;
import org.apache.ignite.internal.thread.context.function.OperationContextAwareRunnable;
Expand Down Expand Up @@ -96,6 +97,24 @@ public static <T> Scope set(OperationContextAttribute<T> attr, T val) {
return ctx.getInternal(attr) == val ? NOOP_SCOPE : ctx.applyAttributeUpdates(new AttributeValueHolder<>(attr, val));
}

/**
* Updates the value of the specified attribute for the {@link OperationContext} bound to the thread this method
* is called from.
*
* @param attrs Context attributes.
* @return Scope instance that, when closed, undoes the applied update. It is crucial to undo all applied
* {@link OperationContext} updates to free up thread-bound resources and avoid memory leaks, so it is highly
* encouraged to use a try-with-resource block to close the returned Scope. Note, updates must be undone in the
* same order and in the same thread they were applied.
*/
public static Scope set(Map<OperationContextAttribute<Object>, Object> attrs) {
ContextUpdater updater = ContextUpdater.create();

attrs.forEach(updater::set);

return updater.apply();
}

/**
* Updates the values of the specified attributes for the {@link OperationContext} bound to the thread this method
* is called from.
Expand Down Expand Up @@ -300,7 +319,7 @@ private int mergeUpdatedAttributeBits(AttributeValueHolder<?>[] attrVals) {
return res;
}

/** */
/** {@inheritDoc} */
@Override public void close() {
undo(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.ignite.internal.thread.context;

import java.util.concurrent.atomic.AtomicInteger;
import org.jetbrains.annotations.Nullable;

/**
Expand All @@ -29,10 +28,7 @@
*/
public class OperationContextAttribute<T> {
/** */
static final AtomicInteger ID_GEN = new AtomicInteger();

/** */
static final int MAX_ATTR_CNT = Integer.SIZE;
public static final int MAX_ATTR_CNT = Integer.SIZE;

/** */
private final int bitmask;
Expand All @@ -41,7 +37,9 @@ public class OperationContextAttribute<T> {
@Nullable private final T initVal;

/** */
private OperationContextAttribute(int bitmask, @Nullable T initVal) {
OperationContextAttribute(int bitmask, @Nullable T initVal) {
assert Integer.numberOfTrailingZeros(bitmask) + Integer.numberOfLeadingZeros(bitmask) == Integer.SIZE - 1;

this.bitmask = bitmask;
this.initVal = initVal;
}
Expand All @@ -59,7 +57,7 @@ private OperationContextAttribute(int bitmask, @Nullable T initVal) {
* Unique attribute bitmask calculated by shifting one from 0 to {@link Integer#SIZE}. It provides an ability to
* use {@link OperationContext} Attribute with bit fields.
*/
int bitmask() {
public int bitmask() {
return bitmask;
}

Expand All @@ -78,32 +76,4 @@ int bitmask() {
@Override public int hashCode() {
return bitmask;
}

/**
* Creates new instance of the {@link OperationContext} Attribute with Initial Value set to {@code null}.
* <p>
* Note, that the maximum number of attribute instances that can be created is currently limited to
* {@link #MAX_ATTR_CNT} for implementation reasons.
* </p>
*/
public static <T> OperationContextAttribute<T> newInstance() {
return newInstance(null);
}

/**
* Creates new instance of the {@link OperationContext} Attribute with the specified Initial Value. The Initial
* Value is returned by {@link OperationContext#get} method if the Attribute's value is not explicitly set in the
* {@link OperationContext}.
* <p>
* Note, that the maximum number of attribute instances that can be created is currently limited to
* {@link #MAX_ATTR_CNT} for implementation reasons.
* </p>
*/
public static <T> OperationContextAttribute<T> newInstance(T initVal) {
int id = ID_GEN.getAndIncrement();

assert id < MAX_ATTR_CNT : "Exceeded maximum supported number of created Attributes instances [maxCnt=" + MAX_ATTR_CNT + ']';

return new OperationContextAttribute<>(1 << id, initVal);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.managers.communication.UnknownMessageException;

/**
* Base class for all communication messages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
* limitations under the License.
*/

package org.apache.ignite.internal.managers.communication;
package org.apache.ignite.plugin.extensions.communication;

import org.apache.ignite.IgniteException;
import org.apache.ignite.plugin.extensions.communication.Message;

/**
* Exception to be thrown when unregistered class serialized or unknown message deserialized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.managers.deployment.GridDeploymentRequest;
import org.apache.ignite.internal.managers.deployment.GridDeploymentResponse;
import org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper;
import org.apache.ignite.internal.managers.encryption.ChangeCacheEncryptionRequest;
import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyRequest;
import org.apache.ignite.internal.managers.encryption.GenerateEncryptionKeyResponse;
Expand All @@ -44,6 +43,8 @@
import org.apache.ignite.internal.managers.encryption.NodeEncryptionKeys;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage;
import org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider;
import org.apache.ignite.internal.processors.authentication.SecurityContextImpl;
import org.apache.ignite.internal.processors.authentication.SecuritySubjectImpl;
import org.apache.ignite.internal.processors.authentication.User;
import org.apache.ignite.internal.processors.authentication.UserAcceptedMessage;
import org.apache.ignite.internal.processors.authentication.UserAuthenticateRequestMessage;
Expand Down Expand Up @@ -425,7 +426,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
withNoSchema(FullMessage.class);
withNoSchema(InitMessage.class);
withNoSchema(CacheStatisticsModeChangeMessage.class);
withNoSchema(SecurityAwareCustomMessageWrapper.class);
++msgIdx; // Former SecurityAwareCustomMessageWrapper.
withNoSchema(MetadataRemoveAcceptedMessage.class);
withNoSchema(MetadataRemoveProposedMessage.class);
withNoSchema(WalStateFinishMessage.class);
Expand Down Expand Up @@ -666,6 +667,11 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
withNoSchema(PartitionHashRecord.class);
withNoSchema(TransactionsHashRecord.class);

// [13400 - 13600]: Operation context messages.
msgIdx = 13400;
withNoSchema(SecuritySubjectImpl.class);
withNoSchema(SecurityContextImpl.class);

assert msgIdx <= MAX_MESSAGE_ID;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
import org.apache.ignite.plugin.extensions.communication.UnknownMessageException;
import org.jetbrains.annotations.Nullable;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.authentication.SecurityContextImpl;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.ClientCacheChangeDummyDiscoveryMessage;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
Expand All @@ -95,7 +96,9 @@
import org.apache.ignite.internal.systemview.NodeAttributeViewWalker;
import org.apache.ignite.internal.systemview.NodeMetricsViewWalker;
import org.apache.ignite.internal.thread.OomExceptionHandler;
import org.apache.ignite.internal.thread.context.DistributedOperationAttributeRegistry;
import org.apache.ignite.internal.thread.context.OperationContext;
import org.apache.ignite.internal.thread.context.OperationContextAttribute;
import org.apache.ignite.internal.thread.context.Scope;
import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper;
import org.apache.ignite.internal.util.GridAtomicLong;
Expand Down Expand Up @@ -134,7 +137,6 @@
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.apache.ignite.spi.discovery.DiscoveryNotification;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
Expand Down Expand Up @@ -924,12 +926,13 @@ public SecurityAwareNotificationTask(DiscoveryNotification notification) {

/** */
@Override public void run() {
DiscoverySpiCustomMessage customMsg = notification.customMessage();
OperationContextAttribute<SecurityContextImpl> attr = DistributedOperationAttributeRegistry.get()
.attribute(SecurityContextImpl.class);

if (customMsg instanceof SecurityAwareCustomMessageWrapper) {
UUID secSubjId = ((SecurityAwareCustomMessageWrapper)customMsg).securitySubjectId();
SecurityContext secCtxMsg = OperationContext.get(attr);

try (Scope ignored = ctx.security().withContext(secSubjId)) {
if (secCtxMsg != null) {
try (Scope ignored = ctx.security().withContext(secCtxMsg.subject().id())) {
super.run();
}
}
Expand Down Expand Up @@ -2324,12 +2327,19 @@ public GridFutureAdapter<DiscoveryLocalJoinData> localJoinFuture() {
* @throws IgniteCheckedException If failed.
*/
public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteCheckedException {
IgniteSecurity sec = ctx.security();

try {
IgniteSecurity security = ctx.security();
if (sec.enabled()) {
OperationContextAttribute<SecurityContextImpl> secAttr = DistributedOperationAttributeRegistry.get()
.attribute(SecurityContextImpl.class);

getSpi().sendCustomEvent(security.enabled()
? new SecurityAwareCustomMessageWrapper(msg, security.securityContext().subject().id())
: msg);
try (Scope ignored = OperationContext.set(secAttr, SecurityContextImpl.of(sec.securityContext()))) {
getSpi().sendCustomEvent(msg);
}
}
else
getSpi().sendCustomEvent(msg);
}
catch (IgniteClientDisconnectedException e) {
IgniteFuture<?> reconnectFut = ctx.cluster().clientReconnectFuture();
Expand Down
Loading