diff --git a/e2e/e2e_metadata_test.go b/e2e/e2e_metadata_test.go index 6acd48339b..6e018482b0 100644 --- a/e2e/e2e_metadata_test.go +++ b/e2e/e2e_metadata_test.go @@ -5,11 +5,18 @@ package e2e import ( "bytes" + "context" "encoding/json" "fmt" + "io" + "net" + "net/http" "os" + "os/exec" "path/filepath" + "strings" "testing" + "time" fn "knative.dev/func/pkg/functions" ) @@ -552,13 +559,310 @@ func Handle(w http.ResponseWriter, _ *http.Request) { } } -// TODO: TestMetadata_Subscriptions ensures that function instances can be -// subscribed to events. +// Tests the complete event flow using func subscribe func TestMetadata_Subscriptions(t *testing.T) { - // TODO - // Create a function which emits an event with as much defaults as possible - // Create a function which subscribes to those events - // Succeed the test as soon as it receives the event - // https://github.com/knative/func/issues/3202 - t.Skip("Subscription E2E tests not yet implemented") + brokerName := "default" + if !createBrokerWithCheck(t, Namespace, brokerName) { + t.Fatal("Failed to create broker") + } + defer deleteBroker(t, Namespace, brokerName) + + uniqueEventID := fmt.Sprintf("e2e-test-%d", time.Now().UnixNano()) + eventReceived := make(chan string, 10) + + callbackURL, cleanup := startCallbackServer(t, eventReceived) + defer cleanup() + + subscriberName := "func-e2e-test-subscriber" + subscriberRoot := fromCleanEnv(t, subscriberName) + if err := newCmd(t, "init", "-l=go", "-t=cloudevents").Run(); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(subscriberRoot, "handle.go"), + []byte(subscriberCode()), 0644); err != nil { + t.Fatal(err) + } + + subscribeCmd := exec.Command(Bin, "subscribe", "--filter", "type=test.event") + subscribeCmd.Stdout, subscribeCmd.Stderr = os.Stdout, os.Stderr + if err := subscribeCmd.Run(); err != nil { + t.Fatal(err) + } + if err := newCmd(t, "config", "envs", "add", + "--name=CALLBACK_URL", "--value="+callbackURL).Run(); err != nil { + t.Fatal(err) + } + + f, err := fn.NewFunction(subscriberRoot) + if err != nil { + t.Fatal(err) + } + if len(f.Deploy.Subscriptions) != 1 { + t.Fatalf("expected 1 subscription, got %d", len(f.Deploy.Subscriptions)) + } + + if err := newCmd(t, "deploy").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, subscriberName, Namespace) + + subscriberURL := fmt.Sprintf("http://%s.%s.%s", subscriberName, Namespace, Domain) + if !waitFor(t, subscriberURL, withTemplate("cloudevents")) { + t.Fatal("subscriber not ready") + } + waitForTrigger(t, Namespace, subscriberName) + + producerName := "func-e2e-test-producer" + producerRoot := fromCleanEnv(t, producerName) + if err := newCmd(t, "init", "-l=go", "-t=http").Run(); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(producerRoot, "handle.go"), + []byte(producerCode(Namespace, brokerName)), 0644); err != nil { + t.Fatal(err) + } + if err := newCmd(t, "deploy").Run(); err != nil { + t.Fatal(err) + } + defer clean(t, producerName, Namespace) + + producerURL := fmt.Sprintf("http://%s.%s.%s", producerName, Namespace, Domain) + if !waitFor(t, producerURL, withContentMatch("Producer is ready")) { + t.Fatal("producer not ready") + } + + client := http.Client{Timeout: 30 * time.Second} + resp, err := client.Post(producerURL+"?event_id="+uniqueEventID, "application/json", strings.NewReader("{}")) + if err != nil { + t.Fatalf("Failed to invoke producer: %v", err) + } + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + if resp.StatusCode != 200 { + t.Fatalf("Broker rejected event: %s", body) + } + t.Logf("Broker accepted event %s", uniqueEventID) + + select { + case receivedID := <-eventReceived: + t.Logf("Event flow verified (received: %s)", receivedID) + case <-time.After(60 * time.Second): + t.Fatal("Timeout: No callback from subscriber") + } +} + +// Starts HTTP server to receive callbacks from subscriber pod +func startCallbackServer(t *testing.T, ch chan<- string) (string, func()) { + t.Helper() + hostIP := getHostIPForCluster(t) + listener, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatalf("Failed to listen: %v", err) + } + port := listener.Addr().(*net.TCPAddr).Port + + mux := http.NewServeMux() + mux.HandleFunc("/callback", func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + select { + case ch <- string(body): + default: + } + w.WriteHeader(200) + }) + + srv := &http.Server{Handler: mux} + go srv.Serve(listener) + return fmt.Sprintf("http://%s:%d/callback", hostIP, port), func() { srv.Shutdown(context.Background()) } +} + +// CloudEvents handler that calls back to test server +func subscriberCode() string { + return `package function + +import ( + "bytes" + "context" + "fmt" + "net/http" + "os" + "time" + "github.com/cloudevents/sdk-go/v2/event" +) + +func Handle(ctx context.Context, e event.Event) (*event.Event, error) { + fmt.Printf("EVENT_RECEIVED: id=%s type=%s source=%s\n", e.ID(), e.Type(), e.Source()) + if url := os.Getenv("CALLBACK_URL"); url != "" { + c := &http.Client{Timeout: 5 * time.Second} + if resp, err := c.Post(url, "text/plain", bytes.NewBufferString(e.ID())); err == nil { + resp.Body.Close() + fmt.Printf("Callback sent to %s\n", url) + } else { + fmt.Printf("Callback failed: %v\n", err) + } + } + r := event.New() + r.SetID("response-" + e.ID()) + r.SetSource("subscriber") + r.SetType("test.response") + r.SetData("application/json", map[string]string{"status": "received"}) + return &r, nil +} +` +} + +// HTTP handler that sends CloudEvents to broker +func producerCode(namespace, broker string) string { + return fmt.Sprintf(`package function + +import ( + "fmt" + "io" + "net/http" + "strings" + "time" +) + +func Handle(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + fmt.Fprint(w, "Producer is ready") + return + } + eventID := r.URL.Query().Get("event_id") + if eventID == "" { + eventID = fmt.Sprintf("evt-%%d", time.Now().UnixNano()) + } + url := "http://broker-ingress.knative-eventing.svc.cluster.local/%s/%s" + req, _ := http.NewRequest("POST", url, strings.NewReader(`+"`"+`{}`+"`"+`)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("ce-specversion", "1.0") + req.Header.Set("ce-type", "test.event") + req.Header.Set("ce-source", "producer") + req.Header.Set("ce-id", eventID) + resp, err := (&http.Client{Timeout: 30 * time.Second}).Do(req) + if err != nil { + http.Error(w, err.Error(), 500) + return + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + fmt.Fprintf(w, "Event %%s sent (%%d)", eventID, resp.StatusCode) + } else { + http.Error(w, string(body), 500) + } +} +`, namespace, broker) +} + +// Returns host IP accessible from kind cluster +func getHostIPForCluster(t *testing.T) string { + t.Helper() + cmd := exec.Command("docker", "network", "inspect", "kind", "-f", "{{(index .IPAM.Config 0).Gateway}}") + output, err := cmd.Output() + if err == nil && len(output) > 0 { + ip := strings.TrimSpace(string(output)) + + if ip != "" && !strings.Contains(ip, ":") { + t.Logf("Using kind network gateway: %s", ip) + return ip + } + } + + cmd = exec.Command("ip", "route", "get", "1") + output, err = cmd.Output() + if err == nil { + // Parse output like "1.0.0.0 via 192.168.1.1 dev eth0 src 192.168.1.100" + fields := strings.Fields(string(output)) + for i, f := range fields { + if f == "src" && i+1 < len(fields) { + t.Logf("Using host IP from route: %s", fields[i+1]) + return fields[i+1] + } + } + } + + // Last resort: use common Docker bridge IP + t.Log("Warning: Could not determine host IP, using 172.17.0.1 (Docker default)") + return "172.17.0.1" +} + +// createBrokerWithCheck creates a Knative Broker and returns true if successful. +func createBrokerWithCheck(t *testing.T, namespace, name string) bool { + t.Helper() + + brokerYAML := fmt.Sprintf(`apiVersion: eventing.knative.dev/v1 +kind: Broker +metadata: + name: %s + namespace: %s +`, name, namespace) + + cmd := exec.Command("kubectl", "apply", "-f", "-") + cmd.Stdin = strings.NewReader(brokerYAML) + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Failed to create broker: %v, output: %s", err, string(output)) + return false + } + t.Logf("Created broker %s in namespace %s", name, namespace) + + waitCmd := exec.Command("kubectl", "wait", "--for=condition=Ready", + fmt.Sprintf("broker/%s", name), "-n", namespace, "--timeout=60s") + waitCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + waitOutput, err := waitCmd.CombinedOutput() + if err != nil { + t.Logf("Broker not ready: %v, output: %s", err, string(waitOutput)) + return false + } + t.Logf("Broker %s is ready", name) + + // Wait for broker-ingress service to be available + t.Log("Waiting for broker-ingress service to be available...") + for i := 0; i < 30; i++ { + checkCmd := exec.Command("kubectl", "get", "svc", "-n", "knative-eventing", "broker-ingress") + checkCmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + if err := checkCmd.Run(); err == nil { + t.Log("broker-ingress service is available") + return true + } + time.Sleep(2 * time.Second) + } + t.Log("broker-ingress service check timed out") + return false +} + +// deleteBroker removes a Knative Broker from the given namespace. +func deleteBroker(t *testing.T, namespace, name string) { + t.Helper() + + cmd := exec.Command("kubectl", "delete", "broker", name, "-n", namespace, "--ignore-not-found") + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Warning: could not delete broker: %v, output: %s", err, string(output)) + return + } + t.Logf("Deleted broker %s from namespace %s", name, namespace) +} + +// waitForTrigger waits for the function's trigger to become ready. +func waitForTrigger(t *testing.T, namespace, functionName string) { + t.Helper() + + triggerName := fmt.Sprintf("%s-function-trigger-0", functionName) + + cmd := exec.Command("kubectl", "wait", "--for=condition=Ready", + fmt.Sprintf("trigger/%s", triggerName), "-n", namespace, "--timeout=60s") + cmd.Env = append(os.Environ(), "KUBECONFIG="+Kubeconfig) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("Warning: trigger may not be ready: %v, output: %s", err, string(output)) + } else { + t.Logf("Trigger %s is ready", triggerName) + } }