diff --git a/go.mod b/go.mod index 586846c..923a9cf 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/dapr/kit go 1.26.0 require ( + github.com/Microsoft/go-winio v0.6.2 github.com/alphadose/haxmap v1.3.1 github.com/cenkalti/backoff/v4 v4.2.1 github.com/fsnotify/fsnotify v1.7.0 diff --git a/signals/pipe_posix.go b/signals/pipe_posix.go new file mode 100644 index 0000000..fa7925e --- /dev/null +++ b/signals/pipe_posix.go @@ -0,0 +1,47 @@ +//go:build !windows + +/* +Copyright 2026 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package signals + +import ( + "fmt" + "os" + "strconv" + "syscall" +) + +// ReloadPipeName returns the named pipe path used by a dapr process with the +// given PID to listen for reload signals on Windows. On POSIX systems this is +// not used (SIGHUP is used instead), but returns the same value as the Windows +// implementation so that cross-platform code can compute the expected name. +func ReloadPipeName(pid int) string { + return `\\.\pipe\dapr-reload-` + strconv.Itoa(pid) +} + +// SignalReload sends SIGHUP to the process with the given PID on POSIX +// systems, triggering a runtime reload. +func SignalReload(pid int) error { + proc, err := os.FindProcess(pid) + if err != nil { + return fmt.Errorf("failed to find process %d: %w", pid, err) + } + + err = proc.Signal(syscall.SIGHUP) + if err != nil { + return fmt.Errorf("failed to send SIGHUP to process %d: %w", pid, err) + } + + return nil +} diff --git a/signals/pipe_windows.go b/signals/pipe_windows.go new file mode 100644 index 0000000..84b00dd --- /dev/null +++ b/signals/pipe_windows.go @@ -0,0 +1,53 @@ +/* +Copyright 2026 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package signals + +import ( + "fmt" + "net" + "strconv" + "time" + + "github.com/Microsoft/go-winio" +) + +// ReloadPipeName returns the named pipe path used by a dapr process with the +// given PID to listen for reload signals on Windows. This is exported so that +// external tooling (CLI, tests) can connect to trigger a reload. +func ReloadPipeName(pid int) string { + return `\\.\pipe\dapr-reload-` + strconv.Itoa(pid) +} + +// listenPipe creates a Windows named pipe listener at the given path. +// The pipe is secured so that the creating user (Creator Owner), +// Built-in Administrators, and Local System have full access. +func listenPipe(name string) (net.Listener, error) { + return winio.ListenPipe(name, &winio.PipeConfig{ + // CO = Creator Owner, BA = Built-in Administrators, SY = Local System. + SecurityDescriptor: "D:P(A;;GA;;;CO)(A;;GA;;;BA)(A;;GA;;;SY)", + }) +} + +// SignalReload connects to the reload named pipe for the given PID, triggering +// a reload of that dapr process. +func SignalReload(pid int) error { + pipeName := ReloadPipeName(pid) + timeout := 5 * time.Second + conn, err := winio.DialPipe(pipeName, &timeout) + if err != nil { + return fmt.Errorf("failed to connect to reload pipe %s: %w", pipeName, err) + } + conn.Close() + return nil +} diff --git a/signals/signals_posix.go b/signals/signals_posix.go index 2dca96d..d164436 100644 --- a/signals/signals_posix.go +++ b/signals/signals_posix.go @@ -1,7 +1,7 @@ //go:build !windows /* -Copyright 2023 The Dapr Authors +Copyright 2026 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/signals/signals_posix_test.go b/signals/signals_posix_test.go index b35c78e..e923686 100644 --- a/signals/signals_posix_test.go +++ b/signals/signals_posix_test.go @@ -195,6 +195,25 @@ func TestOnHUP(t *testing.T) { } }) + t.Run("SignalReload should cancel context via SIGHUP", func(t *testing.T) { + defer signal.Reset() + + hupCh := OnHUP(t.Context()) + ctx := <-hupCh + + require.NoError(t, SignalReload(syscall.Getpid())) + + select { + case <-ctx.Done(): + cause := context.Cause(ctx) + require.Error(t, cause) + assert.Contains(t, cause.Error(), "SIGHUP", + "cause should contain SIGHUP, got: %s", cause.Error()) + case <-time.After(1 * time.Second): + t.Error("context should be cancelled in time") + } + }) + t.Run("channel should be closed when main context is cancelled", func(t *testing.T) { defer signal.Reset() diff --git a/signals/signals_windows.go b/signals/signals_windows.go index c7aab39..1d91808 100644 --- a/signals/signals_windows.go +++ b/signals/signals_windows.go @@ -1,5 +1,5 @@ /* -Copyright 2023 The Dapr Authors +Copyright 2026 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -15,22 +15,86 @@ package signals import ( "context" + "errors" + "net" "os" + "time" ) var shutdownSignals = []os.Signal{os.Interrupt} -// OnHUP is a no-op on Windows as SIGHUP is not supported. It returns a channel -// that yields the original parent context, and closes when the parent -// context is canceled. +// OnHUP returns a channel that yields a new context each time a reload signal +// is received via a Windows named pipe. Each context is canceled when the next +// reload signal arrives or when the parent context is canceled. The channel is +// closed when the parent context is canceled. +// +// On Windows, SIGHUP is not supported. Instead, this function listens on a +// named pipe (\\.\pipe\dapr-reload-). Any connection to the pipe +// triggers a reload, equivalent to sending SIGHUP on POSIX systems. func OnHUP(ctx context.Context) <-chan context.Context { - ctxCh := make(chan context.Context, 1) + ctxhupCh := make(chan context.Context, 1) go func() { - defer close(ctxCh) - ctxCh <- ctx - <-ctx.Done() + defer close(ctxhupCh) + + pipeName := ReloadPipeName(os.Getpid()) + listener, err := listenPipe(pipeName) + if err != nil { + log.Errorf("Failed to create reload named pipe %s: %v", pipeName, err) + // Fall back to the old no-op behavior: send ctx once, wait for + // cancellation. + ctxhupCh <- ctx + <-ctx.Done() + return + } + + log.Infof("Listening for reload signals on named pipe %s", pipeName) + + go func() { + <-ctx.Done() + listener.Close() + }() + + for { + ctxhup, cancel := context.WithCancelCause(ctx) + + select { + case ctxhupCh <- ctxhup: + case <-ctx.Done(): + cancel(ctx.Err()) + return + } + + // Wait for a connection on the named pipe. A connection (and + // immediate close) is the reload trigger, equivalent to SIGHUP. + // Retry on transient Accept errors without canceling the + // current context to avoid unintended restart loops. + for { + conn, err := listener.Accept() + if err != nil { + if ctx.Err() != nil { + cancel(ctx.Err()) + return + } + // If the listener is permanently closed, exit rather + // than spinning in a tight retry loop. + if errors.Is(err, net.ErrClosed) { + log.Errorf("Reload pipe listener closed unexpectedly: %v", err) + cancel(errors.New("reload pipe listener closed")) + return + } + log.Warnf("Error accepting reload pipe connection, retrying in 1s: %v", err) + time.Sleep(time.Second) + continue + } + conn.Close() + break + } + + log.Info("Received reload signal via named pipe; restarting") + cancel(errors.New("received reload signal via named pipe")) + } }() - return ctxCh + return ctxhupCh } diff --git a/signals/signals_windows_test.go b/signals/signals_windows_test.go new file mode 100644 index 0000000..f98a1cb --- /dev/null +++ b/signals/signals_windows_test.go @@ -0,0 +1,91 @@ +/* +Copyright 2026 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package signals + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestOnHUP_WindowsPipe(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ctxCh := OnHUP(ctx) + + // First context should be emitted immediately. + var firstCtx context.Context + select { + case c, ok := <-ctxCh: + require.True(t, ok, "channel should yield a context") + firstCtx = c + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for initial context from OnHUP") + } + + // First context should not be canceled yet. + assert.NoError(t, firstCtx.Err(), "initial context should not be canceled") + + // Send a reload signal via the named pipe. + err := SignalReload(os.Getpid()) + require.NoError(t, err, "SignalReload should connect to the pipe") + + // First context should be canceled after the reload signal. + select { + case <-firstCtx.Done(): + // expected + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for first context to be canceled after reload signal") + } + + // A new context should be emitted for the next reload cycle. + var secondCtx context.Context + select { + case c, ok := <-ctxCh: + require.True(t, ok, "channel should yield a second context") + secondCtx = c + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for second context from OnHUP") + } + + assert.NoError(t, secondCtx.Err(), "second context should not be canceled yet") + + // Cancel the parent context to shut down the pipe listener. + cancel() + + select { + case <-secondCtx.Done(): + // expected + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for second context to be canceled after parent cancel") + } + + // Channel should be closed after parent context cancellation. + select { + case _, ok := <-ctxCh: + assert.False(t, ok, "channel should be closed after parent context is canceled") + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for channel to close") + } +} + +func TestReloadPipeName(t *testing.T) { + name := ReloadPipeName(12345) + assert.Equal(t, `\\.\pipe\dapr-reload-12345`, name) +}