Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,22 +184,78 @@ public void testWalChangeForMultiCacheGroup() throws Exception {
srv.cluster().state(ClusterState.ACTIVE);

srv.createCache(new CacheConfiguration<>("cache1")
.setGroupName("testGroup"));
.setGroupName("group1"));
srv.createCache(new CacheConfiguration<>("cache2")
.setGroupName("testGroup"));
.setGroupName("group1"));
srv.createCache(new CacheConfiguration<>("cache3")
.setGroupName("group2"));
srv.createCache("cache4");

assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "group1,group2,cache4"));
outputContains(".*group1.*true.*true.*true.*true.*false");
outputContains(".*group2.*true.*true.*true.*true.*false");
outputContains(".*cache4.*true.*true.*true.*true.*false");

assertEquals(EXIT_CODE_OK, execute("--wal", "disable", "--groups", "group1"));
outputContains("Successfully disabled WAL for groups:");
outputContains("group1");

assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "group1"));
outputContains(".*group1.*true.*false.*true.*true.*false");

assertEquals(EXIT_CODE_OK, execute("--wal", "disable", "--groups", "group1,group2"));
outputContains("Successfully disabled WAL for groups:");
outputContains("group1");
outputContains("group2");

assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "group1,group2"));
outputContains(".*group1.*true.*false.*true.*true.*false");
outputContains(".*group2.*true.*false.*true.*true.*false");

assertEquals(EXIT_CODE_OK, execute("--wal", "disable", "--groups", "cache4,nonExistentGroup"));
outputContains("Successfully disabled WAL for groups:");
outputContains("cache4");
outputContains("Failed to disable WAL for groups:");
outputContains("nonExistentGroup - Cache group not found");

assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "cache4"));
outputContains(".*cache4.*true.*false.*true.*true.*false");

//Error when using cache name instead of group name
assertEquals(EXIT_CODE_OK, execute("--wal", "enable", "--groups", "cache3"));
outputContains("Failed to enable WAL for groups:");
outputContains("cache3 - Cache group not found");

assertEquals(EXIT_CODE_OK, execute("--wal", "enable", "--groups", "group2,cache4"));
outputContains("Successfully enabled WAL for groups:");
outputContains("group2");
outputContains("cache4");

assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "group2,cache4"));
outputContains(".*group2.*true.*true.*true.*true.*false");
outputContains(".*cache4.*true.*true.*true.*true.*false");

assertEquals(EXIT_CODE_OK, execute("--wal", "disable"));
outputContains("Successfully disabled WAL for groups:");
outputContains("group1");
outputContains("group2");
outputContains("cache4");

assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "testGroup"));
outputContains(".*testGroup.*true.*true.*true.*true.*false");

assertEquals(EXIT_CODE_OK, execute("--wal", "disable", "--groups", "testGroup"));

assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "testGroup"));
outputContains(".*testGroup.*true.*false.*true.*true.*false");
assertEquals(EXIT_CODE_OK, execute("--wal", "state"));
outputContains(".*group1.*true.*false.*true.*true.*false");
outputContains(".*group2.*true.*false.*true.*true.*false");
outputContains(".*cache4.*true.*false.*true.*true.*false");

assertEquals(EXIT_CODE_OK, execute("--wal", "enable", "--groups", "testGroup"));
assertEquals(EXIT_CODE_OK, execute("--wal", "enable"));
outputContains("Successfully enabled WAL for groups:");
outputContains("group1");
outputContains("group2");
outputContains("cache4");

assertEquals(EXIT_CODE_OK, execute("--wal", "state", "--groups", "testGroup"));
outputContains(".*testGroup.*true.*true.*true.*true.*false");
assertEquals(EXIT_CODE_OK, execute("--wal", "state"));
outputContains(".*group1.*true.*true.*true.*true.*false");
outputContains(".*group2.*true.*true.*true.*true.*false");
outputContains(".*cache4.*true.*true.*true.*true.*false");
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.ignite.internal.management.wal;

