Skip to content

Commit b57639c

Browse files
authored
Fix/server cpu exhaustion (#353)
* Fix server CPU exhaustion under high stream load - Enforce maxSessions limit: reject new TCP connections when at capacity instead of accepting and logging a debug message - Add per-session concurrent stream limit (2048) using a semaphore to prevent a single session (e.g. setup-node) from spawning unbounded goroutines that starve the CPU - Add backoff delay (50ms) on non-fatal stream accept errors to prevent tight CPU spin loops when persistent errors occur - Streams that exceed the concurrency limit are immediately closed rather than queued, providing backpressure to the client * Revert maxSessions rejection to original behavior maxSessions only controls discovery advertisement, not connection acceptance. Services and visors connect to all servers regardless of advertised load, so rejecting sessions would break connectivity. * Add stream read deadline and fix indentation - Add read deadline (HandshakeTimeout) on initial stream request read so slow or malicious clients cannot hold goroutines and semaphore slots indefinitely. Deadline is cleared before the long-lived bidirectional copy loop. - Remove stale TODO comment in server accept loop - Fix indentation from previous revert * Ensure pprof HTTP server remains responsive under high load Run the pprof HTTP server on a dedicated OS thread via runtime.LockOSThread() and bump GOMAXPROCS by 1 to reserve a thread for it. This ensures the kernel scheduler gives pprof CPU time even when the Go runtime is saturated with thousands of stream-handling goroutines, which is exactly when pprof is needed most to diagnose the problem.
1 parent 3016c2e commit b57639c

3 files changed

Lines changed: 98 additions & 42 deletions

File tree

pkg/cmdutil/pprof.go

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,30 +21,7 @@ func InitPProf(log *logging.Logger, mode string, addr string) func() { //nolint:
2121

2222
switch mode {
2323
case "http":
24-
go func() {
25-
mux := http.NewServeMux()
26-
mux.HandleFunc("/debug/pprof/", pprof.Index)
27-
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
28-
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
29-
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
30-
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
31-
32-
for _, profile := range []string{"heap", "goroutine", "threadcreate", "block", "mutex", "allocs"} {
33-
mux.Handle("/debug/pprof/"+profile, pprof.Handler(profile))
34-
}
35-
36-
srv := &http.Server{
37-
Addr: addr,
38-
Handler: mux,
39-
ReadHeaderTimeout: 5 * time.Second,
40-
WriteTimeout: 30 * time.Second,
41-
}
42-
log.Infof("Serving pprof on http://%s", addr)
43-
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
44-
log.Errorf("pprof http server failed: %v", err)
45-
}
46-
}()
47-
24+
startPProfHTTP(log, addr, false)
4825
time.Sleep(100 * time.Millisecond)
4926
return noop
5027

@@ -122,21 +99,7 @@ func InitPProf(log *logging.Logger, mode string, addr string) func() { //nolint:
12299
}
123100

124101
case "trace":
125-
go func() {
126-
mux := http.NewServeMux()
127-
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
128-
srv := &http.Server{
129-
Addr: addr,
130-
Handler: mux,
131-
ReadHeaderTimeout: 5 * time.Second,
132-
WriteTimeout: 60 * time.Second,
133-
}
134-
log.Infof("Serving trace endpoint on http://%s/debug/pprof/trace", addr)
135-
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
136-
log.Errorf("pprof trace server failed: %v", err)
137-
}
138-
}()
139-
102+
startPProfHTTP(log, addr, true)
140103
time.Sleep(100 * time.Millisecond)
141104
return noop
142105

@@ -149,3 +112,47 @@ func InitPProf(log *logging.Logger, mode string, addr string) func() { //nolint:
149112

150113
return noop
151114
}
115+
116+
// startPProfHTTP starts a pprof HTTP server on a dedicated OS thread.
117+
// Locking the goroutine to its own thread ensures the kernel scheduler
118+
// gives it CPU time even when the Go runtime is saturated with goroutines,
119+
// which is exactly when pprof is needed most.
120+
func startPProfHTTP(log *logging.Logger, addr string, traceOnly bool) {
121+
// Reserve an extra OS thread for the pprof server so it doesn't
122+
// compete with application goroutines for GOMAXPROCS slots.
123+
runtime.GOMAXPROCS(runtime.GOMAXPROCS(0) + 1)
124+
125+
go func() {
126+
// Pin this goroutine to a dedicated OS thread so the kernel
127+
// scheduler guarantees it CPU time independent of Go's
128+
// cooperative goroutine scheduler.
129+
runtime.LockOSThread()
130+
131+
mux := http.NewServeMux()
132+
if traceOnly {
133+
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
134+
log.Infof("Serving trace endpoint on http://%s/debug/pprof/trace (dedicated thread)", addr)
135+
} else {
136+
mux.HandleFunc("/debug/pprof/", pprof.Index)
137+
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
138+
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
139+
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
140+
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
141+
142+
for _, profile := range []string{"heap", "goroutine", "threadcreate", "block", "mutex", "allocs"} {
143+
mux.Handle("/debug/pprof/"+profile, pprof.Handler(profile))
144+
}
145+
log.Infof("Serving pprof on http://%s (dedicated thread)", addr)
146+
}
147+
148+
srv := &http.Server{
149+
Addr: addr,
150+
Handler: mux,
151+
ReadHeaderTimeout: 5 * time.Second,
152+
WriteTimeout: 30 * time.Second,
153+
}
154+
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
155+
log.Errorf("pprof http server failed: %v", err)
156+
}
157+
}()
158+
}

