Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions pkg/ctl/topic/set_replication_clusters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package topic

import (
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/streamnative/pulsarctl/pkg/cmdutils"
)

func SetReplicationClustersCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Set the replication clusters for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
setReplication := cmdutils.Example{
Desc: "Set the replication clusters for a topic",
Command: "pulsarctl topics set-replication-clusters tenant/namespace/topic --clusters cluster1,cluster2",
}
examples = append(examples, setReplication)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Set the replication clusters for [topic] successfully",
}

noTopicName := cmdutils.Output{
Desc: "you must specify a tenant/namespace/topic name, please check if the tenant/namespace/topic name is provided",
Out: "[✖] the topic name is not specified or the topic name is specified more than one",
}

tenantNotExistError := cmdutils.Output{
Desc: "the tenant does not exist",
Out: "[✖] code: 404 reason: Tenant does not exist",
}

nsNotExistError := cmdutils.Output{
Desc: "the namespace does not exist",
Out: "[✖] code: 404 reason: Namespace (tenant/namespace) does not exist",
}

out = append(out, successOut, noTopicName, tenantNotExistError, nsNotExistError)
desc.CommandOutput = out

vc.SetDescription(
"set-replication-clusters",
"Set the replication clusters for a topic",
desc.ToString(),
desc.ExampleToString(),
"set-replication-clusters",
)

var clusters []string

vc.FlagSetGroup.InFlagSet("Set replication clusters", func(flagSet *pflag.FlagSet) {
flagSet.StringSliceVarP(&clusters, "clusters", "c", nil,
"Replication cluster ids.")
_ = cobra.MarkFlagRequired(flagSet, "clusters")
})
vc.EnableOutputFlagSet()

vc.SetRunFuncWithNameArg(func() error {
return doSetReplicationClusters(vc, clusters)
}, "the topic name is not specified or the topic name is specified more than one")
}

func doSetReplicationClusters(vc *cmdutils.VerbCmd, clusters []string) error {
topic := vc.NameArg

if len(clusters) == 0 {
return errors.New("clusters cannot be empty")
}

admin := cmdutils.NewPulsarClient()
topicName, err := utils.GetTopicName(topic)
if err != nil {
return err
}

err = admin.Topics().SetReplicationClusters(*topicName, clusters)
if err == nil {
vc.Command.Printf("Set the replication clusters successfully on [%s]\n", topic)
}

return err
}
31 changes: 31 additions & 0 deletions pkg/ctl/topic/set_replication_clusters_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package topic

import (
"fmt"
"testing"

"github.com/onsi/gomega"
"github.com/streamnative/pulsarctl/pkg/test"
)

func TestSetReplicationClustersCmd(t *testing.T) {
g := gomega.NewWithT(t)

topic := fmt.Sprintf("test-replication-clusters-topic-%s", test.RandomSuffix())

args := []string{"create", topic, "0"}
_, execErr, _, _ := TestTopicCommands(CreateTopicCmd, args)
g.Expect(execErr).Should(gomega.BeNil())

args = []string{"set-replication-clusters", topic, "--clusters", "standalone"}
out, execErr, nameErr, cmdErr := TestTopicCommands(SetReplicationClustersCmd, args)
g.Expect(execErr).Should(gomega.BeNil())
g.Expect(nameErr).Should(gomega.BeNil())
g.Expect(cmdErr).Should(gomega.BeNil())
g.Expect(out).ShouldNot(gomega.BeNil())
g.Expect(out.String()).ShouldNot(gomega.BeEmpty())

// Since there is no get-replication-clusters command in this PR, we only test the set command success.
// In a real scenario, we might want to verify using the client or adding a get command.
// The set command output verification implies the call was successful.
}
1 change: 1 addition & 0 deletions pkg/ctl/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
RemoveInactiveTopicCmd,
SetDispatchRateCmd,
RemoveDispatchRateCmd,
SetReplicationClustersCmd,
}

cmdutils.AddVerbCmds(flagGrouping, resourceCmd, commands...)
Expand Down
Loading