diff --git a/go.sum b/go.sum index 2b59513408..91f1821038 100644 --- a/go.sum +++ b/go.sum @@ -307,8 +307,6 @@ github.com/containerd/typeurl/v2 v2.1.1/go.mod h1:IDp2JFvbwZ31H8dQbEIY7sDl2L3o3H github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E= github.com/cpuguy83/dockercfg v0.3.1/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= -github.com/cyphar/filepath-securejoin v0.4.1 h1:JyxxyPEaktOD+GAnqIqTf9A8tHyAG22rowi7HkoSU1s= -github.com/cyphar/filepath-securejoin v0.4.1/go.mod h1:Sdj7gXlvMcPZsbhwhQ33GguGLDGQL7h7bg04C/+u9jI= github.com/cyphar/filepath-securejoin v0.6.1 h1:5CeZ1jPXEiYt3+Z6zqprSAgSWiggmpVyciv8syjIpVE= github.com/cyphar/filepath-securejoin v0.6.1/go.mod h1:A8hd4EnAeyujCJRrICiOWqjS1AX0a9kM5XL+NwKoYSc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -376,14 +374,10 @@ github.com/gliderlabs/ssh v0.3.8 h1:a4YXD1V7xMF9g5nTkdfnja3Sxy1PVDCj1Zg4Wb8vY6c= github.com/gliderlabs/ssh v0.3.8/go.mod h1:xYoytBv1sV0aL3CavoDuJIQNURXkkfPA/wxQ1pL1fAU= github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 h1:+zs/tPmkDkHx3U66DAb0lQFJrpS6731Oaa12ikc+DiI= github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376/go.mod h1:an3vInlBmSxCcxctByoQdvwPiA7DTK7jaaFDBTtu0ic= -github.com/go-git/go-billy/v5 v5.8.0 h1:I8hjc3LbBlXTtVuFNJuwYuMiHvQJDq1AT6u4DwDzZG0= -github.com/go-git/go-billy/v5 v5.8.0/go.mod h1:RpvI/rw4Vr5QA+Z60c6d6LXH0rYJo0uD5SqfmrrheCY= github.com/go-git/go-billy/v5 v5.9.0 h1:jItGXszUDRtR/AlferWPTMN4j38BQ88XnXKbilmmBPA= github.com/go-git/go-billy/v5 v5.9.0/go.mod h1:jCnQMLj9eUgGU7+ludSTYoZL/GGmii14RxKFj7ROgHw= github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399 h1:eMje31YglSBqCdIqdhKBW8lokaMrL3uTkpGYlE2OOT4= github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399/go.mod h1:1OCfN199q1Jm3HZlxleg+Dw/mwps2Wbk9frAWm+4FII= -github.com/go-git/go-git/v5 v5.18.0 h1:O831KI+0PR51hM2kep6T8k+w0/LIAD490gvqMCvL5hM= -github.com/go-git/go-git/v5 v5.18.0/go.mod h1:pW/VmeqkanRFqR6AljLcs7EA7FbZaN5MQqO7oZADXpo= github.com/go-git/go-git/v5 v5.19.0 h1:+WkVUQZSy/F1Gb13udrMKjIM2PrzsNfDKFSfo5tkMtc= github.com/go-git/go-git/v5 v5.19.0/go.mod h1:Pb1v0c7/g8aGQJwx9Us09W85yGoyvSwuhEGMH7zjDKQ= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= @@ -715,8 +709,6 @@ github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3v github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 h1:Ii+DKncOVM8Cu1Hc+ETb5K+23HdAMvESYE3ZJ5b5cMI= github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE= -github.com/pjbgf/sha1cd v0.3.2 h1:a9wb0bp1oC2TGwStyn0Umc/IGKQnEgF0vVaZ8QF8eo4= -github.com/pjbgf/sha1cd v0.3.2/go.mod h1:zQWigSxVmsHEZow5qaLtPYxpcKMMQpa09ixqBxuCS6A= github.com/pjbgf/sha1cd v0.6.0 h1:3WJ8Wz8gvDz29quX1OcEmkAlUg9diU4GxJHqs0/XiwU= github.com/pjbgf/sha1cd v0.6.0/go.mod h1:lhpGlyHLpQZoxMv8HcgXvZEhcGs0PG/vsZnEJ7H0iCM= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= @@ -946,8 +938,6 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= -golang.org/x/crypto v0.46.0 h1:cKRW/pmt1pKAfetfu+RCEvjvZkA9RimPbh7bhFjGVBU= -golang.org/x/crypto v0.46.0/go.mod h1:Evb/oLKmMraqjZ2iQTwDwvCtJkczlDuTmdJXoZVzqU0= golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -960,8 +950,6 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= -golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f h1:W3F4c+6OLc6H2lb//N1q4WpJkhzJCK5J6kUi1NTVXfM= golang.org/x/exp v0.0.0-20260410095643-746e56fc9e2f/go.mod h1:J1xhfL/vlindoeF/aINzNzt2Bket5bjo9sdOYzOsU80= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= @@ -990,8 +978,6 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.31.0 h1:HaW9xtz0+kOcWKwli0ZXy79Ix+UW/vOfmWI5QVd2tgI= -golang.org/x/mod v0.31.0/go.mod h1:43JraMp9cGx1Rx3AqioxrbrhNsLl2l/iNAvuBkrezpg= golang.org/x/mod v0.35.0 h1:Ww1D637e6Pg+Zb2KrWfHQUnH2dQRLBQyAtpr/haaJeM= golang.org/x/mod v0.35.0/go.mod h1:+GwiRhIInF8wPm+4AoT6L0FA1QWAad3OMdTRx4tFYlU= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1032,8 +1018,6 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= -golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1061,8 +1045,6 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= -golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1125,8 +1107,6 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= -golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -1134,8 +1114,6 @@ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuX golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= -golang.org/x/term v0.38.0 h1:PQ5pkm/rLO6HnxFR7N2lJHOZX6Kez5Y1gDSJla6jo7Q= -golang.org/x/term v0.38.0/go.mod h1:bSEAKrOT1W+VSu9TSCMtoGEOUcKxOKgl3LE5QEF/xVg= golang.org/x/term v0.42.0 h1:UiKe+zDFmJobeJ5ggPwOshJIVt6/Ft0rcfrXZDLWAWY= golang.org/x/term v0.42.0/go.mod h1:Dq/D+snpsbazcBG5+F9Q1n2rXV8Ma+71xEjTRufARgY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1149,8 +1127,6 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= -golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1212,8 +1188,6 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA= -golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= golang.org/x/tools v0.44.0 h1:UP4ajHPIcuMjT1GqzDWRlalUEoY+uzoZKnhOjbIPD2c= golang.org/x/tools v0.44.0/go.mod h1:KA0AfVErSdxRZIsOVipbv3rQhVXTnlU6UhKxHd1seDI= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/flink/command_catalog.go b/internal/flink/command_catalog.go index 77b540e2e5..8bfe75434f 100644 --- a/internal/flink/command_catalog.go +++ b/internal/flink/command_catalog.go @@ -25,6 +25,7 @@ func (c *command) newCatalogCommand() *cobra.Command { cmd.AddCommand(c.newCatalogDeleteCommand()) cmd.AddCommand(c.newCatalogDescribeCommand()) cmd.AddCommand(c.newCatalogListCommand()) + cmd.AddCommand(c.newCatalogDatabaseCommand()) return cmd } diff --git a/internal/flink/command_catalog_database.go b/internal/flink/command_catalog_database.go new file mode 100644 index 0000000000..605f875b90 --- /dev/null +++ b/internal/flink/command_catalog_database.go @@ -0,0 +1,113 @@ +package flink + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/spf13/cobra" + "gopkg.in/yaml.v3" + + cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/output" +) + +type databaseOut struct { + CreationTime string `human:"Creation Time" serialized:"creation_time"` + Name string `human:"Name" serialized:"name"` + Catalog string `human:"Catalog" serialized:"catalog"` +} + +func (c *command) newCatalogDatabaseCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "database", + Short: "Manage Flink databases in Confluent Platform.", + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogout}, + } + + cmd.AddCommand(c.newCatalogDatabaseCreateCommand()) + cmd.AddCommand(c.newCatalogDatabaseDeleteCommand()) + cmd.AddCommand(c.newCatalogDatabaseDescribeCommand()) + cmd.AddCommand(c.newCatalogDatabaseListCommand()) + cmd.AddCommand(c.newCatalogDatabaseUpdateCommand()) + + return cmd +} + +func printDatabaseOutput(cmd *cobra.Command, sdkDatabase cmfsdk.KafkaDatabase, catalogName string) error { + if output.GetFormat(cmd) == output.Human { + table := output.NewTable(cmd) + var creationTime string + if sdkDatabase.GetMetadata().CreationTimestamp != nil { + creationTime = *sdkDatabase.GetMetadata().CreationTimestamp + } + table.Add(&databaseOut{ + CreationTime: creationTime, + Name: sdkDatabase.GetMetadata().Name, + Catalog: catalogName, + }) + return table.Print() + } + + localDatabase := convertSdkDatabaseToLocalDatabase(sdkDatabase) + return output.SerializedOutput(cmd, localDatabase) +} + +func readDatabaseResourceFile(resourceFilePath string) (cmfsdk.KafkaDatabase, error) { + data, err := os.ReadFile(resourceFilePath) + if err != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to read file: %w", err) + } + + var genericData map[string]interface{} + ext := filepath.Ext(resourceFilePath) + switch ext { + case ".json": + err = json.Unmarshal(data, &genericData) + case ".yaml", ".yml": + err = yaml.Unmarshal(data, &genericData) + default: + return cmfsdk.KafkaDatabase{}, errors.NewErrorWithSuggestions(fmt.Sprintf("unsupported file format: %s", ext), "Supported file formats are .json, .yaml, and .yml.") + } + if err != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to parse input file: %w", err) + } + + jsonBytes, err := json.Marshal(genericData) + if err != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to marshal intermediate data: %w", err) + } + + var sdkDatabase cmfsdk.KafkaDatabase + if err = json.Unmarshal(jsonBytes, &sdkDatabase); err != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf("failed to bind data to KafkaDatabase model: %w", err) + } + + return sdkDatabase, nil +} + +func convertSdkDatabaseToLocalDatabase(sdkDatabase cmfsdk.KafkaDatabase) LocalKafkaDatabase { + return LocalKafkaDatabase{ + ApiVersion: sdkDatabase.ApiVersion, + Kind: sdkDatabase.Kind, + Metadata: LocalDatabaseMetadata{ + Name: sdkDatabase.Metadata.Name, + CreationTimestamp: sdkDatabase.Metadata.CreationTimestamp, + UpdateTimestamp: sdkDatabase.Metadata.UpdateTimestamp, + Uid: sdkDatabase.Metadata.Uid, + Labels: sdkDatabase.Metadata.Labels, + Annotations: sdkDatabase.Metadata.Annotations, + }, + Spec: LocalKafkaDatabaseSpec{ + KafkaCluster: LocalKafkaDatabaseSpecKafkaCluster{ + ConnectionConfig: sdkDatabase.Spec.KafkaCluster.ConnectionConfig, + ConnectionSecretId: sdkDatabase.Spec.KafkaCluster.ConnectionSecretId, + }, + DdlEnvironments: sdkDatabase.Spec.DdlEnvironments, + }, + } +} diff --git a/internal/flink/command_catalog_database_create.go b/internal/flink/command_catalog_database_create.go new file mode 100644 index 0000000000..dbae647b6b --- /dev/null +++ b/internal/flink/command_catalog_database_create.go @@ -0,0 +1,50 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" +) + +func (c *command) newCatalogDatabaseCreateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "create ", + Short: "Create a Flink database.", + Long: "Create a Flink database in a catalog in Confluent Platform.", + Args: cobra.ExactArgs(1), + RunE: c.catalogDatabaseCreate, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseCreate(cmd *cobra.Command, args []string) error { + resourceFilePath := args[0] + + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + sdkDatabase, err := readDatabaseResourceFile(resourceFilePath) + if err != nil { + return err + } + + sdkOutputDatabase, err := client.CreateDatabase(c.createContext(), catalogName, sdkDatabase) + if err != nil { + return err + } + + return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName) +} diff --git a/internal/flink/command_catalog_database_delete.go b/internal/flink/command_catalog_database_delete.go new file mode 100644 index 0000000000..a836b38e46 --- /dev/null +++ b/internal/flink/command_catalog_database_delete.go @@ -0,0 +1,58 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/deletion" + "github.com/confluentinc/cli/v4/pkg/errors" + "github.com/confluentinc/cli/v4/pkg/resource" +) + +func (c *command) newCatalogDatabaseDeleteCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "delete ", + Short: "Delete a Flink database in Confluent Platform.", + Args: cobra.ExactArgs(1), + RunE: c.catalogDatabaseDelete, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddForceFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseDelete(cmd *cobra.Command, args []string) error { + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + existenceFunc := func(name string) bool { + _, err := client.DescribeDatabase(c.createContext(), catalogName, name) + return err == nil + } + + if err := deletion.ValidateAndConfirm(cmd, args, existenceFunc, resource.FlinkDatabase); err != nil { + // We are validating only the existence of the resources (there is no prefix validation). + // Thus, we can add some extra context for the error. + suggestions := "List available Flink databases with `confluent flink catalog database list`." + suggestions += "\nCheck that CMF is running and accessible." + return errors.NewErrorWithSuggestions(err.Error(), suggestions) + } + + deleteFunc := func(name string) error { + return client.DeleteDatabase(c.createContext(), catalogName, name) + } + + _, err = deletion.Delete(cmd, args, deleteFunc, resource.FlinkDatabase) + return err +} diff --git a/internal/flink/command_catalog_database_describe.go b/internal/flink/command_catalog_database_describe.go new file mode 100644 index 0000000000..24b9952f8a --- /dev/null +++ b/internal/flink/command_catalog_database_describe.go @@ -0,0 +1,44 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" +) + +func (c *command) newCatalogDatabaseDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe ", + Short: "Describe a Flink database in Confluent Platform.", + Args: cobra.ExactArgs(1), + RunE: c.catalogDatabaseDescribe, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseDescribe(cmd *cobra.Command, args []string) error { + name := args[0] + + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + sdkOutputDatabase, err := client.DescribeDatabase(c.createContext(), catalogName, name) + if err != nil { + return err + } + + return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName) +} diff --git a/internal/flink/command_catalog_database_list.go b/internal/flink/command_catalog_database_list.go new file mode 100644 index 0000000000..2e0ff82f6e --- /dev/null +++ b/internal/flink/command_catalog_database_list.go @@ -0,0 +1,64 @@ +package flink + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *command) newCatalogDatabaseListCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List Flink databases in a catalog in Confluent Platform.", + Args: cobra.NoArgs, + RunE: c.catalogDatabaseList, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseList(cmd *cobra.Command, _ []string) error { + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + sdkDatabases, err := client.ListDatabases(c.createContext(), catalogName) + if err != nil { + return err + } + + if output.GetFormat(cmd) == output.Human { + list := output.NewList(cmd) + for _, db := range sdkDatabases { + var creationTime string + if db.GetMetadata().CreationTimestamp != nil { + creationTime = *db.GetMetadata().CreationTimestamp + } + list.Add(&databaseOut{ + CreationTime: creationTime, + Name: db.GetMetadata().Name, + Catalog: catalogName, + }) + } + return list.Print() + } + + localDatabases := make([]LocalKafkaDatabase, 0, len(sdkDatabases)) + for _, sdkDatabase := range sdkDatabases { + localDatabases = append(localDatabases, convertSdkDatabaseToLocalDatabase(sdkDatabase)) + } + + return output.SerializedOutput(cmd, localDatabases) +} diff --git a/internal/flink/command_catalog_database_update.go b/internal/flink/command_catalog_database_update.go new file mode 100644 index 0000000000..8411a8b5cd --- /dev/null +++ b/internal/flink/command_catalog_database_update.go @@ -0,0 +1,58 @@ +package flink + +import ( + "fmt" + + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" +) + +func (c *command) newCatalogDatabaseUpdateCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "update ", + Short: "Update a Flink database.", + Long: "Update a Flink database in a catalog in Confluent Platform.", + Args: cobra.ExactArgs(1), + RunE: c.catalogDatabaseUpdate, + } + + cmd.Flags().String("catalog", "", "Name of the catalog.") + cobra.CheckErr(cmd.MarkFlagRequired("catalog")) + addCmfFlagSet(cmd) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *command) catalogDatabaseUpdate(cmd *cobra.Command, args []string) error { + resourceFilePath := args[0] + + catalogName, err := cmd.Flags().GetString("catalog") + if err != nil { + return err + } + + client, err := c.GetCmfClient(cmd) + if err != nil { + return err + } + + sdkDatabase, err := readDatabaseResourceFile(resourceFilePath) + if err != nil { + return err + } + + databaseName := sdkDatabase.Metadata.Name + + if err := client.UpdateDatabase(c.createContext(), catalogName, databaseName, sdkDatabase); err != nil { + return err + } + + sdkOutputDatabase, err := client.DescribeDatabase(c.createContext(), catalogName, databaseName) + if err != nil { + return fmt.Errorf("database %q was updated successfully, but failed to retrieve updated details: %w", databaseName, err) + } + + return printDatabaseOutput(cmd, sdkOutputDatabase, catalogName) +} diff --git a/internal/flink/local_types.go b/internal/flink/local_types.go index 19e0b756d9..1c6e52b47f 100644 --- a/internal/flink/local_types.go +++ b/internal/flink/local_types.go @@ -142,6 +142,32 @@ type LocalKafkaCatalogSpecSrInstance struct { ConnectionSecretId *string `json:"connectionSecretId,omitempty" yaml:"connectionSecretId,omitempty"` } +type LocalKafkaDatabase struct { + ApiVersion string `json:"apiVersion" yaml:"apiVersion"` + Kind string `json:"kind" yaml:"kind"` + Metadata LocalDatabaseMetadata `json:"metadata" yaml:"metadata"` + Spec LocalKafkaDatabaseSpec `json:"spec" yaml:"spec"` +} + +type LocalDatabaseMetadata struct { + Name string `json:"name" yaml:"name"` + CreationTimestamp *string `json:"creationTimestamp,omitempty" yaml:"creationTimestamp,omitempty"` + UpdateTimestamp *string `json:"updateTimestamp,omitempty" yaml:"updateTimestamp,omitempty"` + Uid *string `json:"uid,omitempty" yaml:"uid,omitempty"` + Labels *map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"` + Annotations *map[string]string `json:"annotations,omitempty" yaml:"annotations,omitempty"` +} + +type LocalKafkaDatabaseSpec struct { + KafkaCluster LocalKafkaDatabaseSpecKafkaCluster `json:"kafkaCluster" yaml:"kafkaCluster"` + DdlEnvironments *[]string `json:"ddlEnvironments,omitempty" yaml:"ddlEnvironments,omitempty"` +} + +type LocalKafkaDatabaseSpecKafkaCluster struct { + ConnectionConfig map[string]string `json:"connectionConfig" yaml:"connectionConfig"` + ConnectionSecretId *string `json:"connectionSecretId,omitempty" yaml:"connectionSecretId,omitempty"` +} + type LocalResultSchema struct { Columns []LocalResultSchemaColumn `json:"columns" yaml:"columns"` } diff --git a/pkg/flink/cmf_rest_client.go b/pkg/flink/cmf_rest_client.go index a24b34b1e2..e55405d4c8 100644 --- a/pkg/flink/cmf_rest_client.go +++ b/pkg/flink/cmf_rest_client.go @@ -570,6 +570,57 @@ func (cmfClient *CmfRestClient) DeleteCatalog(ctx context.Context, catalogName s return parseSdkError(httpResp, err) } +func (cmfClient *CmfRestClient) DeleteDatabase(ctx context.Context, catalogName, databaseName string) error { + httpResp, err := cmfClient.SQLApi.DeleteKafkaDatabase(ctx, catalogName, databaseName).Execute() + if parsedErr := parseSdkError(httpResp, err); parsedErr != nil { + return fmt.Errorf(`failed to delete database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr) + } + return nil +} + +func (cmfClient *CmfRestClient) CreateDatabase(ctx context.Context, catalogName string, kafkaDatabase cmfsdk.KafkaDatabase) (cmfsdk.KafkaDatabase, error) { + databaseName := kafkaDatabase.Metadata.Name + outputDatabase, httpResponse, err := cmfClient.SQLApi.CreateKafkaDatabase(ctx, catalogName).KafkaDatabase(kafkaDatabase).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf(`failed to create database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr) + } + return outputDatabase, nil +} + +func (cmfClient *CmfRestClient) UpdateDatabase(ctx context.Context, catalogName, databaseName string, kafkaDatabase cmfsdk.KafkaDatabase) error { + httpResponse, err := cmfClient.SQLApi.UpdateKafkaDatabase(ctx, catalogName, databaseName).KafkaDatabase(kafkaDatabase).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return fmt.Errorf(`failed to update database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr) + } + return nil +} + +func (cmfClient *CmfRestClient) DescribeDatabase(ctx context.Context, catalogName, databaseName string) (cmfsdk.KafkaDatabase, error) { + outputDatabase, httpResponse, err := cmfClient.SQLApi.GetKafkaDatabase(ctx, catalogName, databaseName).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return cmfsdk.KafkaDatabase{}, fmt.Errorf(`failed to get database "%s" in catalog "%s": %s`, databaseName, catalogName, parsedErr) + } + return outputDatabase, nil +} + +func (cmfClient *CmfRestClient) ListDatabases(ctx context.Context, catalogName string) ([]cmfsdk.KafkaDatabase, error) { + databases := make([]cmfsdk.KafkaDatabase, 0) + done := false + const pageSize = 100 + var currentPageNumber int32 = 0 + + for !done { + databasePage, httpResponse, err := cmfClient.SQLApi.GetKafkaDatabases(ctx, catalogName).Page(currentPageNumber).Size(pageSize).Execute() + if parsedErr := parseSdkError(httpResponse, err); parsedErr != nil { + return nil, fmt.Errorf(`failed to list databases in catalog "%s": %s`, catalogName, parsedErr) + } + databases = append(databases, databasePage.GetItems()...) + currentPageNumber, done = extractPageOptions(len(databasePage.GetItems()), currentPageNumber) + } + + return databases, nil +} + // Returns the next page number and whether we need to fetch more pages or not. func extractPageOptions(receivedItemsLength int, currentPageNumber int32) (int32, bool) { if receivedItemsLength == 0 { diff --git a/pkg/resource/resource.go b/pkg/resource/resource.go index 57de4a56c8..eaed00b843 100644 --- a/pkg/resource/resource.go +++ b/pkg/resource/resource.go @@ -45,6 +45,7 @@ const ( FlinkDetachedSavepoint = "Flink detached savepoint" FlinkApplication = "Flink application" FlinkCatalog = "Flink catalog" + FlinkDatabase = "Flink database" FlinkEnvironment = "Flink environment" FlinkRegion = "Flink region" FlinkEndpoint = "Flink endpoint" diff --git a/test/fixtures/input/flink/catalog/database/create-invalid-failure.json b/test/fixtures/input/flink/catalog/database/create-invalid-failure.json new file mode 100644 index 0000000000..e45d244060 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/create-invalid-failure.json @@ -0,0 +1,12 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "invalid-database" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": {} + } + } +} diff --git a/test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml b/test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml new file mode 100644 index 0000000000..ea5bb88064 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml @@ -0,0 +1,7 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: invalid-database +spec: + kafkaCluster: + connectionConfig: {} diff --git a/test/fixtures/input/flink/catalog/database/create-successful.json b/test/fixtures/input/flink/catalog/database/create-successful.json new file mode 100644 index 0000000000..4422ebedb8 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/create-successful.json @@ -0,0 +1,14 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/input/flink/catalog/database/create-successful.yaml b/test/fixtures/input/flink/catalog/database/create-successful.yaml new file mode 100644 index 0000000000..6e4df0bbf0 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/create-successful.yaml @@ -0,0 +1,8 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/input/flink/catalog/database/update-invalid-failure.json b/test/fixtures/input/flink/catalog/database/update-invalid-failure.json new file mode 100644 index 0000000000..62292591c4 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/update-invalid-failure.json @@ -0,0 +1,14 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "invalid-database" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml b/test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml new file mode 100644 index 0000000000..93e62665e9 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml @@ -0,0 +1,8 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: invalid-database +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/input/flink/catalog/database/update-successful.json b/test/fixtures/input/flink/catalog/database/update-successful.json new file mode 100644 index 0000000000..4422ebedb8 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/update-successful.json @@ -0,0 +1,14 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/input/flink/catalog/database/update-successful.yaml b/test/fixtures/input/flink/catalog/database/update-successful.yaml new file mode 100644 index 0000000000..6e4df0bbf0 --- /dev/null +++ b/test/fixtures/input/flink/catalog/database/update-successful.yaml @@ -0,0 +1,8 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/create-help-onprem.golden b/test/fixtures/output/flink/catalog/database/create-help-onprem.golden new file mode 100644 index 0000000000..4b0f03902e --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-help-onprem.golden @@ -0,0 +1,17 @@ +Create a Flink database in a catalog in Confluent Platform. + +Usage: + confluent flink catalog database create [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/create-invalid-failure.golden b/test/fixtures/output/flink/catalog/database/create-invalid-failure.golden new file mode 100644 index 0000000000..d2b4965a8a --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-invalid-failure.golden @@ -0,0 +1 @@ +Error: failed to create database "invalid-database" in catalog "test-catalog": The Kafka database object from resource file is invalid diff --git a/test/fixtures/output/flink/catalog/database/create-success-json.golden b/test/fixtures/output/flink/catalog/database/create-success-json.golden new file mode 100644 index 0000000000..065e5eba7c --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-success-json.golden @@ -0,0 +1,15 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database", + "creationTimestamp": "2025-03-12 23:42:00 +0000 UTC" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/output/flink/catalog/database/create-success-yaml.golden b/test/fixtures/output/flink/catalog/database/create-success-yaml.golden new file mode 100644 index 0000000000..0d9dc39dc6 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-success-yaml.golden @@ -0,0 +1,9 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database + creationTimestamp: 2025-03-12 23:42:00 +0000 UTC +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/create-success.golden b/test/fixtures/output/flink/catalog/database/create-success.golden new file mode 100644 index 0000000000..b1b1a2a1ff --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/create-success.golden @@ -0,0 +1,5 @@ ++---------------+-------------------------------+ +| Creation Time | 2025-03-12 23:42:00 +0000 UTC | +| Name | test-database | +| Catalog | test-catalog | ++---------------+-------------------------------+ diff --git a/test/fixtures/output/flink/catalog/database/delete-help-onprem.golden b/test/fixtures/output/flink/catalog/database/delete-help-onprem.golden new file mode 100644 index 0000000000..91228009b3 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/delete-help-onprem.golden @@ -0,0 +1,17 @@ +Delete a Flink database in Confluent Platform. + +Usage: + confluent flink catalog database delete [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + --force Skip the deletion confirmation prompt. + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/delete-non-exist-failure.golden b/test/fixtures/output/flink/catalog/database/delete-non-exist-failure.golden new file mode 100644 index 0000000000..b6cedfb005 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/delete-non-exist-failure.golden @@ -0,0 +1 @@ +Are you sure you want to delete Flink database "non-exist-database"? (y/n): Error: failed to delete non-exist-database: failed to delete database "non-exist-database" in catalog "test-catalog": 404 Not Found diff --git a/test/fixtures/output/flink/catalog/database/delete-single-force.golden b/test/fixtures/output/flink/catalog/database/delete-single-force.golden new file mode 100644 index 0000000000..3517fbbf14 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/delete-single-force.golden @@ -0,0 +1 @@ +Deleted Flink database "test-database-1". diff --git a/test/fixtures/output/flink/catalog/database/delete-single-successful.golden b/test/fixtures/output/flink/catalog/database/delete-single-successful.golden new file mode 100644 index 0000000000..ecc2f170b7 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/delete-single-successful.golden @@ -0,0 +1 @@ +Are you sure you want to delete Flink database "test-database-1"? (y/n): Deleted Flink database "test-database-1". diff --git a/test/fixtures/output/flink/catalog/database/describe-help-onprem.golden b/test/fixtures/output/flink/catalog/database/describe-help-onprem.golden new file mode 100644 index 0000000000..23cffb3b6d --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-help-onprem.golden @@ -0,0 +1,17 @@ +Describe a Flink database in Confluent Platform. + +Usage: + confluent flink catalog database describe [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/describe-not-found.golden b/test/fixtures/output/flink/catalog/database/describe-not-found.golden new file mode 100644 index 0000000000..0d2858c098 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-not-found.golden @@ -0,0 +1 @@ +Error: failed to get database "invalid-database" in catalog "test-catalog": The database name is invalid diff --git a/test/fixtures/output/flink/catalog/database/describe-success-json.golden b/test/fixtures/output/flink/catalog/database/describe-success-json.golden new file mode 100644 index 0000000000..87b3be9876 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-success-json.golden @@ -0,0 +1,15 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database", + "creationTimestamp": "2025-08-05T12:00:00Z" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/output/flink/catalog/database/describe-success-yaml.golden b/test/fixtures/output/flink/catalog/database/describe-success-yaml.golden new file mode 100644 index 0000000000..f0878e9e53 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-success-yaml.golden @@ -0,0 +1,9 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database + creationTimestamp: "2025-08-05T12:00:00Z" +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/describe-success.golden b/test/fixtures/output/flink/catalog/database/describe-success.golden new file mode 100644 index 0000000000..3702c544e4 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/describe-success.golden @@ -0,0 +1,5 @@ ++---------------+----------------------+ +| Creation Time | 2025-08-05T12:00:00Z | +| Name | test-database | +| Catalog | test-catalog | ++---------------+----------------------+ diff --git a/test/fixtures/output/flink/catalog/database/help-onprem.golden b/test/fixtures/output/flink/catalog/database/help-onprem.golden new file mode 100644 index 0000000000..49da534a01 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/help-onprem.golden @@ -0,0 +1,18 @@ +Manage Flink databases in Confluent Platform. + +Usage: + confluent flink catalog database [command] + +Available Commands: + create Create a Flink database. + delete Delete a Flink database in Confluent Platform. + describe Describe a Flink database in Confluent Platform. + list List Flink databases in a catalog in Confluent Platform. + update Update a Flink database. + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). + +Use "confluent flink catalog database [command] --help" for more information about a command. diff --git a/test/fixtures/output/flink/catalog/database/list-help-onprem.golden b/test/fixtures/output/flink/catalog/database/list-help-onprem.golden new file mode 100644 index 0000000000..4dbf71ed16 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/list-help-onprem.golden @@ -0,0 +1,17 @@ +List Flink databases in a catalog in Confluent Platform. + +Usage: + confluent flink catalog database list [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/list-success-json.golden b/test/fixtures/output/flink/catalog/database/list-success-json.golden new file mode 100644 index 0000000000..36349dc6d4 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/list-success-json.golden @@ -0,0 +1,32 @@ +[ + { + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database-1", + "creationTimestamp": "2025-08-05T12:00:00Z" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } + }, + { + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database-2", + "creationTimestamp": "2025-08-05T12:00:00Z" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } + } +] diff --git a/test/fixtures/output/flink/catalog/database/list-success-yaml.golden b/test/fixtures/output/flink/catalog/database/list-success-yaml.golden new file mode 100644 index 0000000000..c421dc678f --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/list-success-yaml.golden @@ -0,0 +1,18 @@ +- apiVersion: cmf/api/v1/database + kind: KafkaDatabase + metadata: + name: test-database-1 + creationTimestamp: "2025-08-05T12:00:00Z" + spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 +- apiVersion: cmf/api/v1/database + kind: KafkaDatabase + metadata: + name: test-database-2 + creationTimestamp: "2025-08-05T12:00:00Z" + spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/list-success.golden b/test/fixtures/output/flink/catalog/database/list-success.golden new file mode 100644 index 0000000000..38bfed9632 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/list-success.golden @@ -0,0 +1,4 @@ + Creation Time | Name | Catalog +-----------------------+-----------------+--------------- + 2025-08-05T12:00:00Z | test-database-1 | test-catalog + 2025-08-05T12:00:00Z | test-database-2 | test-catalog diff --git a/test/fixtures/output/flink/catalog/database/update-help-onprem.golden b/test/fixtures/output/flink/catalog/database/update-help-onprem.golden new file mode 100644 index 0000000000..f5c10ad850 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-help-onprem.golden @@ -0,0 +1,17 @@ +Update a Flink database in a catalog in Confluent Platform. + +Usage: + confluent flink catalog database update [flags] + +Flags: + --catalog string REQUIRED: Name of the catalog. + --url string Base URL of the Confluent Manager for Apache Flink (CMF). Environment variable "CONFLUENT_CMF_URL" may be set in place of this flag. + --client-key-path string Path to client private key for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_KEY_PATH" may be set in place of this flag. + --client-cert-path string Path to client cert to be verified by Confluent Manager for Apache Flink. Include for mTLS authentication. Environment variable "CONFLUENT_CMF_CLIENT_CERT_PATH" may be set in place of this flag. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent Manager for Apache Flink connection. Environment variable "CONFLUENT_CMF_CERTIFICATE_AUTHORITY_PATH" may be set in place of this flag. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/flink/catalog/database/update-invalid-failure.golden b/test/fixtures/output/flink/catalog/database/update-invalid-failure.golden new file mode 100644 index 0000000000..16726790eb --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-invalid-failure.golden @@ -0,0 +1 @@ +Error: failed to update database "invalid-database" in catalog "test-catalog": The database name is invalid diff --git a/test/fixtures/output/flink/catalog/database/update-success-json.golden b/test/fixtures/output/flink/catalog/database/update-success-json.golden new file mode 100644 index 0000000000..87b3be9876 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-success-json.golden @@ -0,0 +1,15 @@ +{ + "apiVersion": "cmf/api/v1/database", + "kind": "KafkaDatabase", + "metadata": { + "name": "test-database", + "creationTimestamp": "2025-08-05T12:00:00Z" + }, + "spec": { + "kafkaCluster": { + "connectionConfig": { + "bootstrap.servers": "localhost:9092" + } + } + } +} diff --git a/test/fixtures/output/flink/catalog/database/update-success-yaml.golden b/test/fixtures/output/flink/catalog/database/update-success-yaml.golden new file mode 100644 index 0000000000..f0878e9e53 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-success-yaml.golden @@ -0,0 +1,9 @@ +apiVersion: cmf/api/v1/database +kind: KafkaDatabase +metadata: + name: test-database + creationTimestamp: "2025-08-05T12:00:00Z" +spec: + kafkaCluster: + connectionConfig: + bootstrap.servers: localhost:9092 diff --git a/test/fixtures/output/flink/catalog/database/update-success.golden b/test/fixtures/output/flink/catalog/database/update-success.golden new file mode 100644 index 0000000000..3702c544e4 --- /dev/null +++ b/test/fixtures/output/flink/catalog/database/update-success.golden @@ -0,0 +1,5 @@ ++---------------+----------------------+ +| Creation Time | 2025-08-05T12:00:00Z | +| Name | test-database | +| Catalog | test-catalog | ++---------------+----------------------+ diff --git a/test/fixtures/output/flink/catalog/help-onprem.golden b/test/fixtures/output/flink/catalog/help-onprem.golden index cbc8042d2d..1ce122027c 100644 --- a/test/fixtures/output/flink/catalog/help-onprem.golden +++ b/test/fixtures/output/flink/catalog/help-onprem.golden @@ -5,6 +5,7 @@ Usage: Available Commands: create Create a Flink catalog. + database Manage Flink databases in Confluent Platform. delete Delete one or more Flink catalogs in Confluent Platform. describe Describe a Flink catalog in Confluent Platform. list List Flink catalogs in Confluent Platform. diff --git a/test/flink_onprem_test.go b/test/flink_onprem_test.go index 5536f0a14c..55b0583a6c 100644 --- a/test/flink_onprem_test.go +++ b/test/flink_onprem_test.go @@ -410,6 +410,94 @@ func (s *CLITestSuite) TestFlinkCatalogListOnPrem() { runIntegrationTestsWithMultipleAuth(s, tests) } +func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog", fixture: "flink/catalog/database/create-success.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/create-success-json.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/create-success-yaml.golden"}, + // failure + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/create-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + +func (s *CLITestSuite) TestFlinkCatalogDatabaseDeleteOnPrem() { + tests := []CLITest{ + // success scenarios + {args: "flink catalog database delete test-database-1 --catalog test-catalog", input: "y\n", fixture: "flink/catalog/database/delete-single-successful.golden"}, + {args: "flink catalog database delete test-database-1 --catalog test-catalog --force", fixture: "flink/catalog/database/delete-single-force.golden"}, + // failure scenarios + {args: "flink catalog database delete non-exist-database --catalog test-catalog", input: "y\n", fixture: "flink/catalog/database/delete-non-exist-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + +func (s *CLITestSuite) TestFlinkCatalogDatabaseDescribeOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog database describe test-database --catalog test-catalog", fixture: "flink/catalog/database/describe-success.golden"}, + {args: "flink catalog database describe test-database --catalog test-catalog --output json", fixture: "flink/catalog/database/describe-success-json.golden"}, + {args: "flink catalog database describe test-database --catalog test-catalog --output yaml", fixture: "flink/catalog/database/describe-success-yaml.golden"}, + // failure + {args: "flink catalog database describe invalid-database --catalog test-catalog", fixture: "flink/catalog/database/describe-not-found.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + +func (s *CLITestSuite) TestFlinkCatalogDatabaseListOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog database list --catalog test-catalog", fixture: "flink/catalog/database/list-success.golden"}, + {args: "flink catalog database list --catalog test-catalog --output json", fixture: "flink/catalog/database/list-success-json.golden"}, + {args: "flink catalog database list --catalog test-catalog --output yaml", fixture: "flink/catalog/database/list-success-yaml.golden"}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + +func (s *CLITestSuite) TestFlinkCatalogDatabaseUpdateOnPrem() { + tests := []CLITest{ + // success + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog", fixture: "flink/catalog/database/update-success.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/update-success-json.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/update-success-yaml.golden"}, + // failure + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/update-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + +func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateOnPremWithYAML() { + tests := []CLITest{ + // success + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.yaml --catalog test-catalog", fixture: "flink/catalog/database/create-success.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.yaml --catalog test-catalog --output json", fixture: "flink/catalog/database/create-success-json.golden"}, + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.yaml --catalog test-catalog --output yaml", fixture: "flink/catalog/database/create-success-yaml.golden"}, + // failure + {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-invalid-failure.yaml --catalog test-catalog", fixture: "flink/catalog/database/create-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + +func (s *CLITestSuite) TestFlinkCatalogDatabaseUpdateOnPremWithYAML() { + tests := []CLITest{ + // success + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.yaml --catalog test-catalog", fixture: "flink/catalog/database/update-success.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.yaml --catalog test-catalog --output json", fixture: "flink/catalog/database/update-success-json.golden"}, + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-successful.yaml --catalog test-catalog --output yaml", fixture: "flink/catalog/database/update-success-yaml.golden"}, + // failure + {args: "flink catalog database update test/fixtures/input/flink/catalog/database/update-invalid-failure.yaml --catalog test-catalog", fixture: "flink/catalog/database/update-invalid-failure.golden", exitCode: 1}, + } + + runIntegrationTestsWithMultipleAuth(s, tests) +} + func (s *CLITestSuite) TestFlinkStatementCreateOnPrem() { tests := []CLITest{ // success diff --git a/test/test-server/flink_onprem_handler.go b/test/test-server/flink_onprem_handler.go index ef179075d8..57e77d30d5 100644 --- a/test/test-server/flink_onprem_handler.go +++ b/test/test-server/flink_onprem_handler.go @@ -16,6 +16,8 @@ import ( cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" ) +const invalidDatabaseName = "invalid-database" + // Helper function to create a Flink application. func createApplication(name string) cmfsdk.FlinkApplication { status := map[string]interface{}{ @@ -224,6 +226,112 @@ func createKafkaCatalog(catName string) cmfsdk.KafkaCatalog { } } +func createKafkaDatabase(dbName string) cmfsdk.KafkaDatabase { + timeStamp := time.Date(2025, time.August, 5, 12, 0, 0, 0, time.UTC).Format(time.RFC3339) + return cmfsdk.KafkaDatabase{ + ApiVersion: "cmf/api/v1/database", + Kind: "KafkaDatabase", + Metadata: cmfsdk.DatabaseMetadata{ + Name: dbName, + CreationTimestamp: &timeStamp, + }, + Spec: cmfsdk.KafkaDatabaseSpec{ + KafkaCluster: cmfsdk.KafkaDatabaseSpecKafkaCluster{ + ConnectionConfig: map[string]string{ + "bootstrap.servers": "localhost:9092", + }, + }, + }, + } +} + +func handleCmfCatalogDatabases(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + handleLoginType(t, r) + switch r.Method { + case http.MethodGet: + databases := []cmfsdk.KafkaDatabase{ + createKafkaDatabase("test-database-1"), + createKafkaDatabase("test-database-2"), + } + databasesPage := cmfsdk.KafkaDatabasesPage{} + page := r.URL.Query().Get("page") + + if page == "0" { + databasesPage.SetItems(databases) + } + + err := json.NewEncoder(w).Encode(databasesPage) + require.NoError(t, err) + case http.MethodPost: + reqBody, err := io.ReadAll(r.Body) + require.NoError(t, err) + var database cmfsdk.KafkaDatabase + err = json.Unmarshal(reqBody, &database) + require.NoError(t, err) + + dbName := database.GetMetadata().Name + + if dbName == invalidDatabaseName { + http.Error(w, "The Kafka database object from resource file is invalid", http.StatusUnprocessableEntity) + return + } + + timeStamp := time.Date(2025, time.March, 12, 23, 42, 0, 0, time.UTC).String() + database.Metadata.CreationTimestamp = &timeStamp + err = json.NewEncoder(w).Encode(database) + require.NoError(t, err) + return + default: + require.Fail(t, fmt.Sprintf("Unexpected method %s", r.Method)) + } + } +} + +func handleCmfCatalogDatabase(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + handleLoginType(t, r) + + vars := mux.Vars(r) + dbName := vars["dbName"] + + switch r.Method { + case http.MethodGet: + if dbName == invalidDatabaseName { + http.Error(w, "The database name is invalid", http.StatusNotFound) + return + } + + database := createKafkaDatabase(dbName) + err := json.NewEncoder(w).Encode(database) + require.NoError(t, err) + return + case http.MethodPut: + if dbName == invalidDatabaseName { + http.Error(w, "The database name is invalid", http.StatusNotFound) + return + } + + // Read and validate the request body. + req := new(cmfsdk.KafkaDatabase) + err := json.NewDecoder(r.Body).Decode(req) + require.NoError(t, err) + + w.WriteHeader(http.StatusOK) + return + case http.MethodDelete: + if dbName == "non-exist-database" { + http.Error(w, "", http.StatusNotFound) + return + } + w.WriteHeader(http.StatusOK) + return + default: + require.Fail(t, fmt.Sprintf("Unexpected method %s", r.Method)) + } + } +} + func createFlinkStatement(stmtName string, stopped bool, parallelism int32) cmfsdk.Statement { timeStamp := time.Date(2025, time.August, 5, 12, 00, 0, 0, time.UTC).String() status := cmfsdk.StatementStatus{ diff --git a/test/test-server/flink_onprem_router.go b/test/test-server/flink_onprem_router.go index 2267f01d5a..8d597732f6 100644 --- a/test/test-server/flink_onprem_router.go +++ b/test/test-server/flink_onprem_router.go @@ -9,6 +9,8 @@ import ( var flinkRoutes = []route{ {"/cmf/api/v1/catalogs/kafka", handleCmfCatalogs}, {"/cmf/api/v1/catalogs/kafka/{catName}", handleCmfCatalog}, + {"/cmf/api/v1/catalogs/kafka/{catName}/databases", handleCmfCatalogDatabases}, + {"/cmf/api/v1/catalogs/kafka/{catName}/databases/{dbName}", handleCmfCatalogDatabase}, {"/cmf/api/v1/environments/{environment}/applications", handleCmfApplications}, {"/cmf/api/v1/environments/{environment}/applications/{application}", handleCmfApplication}, {"/cmf/api/v1/environments", handleCmfEnvironments},