Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/lint/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ var commandRules = []linter.CommandRule{
linter.ExcludeCommand("connect custom-plugin version update"),
linter.ExcludeCommand("pipeline update"),
linter.ExcludeCommand("flink statement update"),
linter.ExcludeCommand("flink materialized-table update")),
linter.ExcludeCommand("flink materialized-table update"),
linter.ExcludeCommand("kafka cluster update")),

// Soft Requirements
linter.Filter(linter.RequireLengthBetween("Short", 10, 60),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/confluentinc/ccloud-sdk-go-v2/cdx v0.0.5
github.com/confluentinc/ccloud-sdk-go-v2/certificate-authority v0.0.2
github.com/confluentinc/ccloud-sdk-go-v2/cli v0.3.0
github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.25.0
github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.26.0
github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.11.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ github.com/confluentinc/ccloud-sdk-go-v2/certificate-authority v0.0.2 h1:stsiO1J
github.com/confluentinc/ccloud-sdk-go-v2/certificate-authority v0.0.2/go.mod h1:OU1RGuP2y5l54jX5rA++QBAKeRvSa7GmkfNgJvB9J6M=
github.com/confluentinc/ccloud-sdk-go-v2/cli v0.3.0 h1:OOFNqtZN3Spuzz4TX6K6JfDM7zNDIE6BE1TtK78jFHQ=
github.com/confluentinc/ccloud-sdk-go-v2/cli v0.3.0/go.mod h1:Mv0WTsBXUfKjmF+r2t2Dv/xJzZf17shhf5J1cttU2Qo=
github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.25.0 h1:EdZzQZ4SI5q+f0DQPjH3lWpygz1wYz7IE0K62Mv06bY=
github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.25.0/go.mod h1:FSSO9mkNPJKMa7Ky66IlXEG7o5H6cpyuKKClvRf9y+0=
github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.26.0 h1:GVGsu/DtcHaQH3yuvovpPbjNrcwez0N7+e4rbCQ/QKU=
github.com/confluentinc/ccloud-sdk-go-v2/cmk v0.26.0/go.mod h1:wtfggR9vJMVda+fMBamv3/nNxSDB+unboplVzzcQlGY=
github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0 h1:ISrVOX9qJ2Sxiu/fGBqqHeaA0SRJQujc8yP7qAZRL3Y=
github.com/confluentinc/ccloud-sdk-go-v2/connect v0.7.0/go.mod h1:zHG/3DzsnoHC81B1AY9K/8bMX3mxbIp5/nHHdypa//w=
github.com/confluentinc/ccloud-sdk-go-v2/connect-custom-plugin v0.0.9 h1:o1zKZlKbnN9uv+Y8TxwesBRryUl3lEU6lnfndEJigxQ=
Expand Down
13 changes: 13 additions & 0 deletions internal/kafka/command_cluster_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@
Text: "Create a new Freight cluster that uses a customer-managed encryption key in AWS:",
Code: "confluent kafka cluster create my-cluster --cloud aws --region us-west-2 --type freight --cku 1 --byok cck-a123z --availability high",
},
examples.Example{
Text: "Create a new cluster with deletion protection enabled:",
Code: "confluent kafka cluster create my-cluster --cloud aws --region us-west-2 --deletion-protection",
},
examples.Example{
Text: "For more information, see https://docs.confluent.io/current/cloud/clusters/byok-encrypted-clusters.html.",
},
Expand All @@ -65,6 +69,7 @@
cmd.Flags().Int("cku", 0, `Number of Confluent Kafka Units (non-negative). Required for Kafka clusters of type "dedicated".`)
cmd.Flags().Int("max-ecku", 0, `Maximum number of Elastic Confluent Kafka Units (eCKUs) that Kafka clusters should auto-scale to. `+
`Kafka clusters with "HIGH" availability must have at least two eCKUs.`)
cmd.Flags().Bool("deletion-protection", false, "Enable deletion protection for the Kafka cluster.")