pkg/dmsg/server.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,6 @@ func (s *Server) Serve(lis net.Listener, addr string) error {
150150
return err
151151
}
152152

153-
// TODO(evanlinjin): Implement proper load-balancing.
154153
if s.SessionCount() >= s.maxSessions {
155154
s.log.
156155
WithField("max_sessions", s.maxSessions).

pkg/dmsg/server_session.go

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"io"
77
"net"
8+
"time"
89

910
"github.com/hashicorp/yamux"
1011
"github.com/sirupsen/logrus"
@@ -15,6 +16,16 @@ import (
1516
"github.com/skycoin/dmsg/pkg/noise"
1617
)
1718

19+
const (
20+
// maxConcurrentStreams limits how many streams can be served concurrently
21+
// per session to prevent a single session from exhausting server resources.
22+
maxConcurrentStreams = 2048
23+
24+
// streamErrorBackoff is the delay after a non-fatal stream accept error
25+
// to prevent CPU spin on persistent errors.
26+
streamErrorBackoff = 50 * time.Millisecond
27+
)
28+
1829
// ServerSession represents a session from the perspective of a dmsg server.
1930
type ServerSession struct {
2031
*SessionCommon
@@ -45,6 +56,10 @@ func (ss *ServerSession) Close() error {
4556
func (ss *ServerSession) Serve() {
4657
ss.m.RecordSession(metrics.DeltaConnect) // record successful connection
4758
defer ss.m.RecordSession(metrics.DeltaDisconnect) // record disconnection
59+
60+
// Semaphore to limit concurrent streams per session.
61+
sem := make(chan struct{}, maxConcurrentStreams)
62+
4863
if ss.sm.smux != nil {
4964
for {
5065
sStr, err := ss.sm.smux.AcceptStream()
@@ -54,13 +69,24 @@ func (ss *ServerSession) Serve() {
5469
return
5570
}
5671
ss.log.WithError(err).Warn("Failed to accept smux stream, continuing...")
72+
time.Sleep(streamErrorBackoff)
5773
continue
5874
}
5975

6076
log := ss.log.WithField("smux_id", sStr.ID())
61-
log.Info("Initiating stream.")
6277

78+
// Acquire semaphore slot; if full, reject the stream.
79+
select {
80+
case sem <- struct{}{}:
81+
default:
82+
log.Warn("Max concurrent streams reached, rejecting stream.")
83+
sStr.Close() //nolint:errcheck,gosec
84+
continue
85+
}
86+
87+
log.Info("Initiating stream.")
6388
go func(sStr *smux.Stream) {
89+
defer func() { <-sem }()
6490
defer func() {
6591
if r := recover(); r != nil {
6692
log.WithField("panic", r).Error("Recovered from panic in serveStream")
@@ -79,13 +105,24 @@ func (ss *ServerSession) Serve() {
79105
return
80106
}
81107
ss.log.WithError(err).Warn("Failed to accept yamux stream, continuing...")
108+
time.Sleep(streamErrorBackoff)
82109
continue
83110
}
84111

85112
log := ss.log.WithField("yamux_id", yStr.StreamID())
86-
log.Info("Initiating stream.")
87113

114+
// Acquire semaphore slot; if full, reject the stream.
115+
select {
116+
case sem <- struct{}{}:
117+
default:
118+
log.Warn("Max concurrent streams reached, rejecting stream.")
119+
yStr.Close() //nolint:errcheck,gosec
120+
continue
121+
}
122+
123+
log.Info("Initiating stream.")
88124
go func(yStr *yamux.Stream) {
125+
defer func() { <-sem }()
89126
defer func() {
90127
if r := recover(); r != nil {
91128
log.WithField("panic", r).Error("Recovered from panic in serveStream")
@@ -101,6 +138,14 @@ func (ss *ServerSession) Serve() {
101138
// struct
102139

103140
func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr io.ReadWriteCloser, addr net.Addr) error {
141+
// Set a deadline for the initial stream request read so a slow or
142+
// malicious client cannot hold a goroutine and semaphore slot indefinitely.
143+
if conn, ok := yStr.(net.Conn); ok {
144+
if err := conn.SetReadDeadline(time.Now().Add(HandshakeTimeout)); err != nil {
145+
return fmt.Errorf("set read deadline: %w", err)
146+
}
147+
}
148+
104149
readRequest := func() (StreamRequest, error) {
105150
obj, err := ss.readObject(yStr)
106151
if err != nil {
@@ -183,6 +228,11 @@ func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr io.ReadWriteCl
183228
}
184229
log.Debug("Forwarded stream response.")
185230

231+
// Clear the read deadline before the long-lived bidirectional copy.
232+
if conn, ok := yStr.(net.Conn); ok {
233+
conn.SetReadDeadline(time.Time{}) //nolint:errcheck,gosec
234+
}
235+
186236
// Serve stream.
187237
log.Info("Serving stream.")
188238
ss.m.RecordStream(metrics.DeltaConnect) // record successful stream

0 commit comments

Comments
 (0)