Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/dapr/kit
go 1.26.0

require (
github.com/Microsoft/go-winio v0.6.2
github.com/alphadose/haxmap v1.3.1
github.com/cenkalti/backoff/v4 v4.2.1
github.com/fsnotify/fsnotify v1.7.0
Expand Down
47 changes: 47 additions & 0 deletions signals/pipe_posix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//go:build !windows

/*
Copyright 2026 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package signals

import (
"fmt"
"os"
"strconv"
"syscall"
)

// ReloadPipeName returns the named pipe path used by a dapr process with the
// given PID to listen for reload signals on Windows. On POSIX systems this is
// not used (SIGHUP is used instead), but returns the same value as the Windows
// implementation so that cross-platform code can compute the expected name.
func ReloadPipeName(pid int) string {
return `\\.\pipe\dapr-reload-` + strconv.Itoa(pid)
}

// SignalReload sends SIGHUP to the process with the given PID on POSIX
// systems, triggering a runtime reload.
func SignalReload(pid int) error {
proc, err := os.FindProcess(pid)
if err != nil {
return fmt.Errorf("failed to find process %d: %w", pid, err)
}

err = proc.Signal(syscall.SIGHUP)
if err != nil {
return fmt.Errorf("failed to send SIGHUP to process %d: %w", pid, err)
}

return nil
}
53 changes: 53 additions & 0 deletions signals/pipe_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what the _windows suffix of the filename is doing

Copyright 2026 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package signals

import (
"fmt"
"net"
"strconv"
"time"

"github.com/Microsoft/go-winio"
)

// ReloadPipeName returns the named pipe path used by a dapr process with the
// given PID to listen for reload signals on Windows. This is exported so that
// external tooling (CLI, tests) can connect to trigger a reload.
func ReloadPipeName(pid int) string {
return `\\.\pipe\dapr-reload-` + strconv.Itoa(pid)
}

// listenPipe creates a Windows named pipe listener at the given path.
// The pipe is secured so that the creating user (Creator Owner),
// Built-in Administrators, and Local System have full access.
func listenPipe(name string) (net.Listener, error) {
return winio.ListenPipe(name, &winio.PipeConfig{
// CO = Creator Owner, BA = Built-in Administrators, SY = Local System.
SecurityDescriptor: "D:P(A;;GA;;;CO)(A;;GA;;;BA)(A;;GA;;;SY)",
})
}

// SignalReload connects to the reload named pipe for the given PID, triggering
// a reload of that dapr process.
func SignalReload(pid int) error {
pipeName := ReloadPipeName(pid)
timeout := 5 * time.Second
conn, err := winio.DialPipe(pipeName, &timeout)
if err != nil {
return fmt.Errorf("failed to connect to reload pipe %s: %w", pipeName, err)
}
conn.Close()
return nil
}
2 changes: 1 addition & 1 deletion signals/signals_posix.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//go:build !windows

/*
Copyright 2023 The Dapr Authors
Copyright 2026 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Expand Down
19 changes: 19 additions & 0 deletions signals/signals_posix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,25 @@ func TestOnHUP(t *testing.T) {
}
})

t.Run("SignalReload should cancel context via SIGHUP", func(t *testing.T) {
defer signal.Reset()

hupCh := OnHUP(t.Context())
ctx := <-hupCh

require.NoError(t, SignalReload(syscall.Getpid()))

select {
case <-ctx.Done():
cause := context.Cause(ctx)
require.Error(t, cause)
assert.Contains(t, cause.Error(), "SIGHUP",
"cause should contain SIGHUP, got: %s", cause.Error())
case <-time.After(1 * time.Second):
t.Error("context should be cancelled in time")
}
})

t.Run("channel should be closed when main context is cancelled", func(t *testing.T) {
defer signal.Reset()

Expand Down
82 changes: 73 additions & 9 deletions signals/signals_windows.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2023 The Dapr Authors
Copyright 2026 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Expand All @@ -15,22 +15,86 @@ package signals

import (
"context"
"errors"
"net"
"os"
"time"
)

var shutdownSignals = []os.Signal{os.Interrupt}

// OnHUP is a no-op on Windows as SIGHUP is not supported. It returns a channel
// that yields the original parent context, and closes when the parent
// context is canceled.
// OnHUP returns a channel that yields a new context each time a reload signal
// is received via a Windows named pipe. Each context is canceled when the next
// reload signal arrives or when the parent context is canceled. The channel is
// closed when the parent context is canceled.
//
// On Windows, SIGHUP is not supported. Instead, this function listens on a
// named pipe (\\.\pipe\dapr-reload-<PID>). Any connection to the pipe
// triggers a reload, equivalent to sending SIGHUP on POSIX systems.
func OnHUP(ctx context.Context) <-chan context.Context {
ctxCh := make(chan context.Context, 1)
ctxhupCh := make(chan context.Context, 1)

go func() {
defer close(ctxCh)
ctxCh <- ctx
<-ctx.Done()
defer close(ctxhupCh)

pipeName := ReloadPipeName(os.Getpid())
listener, err := listenPipe(pipeName)
if err != nil {
log.Errorf("Failed to create reload named pipe %s: %v", pipeName, err)
// Fall back to the old no-op behavior: send ctx once, wait for
// cancellation.
ctxhupCh <- ctx
<-ctx.Done()
return
}

log.Infof("Listening for reload signals on named pipe %s", pipeName)

go func() {
<-ctx.Done()
listener.Close()
}()

for {
ctxhup, cancel := context.WithCancelCause(ctx)

select {
case ctxhupCh <- ctxhup:
case <-ctx.Done():
cancel(ctx.Err())
return
}

// Wait for a connection on the named pipe. A connection (and
// immediate close) is the reload trigger, equivalent to SIGHUP.
// Retry on transient Accept errors without canceling the
// current context to avoid unintended restart loops.
for {
conn, err := listener.Accept()
if err != nil {
if ctx.Err() != nil {
cancel(ctx.Err())
return
}
// If the listener is permanently closed, exit rather
// than spinning in a tight retry loop.
if errors.Is(err, net.ErrClosed) {
log.Errorf("Reload pipe listener closed unexpectedly: %v", err)
cancel(errors.New("reload pipe listener closed"))
return
}
log.Warnf("Error accepting reload pipe connection, retrying in 1s: %v", err)
time.Sleep(time.Second)
continue
}
conn.Close()
break
}

log.Info("Received reload signal via named pipe; restarting")
cancel(errors.New("received reload signal via named pipe"))
}
}()

return ctxCh
return ctxhupCh
}
91 changes: 91 additions & 0 deletions signals/signals_windows_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
Copyright 2026 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package signals

import (
"context"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestOnHUP_WindowsPipe(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ctxCh := OnHUP(ctx)

// First context should be emitted immediately.
var firstCtx context.Context
select {
case c, ok := <-ctxCh:
require.True(t, ok, "channel should yield a context")
firstCtx = c
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for initial context from OnHUP")
}

// First context should not be canceled yet.
assert.NoError(t, firstCtx.Err(), "initial context should not be canceled")

// Send a reload signal via the named pipe.
err := SignalReload(os.Getpid())
require.NoError(t, err, "SignalReload should connect to the pipe")

// First context should be canceled after the reload signal.
select {
case <-firstCtx.Done():
// expected
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for first context to be canceled after reload signal")
}

// A new context should be emitted for the next reload cycle.
var secondCtx context.Context
select {
case c, ok := <-ctxCh:
require.True(t, ok, "channel should yield a second context")
secondCtx = c
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for second context from OnHUP")
}

assert.NoError(t, secondCtx.Err(), "second context should not be canceled yet")

// Cancel the parent context to shut down the pipe listener.
cancel()

select {
case <-secondCtx.Done():
// expected
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for second context to be canceled after parent cancel")
}

// Channel should be closed after parent context cancellation.
select {
case _, ok := <-ctxCh:
assert.False(t, ok, "channel should be closed after parent context is canceled")
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for channel to close")
}
}

func TestReloadPipeName(t *testing.T) {
name := ReloadPipeName(12345)
assert.Equal(t, `\\.\pipe\dapr-reload-12345`, name)
}
Loading