Check failure on line 72 in internal/kafka/command_cluster_create.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Define a constant instead of duplicating this literal "deletion-protection" 3 times.

[S1192] String literals should not be duplicated See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3318&issues=211dac99-2ba5-459e-bb90-3ec1b2721bd6&open=211dac99-2ba5-459e-bb90-3ec1b2721bd6
pcmd.AddByokKeyFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddNetworkFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddContextFlag(cmd, c.CLICommand)
Expand All @@ -77,7 +82,7 @@
return cmd
}

func (c *clusterCommand) create(cmd *cobra.Command, args []string) error {

Check failure on line 85 in internal/kafka/command_cluster_create.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 40 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3318&issues=de54829c-7fd9-4178-8c58-e2143ee0c141&open=de54829c-7fd9-4178-8c58-e2143ee0c141
cloud, err := cmd.Flags().GetString("cloud")
if err != nil {
return err
Expand Down Expand Up @@ -171,6 +176,14 @@
setClusterConfigCku(&createCluster, int32(cku))
}

if cmd.Flags().Changed("deletion-protection") {
deletionProtection, err := cmd.Flags().GetBool("deletion-protection")
if err != nil {
return err
}
createCluster.Spec.SetDeletionProtection(deletionProtection)
}

if cmd.Flags().Changed("network") {
network, err := cmd.Flags().GetString("network")
if err != nil {
Expand Down
71 changes: 69 additions & 2 deletions internal/kafka/command_cluster_delete.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package kafka

import (
"bytes"
"encoding/json"
"io"
"net/http"
"strings"

"github.com/hashicorp/go-multierror"
"github.com/spf13/cobra"

Expand All @@ -10,6 +16,11 @@
"github.com/confluentinc/cli/v4/pkg/resource"
)

const (
errorCodeDeletionProtectionEnabled = "deletion_protection_enabled"
clusterDeletionProtectionDetail = "Cluster deletion is blocked by deletion protection."
)

func (c *clusterCommand) newDeleteCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "delete <id-1> [id-2] ... [id-n]",
Expand Down Expand Up @@ -43,8 +54,12 @@
return errors.NewErrorWithSuggestions(err.Error(), PluralClusterEnvironmentSuggestions)
}

deletionProtectionDetail := ""
deleteFunc := func(id string) error {
if httpResp, err := c.V2Client.DeleteKafkaCluster(id, environmentId); err != nil {
if detail, ok := parseDeletionProtectionErrDetail(httpResp); ok {
deletionProtectionDetail = detail
}
return errors.CatchKafkaNotFoundError(err, id, httpResp)
}
return nil
Expand All @@ -54,16 +69,68 @@

errs := multierror.Append(err, c.removeKafkaClusterConfigs(deletedIds))
if errs.ErrorOrNil() != nil {
if suggestion := deletionProtectionErrorToSuggestion(deletionProtectionDetail); suggestion != "" {
return errors.NewErrorWithSuggestions(errs.Error(), suggestion)
}
if len(args)-len(deletedIds) > 1 {
return errors.NewErrorWithSuggestions(err.Error(), "Ensure the clusters are not associated with any active Connect clusters.")
return errors.NewErrorWithSuggestions(errs.Error(),
"Ensure the clusters are not associated with any active Connect clusters.")
} else {
return errors.NewErrorWithSuggestions(err.Error(), "Ensure the cluster is not associated with any active Connect clusters.")
return errors.NewErrorWithSuggestions(errs.Error(),
"Ensure the cluster is not associated with any active Connect clusters.")
Comment thread
hydradon marked this conversation as resolved.
}
}

return nil
}

type apiError struct {
Code string `json:"code"`
Detail string `json:"detail"`
}

type apiErrorResponse struct {
Errors []apiError `json:"errors"`
}

// parseDeletionProtectionErrDetail checks if the HTTP response indicates a deletion protection error
// Returns the error detail and true if the error is a deletion protection error, or empty string and false otherwise.
func parseDeletionProtectionErrDetail(r *http.Response) (string, bool) {
if r == nil || r.StatusCode != http.StatusConflict || r.Body == nil {
return "", false
}

body, err := io.ReadAll(r.Body)
if err != nil {
return "", false
}
// Restore the body so downstream handlers can read it
r.Body = io.NopCloser(bytes.NewBuffer(body))

var res apiErrorResponse
if err := json.Unmarshal(body, &res); err != nil {

Check warning on line 111 in internal/kafka/command_cluster_delete.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Remove this unnecessary variable declaration and use the expression directly in the condition.

[S8193] Variables in if short statements should be used beyond just the condition See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3318&issues=99d01350-8cb5-471a-9280-1d42436211fa&open=99d01350-8cb5-471a-9280-1d42436211fa
return "", false
}

for _, apiErr := range res.Errors {
if apiErr.Code == errorCodeDeletionProtectionEnabled {
return apiErr.Detail, true
}
}

return "", false
}

// deletionProtectionErrorToSuggestion maps a deletion protection error detail to a user-facing suggestion.
func deletionProtectionErrorToSuggestion(errorMsg string) string {
switch {
case strings.EqualFold(errorMsg, clusterDeletionProtectionDetail):
return `Disable deletion_protection before deleting the cluster.`
default:
return ""
}
}

func (c *clusterCommand) removeKafkaClusterConfigs(deletedIds []string) error {
for _, id := range deletedIds {
c.Context.KafkaClusterContext.RemoveKafkaCluster(id)
Expand Down
146 changes: 146 additions & 0 deletions internal/kafka/command_cluster_delete_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package kafka

import (
"io"
"net/http"
"strings"
"testing"

"github.com/stretchr/testify/require"
)

func TestParseDeletionProtectionErrDetail(t *testing.T) {
tests := []struct {
name string
response *http.Response
expectedDetail string
expectedOk bool
}{
{
name: "nil response",
response: nil,
expectedOk: false,
},
{
name: "non-conflict status code",
response: &http.Response{
StatusCode: http.StatusBadRequest,
Body: io.NopCloser(strings.NewReader(`{"errors": [{"code": "deletion_protection_enabled", "detail": "test"}]}`)),
},
expectedOk: false,
},
{
name: "conflict with deletion protection error code",
response: &http.Response{
StatusCode: http.StatusConflict,
Body: io.NopCloser(strings.NewReader(`{"errors": [{"code": "deletion_protection_enabled", "detail": "Cluster deletion is blocked by deletion protection."}]}`)),
},
expectedDetail: "Cluster deletion is blocked by deletion protection.",

Check failure on line 38 in internal/kafka/command_cluster_delete_test.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Define a constant instead of duplicating this literal "Cluster deletion is blocked by deletion protection." 4 times.

[S1192] String literals should not be duplicated See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3318&issues=823afd9b-9e81-4682-bc19-2b39ecacccc3&open=823afd9b-9e81-4682-bc19-2b39ecacccc3
expectedOk: true,
},
{
name: "conflict with different error code",
response: &http.Response{
StatusCode: http.StatusConflict,
Body: io.NopCloser(strings.NewReader(`{"errors": [{"code": "some_other_error", "detail": "some detail"}]}`)),
},
expectedOk: false,
},
{
name: "conflict with empty errors array",
response: &http.Response{
StatusCode: http.StatusConflict,
Body: io.NopCloser(strings.NewReader(`{"errors": []}`)),
},
expectedOk: false,
},
{
name: "conflict with invalid JSON body",
response: &http.Response{
StatusCode: http.StatusConflict,
Body: io.NopCloser(strings.NewReader(`not json`)),
},
expectedOk: false,
},
{
name: "conflict with nil body",
response: &http.Response{
StatusCode: http.StatusConflict,
Body: nil,
},
expectedOk: false,
},
{
name: "deletion protection error is not first in errors array",
response: &http.Response{
StatusCode: http.StatusConflict,
Body: io.NopCloser(strings.NewReader(`{"errors": [` +
`{"code": "some_other_error", "detail": "other error"},` +
`{"code": "deletion_protection_enabled", "detail": "Cluster deletion is blocked by deletion protection."}` +
`]}`)),
},
expectedDetail: "Cluster deletion is blocked by deletion protection.",
expectedOk: true,
},
{
name: "body is restored after reading",
response: &http.Response{
StatusCode: http.StatusConflict,
Body: io.NopCloser(strings.NewReader(`{"errors": [{"code": "deletion_protection_enabled", "detail": "Cluster deletion is blocked by deletion protection."}]}`)),
},
expectedDetail: "Cluster deletion is blocked by deletion protection.",
expectedOk: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
detail, ok := parseDeletionProtectionErrDetail(tt.response)
require.Equal(t, tt.expectedOk, ok)
require.Equal(t, tt.expectedDetail, detail)

// Verify body is restored for downstream handlers
if tt.response != nil && tt.response.Body != nil && ok {
body, err := io.ReadAll(tt.response.Body)
require.NoError(t, err)
require.NotEmpty(t, body)
}
})
}
}

func TestDeletionProtectionErrorToSuggestion(t *testing.T) {
tests := []struct {
name string
errorMsg string
expectedSuggestion string
}{
{
name: "cluster deletion protection",
errorMsg: "Cluster deletion is blocked by deletion protection.",
expectedSuggestion: `Disable deletion_protection before deleting the cluster.`,
},
{
name: "cluster deletion protection case insensitive",
errorMsg: "cluster deletion is blocked by deletion protection.",
expectedSuggestion: `Disable deletion_protection before deleting the cluster.`,
},
{
name: "unknown deletion protection error",
errorMsg: "Some other deletion protection error.",
expectedSuggestion: "",
},
{
name: "empty string",
errorMsg: "",
expectedSuggestion: "",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
suggestion := deletionProtectionErrorToSuggestion(tt.errorMsg)
require.Equal(t, tt.expectedSuggestion, suggestion)
})
}
}
4 changes: 3 additions & 1 deletion internal/kafka/command_cluster_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/confluentinc/cli/v4/pkg/resource"
)

var basicDescribeFields = []string{"IsCurrent", "Id", "Name", "Type", "IngressLimit", "EgressLimit", "Storage", "Cloud", "Availability", "Region", "Network", "Status", "Endpoint", "RestEndpoint"}
var basicDescribeFields = []string{"IsCurrent", "Id", "Name", "Type", "IngressLimit", "EgressLimit", "Storage", "Cloud", "Availability", "Region", "Network", "Status", "DeletionProtection", "Endpoint", "RestEndpoint"}

type describeStruct struct {
IsCurrent bool `human:"Current" serialized:"is_current"`
Expand All @@ -35,6 +35,7 @@ type describeStruct struct {
Availability string `human:"Availability" serialized:"availability"`
Network string `human:"Network,omitempty" serialized:"network,omitempty"`
Status string `human:"Status" serialized:"status"`
DeletionProtection bool `human:"Deletion Protection" serialized:"deletion_protection"`
Endpoint string `human:"Endpoint" serialized:"endpoint"`
ByokKeyId string `human:"BYOK Key ID" serialized:"byok_key_id"`
EncryptionKeyId string `human:"Encryption Key ID" serialized:"encryption_key_id"`
Expand Down Expand Up @@ -146,6 +147,7 @@ func convertClusterToDescribeStruct(cluster *cmkv2.CmkV2Cluster, usageLimits *ka
Availability: ccloudv2.ToLower(cluster.Spec.GetAvailability()),
Network: cluster.Spec.Network.GetId(),
Status: getCmkClusterStatus(cluster),
DeletionProtection: cluster.Spec.GetDeletionProtection(),
Endpoint: cluster.Spec.GetKafkaBootstrapEndpoint(),
ByokKeyId: getCmkByokId(cluster),
EncryptionKeyId: getCmkEncryptionKey(cluster),
Expand Down
2 changes: 1 addition & 1 deletion internal/kafka/command_cluster_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ func (c *clusterCommand) list(cmd *cobra.Command, _ []string) error {
for _, cluster := range clusters {
list.Add(convertClusterToDescribeStruct(&cluster, nil, c.Context))
}
list.Filter([]string{"IsCurrent", "Id", "Name", "Type", "Cloud", "Region", "Availability", "Network", "Status", "Endpoint"})
list.Filter([]string{"IsCurrent", "Id", "Name", "Type", "Cloud", "Region", "Availability", "Network", "Status", "DeletionProtection", "Endpoint"})
return list.Print()
}
19 changes: 18 additions & 1 deletion internal/kafka/command_cluster_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,34 @@
Text: `Update the Max eCKU count of a Kafka cluster:`,
Code: `confluent kafka cluster update lkc-123456 --max-ecku 5`,
},
examples.Example{
Text: "Enable deletion protection on a Kafka cluster:",
Code: "confluent kafka cluster update lkc-123456 --deletion-protection",
},
examples.Example{
Text: "Disable deletion protection on a Kafka cluster:",
Code: "confluent kafka cluster update lkc-123456 --deletion-protection=false",
},
),
}

cmd.Flags().String("name", "", "Name of the Kafka cluster.")
cmd.Flags().Uint32("cku", 0, `Number of Confluent Kafka Units. For Kafka clusters of type "dedicated" only. When shrinking a cluster, you must reduce capacity one CKU at a time.`)
cmd.Flags().String("type", "", `Type of the Kafka cluster. Only supports upgrading from "Basic" to "Standard".`)
cmd.Flags().Int("max-ecku", 0, `Maximum number of Elastic Confluent Kafka Units (eCKUs) that Kafka clusters should auto-scale to. `+

Check failure on line 53 in internal/kafka/command_cluster_update.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Define a constant instead of duplicating this literal "max-ecku" 4 times.

[S1192] String literals should not be duplicated See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3318&issues=53dcc62d-ff31-4f63-924d-d8d1aaf9a097&open=53dcc62d-ff31-4f63-924d-d8d1aaf9a097
`Kafka clusters with "HIGH" availability must have at least two eCKUs.`)
cmd.Flags().Bool("deletion-protection", false, "Enable deletion protection for the Kafka cluster. Use \"--deletion-protection=false\" to disable.")

Check failure on line 55 in internal/kafka/command_cluster_update.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Define a constant instead of duplicating this literal "deletion-protection" 4 times.

[S1192] String literals should not be duplicated See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3318&issues=c26c8c9f-3815-4b4c-89ee-e2c3bede020d&open=c26c8c9f-3815-4b4c-89ee-e2c3bede020d
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand)
pcmd.AddOutputFlag(cmd)

cmd.MarkFlagsOneRequired("name", "cku", "type", "max-ecku")
cmd.MarkFlagsOneRequired("name", "cku", "type", "max-ecku", "deletion-protection")

return cmd
}

func (c *clusterCommand) update(cmd *cobra.Command, args []string) error {

Check failure on line 66 in internal/kafka/command_cluster_update.go

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 45 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=cli&pullRequest=3318&issues=2d029223-029d-474d-9b46-80633de59e4c&open=2d029223-029d-474d-9b46-80633de59e4c
environmentId, err := c.Context.EnvironmentId()
if err != nil {
return err
Expand Down Expand Up @@ -85,6 +94,14 @@
update.Spec.SetDisplayName(name)
}

if cmd.Flags().Changed("deletion-protection") {
deletionProtection, err := cmd.Flags().GetBool("deletion-protection")
if err != nil {
return err
}
update.Spec.SetDeletionProtection(deletionProtection)
}

if cmd.Flags().Changed("cku") {
cku, err := cmd.Flags().GetUint32("cku")
if err != nil {
Expand Down
Loading