Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
338 changes: 330 additions & 8 deletions e2e/e2e_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ package e2e

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"

fn "knative.dev/func/pkg/functions"
"knative.dev/func/pkg/k8s"
)

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -552,13 +559,328 @@ func Handle(w http.ResponseWriter, _ *http.Request) {
}
}

// TODO: TestMetadata_Subscriptions ensures that function instances can be
// subscribed to events.
// TestMetadata_Subscriptions verifies the full event flow using Knative Eventing:
// Producer function -> Broker -> Trigger -> Subscriber function
func TestMetadata_Subscriptions(t *testing.T) {
// TODO
// Create a function which emits an event with as much defaults as possible
// Create a function which subscribes to those events
// Succeed the test as soon as it receives the event
// https://github.com/knative/func/issues/3202
t.Skip("Subscription E2E tests not yet implemented")
brokerName := "default"
createBroker(t, Namespace, brokerName)
defer deleteBroker(t, Namespace, brokerName)

// Create subscriber function that receives CloudEvents
subscriberName := "func-e2e-test-subscriber"
subscriberRoot := fromCleanEnv(t, subscriberName)

if err := newCmd(t, "init", "-l=go", "-t=cloudevents").Run(); err != nil {
t.Fatal(err)
}

subscriberImpl := `package function

import (
"context"
"fmt"
"os"
"time"

"github.com/cloudevents/sdk-go/v2/event"
)

func Handle(ctx context.Context, e event.Event) (*event.Event, error) {
os.WriteFile("/tmp/received_event", []byte(e.Type()), 0644)
fmt.Printf("Received event: type=%s, source=%s, id=%s\n", e.Type(), e.Source(), e.ID())

response := event.New()
response.SetID(fmt.Sprintf("response-%d", time.Now().UnixNano()))
response.SetSource("subscriber")
response.SetType("test.response")
response.SetData("application/json", map[string]string{
"received_type": e.Type(),
"status": "received",
})
return &response, nil
}
`
if err := os.WriteFile(filepath.Join(subscriberRoot, "handle.go"), []byte(subscriberImpl), 0644); err != nil {
t.Fatal(err)
}

// Run func subscribe (without -v flag which subscribe doesn't support)
subscribeCmd := exec.Command(Bin, "subscribe", "--filter", "type=test.event")
subscribeCmd.Stdout = os.Stdout
subscribeCmd.Stderr = os.Stderr
t.Log("$ func subscribe --filter type=test.event")
if err := subscribeCmd.Run(); err != nil {
t.Fatal(err)
}

// Verify subscription config
f, err := fn.NewFunction(subscriberRoot)
if err != nil {
t.Fatal(err)
}
if len(f.Deploy.Subscriptions) != 1 {
t.Fatalf("expected 1 subscription, got %d", len(f.Deploy.Subscriptions))
}

if err := newCmd(t, "deploy").Run(); err != nil {
t.Fatal(err)
}
defer clean(t, subscriberName, Namespace)

subscriberURL := fmt.Sprintf("http://%s.%s.%s", subscriberName, Namespace, Domain)
if !waitFor(t, subscriberURL, withTemplate("cloudevents")) {
t.Fatal("subscriber did not become ready")
}
t.Log("Subscriber deployed and ready")
waitForTrigger(t, Namespace, subscriberName)

// Create producer function that sends CloudEvents to the broker
producerName := "func-e2e-test-producer"
_ = fromCleanEnv(t, producerName)

if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil {
t.Fatal(err)
}

producerImpl := fmt.Sprintf(`package function

import (
"fmt"
"io"
"net/http"
"strings"
"time"
)

const (
brokerNamespace = "%s"
brokerName = "%s"
)

func Handle(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
w.WriteHeader(200)
fmt.Fprintf(w, "Producer is ready")
return
}

brokerURL := fmt.Sprintf("http://broker-ingress.knative-eventing.svc.cluster.local/%%s/%%s", brokerNamespace, brokerName)
eventBody := `+"`"+`{"message": "hello from producer"}`+"`"+`
httpReq, err := http.NewRequest("POST", brokerURL, strings.NewReader(eventBody))
if err != nil {
w.WriteHeader(500)
fmt.Fprintf(w, "Failed to create request: %%v", err)
return
}

httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("ce-specversion", "1.0")
httpReq.Header.Set("ce-type", "test.event")
httpReq.Header.Set("ce-source", "producer-function")
httpReq.Header.Set("ce-id", fmt.Sprintf("evt-%%d", time.Now().UnixNano()))

client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(httpReq)
if err != nil {
w.WriteHeader(500)
fmt.Fprintf(w, "Failed to send to broker at %%s: %%v", brokerURL, err)
return
}
defer resp.Body.Close()

body, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
w.WriteHeader(200)
fmt.Fprintf(w, "Event sent successfully to %%s (status %%d)", brokerURL, resp.StatusCode)
} else {
w.WriteHeader(500)
fmt.Fprintf(w, "Broker %%s returned status %%d. Body: %%s", brokerURL, resp.StatusCode, string(body))
}
}

`, Namespace, brokerName)

producerRoot, _ := os.Getwd()
if err := os.WriteFile(filepath.Join(producerRoot, "handle.go"), []byte(producerImpl), 0644); err != nil {
t.Fatal(err)
}

if err := newCmd(t, "deploy").Run(); err != nil {
t.Fatal(err)
}
defer clean(t, producerName, Namespace)

producerURL := fmt.Sprintf("http://%s.%s.%s", producerName, Namespace, Domain)
if !waitFor(t, producerURL, withContentMatch("Producer is ready")) {
t.Fatal("producer did not become ready")
}
t.Log("Producer deployed and ready")

// Invoke producer to trigger event flow
t.Log("Invoking producer to send event to broker...")
client := http.Client{Timeout: 30 * time.Second}
resp, err := client.Post(producerURL, "application/json", strings.NewReader("{}"))
if err != nil {
t.Fatalf("Failed to invoke producer: %v", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
t.Logf("Producer response: %s (Status: %d)", string(body), resp.StatusCode)

if resp.StatusCode != 200 {
t.Fatalf("Broker failed to accept event: Status %d. Body: %s", resp.StatusCode, string(body))
}

t.Log("Event sent to broker successfully. Waiting for subscriber...")
if !waitFor(t, subscriberURL, withTemplate("cloudevents")) {
t.Fatal("subscriber did not respond after event was sent")
}
t.Log("Event flow verified: Producer -> Broker -> Subscriber")
}

// createBroker creates a Knative Broker in the given namespace.
func createBroker(t *testing.T, namespace, name string) {
t.Helper()

brokerYAML := fmt.Sprintf(`apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: %s
namespace: %s
`, name, namespace)

cmd := exec.Command("kubectl", "apply", "-f", "-")
cmd.Stdin = strings.NewReader(brokerYAML)
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)

output, err := cmd.CombinedOutput()
if err != nil {
t.Logf("Warning: could not create broker: %v, output: %s", err, string(output))
return
}
t.Logf("Created broker %s in namespace %s", name, namespace)

waitCmd := exec.Command("kubectl", "wait", "--for=condition=Ready",
fmt.Sprintf("broker/%s", name), "-n", namespace, "--timeout=60s")
waitCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
waitOutput, err := waitCmd.CombinedOutput()
if err != nil {
t.Logf("Warning: broker may not be ready: %v, output: %s", err, string(waitOutput))
} else {
t.Logf("Broker %s is ready", name)
}

// Wait for broker-ingress service to be available (critical for CI)
// This ensures DNS has propagated before we try to send events
t.Log("Waiting for broker-ingress service to be available...")
for i := 0; i < 30; i++ {
checkCmd := exec.Command("kubectl", "get", "svc", "-n", "knative-eventing", "broker-ingress")
checkCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)
if err := checkCmd.Run(); err == nil {
t.Log("broker-ingress service is available")
return
}
time.Sleep(2 * time.Second)
}
t.Log("Warning: broker-ingress service check timed out, proceeding anyway")
}