import java.util.function.Consumer;
import org.apache.ignite.internal.management.api.ComputeCommand;

/** */
public class WalDisableCommand implements ComputeCommand<WalDisableCommand.WalDisableCommandArg, Void> {
public class WalDisableCommand implements ComputeCommand<WalDisableCommand.WalDisableCommandArg, WalSetStateTaskResult> {
/** {@inheritDoc} */
@Override public Class<WalSetStateTask> taskClass() {
return WalSetStateTask.class;
Expand All @@ -41,6 +42,11 @@ public class WalDisableCommand implements ComputeCommand<WalDisableCommand.WalDi
return "Are you sure? Any node failure without WAL can lead to the loss of all PDS data. CDC events will be lost without WAL.";
}

/** {@inheritDoc} */
@Override public void printResult(WalDisableCommandArg arg, WalSetStateTaskResult res, Consumer<String> printer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move logic to a separate class

WalSetStateResultReporter.printResult(arg, res, printer);
}

/** */
public static class WalDisableCommandArg extends WalStateCommandArg {
/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.ignite.internal.management.wal;

import java.util.function.Consumer;
import org.apache.ignite.internal.management.api.ComputeCommand;
import org.apache.ignite.internal.management.wal.WalDisableCommand.WalDisableCommandArg;

/** */
public class WalEnableCommand implements ComputeCommand<WalDisableCommandArg, Void> {
public class WalEnableCommand implements ComputeCommand<WalDisableCommandArg, WalSetStateTaskResult> {
/** {@inheritDoc} */
@Override public Class<WalSetStateTask> taskClass() {
return WalSetStateTask.class;
Expand All @@ -37,6 +38,11 @@ public class WalEnableCommand implements ComputeCommand<WalDisableCommandArg, Vo
return WalEnableCommandArg.class;
}

/** {@inheritDoc} */
@Override public void printResult(WalDisableCommandArg arg, WalSetStateTaskResult res, Consumer<String> printer) {
WalSetStateResultReporter.printResult(arg, res, printer);
}

/** */
public static class WalEnableCommandArg extends WalDisableCommandArg {
/** */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.management.wal;

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

/** */
public class WalSetStateResultReporter {
/**
* Print WAL disable/enable command result.
*
* @param arg Command argument.
* @param res Command result.
* @param printer Output consumer.
*/
public static void printResult(WalStateCommandArg arg, WalSetStateTaskResult res, Consumer<String> printer) {
String operation = arg instanceof WalEnableCommand.WalEnableCommandArg ? "enable" : "disable";
List<String> successGrps = res.successGroups();
Map<String, String> errors = res.errorsByGroup();

if (!successGrps.isEmpty()) {
printer.accept("Successfully " + operation + "d WAL for groups:");
for (String grp : successGrps)
printer.accept(" " + grp);
}

if (errors != null && !errors.isEmpty()) {
printer.accept("Failed to " + operation + " WAL for groups:");
for (Map.Entry<String, String> entry : errors.entrySet())
printer.accept(" " + entry.getKey() + " - " + entry.getValue());
}
}

/** Default constructor. */
private WalSetStateResultReporter() {
// No-op.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,32 @@

package org.apache.ignite.internal.management.wal;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.Map;
import org.apache.ignite.IgniteException;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.internal.management.wal.WalDisableCommand.WalDisableCommandArg;
import org.apache.ignite.internal.management.wal.WalEnableCommand.WalEnableCommandArg;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorMultiNodeTask;
import org.apache.ignite.internal.visor.VisorOneNodeTask;
import org.jetbrains.annotations.Nullable;

/** */
public class WalSetStateTask extends VisorMultiNodeTask<WalDisableCommandArg, Void, Void> {
public class WalSetStateTask extends VisorOneNodeTask<WalDisableCommandArg, WalSetStateTaskResult> {
/** */
private static final long serialVersionUID = 0;

/** {@inheritDoc} */
@Override protected VisorJob<WalDisableCommandArg, Void> job(WalDisableCommandArg arg) {
return new WalDisableJob(arg, false);
}

/** {@inheritDoc} */
@Override protected @Nullable Void reduce0(List<ComputeJobResult> res) throws IgniteException {
return null;
@Override protected VisorJob<WalDisableCommandArg, WalSetStateTaskResult> job(WalDisableCommandArg arg) {
return new WalDisableJob(arg, debug);
}

/** */
private static class WalDisableJob extends VisorJob<WalDisableCommandArg, Void> {
private static class WalDisableJob extends VisorJob<WalDisableCommandArg, WalSetStateTaskResult> {
/** */
private static final long serialVersionUID = 0;

Expand All @@ -57,22 +52,35 @@ protected WalDisableJob(@Nullable WalDisableCommandArg arg, boolean debug) {
}

/** {@inheritDoc} */
@Override protected Void run(@Nullable WalDisableCommandArg arg) throws IgniteException {
Set<String> grps = F.isEmpty(arg.groups()) ? null : new HashSet<>(Arrays.asList(arg.groups()));
@Override protected WalSetStateTaskResult run(@Nullable WalDisableCommandArg arg) throws IgniteException {
List<String> requestedGrps = F.isEmpty(arg.groups()) ? null : new ArrayList<>(Arrays.asList(arg.groups()));
List<String> successGrps = new ArrayList<>();
Map<String, String> errorsByGrp = new HashMap<>();

for (CacheGroupContext gctx : ignite.context().cache().cacheGroups()) {
String grpName = gctx.cacheOrGroupName();

if (grps != null && !grps.contains(grpName))
if (requestedGrps != null && !requestedGrps.remove(grpName))
continue;

if (arg instanceof WalEnableCommandArg)
ignite.cluster().enableWal(grpName);
else
ignite.cluster().disableWal(grpName);
try {
if (arg instanceof WalEnableCommandArg)
ignite.cluster().enableWal(grpName);
else
ignite.cluster().disableWal(grpName);

successGrps.add(grpName);
}
catch (Exception e) {
errorsByGrp.put(grpName, e.getMessage());
}
}

return null;
if (requestedGrps != null && !requestedGrps.isEmpty())
for (String requestedGrp : requestedGrps)
errorsByGrp.put(requestedGrp, "Cache group not found");

return new WalSetStateTaskResult(successGrps, errorsByGrp);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.management.wal;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.ignite.internal.dto.IgniteDataTransferObject;
import org.apache.ignite.internal.util.typedef.internal.U;

/**
* Result of WAL enable/disable operation.
*/
public class WalSetStateTaskResult extends IgniteDataTransferObject {
/** */
private static final long serialVersionUID = 0L;

/** Successfully processed groups. */
private List<String> successGrps;

/** Errors by group name. */
private Map<String, String> errorsByGrp;

/** Default constructor. */
public WalSetStateTaskResult() {
// No-op.
}

/**
* Constructor.
*
* @param successGrps Successfully processed groups.
* @param errorsByGrp Error messages.
*/
public WalSetStateTaskResult(List<String> successGrps, Map<String, String> errorsByGrp) {
this.successGrps = new ArrayList<>(successGrps);
this.errorsByGrp = new HashMap<>(errorsByGrp);
}

/** {@inheritDoc} */
@Override protected void writeExternalData(ObjectOutput out) throws IOException {
U.writeCollection(out, successGrps);
U.writeMap(out, errorsByGrp);
}

/** {@inheritDoc} */
@Override protected void readExternalData(ObjectInput in) throws IOException, ClassNotFoundException {
successGrps = U.readList(in);
errorsByGrp = U.readMap(in);
}

/**
* @return Successfully processed groups.
*/
public List<String> successGroups() {
return Collections.unmodifiableList(successGrps);
}

/**
* @return Error messages by group name if operation failed.
*/
public Map<String, String> errorsByGroup() {
return Collections.unmodifiableMap(errorsByGrp);
}
}