diff --git a/modules/binary/api/src/main/java/org/apache/ignite/binary/BinaryRawWriter.java b/modules/binary/api/src/main/java/org/apache/ignite/binary/BinaryRawWriter.java index ffa89ce274ceb..a2c408af6c919 100644 --- a/modules/binary/api/src/main/java/org/apache/ignite/binary/BinaryRawWriter.java +++ b/modules/binary/api/src/main/java/org/apache/ignite/binary/BinaryRawWriter.java @@ -128,6 +128,14 @@ public interface BinaryRawWriter { */ public void writeByteArray(@Nullable byte[] val) throws BinaryObjectException; + /** + * @param val Value to write. + * @param off Offset to start write from. + * @param len Count of bytes to write. + * @throws BinaryObjectException In case of error. + */ + public void writeByteArray(@Nullable byte[] val, int off, int len) throws BinaryObjectException; + /** * @param val Value to write. * @throws BinaryObjectException In case of error. diff --git a/modules/binary/impl/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java b/modules/binary/impl/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java index 033be969abcf7..b3e1042f6d380 100644 --- a/modules/binary/impl/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java +++ b/modules/binary/impl/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java @@ -904,6 +904,21 @@ void writeBooleanField(@Nullable Boolean val) { } } + /** {@inheritDoc} */ + @Override public void writeByteArray(@Nullable byte[] val, int pos, int len) throws BinaryObjectException { + if (val == null) + out.writeByte(GridBinaryMarshaller.NULL); + else { + if (len > val.length) + throw new IllegalArgumentException("len must be less then val.length"); + + out.unsafeEnsure(1 + 4); + out.unsafeWriteByte(GridBinaryMarshaller.BYTE_ARR); + out.unsafeWriteInt(len); + out.writeByteArray(val, pos, len); + } + } + /** {@inheritDoc} */ @Override public void writeShortArray(String fieldName, @Nullable short[] val) throws BinaryObjectException { writeFieldId(fieldName); diff --git a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageProcessor.java b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageProcessor.java index 0a9c0fd58ab8b..e90cadd0f8848 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageProcessor.java +++ b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageProcessor.java @@ -33,12 +33,14 @@ import javax.lang.model.SourceVersion; import javax.lang.model.element.Element; import javax.lang.model.element.ElementKind; +import javax.lang.model.element.ExecutableElement; import javax.lang.model.element.Modifier; import javax.lang.model.element.TypeElement; import javax.lang.model.element.VariableElement; import javax.lang.model.type.TypeKind; import javax.lang.model.type.TypeMirror; import javax.tools.Diagnostic; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteBiTuple; import static org.apache.ignite.internal.MessageSerializerGenerator.DLFT_ENUM_MAPPER_CLS; @@ -115,6 +117,9 @@ public class MessageProcessor extends AbstractProcessor { if (clazz.getModifiers().contains(Modifier.ABSTRACT)) continue; + if (!checkConstructors(clazz)) + continue; + List fields = orderedFields(clazz); if (fields.isEmpty() && emptyMsgs.stream().noneMatch(t -> isAssignable(t, clazz))) { @@ -146,6 +151,34 @@ public class MessageProcessor extends AbstractProcessor { return true; } + /** */ + private boolean checkConstructors(TypeElement clazz) { + boolean isMarshallableMsg = isAssignable( + clazz.asType(), + processingEnv.getElementUtils().getTypeElement("org.apache.ignite.internal.MarshallableMessage") + ); + + for (Element el : clazz.getEnclosedElements()) { + if (el.getKind() != ElementKind.CONSTRUCTOR) + continue; + + ExecutableElement c = (ExecutableElement)el; + + boolean isDfltConstructor = F.isEmpty(c.getParameters()); + + if (isDfltConstructor && !isMarshallableMsg) + return true; + } + + processingEnv.getMessager().printMessage( + Diagnostic.Kind.ERROR, + "Class must have default constructor: " + clazz.getQualifiedName(), + clazz + ); + + return false; + } + /** * Collects all fields annotated with {@link Order} from the given {@link TypeElement} and all its superclasses. *

diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index 8bf794062ef45..6c38c62040684 100755 --- a/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -2153,4 +2153,18 @@ public static Collection emptyIfNull(@Nullable Collection col) { public static Map emptyIfNull(@Nullable Map map) { return map == null ? Collections.emptyMap() : map; } + + /** + * @param arr Array. + * @param el Element. + * @return Element index or {@code -1} if not found. + */ + public static int indexOf(T[] arr, T el) { + for (int i = 0; i < arr.length; i++) { + if (Objects.equals(arr[i], el)) + return i; + } + + return -1; + } } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java index eae0f1e86dcbb..2fad7652666c1 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite2.java @@ -22,6 +22,7 @@ import org.apache.ignite.util.CacheMetricsCommandTest; import org.apache.ignite.util.CdcCommandTest; import org.apache.ignite.util.CdcResendCommandTest; +import org.apache.ignite.util.GridCommandHandlerClassPathTest; import org.apache.ignite.util.GridCommandHandlerConsistencyBinaryTest; import org.apache.ignite.util.GridCommandHandlerConsistencySensitiveTest; import org.apache.ignite.util.GridCommandHandlerConsistencyTest; @@ -74,7 +75,8 @@ SecurityCommandHandlerPermissionsTest.class, - IdleVerifyDumpTest.class + IdleVerifyDumpTest.class, + GridCommandHandlerClassPathTest.class }) public class IgniteControlUtilityTestSuite2 { } diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClassPathTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClassPathTest.java new file mode 100644 index 0000000000000..33f5cd2ee4101 --- /dev/null +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerClassPathTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.util; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.classpath.IgniteClassPath; +import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; +import org.junit.Test; + +import static org.apache.ignite.internal.classpath.ClassPathProcessor.METASTORE_PREFIX; +import static org.apache.ignite.internal.classpath.IgniteClassPathState.READY; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_INVALID_ARGUMENTS; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; +import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_UNEXPECTED_ERROR; +import static org.apache.ignite.testframework.GridTestUtils.assertContains; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** + * Test for --classpath command. + */ +public class GridCommandHandlerClassPathTest extends GridCommandHandlerAbstractTest { + /** */ + public static final int GRID_CNT = 3; + + /** */ + public static final int FAIL_NODE_IDX = 2; + + /** */ + private Set cpFiles; + + /** */ + private String filesArg; + + /** */ + private ListeningTestLogger lsnrLog; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName).setGridLogger(lsnrLog); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cpFiles = cpFiles(); + + filesArg = cpFiles.stream().map(Path::toFile).map(File::getAbsolutePath).collect(Collectors.joining(",")); + + cleanPersistenceDir(); + + lsnrLog = new ListeningTestLogger(log); + + startGrids(GRID_CNT); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + // TODO: add test that classpath can be created only with ADMIN_OPS privelege. + // TODO: add CRC or other check of file integrity. + // TODO add in production code checks of files integriy. Perform file integrity check on startup. + // Support pretty print for command. + // Test for different failure types: node fail, file write fail, etc. + // Node fail: upload node vs not upload node. + // Check cleanup. + // Concurrent creation of CP with the same name. + + /** Tests --create command. */ + @Test + public void testCreate() throws Exception { + injectTestSystemOut(); + + final TestCommandHandler hnd = newCommandHandler(createTestLogger()); + + String cpName = "mysuperapp_" + commandHandler; + + // Empty root doesn't affect ClassPath creation. + if (commandHandler.equals(JMX_CMD_HND)) + assertTrue(grid(0).context().pdsFolderResolver().fileTree().classPathRoot(cpName).mkdirs()); + + LogListener cpReadyLsnr = readyLogListener(GRID_CNT); + + lsnrLog.registerListener(cpReadyLsnr); + + assertEquals(EXIT_CODE_OK, execute(hnd, "--class-path", "create", "--name", cpName, "--files", filesArg)); + + IgniteClassPath icp = classPath(cpName); + + assertEquals(READY, icp.state()); + + checkFilesExists(cpName, -1); + + assertTrue(waitForCondition(cpReadyLsnr::check, 30_000)); + + try { + // Attemp to create ClassPath with the same name must fail. + assertEquals( + EXIT_CODE_UNEXPECTED_ERROR, + execute(hnd, "--class-path", "create", "--name", cpName, "--files", filesArg) + ); + } + finally { + assertTrue(testOut.toString().contains("Fail to register ClassPath. Same ClassPath exists, already?")); + + assertEquals("Metastorage state must not change", icp, classPath(cpName)); + + checkFilesExists(cpName, -1); + } + } + + /** Tests --create command. */ + @Test + public void testCreateWhenRootExists() throws Exception { + injectTestSystemOut(); + + String cpName = "mysuperapp_" + commandHandler; + + final TestCommandHandler hnd = newCommandHandler(createTestLogger()); + + File cpRoot = grid(FAIL_NODE_IDX).context().pdsFolderResolver().fileTree().classPathRoot(cpName); + + assertTrue(cpRoot.mkdirs()); + + File f = new File(cpRoot, ".must_fail_cp_creation"); + + assertTrue(f.createNewFile()); + + LogListener cpReadyLsnr = readyLogListener(GRID_CNT - 1); + + lsnrLog.registerListener(cpReadyLsnr); + + assertEquals(EXIT_CODE_OK, execute(hnd, "--class-path", "create", "--name", cpName, "--files", filesArg)); + + IgniteClassPath icp = classPath(cpName); + + assertEquals(READY, icp.state()); + + checkFilesExists(cpName, FAIL_NODE_IDX); + + assertTrue(waitForCondition(cpReadyLsnr::check, 30_000)); + } + + /** Tests --create command arguments format. */ + @Test + public void testEmptyFilesArgument() { + injectTestSystemOut(); + + assertContains( + log, + executeCommand(EXIT_CODE_INVALID_ARGUMENTS, "--class-path", "create", "--name", "mysuperapp", "--files"), + "Please specify a value for argument: --files" + ); + + assertContains( + log, + executeCommand(EXIT_CODE_INVALID_ARGUMENTS, "--class-path", "create", "--name", "mysuperapp"), + "Mandatory argument(s) missing: [--files]" + ); + + assertContains( + log, + executeCommand(EXIT_CODE_INVALID_ARGUMENTS, "--class-path", "create", "--name", "--files", "some_files"), + "Please specify a value for argument: --name" + ); + + assertContains( + log, + executeCommand(EXIT_CODE_INVALID_ARGUMENTS, "--class-path", "create", "--files", "some_files"), + "Mandatory argument(s) missing: [--name]" + ); + + assertContains( + log, + executeCommand( + EXIT_CODE_INVALID_ARGUMENTS, + "--class-path", "create", "--name", "mysuperapp", "--files", "" + ), + cliCommandHandler() ? "File name must not be empty" : "Argument --files required" + ); + } + + /** */ + private Set cpFiles() throws IOException { + File emptyFile = File.createTempFile("empty", ".txt"); + + emptyFile.deleteOnExit(); + + return files( + Path.of(getClass().getClassLoader().getResource(".").getPath() + "../"), + Path.of(getClass().getClassLoader().getResource(".").getPath() + "../../../core/target"), + emptyFile.toPath() + ); + } + + /** */ + private Set fileNames(Set dirs) { + return dirs.stream().map(Path::getFileName).map(Path::toString).collect(Collectors.toSet()); + } + + /** */ + private Set files(Path... paths) { + return Stream.of(paths).flatMap(path -> { + try { + return Files.isDirectory(path) + ? Files.list(path).filter(p -> p.getFileName().toString().endsWith("jar") + || p.getFileName().toString().endsWith("txt")) + : Stream.of(path); + } + catch (IOException e) { + throw new RuntimeException(e); + } + }).map(Path::toAbsolutePath).collect(Collectors.toSet()); + } + + /** */ + private void checkFilesExists(String cpName, int skip) { + for (int i = 0; i < GRID_CNT; i++) { + if (skip == i) + continue; + + NodeFileTree ft = grid(i).context().pdsFolderResolver().fileTree(); + + assertEquals( + "Files must be deployed on each node", + fileNames(cpFiles), + fileNames(files(ft.classPathRoot(cpName).toPath())) + ); + } + } + + /** */ + private IgniteClassPath classPath(String cpName) throws IgniteCheckedException { + return grid(0).context().distributedMetastorage().read(METASTORE_PREFIX + cpName); + } + + /** */ + private static LogListener readyLogListener(int succeed) { + return LogListener + .matches("ClassPath is READY. " + succeed + " of " + GRID_CNT + " nodes has its files") + .times(1) + .build(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index 9da592635d229..8eabba605795d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -23,6 +23,10 @@ import org.apache.ignite.internal.cache.query.index.IndexQueryResultMeta; import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition; import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings; +import org.apache.ignite.internal.classpath.ClassPathDeployToAllRequest; +import org.apache.ignite.internal.classpath.ClassPathDeployToAllResponse; +import org.apache.ignite.internal.classpath.DownloadClassPathFailureMessage; +import org.apache.ignite.internal.classpath.DownloadClassPathMessage; import org.apache.ignite.internal.management.cache.PartitionKey; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointRequest; import org.apache.ignite.internal.managers.communication.CompressedMessage; @@ -666,6 +670,11 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(PartitionHashRecord.class); withNoSchema(TransactionsHashRecord.class); + withNoSchema(ClassPathDeployToAllRequest.class); + withNoSchema(ClassPathDeployToAllResponse.class); + withNoSchema(DownloadClassPathMessage.class); + withNoSchema(DownloadClassPathFailureMessage.class); + assert msgIdx <= MAX_MESSAGE_ID; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 744a844f88e2f..447813aaac8ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.cache.query.index.IndexProcessor; import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor; +import org.apache.ignite.internal.classpath.ClassPathProcessor; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager; import org.apache.ignite.internal.managers.collision.GridCollisionManager; import org.apache.ignite.internal.managers.communication.GridIoManager; @@ -656,6 +657,11 @@ public interface GridKernalContext extends Iterable { */ public RollingUpgradeProcessor rollingUpgrade(); + /** + * @return Class path processor. + */ + public ClassPathProcessor classPath(); + /** * Executor that is in charge of processing user async continuations. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 5d779302d1f7b..99594853d4d15 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.binary.GridBinaryMarshaller; import org.apache.ignite.internal.cache.query.index.IndexProcessor; import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor; +import org.apache.ignite.internal.classpath.ClassPathProcessor; import org.apache.ignite.internal.maintenance.MaintenanceProcessor; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager; import org.apache.ignite.internal.managers.collision.GridCollisionManager; @@ -368,6 +369,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable @GridToStringExclude private RollingUpgradeProcessor rollUpProc; + /** Classpath processor. */ + @GridToStringExclude + private ClassPathProcessor classPathProc; + /** */ private Thread.UncaughtExceptionHandler hnd; @@ -601,6 +606,8 @@ else if (comp instanceof PerformanceStatisticsProcessor) perfStatProc = (PerformanceStatisticsProcessor)comp; else if (comp instanceof RollingUpgradeProcessor) rollUpProc = (RollingUpgradeProcessor)comp; + else if (comp instanceof ClassPathProcessor) + classPathProc = (ClassPathProcessor)comp; else if (comp instanceof IndexProcessor) indexProc = (IndexProcessor)comp; else if (!(comp instanceof DiscoveryNodeValidationProcessor @@ -1111,6 +1118,11 @@ public void recoveryMode(boolean recoveryMode) { return rollUpProc; } + /** {@inheritDoc} */ + @Override public ClassPathProcessor classPath() { + return classPathProc; + } + /** {@inheritDoc} */ @Override public Executor getAsyncContinuationExecutor() { return config().getAsyncContinuationExecutor() == null diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 303b0355960ab..90b4b59d521c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -89,6 +89,7 @@ import org.apache.ignite.internal.binary.BinaryUtils; import org.apache.ignite.internal.cache.query.index.IndexProcessor; import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor; +import org.apache.ignite.internal.classpath.ClassPathProcessor; import org.apache.ignite.internal.cluster.ClusterGroupAdapter; import org.apache.ignite.internal.cluster.IgniteClusterEx; import org.apache.ignite.internal.maintenance.MaintenanceProcessor; @@ -1025,6 +1026,7 @@ public void start( // Start the encryption manager after assigning the discovery manager to context, so it will be // able to register custom event listener. startManager(new GridEncryptionManager(ctx)); + startProcessor(new ClassPathProcessor(ctx)); startProcessor(new PdsConsistentIdProcessor(ctx)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathDeployToAllRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathDeployToAllRequest.java new file mode 100644 index 0000000000000..ea4199b86c031 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathDeployToAllRequest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.classpath; + +import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Class path deploy to all request for {@link DistributedProcess} initiate message. + */ +public class ClassPathDeployToAllRequest implements Message { + /** + * Classpath ID. + * + * @see IgniteClassPath#id() + */ + @Order(0) + UUID icpId; + + /** Node containing class path files. */ + @Order(1) + UUID uploadNodeId; + + /** */ + public ClassPathDeployToAllRequest() { + // No-op. + } + + /** */ + public ClassPathDeployToAllRequest(UUID icpId, UUID uploadNodeId) { + this.icpId = icpId; + this.uploadNodeId = uploadNodeId; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathDeployToAllResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathDeployToAllResponse.java new file mode 100644 index 0000000000000..c05efc2014f10 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathDeployToAllResponse.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.classpath; + +import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Class path deploy to all response for {@link DistributedProcess} initiate message. + */ +public class ClassPathDeployToAllResponse implements Message { + /** + * Classpath ID. + * + * @see IgniteClassPath#id() + */ + @Order(0) + UUID icpId; + + /** */ + public ClassPathDeployToAllResponse() { + // No-op. + } + + /** */ + public ClassPathDeployToAllResponse(UUID icpId) { + this.icpId = icpId; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathFilesTransmissionHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathFilesTransmissionHandler.java new file mode 100644 index 0000000000000..8f730bbc41eba --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathFilesTransmissionHandler.java @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.classpath; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Predicate; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.failure.FailureType; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoManager.TransmissionSender; +import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.communication.TransmissionCancelledException; +import org.apache.ignite.internal.managers.communication.TransmissionHandler; +import org.apache.ignite.internal.managers.communication.TransmissionMeta; +import org.apache.ignite.internal.managers.communication.TransmissionPolicy; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.classpath.ClassPathProcessor.fromMetastorage; +import static org.apache.ignite.internal.classpath.IgniteClassPathState.NEW; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; +import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNP_NODE_STOPPING_ERR_MSG; + +/** + * This manager is responsible for requesting and handling files from a remote node. + */ +class ClassPathFilesTransmissionHandler implements TransmissionHandler, GridMessageListener { + /** ClassPath topic to receive files from remote node. */ + static final Object FILES_TOPIC = GridTopic.TOPIC_CLASSLOAD.topic("rmt_files"); + + /** Transmission parameter for {@link IgniteClassPath#id()}. */ + private static final String ICP_ID_PARAM = "icpId"; + + /** Transmission parameter for {@link IgniteClassPath} file name. */ + private static final String NAME_PARAM = "name"; + + /** System discovery message listener. */ + private DiscoveryEventListener discoLsnr; + + /** */ + private volatile DownloadClassPathTask active; + + /** + * Queue of asynchronous tasks to execute. + * Head of queue is taks that currently executing. + */ + private final Queue queue = new ConcurrentLinkedDeque<>(); + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Logger. */ + private final IgniteLogger log; + + /** {@code true} if the node is stopping. */ + private volatile boolean stopping; + + /** */ + public ClassPathFilesTransmissionHandler(GridKernalContext ctx) { + this.ctx = ctx; + this.log = ctx.log(ClassPathFilesTransmissionHandler.class); + } + + /** + * Downloads {@link IgniteClassPath} files locally from the remote node specified by {@code rmtNodeId}. + * @param rmtNodeId Remote node id. + * @param icp ClassPath. + * @return Future for download operation. + */ + IgniteInternalFuture downloadLocally(UUID rmtNodeId, IgniteClassPath icp) { + assert !rmtNodeId.equals(ctx.localNodeId()); + + log.info("Start download ClassPath files [name=" + icp.name() + ", id=" + icp.id() + ']'); + + DownloadClassPathTask task = new DownloadClassPathTask(rmtNodeId, icp); + + try { + submit(task); + } + catch (Throwable t) { + task.res.onDone(t); + } + + return task.res; + } + + /** Starts handler. */ + synchronized void start() { + ctx.event().addDiscoveryEventListener(discoLsnr = (evt, discoCache) -> { + UUID leftNodeId = evt.eventNode().id(); + + if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) + onNodeLeft(leftNodeId); + }, EVT_NODE_LEFT, EVT_NODE_FAILED); + + ctx.io().addMessageListener(FILES_TOPIC, this); + ctx.io().addTransmissionHandler(FILES_TOPIC, this); + } + + /** Stopping handler. */ + void stop() { + synchronized (this) { + if (discoLsnr != null) + ctx.event().removeDiscoveryEventListener(discoLsnr); + + ctx.io().removeMessageListener(FILES_TOPIC); + ctx.io().removeTransmissionHandler(FILES_TOPIC); + + stopping = true; + } + + cancelAll(new IgniteException(SNP_NODE_STOPPING_ERR_MSG), r -> true); + } + + /** + * @param nodeId A node left the cluster. + */ + void onNodeLeft(UUID nodeId) { + cancelAll( + new ClusterTopologyCheckedException("The node from which a snapshot has been requested left the grid"), + r -> r.rmtNodeId.equals(nodeId) + ); + } + + /** + * @param err Task result. + * @param filter Filter to select tasks for cancel. + */ + private synchronized void cancelAll(Throwable err, Predicate filter) { + queue.forEach(r -> cancel(r, err, filter)); + + cancel(active, err, filter); + } + + /** + * @param r Task to cancel. + * @param err Result error. + * @param filter Task filter. + * @return {@code True} if task was canceled. + */ + private synchronized boolean cancel(DownloadClassPathTask r, Throwable err, Predicate filter) { + if (r == null || !filter.test(r)) + return false; + + return r.res.onDone(err); + } + + /** {@inheritDoc} */ + @Override public Consumer chunkHandler(UUID nodeId, TransmissionMeta initMeta) { + throw new UnsupportedOperationException("Loading file by chunks is not supported: " + nodeId); + } + + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg0, byte plc) { + try { + if (msg0 instanceof DownloadClassPathMessage msg) { + IgniteClassPath icp = null; + + try { + icp = fromMetastorage(msg.icpId, NEW, ctx); + + NodeFileTree ft = ctx.pdsFolderResolver().fileTree(); + + File root = ft.classPathRoot(icp.name()); + + if (!root.exists()) + throw new IgniteException("Classpath root not exists: " + root); + + // TODO: make async execution. + try (TransmissionSender sndr = ctx.io().openTransmissionSender(nodeId, FILES_TOPIC)) { + for (String name : icp.files()) { + File f = new File(root, name); + + if (!f.exists()) + throw new IgniteException("Classpath file not exists: " + f); + + sndr.send(f, Map.of(ICP_ID_PARAM, icp.id(), NAME_PARAM, name), TransmissionPolicy.FILE); + } + } + } + catch (Throwable t) { + U.error( + log, + "Error processing classpath file request [request=" + msg + ", nodeId=" + nodeId + ']', + t + ); + + if (icp != null) { + ctx.io().sendToCustomTopic(nodeId, + FILES_TOPIC, + new DownloadClassPathFailureMessage(icp, t.getMessage()), + SYSTEM_POOL + ); + } + } + } + else if (msg0 instanceof DownloadClassPathFailureMessage msg) { + String errMsg = "File download cancelled. ClassPath operation stopped on the remote node. " + + "Error: " + msg.err; + + if (log.isDebugEnabled()) + log.debug(errMsg); + + if (!cancel(active, new IgniteException(errMsg), t -> t.icp.id().equals(msg.icpId))) { + if (log.isInfoEnabled()) { + log.warning("A stale ClassPath failure message has been received. Will be ignored " + + "[fromNodeId=" + nodeId + ", icpId=" + msg.icpId + "]: " + msg.err); + } + } + } + } + catch (Throwable e) { + U.error(log, "Processing snapshot request from remote node fails with an error", e); + + ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + } + } + + /** {@inheritDoc} */ + @Override public void onEnd(UUID nodeId) { + DownloadClassPathTask task = active; + + if (!ensureTask(nodeId, task)) + return; + + int filesLeft = task.filesLeft.get(); + + if (filesLeft != 0) { + String msg = "onEnd invoked, but more files left: " + filesLeft + + ", completing download process with an error"; + + log.warning(msg); + + task.res.onDone(new IllegalStateException(msg)); + } + + if (log.isInfoEnabled()) + log.info("Classpath files from remote node has been fully received [icp=" + task.icp.name() + ']'); + + task.res.onDone((Void)null); + } + + /** {@inheritDoc} */ + @Override public void onException(UUID nodeId, Throwable ex) { + DownloadClassPathTask task = active; + + if (!ensureTask(nodeId, task)) + return; + + task.res.onDone(ex); + } + + /** {@inheritDoc} */ + @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) { + UUID icpId = (UUID)fileMeta.params().get(ICP_ID_PARAM); + String name = (String)fileMeta.params().get(NAME_PARAM); + + IgniteClassPath icp = fromMetastorage(icpId, NEW, ctx); + + DownloadClassPathTask task = active; + + if (!ensureTask(nodeId, task)) { + throw new TransmissionCancelledException("Stale snapshot transmission will be ignored " + + "[icpId=" + icp.id() + ", file=" + name + ']'); + } + + NodeFileTree ft = ctx.pdsFolderResolver().fileTree(); + + File root = ft.classPathRoot(icp.name()); + + NodeFileTree.mkdir(root, "classpath root"); + + return new File(root, name).getAbsolutePath(); + } + + /** {@inheritDoc} */ + @Override public Consumer fileHandler(UUID nodeId, TransmissionMeta initMeta) { + UUID icpId = (UUID)initMeta.params().get(ICP_ID_PARAM); + String name = (String)initMeta.params().get(NAME_PARAM); + + IgniteClassPath icp = fromMetastorage(icpId, NEW, ctx); + + return file -> { + DownloadClassPathTask task = active; + + if (!ensureTask(nodeId, task)) { + throw new TransmissionCancelledException("Stale snapshot transmission will be ignored " + + "[icpId=" + icp.id() + ", file=" + name + ']'); + } + + int filesLeft = task.filesLeft.decrementAndGet(); + + if (log.isInfoEnabled()) { + log.info("Classpath file from remote node has been received " + + "[icp=" + task.icp.name() + ", file=" + name + ", filesLeft=" + filesLeft + ']'); + } + }; + } + + /** + * Starts {@code task} or adds it to queue. + * + * @param next Task to execute. + */ + private void submit(DownloadClassPathTask next) { + ClusterNode rmtNode; + + synchronized (this) { + if (stopping) { + next.res.onDone(new IgniteException(SNP_NODE_STOPPING_ERR_MSG)); + + return; + } + + if (active != null && !active.res.isDone()) { + if (!queue.offer(next)) { + next.res.onDone(new IgniteException("Can't put task in queue: " + next.icp)); + } + + return; + } + + rmtNode = ctx.discovery().node(next.rmtNodeId); + + if (rmtNode == null) { + next.res.onDone(new IgniteException("Can't download classpath files. " + + "Remote node left the grid [rmtNodeId=" + rmtNode + ']')); + + return; + } + + active = next; + + next.res.listen(this::onActiveDone); + } + + try { + // submit can be invoked from discovery thread. + // sendOrderedMessage can be blocking so invok it in separate thread to release discovery. + ctx.pools().getSystemExecutorService().submit(() -> { + try { + ctx.cache().context().gridIO().sendOrderedMessage( + rmtNode, + FILES_TOPIC, + new DownloadClassPathMessage(next.icp), + SYSTEM_POOL, + Long.MAX_VALUE, + true + ); + } + catch (Throwable e) { + log.warning("Can't start download ClassPath files", e); + + next.res.onDone(new IgniteException("Can't download classpath files. " + + "Remote node left the grid [rmtNodeId=" + next.rmtNodeId + ']')); + } + }); + } + catch (RejectedExecutionException e) { + log.warning("Submit to system pool rejected", e); + + next.res.onDone(e); + } + } + + /** Starts next task if exists. */ + private void onActiveDone(IgniteInternalFuture doneFut) { + DownloadClassPathTask next; + + synchronized (this) { + if (active == null || doneFut != active.res) + return; + + active = null; + + next = queue.poll(); + + while (next != null && next.res.isDone()) + next = queue.poll(); + } + + if (next != null) + submit(next); + } + + /** */ + private static boolean ensureTask(UUID nodeId, DownloadClassPathTask task) { + return task != null + && !task.res.isDone() + && task.rmtNodeId.equals(nodeId); + } + + /** + * Task responsible for downloading {@link IgniteClassPath} files from remote node. + */ + private static class DownloadClassPathTask { + /** Node to download files from. */ + final UUID rmtNodeId; + + /** ClassPath to download files for. */ + final IgniteClassPath icp; + + /** Result of download. */ + final GridFutureAdapter res; + + /** Files counter. */ + final AtomicInteger filesLeft; + + /** */ + public DownloadClassPathTask(UUID rmtNodeId, IgniteClassPath icp) { + this.rmtNodeId = rmtNodeId; + this.icp = icp; + this.res = new GridFutureAdapter<>(); + this.filesLeft = new AtomicInteger(icp.files().length); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathProcessor.java new file mode 100644 index 0000000000000..ea78a0b286754 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/classpath/ClassPathProcessor.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.classpath; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.classpath.IgniteClassPathState.NEW; + +/** + * TODO: + * 1. How to check data integrity on start? + * Do we want to do this for txt file or for jar only? + * 2. Check and remove obsolete icp from dist on start. + * Do we want to have some flag to skip remove in this case? (if we preparing for ICP registration). + * 3. Should we include CP into snapshots and dumps? + */ +public class ClassPathProcessor extends GridProcessorAdapter { + /** Prefix for metastorage keys. */ + public static final String METASTORE_PREFIX = "icp."; + + /** Handles download requests for {@link IgniteClassPath} files. */ + private final ClassPathFilesTransmissionHandler icpFilesHnd; + + /** Distributed process that deploys classpath files to all nodes. */ + private final DeployToAllProcess deployToAllProc; + + /** + * @param ctx Kernal context. + */ + public ClassPathProcessor(GridKernalContext ctx) { + super(ctx); + + icpFilesHnd = new ClassPathFilesTransmissionHandler(ctx); + deployToAllProc = new DeployToAllProcess(ctx, icpFilesHnd); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + icpFilesHnd.start(); + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + icpFilesHnd.stop(); + } + + /** + * Register new classpath in metastorage it same name not exists. + * Fails if exists. + * + * @param name Class path name. + * @param files Files included. + * @param lengths Files lengths. + * @return Class path id. + */ + public UUID startCreation(String name, String[] files, long[] lengths) { + assert files.length == lengths.length : "wrong arrays lengths"; + + A.ensure(U.alphanumericUnderscore(name), "Classpath name must satisfy the following name pattern: a-zA-Z0-9_"); + + for (String file : files) + ensureFilename(file); + + IgniteClassPath icp; + + Boolean metastorageWritten; + + try { + icp = new IgniteClassPath( + UUID.randomUUID(), + ctx.pdsFolderResolver().resolveFolders().consistentId(), + name, + files, + lengths, + NEW + ); + + metastorageWritten = casToMetastorageAsync(null, icp).get(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + if (metastorageWritten != null && !metastorageWritten) + throw new IgniteException("Fail to register ClassPath. Same ClassPath exists, already?"); + + File root = ctx.pdsFolderResolver().fileTree().classPathRoot(name); + + try { + createRootAndCheckIsEmpty(root); + } + catch (Exception e) { + try { + if (metastorageWritten != null && metastorageWritten) + ctx.distributedMetastorage().remove(metastorageKey(icp)); + + U.delete(root); + } + catch (IgniteCheckedException ex) { + log.error("Cleanup after IgniteClassPath creation failed [key=" + metastorageKey(icp) + ", root=" + root + ']', e); + } + + throw e; + } + + log.info("New classpath created [root = " + root + ", icp=" + icp + ']'); + + return icp.id(); + } + + /** + * Writes {@code batch} to the Ignite Class Path file. + * + * @param icpId ClassPath id. + * @param name File name. + * @param offset Offset to write data to. + * @param batch Batch. + */ + public synchronized void writeFilePartFromClient( + UUID icpId, + String name, + long offset, + byte[] batch + ) { + try { + IgniteClassPath icp = fromMetastorage(icpId, NEW, ctx); + + ensureKnownFilename(name, icp); + + File root = ctx.pdsFolderResolver().fileTree().classPathRoot(icp.name()); + + File f = new File(root, name); + + if (offset == 0) { + A.ensure(root.equals(f.getParentFile()), "filename"); + + log.info("Creating new classpath file: " + f); + + if (!f.createNewFile()) + throw new IgniteException("File exists: " + f); + } + + try (RandomAccessFile raf = new RandomAccessFile(f, "rw")) { + if (raf.length() < offset) { + throw new IgniteException("Wrong offset [icp=" + icp.name() + ", lib=" + name + ", " + + "fileLength=" + raf.length() + ", offset=" + offset + ']'); + } + + raf.seek(offset); + raf.write(batch); + } + } + catch (IOException e) { + log.error("Upload file part error :", e); + + throw new IgniteException(e); + } + } + + /** + * Copies local file to class path directory. + * + * @param icpId ClassPath id. + * @param file File to copy. + */ + public void copyClassPathFileLocally(UUID icpId, Path file) throws IOException { + IgniteClassPath icp = fromMetastorage(icpId, NEW, ctx); + + String name = file.getFileName().toString(); + + ensureKnownFilename(name, icp); + + File root = ctx.pdsFolderResolver().fileTree().classPathRoot(icp.name()); + + Path f = new File(root, name).toPath(); + + A.ensure(root.equals(f.toFile().getParentFile()), "filename"); + + log.info("Copying new classpath file: " + f); + + if (Files.exists(f) && Files.isSameFile(file, f)) { + log.info("Skip copying new classpath file, already there: " + f); + + return; + } + + Files.copy(file, f); + } + + /** + * Deploy {@link IgniteClassPath} to all nodes. + * @param icpId ClassPath id. + * @return Future for process result. + */ + public IgniteInternalFuture deployToAll(@Nullable UUID icpId) { + return deployToAllProc.start(icpId); + } + + /** + * + */ + GridFutureAdapter casToMetastorageAsync(@Nullable IgniteClassPath prev, IgniteClassPath icp) { + try { + String key = metastorageKey(icp); + + if (log.isDebugEnabled()) + log.debug("Writing new ClassPath state [new=" + icp + ", prev=" + prev + ']'); + + GridFutureAdapter res = ctx.distributedMetastorage().compareAndSetAsync(key, prev, icp); + + res.listen(casFut -> { + if (casFut.error() == null) + return; + + try { + Object val = ctx.distributedMetastorage().read(key); + + log.warning("Fail to write new ClassPath state [exp=" + prev + ", actual=" + val + ']'); + } + catch (IgniteCheckedException e) { + log.warning("Can't read metastore key", e); + } + }); + + return res; + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * @param icpId ClassPath id. + * @param ctx Kernal context. + * @return Class path. + */ + static IgniteClassPath fromMetastorage(UUID icpId, IgniteClassPathState expState, GridKernalContext ctx) { + try { + IgniteClassPath[] icp = new IgniteClassPath[1]; + + ctx.distributedMetastorage().iterate(METASTORE_PREFIX, (key, icp0) -> { + if (icpId.equals(((IgniteClassPath)icp0).id())) + icp[0] = (IgniteClassPath)icp0; + }); + + if (icp[0] == null) + throw new IgniteException("ClassPath not found: " + icpId); + + if (icp[0].state() != expState) + throw new IgniteException("ClassPath in wrong state [expected=" + expState + ", status=" + icp[0].state() + ']'); + + return icp[0]; + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** */ + private static String metastorageKey(IgniteClassPath icp) { + return METASTORE_PREFIX + icp.name(); + } + + /** */ + private static void ensureFilename(String file) { + Path path = Path.of(file); + + A.ensure(path.getNameCount() == 1 && !path.isAbsolute(), "simple filename expected"); + } + + /** */ + private static void ensureKnownFilename(String name, IgniteClassPath icp) { + ensureFilename(name); + + if (F.indexOf(icp.files(), name) == -1) + throw new IllegalArgumentException("Unknown lib [icp=" + icp.name() + ", unknown_lib=" + name + ']'); + } + + /** */ + static void createRootAndCheckIsEmpty(File root) { + if (!root.exists()) + NodeFileTree.mkdir(root, "Ignite Class Path root"); + else if (!F.isEmpty(root.listFiles())) + throw new IgniteException("ClassPath root exists and not empty: " + root); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/classpath/DeployToAllProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/classpath/DeployToAllProcess.java new file mode 100644 index 0000000000000..5b931066b6e2c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/classpath/DeployToAllProcess.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.classpath; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.persistence.filename.NodeFileTree; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridClosureException; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.internal.classpath.ClassPathProcessor.createRootAndCheckIsEmpty; +import static org.apache.ignite.internal.classpath.ClassPathProcessor.fromMetastorage; +import static org.apache.ignite.internal.classpath.IgniteClassPathState.NEW; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.CLASSPATH_DEPLOY_TO_ALL; + +/** Distributed process to spread {@link IgniteClassPath} files across cluster. */ +class DeployToAllProcess { + /** Logger. */ + private final IgniteLogger log; + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Files handler. */ + private final ClassPathFilesTransmissionHandler icpFilesHnd; + + /** Distribute process that distributes new Ignite class path across all server nodes. */ + private final DistributedProcess deployToAllProc; + + /** Future results of started distributed process. */ + private final Map> futs = new ConcurrentHashMap<>(); + + /** */ + public DeployToAllProcess(GridKernalContext ctx, ClassPathFilesTransmissionHandler icpFilesHnd) { + log = ctx.log(DeployToAllProcess.class); + + this.ctx = ctx; + this.icpFilesHnd = icpFilesHnd; + this.deployToAllProc = new DistributedProcess<>( + ctx, + CLASSPATH_DEPLOY_TO_ALL, + this::downloadLocally, + this::processDeployToAllResult + ); + } + + /** + * @param icpId ClassPath ID. + * @return Future for deploy process result. + */ + public IgniteInternalFuture start(UUID icpId) { + boolean added = false; + + try { + GridFutureAdapter deployRes = new GridFutureAdapter<>(); + + synchronized (this) { + IgniteClassPath icp = fromMetastorage(icpId, NEW, ctx); + + ClassPathDeployToAllRequest req = new ClassPathDeployToAllRequest(icpId, ctx.localNodeId()); + + added = futs.putIfAbsent(icpId, deployRes) == null; + + if (!added) + return new GridFinishedFuture<>(new IllegalStateException("Deploy to all process started, already: " + icp.name())); + + deployToAllProc.start(icpId, req); + } + + return deployRes; + } + catch (Exception e) { + if (added) + futs.remove(icpId); + + return new GridFinishedFuture<>(e); + } + } + + /** + * @param req Request on snapshot creation. + * @return Future which will be completed when a snapshot has been started. + */ + private IgniteInternalFuture downloadLocally(ClassPathDeployToAllRequest req) { + try { + if (req.uploadNodeId.equals(ctx.localNodeId())) { + if (log.isDebugEnabled()) + log.debug("Skip download ClassPath files for upload node [id=" + req.icpId + ']'); + + return new GridFinishedFuture<>(); + } + + IgniteClassPath icp = fromMetastorage(req.icpId, NEW, ctx); + + createRootAndCheckIsEmpty(ctx.pdsFolderResolver().fileTree().classPathRoot(icp.name())); + + return icpFilesHnd.downloadLocally(req.uploadNodeId, icp).chain(f -> { + if (f.error() == null) + return new ClassPathDeployToAllResponse(icp.id()); + + NodeFileTree ft = ctx.pdsFolderResolver().fileTree(); + + U.delete(ft.classPathRoot(icp.name())); + + throw new GridClosureException(f.error()); + }); + } + catch (Throwable e) { + return new GridFinishedFuture<>(e); + } + } + + /** + * @param id Request id. + * @param res Results. + * @param err Errors. + */ + private void processDeployToAllResult( + UUID id, + Map res, + Map err + ) { + GridFutureAdapter fut = futs.remove(id); + + // Only upload node manage the process. + if (fut == null) { + if (log.isDebugEnabled()) + log.debug("Unknown distribute process [id=" + id + ']'); + + return; + } + + try { + IgniteClassPath icp = fromMetastorage(id, NEW, ctx); + + if (log.isDebugEnabled()) + log.debug("Starting CAS to metastorage: " + icp); + + // Perform CAS async to release discovery thread and let CAS proceed. + ctx.classPath().casToMetastorageAsync(icp, icp.newState(IgniteClassPathState.READY)).listen(casFut -> { + log.info("ClassPath is READY. " + res.size() + " of " + (res.size() + err.size()) + + " nodes has its files"); + + if (!F.isEmpty(res)) { + log.info("Node that successfully download ClassPath files:"); + res.forEach((nodeId, resp) -> log.info(" ^-- " + nodeId)); + } + + if (!F.isEmpty(err)) { + log.info("Node that fail to download ClassPath file (will retry on first usage):"); + err.forEach((nodeId, t) -> log.info(" ^-- " + nodeId + ": " + t.getMessage())); + } + + boolean metastorageWritten = casFut.error() == null && casFut.result() != null && casFut.result(); + + Throwable t = metastorageWritten + ? null + : casFut.error() != null + ? casFut.error() + : new IgniteException("Fail to change ClassPath state. Concurrent removal?"); + + if (!fut.onDone(metastorageWritten ? "OK" : null, t)) { + log.warning("Distribute process in wrong state " + + "[canceled=" + fut.isCancelled() + ", failed=" + fut.isFailed() + ", done=" + fut.isDone() + ']'); + } + }); + } + catch (Exception e) { + fut.onDone(e); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/classpath/DownloadClassPathFailureMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/classpath/DownloadClassPathFailureMessage.java new file mode 100644 index 0000000000000..d93581013a5d6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/classpath/DownloadClassPathFailureMessage.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.classpath; + +import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.communication.TransmissionHandler; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Message sent by node in case it can't send classpath to receiver. + * + * @see TransmissionHandler + * @see ClassPathFilesTransmissionHandler + * @see DownloadClassPathFailureMessage + */ +public class DownloadClassPathFailureMessage implements Message { + /** + * Classpath ID. + * + * @see IgniteClassPath#id() + */ + @Order(0) + UUID icpId; + + /** Error message. */ + @Order(1) + String err; + + /** */ + public DownloadClassPathFailureMessage() { + // No-op. + } + + /** */ + public DownloadClassPathFailureMessage(IgniteClassPath icp, String err) { + this.icpId = icp.id(); + this.err = err; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/classpath/DownloadClassPathMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/classpath/DownloadClassPathMessage.java new file mode 100644 index 0000000000000..9a3f17ae99f92 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/classpath/DownloadClassPathMessage.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.classpath; + +import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.communication.TransmissionHandler; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Message to initiate download of classpath files from remote node. + * + * @see TransmissionHandler + * @see ClassPathFilesTransmissionHandler + * @see DownloadClassPathFailureMessage + */ +public class DownloadClassPathMessage implements Message { + /** + * Classpath ID. + * + * @see IgniteClassPath#id() + */ + @Order(0) + UUID icpId; + + /** */ + public DownloadClassPathMessage() { + // No-op. + } + + /** */ + public DownloadClassPathMessage(IgniteClassPath icp) { + this.icpId = icp.id(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/classpath/IgniteClassPath.java b/modules/core/src/main/java/org/apache/ignite/internal/classpath/IgniteClassPath.java new file mode 100644 index 0000000000000..6b973bd284029 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/classpath/IgniteClassPath.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.classpath; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Objects; +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Class path POJO. + */ +public class IgniteClassPath implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private UUID id; + + /** */ + private Object uploadNodeConsistentId; + + /** */ + private String name; + + /** */ + private String[] files; + + /** */ + private long[] lengths; + + /** */ + private IgniteClassPathState state; + + /** + * @param id Unique id of classpath. + * @param name User provided name. + * @param files Files to include to classpath. + */ + public IgniteClassPath( + UUID id, + Object uploadNodeConsistentId, + String name, + String[] files, + long[] lengths, + IgniteClassPathState state + ) { + this.id = id; + this.name = name; + this.files = files; + this.lengths = lengths; + this.state = state; + } + + /** + * @param state New state. + */ + IgniteClassPath newState(IgniteClassPathState state) { + return new IgniteClassPath(id, uploadNodeConsistentId, name, files, lengths, state); + } + + /** @return Consistent id of the node that starts ICP creation. */ + public Object uploadNodeConsistentId() { + return uploadNodeConsistentId; + } + + /** */ + public IgniteClassPathState state() { + return state; + } + + /** */ + public UUID id() { + return id; + } + + /** */ + public String name() { + return name; + } + + /** */ + public String[] files() { + return files; + } + + /** */ + public long[] lengths() { + return lengths; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteClassPath.class, this); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + IgniteClassPath that = (IgniteClassPath) o; + return Objects.equals(id, that.id) && Objects.equals(uploadNodeConsistentId, that.uploadNodeConsistentId) + && Objects.equals(name, that.name) && Objects.deepEquals(files, that.files) + && Objects.deepEquals(lengths, that.lengths) && state == that.state; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Objects.hash(id, uploadNodeConsistentId, name, Arrays.hashCode(files), Arrays.hashCode(lengths), state); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/classpath/IgniteClassPathState.java b/modules/core/src/main/java/org/apache/ignite/internal/classpath/IgniteClassPathState.java new file mode 100644 index 0000000000000..2cf677a206dda --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/classpath/IgniteClassPathState.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.classpath; + +/** State of {@link IgniteClassPath}. */ +public enum IgniteClassPathState { + /** Creationg process in progress. */ + NEW, + + /** Ready for usage. */ + READY, + + /** Marked for removal. Newly started code can't use corresponding classpath. */ + REMOVING +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java index 112dc47c7125a..747dfd5867dff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java @@ -294,6 +294,9 @@ public enum ClientOperation { /** IgniteSet.iterator page. */ OP_SET_ITERATOR_GET_PAGE(9023), + /** File upload. */ + FILE_UPLOAD(9030), + /** Stop warmup. */ OP_STOP_WARMUP(10000); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java index e8f6938890d25..38d0e5d13416d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java @@ -35,8 +35,8 @@ class PayloadInputChannel { * Constructor. */ PayloadInputChannel(ClientChannel ch, ByteBuffer payload) { - in = BinaryStreams.inputStream(payload); this.ch = ch; + in = BinaryStreams.inputStream(payload); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannelImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannelImpl.java index ec6e96cb0f979..454e29189317c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannelImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannelImpl.java @@ -942,7 +942,6 @@ private T applyOnNodeChannelWithFallback(UUID tryNodeId, Function(); @@ -1034,7 +1033,7 @@ class ClientChannelHolder { private volatile ClientChannel ch; /** ID of the last server node that {@link #ch} is or was connected to. */ - private volatile UUID serverNodeId; + volatile UUID serverNodeId; /** Address that holder is bind to (chCfg.addr) is not in use now. So close the holder. */ private volatile boolean close; @@ -1075,7 +1074,7 @@ private boolean applyReconnectionThrottling() { /** * Get or create channel. */ - private ClientChannel getOrCreateChannel() + public ClientChannel getOrCreateChannel() throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError { return getOrCreateChannel(false); } @@ -1175,6 +1174,19 @@ void setConfiguration(ClientChannelConfiguration chCfg) { } } + /** + * @param id Node id. + * @return Client channel for node. + */ + public ClientChannel nodeClientChannel(UUID id) { + ClientChannelHolder cliCh = nodeChannels.get(id); + + if (cliCh == null) + throw new ClientConnectionException("Node can't be found [id=" + id + ']'); + + return cliCh.getOrCreateChannel(); + } + /** * Get holders reference. For test purposes. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java index b21eca8578160..0952322625606 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java @@ -18,16 +18,22 @@ package org.apache.ignite.internal.client.thin; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; import java.util.EventListener; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.ignite.IgniteBinary; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -55,6 +61,7 @@ import org.apache.ignite.client.events.ClientLifecycleEventListener; import org.apache.ignite.client.events.ClientStartEvent; import org.apache.ignite.client.events.ClientStopEvent; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.BinaryConfiguration; import org.apache.ignite.configuration.ClientConfiguration; import org.apache.ignite.configuration.ClientTransactionConfiguration; @@ -625,6 +632,61 @@ private void retrieveBinaryConfiguration(ClientConfiguration cfg) { marsh.setBinaryConfiguration(resCfg); } + /** + * @param node Node to upload file to. + * @param icpID Classpath ID. + * @param file File to upload. + */ + public void uploadClasspathFile(ClusterNode node, UUID icpID, Path file) throws IOException { + ClientChannel cliCh = ch.nodeClientChannel(node.id()); + + String name = file.getFileName().toString(); + byte[] batch = new byte[(int)(U.MB)]; + + try (InputStream fis = Files.newInputStream(file)) { + long[] offset = new long[]{0}; + int[] bytesCnt = new int[1]; + + // We want to create empty file on the server side. + // So, even 0 bytes read, one request need to be sent. + bytesCnt[0] = fis.read(batch); + + do { + cliCh.service( + ClientOperation.FILE_UPLOAD, + ch -> { + try (BinaryWriterEx w = BinaryUtils.writer(marsh.context(), ch.out(), null)) { + w.writeUuid(node.id()); + w.writeUuid(icpID); + w.writeString(name); + w.writeLong(offset[0]); + w.writeByteArray(batch, 0, Math.max(0, bytesCnt[0])); + } + }, + in -> null + ); + + if (bytesCnt[0] > 0) + offset[0] += bytesCnt[0]; + + bytesCnt[0] = fis.read(batch); + } + while (bytesCnt[0] > 0); + + // Check all data read and sent. + if (offset[0] != Files.size(file)) + throw new IOException("Can't read all data from file"); + } + } + + /** @return Node IDs client connected to. */ + public List connectedToNodes() { + return ch.getChannelHolders().stream() + .map(hldr -> hldr.serverNodeId) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + /** * Thin client implementation of {@link BinaryMetadataHandler}. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/IgniteCommandRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/management/IgniteCommandRegistry.java index f7c9ea9c57fcb..7609cbaa971f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/management/IgniteCommandRegistry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/IgniteCommandRegistry.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.management.cache.CacheCommand; import org.apache.ignite.internal.management.cdc.CdcCommand; import org.apache.ignite.internal.management.checkpoint.CheckpointCommand; +import org.apache.ignite.internal.management.classpath.ClassPathCommand; import org.apache.ignite.internal.management.consistency.ConsistencyCommand; import org.apache.ignite.internal.management.defragmentation.DefragmentationCommand; import org.apache.ignite.internal.management.diagnostic.DiagnosticCommand; @@ -77,7 +78,8 @@ public IgniteCommandRegistry() { new PerformanceStatisticsCommand(), new CdcCommand(), new ConsistencyCommand(), - new EventCommand() + new EventCommand(), + new ClassPathCommand() ); U.loadService(CommandsProvider.class).forEach(p -> p.commands().forEach(this::register)); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCommand.java new file mode 100644 index 0000000000000..f297278661da7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCommand.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.management.classpath; + +import org.apache.ignite.internal.management.api.CommandRegistryImpl; + +/** Command to manage IgniteClassPath. */ +public class ClassPathCommand extends CommandRegistryImpl { + /** */ + public ClassPathCommand() { + super( + new ClassPathCreateCommand() + ); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCreateCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCreateCommand.java new file mode 100644 index 0000000000000..cbab01f80dacd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCreateCommand.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.management.classpath; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.function.Consumer; +import org.apache.ignite.Ignite; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.classpath.IgniteClassPath; +import org.apache.ignite.internal.client.thin.TcpIgniteClient; +import org.apache.ignite.internal.management.api.CommandUtils; +import org.apache.ignite.internal.management.api.NativeCommand; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.jetbrains.annotations.Nullable; + +/** + * Ignite classpath creation command. + * + * @see IgniteClassPath + */ +public class ClassPathCreateCommand implements NativeCommand { + /** {@inheritDoc} */ + @Override public String description() { + return "Create ClassPath instance from set of local file"; + } + + /** {@inheritDoc} */ + @Override public Class argClass() { + return ClassPathCreateCommandArg.class; + } + + /** {@inheritDoc} */ + @Override public Void execute( + @Nullable IgniteClient client, + @Nullable Ignite ignite, + ClassPathCreateCommandArg arg, + Consumer printer + ) throws Exception { + if (client == null && !(ignite instanceof IgniteEx)) + throw new IllegalStateException("Client or IgniteEx required"); + + List files = prepareFiles(arg); + + ClusterNode uploadNode = uploadNode(client, ignite); + + printer.accept("Upload node: " + uploadNode.id()); + + UUID icpID = CommandUtils.execute( + client, + ignite, + ClassPathStartCreationTask.class, + arg, + Collections.singletonList(uploadNode) + ); + + printer.accept("New classpath created [" + + "uploadNode=" + uploadNode.id() + ", name=" + arg.name + ", id=" + icpID.toString() + ']'); + + uploadFiles(client, ignite, printer, files, uploadNode, icpID); + + CommandUtils.execute( + client, + ignite, + ClassPathDistributeTask.class, + icpID, Collections.singletonList(uploadNode) + ); + + return null; + } + + /** */ + private static void uploadFiles( + @Nullable IgniteClient client, + @Nullable Ignite ignite, + Consumer printer, + List files, + ClusterNode uploadNode, + UUID icpId + ) throws IOException { + printer.accept("Starting to upload files:"); + + // TODO: add pretty print here. + for (Path file : files) { + printer.accept(String.valueOf(file.toAbsolutePath())); + + if (client != null) + ((TcpIgniteClient)client).uploadClasspathFile(uploadNode, icpId, file); + else { + ((IgniteEx)ignite).context().classPath().copyClassPathFileLocally(icpId, file); + } + + printer.accept("DONE"); + } + } + + /** */ + private static List prepareFiles(ClassPathCreateCommandArg arg) throws IOException { + List files = new ArrayList<>(arg.files.length); + + arg.lengths = new long[arg.files.length]; + + for (int i = 0; i < arg.files.length; i++) { + A.notEmpty(arg.files[i], "File name"); + + Path f = Path.of(arg.files[i]); + + if (!Files.exists(f) || Files.isDirectory(f)) + throw new IllegalArgumentException("File not exists or directory: " + f); + + files.add(f); + + arg.lengths[i] = Files.size(f); + + // Don't want to send full path to server nodes. + // Server nodes require files names, only. + arg.files[i] = f.getFileName().toString(); + } + + return files; + } + + /** */ + private static ClusterNode uploadNode(IgniteClient client, Ignite ignite) { + if (client != null) { + List nodes = ((TcpIgniteClient)client).connectedToNodes(); + + if (F.isEmpty(nodes)) + throw new IllegalStateException("Not connected to node"); + + return client.cluster().node(F.first(nodes)); + } + + return ignite.cluster().localNode(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCreateCommandArg.java b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCreateCommandArg.java new file mode 100644 index 0000000000000..586654424a80d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathCreateCommandArg.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.management.classpath; + +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.dto.IgniteDataTransferObject; +import org.apache.ignite.internal.management.api.Argument; + +/** */ +public class ClassPathCreateCommandArg extends IgniteDataTransferObject { + /** */ + private static final long serialVersionUID = 0; + + /** */ + @Order(0) + @Argument(description = "Name of the classpath") + String name; + + /** */ + @Order(1) + @Argument(description = "Files to add to classpath") + String[] files; + + /** */ + @Order(2) + long[] lengths; + + /** */ + public String name() { + return name; + } + + /** */ + public void name(String name) { + this.name = name; + } + + /** */ + public String[] files() { + return files; + } + + /** */ + public void files(String[] files) { + this.files = files; + } + + /** */ + public long[] lengths() { + return lengths; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathDistributeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathDistributeTask.java new file mode 100644 index 0000000000000..f9213dab6aa67 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathDistributeTask.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.management.classpath; + +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.classpath.IgniteClassPath; +import org.apache.ignite.internal.visor.VisorJob; +import org.apache.ignite.internal.visor.VisorOneNodeTask; +import org.jetbrains.annotations.Nullable; + +/** Task to start deploy process of newly created {@link IgniteClassPath}. */ +public class ClassPathDistributeTask extends VisorOneNodeTask { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorJob job(UUID arg) { + return new ClassPathDistributeJob(arg, debug); + } + + /** */ + private static class ClassPathDistributeJob extends VisorJob { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + protected ClassPathDistributeJob(@Nullable UUID arg, boolean debug) { + super(arg, debug); + } + + /** {@inheritDoc} */ + @Override protected Void run(@Nullable UUID arg) throws IgniteException { + IgniteInternalFuture fut = ignite.context().classPath().deployToAll(arg); + + try { + fut.get(); + + return null; + } + catch (IgniteCheckedException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathStartCreationTask.java b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathStartCreationTask.java new file mode 100644 index 0000000000000..c7c11a1c7fce8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/management/classpath/ClassPathStartCreationTask.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.management.classpath; + +import java.util.UUID; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.visor.VisorJob; +import org.apache.ignite.internal.visor.VisorOneNodeTask; +import org.jetbrains.annotations.Nullable; + +/** */ +public class ClassPathStartCreationTask extends VisorOneNodeTask { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override protected VisorJob job(ClassPathCreateCommandArg arg) { + return new ClassPathStartCreationJob(arg, debug); + } + + /** */ + private static class ClassPathStartCreationJob extends VisorJob { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + protected ClassPathStartCreationJob(@Nullable ClassPathCreateCommandArg arg, boolean debug) { + super(arg, debug); + } + + /** {@inheritDoc} */ + @Override protected UUID run(@Nullable ClassPathCreateCommandArg arg) throws IgniteException { + return ignite.context().classPath().startCreation(arg.name, arg.files, arg.lengths); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/NodeFileTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/NodeFileTree.java index 159eeb77db7c9..1ec5b0477e37f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/NodeFileTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/NodeFileTree.java @@ -295,6 +295,9 @@ public class NodeFileTree extends SharedFileTree { /** Path to the storage directory. */ private final File nodeStorage; + /** Path to the root classpath directory. */ + private final File icp; + /** * Key is the path from {@link DataStorageConfiguration#getExtraStoragePaths()}, may be relative. Value is storage. * @see DataStorageConfiguration#getExtraStoragePaths() @@ -334,6 +337,7 @@ public NodeFileTree(File root, String folderName) { this.folderName = folderName; binaryMeta = new File(binaryMetaRoot, folderName); + icp = new File(icpRoot, folderName); nodeStorage = rootRelative(DB_DIR); checkpoint = new File(nodeStorage, CHECKPOINT_DIR); wal = rootRelative(DFLT_WAL_PATH); @@ -382,6 +386,7 @@ protected NodeFileTree(IgniteConfiguration cfg, File root, String folderName, bo this.folderName = folderName; binaryMeta = new File(binaryMetaRoot, folderName); + icp = new File(icpRoot, folderName); DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration(); @@ -431,6 +436,11 @@ public File binaryMeta() { return binaryMeta; } + /** @return Root directory for Ignite class path files. */ + public File classPathRoot() { + return icp; + } + /** @return Path to the directory containing active WAL segments. */ public @Nullable File wal() { return wal; @@ -1082,6 +1092,14 @@ public File maintenanceFile() { return new File(nodeStorage, MAINTENANCE_FILE_NAME); } + /** + * @param name IgniteClassPath name. + * @return IgniteClassPath directory. + */ + public File classPathRoot(String name) { + return new File(icp, name); + } + /** * @param includeMeta If {@code true} then include metadata directory into results. * @param filter Cache group names to filter. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/SharedFileTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/SharedFileTree.java index 33df1ed636040..199b9a00e38d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/SharedFileTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/filename/SharedFileTree.java @@ -36,6 +36,7 @@ * ├── db ← db (shared between all local nodes). * │ ├── binary_meta ← binaryMetaRoot (shared between all local nodes). * │ ├── marshaller ← marshaller (shared between all local nodes). + * │ ├── classpath ← classpath (shared between all local nodes). * └── snapshots ← snpsRoot (shared between all local nodes). * * @@ -48,6 +49,9 @@ public class SharedFileTree { /** Name of marshaller mappings folder. */ public static final String MARSHALLER_DIR = "marshaller"; + /** Name of classpath folder. */ + public static final String CLASSPATH_DIR = "classpath"; + /** Database default folder. */ protected static final String DB_DIR = "db"; @@ -60,6 +64,9 @@ public class SharedFileTree { /** Path to the directory containing marshaller files. */ private final File marshaller; + /** Path to the directory containing classpath files. */ + protected final File icpRoot; + /** Path to the snapshot root directory. */ private final File snpsRoot; @@ -77,6 +84,7 @@ protected SharedFileTree(File root, String snpsRoot) { marshaller = Paths.get(rootStr, DB_DIR, MARSHALLER_DIR).toFile(); binaryMetaRoot = Paths.get(rootStr, DB_DIR, BINARY_METADATA_DIR).toFile(); + icpRoot = Paths.get(rootStr, DB_DIR, CLASSPATH_DIR).toFile(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index 48525f5d697d4..8e0a986eece1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.binary.GridBinaryMarshaller; import org.apache.ignite.internal.cache.query.index.IndexProcessor; import org.apache.ignite.internal.cache.transform.CacheObjectTransformerProcessor; +import org.apache.ignite.internal.classpath.ClassPathProcessor; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager; import org.apache.ignite.internal.managers.collision.GridCollisionManager; import org.apache.ignite.internal.managers.communication.GridIoManager; @@ -753,6 +754,11 @@ private void setField(IgniteEx kernal, String name, Object val) throws NoSuchFie return null; } + /** {@inheritDoc} */ + @Override public ClassPathProcessor classPath() { + return null; + } + /** {@inheritDoc} */ @Override public Executor getAsyncContinuationExecutor() { return null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerInternalRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerInternalRequest.java new file mode 100644 index 0000000000000..733e737383ce5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerInternalRequest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; + +/** + * Marker interface for requests that can be executed by control.sh, only + * @see ClientConnectionContext#managementClient() + */ +public interface ClientListenerInternalRequest extends ClientListenerRequest { + /** {@inheritDoc} */ + @Override default boolean internal() { + return true; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequest.java index d7006f70a617f..bb37216852cf5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequest.java @@ -33,4 +33,9 @@ public interface ClientListenerRequest { default boolean beforeStartupRequest() { return false; } + + /** @return {@code True} if request can be executed only by control.sh client. */ + default boolean internal() { + return false; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java index d0f54ef2136ba..a1bfe607c5e4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.platform.client; +import org.apache.ignite.client.ClientException; import org.apache.ignite.internal.binary.BinaryReaderEx; import org.apache.ignite.internal.binary.BinaryUtils; import org.apache.ignite.internal.binary.BinaryWriterEx; @@ -76,6 +77,7 @@ import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheScanQueryRequest; import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSqlFieldsQueryRequest; import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSqlQueryRequest; +import org.apache.ignite.internal.processors.platform.client.classpath.ClientClassPathFileUploadRequest; import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterChangeStateRequest; import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterGetDataCenterNodesRequest; import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterGetStateRequest; @@ -115,6 +117,8 @@ import org.apache.ignite.internal.processors.platform.client.streamer.ClientDataStreamerStartRequest; import org.apache.ignite.internal.processors.platform.client.tx.ClientTxEndRequest; import org.apache.ignite.internal.processors.platform.client.tx.ClientTxStartRequest; +import org.apache.ignite.internal.thread.context.Scope; +import org.apache.ignite.plugin.security.SecurityPermission; /** * Thin client message parser. @@ -410,10 +414,16 @@ public class ClientMessageParser implements ClientListenerMessageParser { /** Get service topology. */ private static final short OP_SERVICE_GET_TOPOLOGY = 7003; + /** File upload. */ + public static final short FILE_UPLOAD = 9030; + /** Operations that are performed before a node is joined to the topology. */ /** Stop warmup. */ private static final short OP_STOP_WARMUP = 10000; + /** */ + public static final String INTERNAL_REQ_ERR_MSG = "Only management client are allowed to execute this"; + /** Marshaller. */ private final GridBinaryMarshaller marsh; @@ -451,6 +461,9 @@ public class ClientMessageParser implements ClientListenerMessageParser { if (ctx.kernalContext().recoveryMode() && !req.beforeStartupRequest()) return new ClientRawRequest(req.requestId(), ClientStatus.FAILED, "Node in recovery mode."); + if (req.internal()) + checkInternalRequestAllowed(); + return req; } @@ -735,6 +748,9 @@ public ClientListenerRequest decode(BinaryReaderEx reader) { case OP_SERVICE_GET_TOPOLOGY: return new ClientServiceTopologyRequest(reader); + case FILE_UPLOAD: + return new ClientClassPathFileUploadRequest(reader); + case OP_STOP_WARMUP: return new ClientCacheStopWarmupRequest(reader); } @@ -771,4 +787,22 @@ public ClientListenerRequest decode(BinaryReaderEx reader) { @Override public long decodeRequestId(ClientMessage msg) { return 0; } + + /** + * Check permissions to invoke internal requests. + */ + private void checkInternalRequestAllowed() { + if (!ctx.managementClient()) + throw new ClientException(INTERNAL_REQ_ERR_MSG); + + // When security is enabled, only an administrator can connect and execute commands. + if (ctx.securityContext() != null) { + try (Scope ignored = ctx.kernalContext().security().withContext(ctx.securityContext())) { + ctx.kernalContext().security().authorize(SecurityPermission.ADMIN_OPS); + } + catch (SecurityException e) { + throw new ClientException("ADMIN_OPS permission required"); + } + } + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/classpath/ClientClassPathFileUploadRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/classpath/ClientClassPathFileUploadRequest.java new file mode 100644 index 0000000000000..d7bc9d0dd2535 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/classpath/ClientClassPathFileUploadRequest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.client.classpath; + +import java.util.UUID; +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.internal.classpath.IgniteClassPath; +import org.apache.ignite.internal.processors.odbc.ClientListenerInternalRequest; +import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; +import org.apache.ignite.internal.processors.platform.client.ClientRequest; +import org.apache.ignite.internal.processors.platform.client.ClientResponse; + +/** + * Must be used only from control.sh command. + * Writes part of {@link IgniteClassPath} file. + * + * @see ClientListenerInternalRequest + * @see IgniteClassPath + */ +public class ClientClassPathFileUploadRequest extends ClientRequest implements ClientListenerInternalRequest { + /** Upload node ID. */ + private final UUID uploadNodeId; + + /** ClassPath ID. */ + private final UUID icpId; + + /** File name. */ + private final String name; + + /** Offset to write data to. */ + private final long offset; + + /** Batch. */ + private final byte[] batch; + + /** + * Creates the file upload request. + * + * @param reader Reader. + */ + public ClientClassPathFileUploadRequest(BinaryRawReader reader) { + super(reader); + + uploadNodeId = reader.readUuid(); + icpId = reader.readUuid(); + name = reader.readString(); + offset = reader.readLong(); + batch = reader.readByteArray(); + } + + /** {@inheritDoc} */ + @Override public ClientResponse process(ClientConnectionContext ctx) { + UUID locNodeId = ctx.kernalContext().localNodeId(); + + if (!uploadNodeId.equals(locNodeId)) + throw new IllegalStateException("Wrong node [uploadNode=" + uploadNodeId + ", localNode=" + locNodeId + ']'); + + ctx.kernalContext().classPath().writeFilePartFromClient(icpId, name, offset, batch); + + return new ClientResponse(requestId()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java index f4aeb18fd2882..1e10bd4372302 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java @@ -499,6 +499,11 @@ public enum DistributedProcessType { /** * Snapshot partitions validation. */ - CHECK_SNAPSHOT_PARTS + CHECK_SNAPSHOT_PARTS, + + /** + * Deploy new classpath to all nodes. + */ + CLASSPATH_DEPLOY_TO_ALL; } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/classpath/ClassPathSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/classpath/ClassPathSelfTest.java new file mode 100644 index 0000000000000..2bdb3158759a2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/classpath/ClassPathSelfTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.classpath; + +import java.nio.file.Path; +import java.util.UUID; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.testframework.GridTestUtils.assertThrows; + +/** */ +public class ClassPathSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(0); + } + + /** */ + @Test + public void testMaliciousFilename() { + String[] hackNames = new String[]{ + "/", + "../../optional/ignite-cdc/myjar.jar", + "./file.txt", + "../file.txt", + "/file.txt", + "~/file.txt" + }; + + for (String hackName : hackNames) { + assertThrows( + null, + () -> cpProc().startCreation("mycp", new String[]{hackName}, new long[]{42}), + IllegalArgumentException.class, + "simple filename expected" + ); + } + } + + /** */ + @Test + public void testMaliciousClassPathName() { + String[] hackNames = new String[]{ + "/", + "../../optional/ignite-cdc", + "./files", + "../files", + "/files", + "~/files" + }; + + for (String hackName : hackNames) { + assertThrows( + null, + () -> cpProc().startCreation(hackName, new String[]{"file.txt"}, new long[]{42}), + IllegalArgumentException.class, + "Classpath name must satisfy the following name pattern: a-zA-Z0-9_" + ); + } + } + + /** */ + @Test + public void testUnknownFilename() { + UUID icpId = cpProc().startCreation("mycp", new String[]{"file.txt"}, new long[]{42}); + + assertThrows( + null, + () -> { + cpProc().copyClassPathFileLocally(icpId, Path.of("other.txt")); + return null; + }, + IllegalArgumentException.class, + "Unknown lib" + ); + + assertThrows( + null, + () -> { + cpProc().writeFilePartFromClient(icpId, "other.txt", 0, new byte[1]); + return null; + }, + IllegalArgumentException.class, + "Unknown lib" + ); + } + + /** */ + private ClassPathProcessor cpProc() { + return grid(0).context().classPath(); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/client/ThinClientPermissionCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/client/ThinClientPermissionCheckTest.java index 401edd85015dd..f0cf1ee674b01 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/client/ThinClientPermissionCheckTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/client/ThinClientPermissionCheckTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.security.client; +import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -24,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -36,6 +38,7 @@ import org.apache.ignite.client.ClientAuthenticationException; import org.apache.ignite.client.ClientAuthorizationException; import org.apache.ignite.client.ClientCache; +import org.apache.ignite.client.ClientConnectionException; import org.apache.ignite.client.ClientException; import org.apache.ignite.client.Config; import org.apache.ignite.client.IgniteClient; @@ -49,6 +52,7 @@ import org.apache.ignite.configuration.ThinClientConfiguration; import org.apache.ignite.events.CacheEvent; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.client.thin.TcpIgniteClient; import org.apache.ignite.internal.processors.cache.eviction.paged.TestObject; import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory; import org.apache.ignite.internal.processors.security.AbstractSecurityTest; @@ -440,6 +444,49 @@ public void testConnectAsManagementClient() throws Exception { checkDflt.run(); } + /** */ + @Test + public void testClassPathUploadFilePart() throws Exception { + File tmpFile = File.createTempFile("file", "temp"); + + tmpFile.deleteOnExit(); + + // Client has no permission to invoke internal methods. + // Trying to invoke without specifying "management client" flag must fail. + for (String name: new String[] {CLIENT, ADMIN}) { + assertThrows(log, () -> { + try (IgniteClient cli = startClient(name)) { + ((TcpIgniteClient)cli).uploadClasspathFile(F.first(cli.cluster().nodes()), UUID.randomUUID(), tmpFile.toPath()); + return null; + } + }, ClientConnectionException.class, "Channel is closed"); + } + + userAttrs = F.asMap(MANAGEMENT_CLIENT_ATTR, "true"); + + try { + // Trying to invoke as CLIENT with "management client" flag must fail, because of security. + // CLIENT has no ADMIN_OPS permission. + assertThrows(log, () -> { + try (IgniteClient cli = startClient(CLIENT)) { + ((TcpIgniteClient)cli).uploadClasspathFile(F.first(cli.cluster().nodes()), UUID.randomUUID(), tmpFile.toPath()); + return null; + } + }, ClientConnectionException.class, "Channel is closed"); + + // Check that request actually invoked and failed due to unknown ClassPath. + assertThrows(log, () -> { + try (IgniteClient cli = startClient(ADMIN)) { + ((TcpIgniteClient)cli).uploadClasspathFile(F.first(cli.cluster().nodes()), UUID.randomUUID(), tmpFile.toPath()); + return null; + } + }, ClientException.class, "ClassPath not found"); + } + finally { + userAttrs = null; + } + } + /** * Gets all operations. * diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDeploymentSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDeploymentSelfTestSuite.java index 5218686113ced..5cf0ed620aa95 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDeploymentSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDeploymentSelfTestSuite.java @@ -17,6 +17,7 @@ package org.apache.ignite.testsuites; +import org.apache.ignite.internal.classpath.ClassPathSelfTest; import org.apache.ignite.spi.deployment.local.GridLocalDeploymentSpiSelfTest; import org.apache.ignite.spi.deployment.local.GridLocalDeploymentSpiStartStopSelfTest; import org.junit.runner.RunWith; @@ -28,7 +29,8 @@ @RunWith(Suite.class) @Suite.SuiteClasses({ GridLocalDeploymentSpiSelfTest.class, - GridLocalDeploymentSpiStartStopSelfTest.class + GridLocalDeploymentSpiStartStopSelfTest.class, + ClassPathSelfTest.class }) public class IgniteSpiDeploymentSelfTestSuite { }