// deleteBroker removes a Knative Broker from the given namespace.
func deleteBroker(t *testing.T, namespace, name string) {
t.Helper()

cmd := exec.Command("kubectl", "delete", "broker", name, "-n", namespace, "--ignore-not-found")
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)

output, err := cmd.CombinedOutput()
if err != nil {
t.Logf("Warning: could not delete broker: %v, output: %s", err, string(output))
return
}
t.Logf("Deleted broker %s from namespace %s", name, namespace)
}

// waitForTrigger waits for the function's trigger to become ready.
func waitForTrigger(t *testing.T, namespace, functionName string) {
t.Helper()

triggerName := fmt.Sprintf("%s-function-trigger-0", functionName)

cmd := exec.Command("kubectl", "wait", "--for=condition=Ready",
fmt.Sprintf("trigger/%s", triggerName), "-n", namespace, "--timeout=60s")
cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig)

output, err := cmd.CombinedOutput()
if err != nil {
t.Logf("Warning: trigger may not be ready: %v, output: %s", err, string(output))
} else {
t.Logf("Trigger %s is ready", triggerName)
}
}

// sendEventToBrokerWithRetry attempts to send a CloudEvent to the broker with retries.
func sendEventToBrokerWithRetry(t *testing.T, brokerURL, eventType, eventSource string, maxRetries int) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Kunal1522 , I noticed that sendEventToBrokerWithRetry and sendEventToBrokerInternal are defined but never called anywhere in the codebase. Are these intended for future use, or can they be removed to reduce dead code?

t.Helper()

for i := 0; i < maxRetries; i++ {
if i > 0 {
t.Logf("Retry %d/%d for sending event to broker", i+1, maxRetries)
time.Sleep(5 * time.Second)
}

success := sendEventToBrokerInternal(t, brokerURL, eventType, eventSource)
if success {
return true
}
}
return false
}

// sendEventToBrokerInternal sends a CloudEvent to the broker using in-cluster networking.
func sendEventToBrokerInternal(t *testing.T, brokerURL, eventType, eventSource string) bool {
t.Helper()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

clientConfig := k8s.GetClientConfig()
dialer, err := k8s.NewInClusterDialer(ctx, clientConfig)
if err != nil {
t.Logf("Warning: could not create in-cluster dialer: %v", err)
return false
}
defer dialer.Close()

transport := &http.Transport{
DialContext: dialer.DialContext,
}
client := &http.Client{
Transport: transport,
Timeout: 30 * time.Second,
}

eventID := fmt.Sprintf("test-event-%d", time.Now().UnixNano())
eventJSON := fmt.Sprintf(`{"specversion":"1.0","type":"%s","source":"%s","id":"%s","datacontenttype":"application/json","data":{"message":"test subscription event"}}`, eventType, eventSource, eventID)

req, err := http.NewRequestWithContext(ctx, "POST", brokerURL, strings.NewReader(eventJSON))
if err != nil {
t.Logf("Warning: could not create request to broker: %v", err)
return false
}

req.Header.Set("Content-Type", "application/cloudevents+json")

resp, err := client.Do(req)
if err != nil {
t.Logf("Warning: could not send event to broker: %v", err)
return false
}
defer resp.Body.Close()

if resp.StatusCode >= 200 && resp.StatusCode < 300 {
t.Logf("Event sent to broker successfully (status: %d)", resp.StatusCode)
return true
}
t.Logf("Broker responded with status: %d", resp.StatusCode)
return false
}
Loading