Skip to content

Commit fcac438

Browse files
committed
fix(replication): prevent WAL exhaustion from slow consumers
The replication feed thread could block indefinitely when sending data to a slow replica. If the replica wasn't consuming data fast enough, the TCP send buffer would fill and the feed thread would block on write() with no timeout. During this time, WAL files would rotate and be pruned, leaving the replica's sequence unavailable when the thread eventually unblocked or the connection dropped. This commit adds three mechanisms to address the issue: 1. Socket send timeout: New SockSendWithTimeout() function that uses poll() to wait for socket writability with a configurable timeout (default 30 seconds). This prevents indefinite blocking. 2. Replication lag detection: At the start of each loop iteration, check if the replica has fallen too far behind (configurable via max-replication-lag). If exceeded, disconnect the slow consumer before WAL is exhausted, allowing psync on reconnect. Disabled by default (0), set to a positive value to enable. 3. Exponential backoff on reconnection: When a replica is disconnected, it now waits with exponential backoff (1s, 2s, 4s... up to 60s) before reconnecting. This prevents rapid reconnection loops for persistently slow replicas. The backoff resets on successful psync or fullsync. New configuration options: - max-replication-lag: Maximum sequence lag before disconnecting (default: 0 = disabled) - replication-send-timeout-ms: Socket send timeout in ms (default: 30000) Fixes #3356
1 parent 4404eb7 commit fcac438

11 files changed

Lines changed: 679 additions & 6 deletions

File tree

kvrocks.conf

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,21 @@ replication-delay-bytes 16384
231231
# Default: 16 updates
232232
replication-delay-updates 16
233233

234+
# Maximum sequence lag allowed before disconnecting a slow replica.
235+
# If a replica falls behind by more than this many sequences, the master will
236+
# disconnect it to prevent WAL exhaustion. The replica can then reconnect and
237+
# attempt partial sync (psync) if the sequence is still available.
238+
# Set to 0 to disable this check (default).
239+
# Default: 0 (disabled)
240+
max-replication-lag 0
241+
242+
# Timeout in milliseconds for socket send operations to replicas.
243+
# If sending data to a replica blocks for longer than this timeout,
244+
# the connection will be dropped. This prevents the replication feed thread
245+
# from blocking indefinitely on slow consumers.
246+
# Default: 30000 (30 seconds)
247+
replication-send-timeout-ms 30000
248+
234249
# TCP listen() backlog.
235250
#
236251
# In high requests-per-second environments you need an high backlog in order

src/cluster/replication.cc

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ FeedSlaveThread::FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::
6363
next_repl_seq_(next_repl_seq),
6464
req_(srv),
6565
max_delay_bytes_(srv->GetConfig()->max_replication_delay_bytes),
66-
max_delay_updates_(srv->GetConfig()->max_replication_delay_updates) {}
66+
max_delay_updates_(srv->GetConfig()->max_replication_delay_updates),
67+
max_replication_lag_(srv->GetConfig()->max_replication_lag),
68+
send_timeout_ms_(srv->GetConfig()->replication_send_timeout_ms) {}
6769

6870
Status FeedSlaveThread::Start() {
6971
auto s = util::CreateThread("feed-replica", [this] {
@@ -184,6 +186,21 @@ void FeedSlaveThread::loop() {
184186
while (!IsStopped()) {
185187
auto curr_seq = next_repl_seq_.load();
186188

189+
// Check replication lag - disconnect slow consumers before WAL is exhausted
190+
// Skip check if max_replication_lag_ is 0 (feature disabled)
191+
if (max_replication_lag_ > 0) {
192+
auto latest_seq = srv_->storage->LatestSeqNumber();
193+
if (latest_seq > curr_seq) {
194+
auto lag = static_cast<int64_t>(latest_seq - curr_seq);
195+
if (lag > max_replication_lag_) {
196+
ERROR("Replication lag {} exceeds max allowed {} for slave {}:{}, disconnecting to prevent WAL exhaustion",
197+
lag, max_replication_lag_, conn_->GetAnnounceIP(), conn_->GetListeningPort());
198+
Stop();
199+
return;
200+
}
201+
}
202+
}
203+
187204
if (!iter_ || !iter_->Valid()) {
188205
if (iter_) INFO("WAL was rotated, would reopen again");
189206
if (!srv_->storage->WALHasNewData(curr_seq) || !srv_->storage->GetWALIter(curr_seq, &iter_).IsOK()) {
@@ -221,10 +238,12 @@ void FeedSlaveThread::loop() {
221238
batches_bulk += redis::BulkString("_getack");
222239
}
223240

224-
// Send entire bulk which contain multiple batches
225-
auto s = util::SockSend(conn_->GetFD(), batches_bulk, conn_->GetBufferEvent());
241+
// Send entire bulk which contain multiple batches with timeout
242+
// This prevents blocking indefinitely on slow consumers
243+
auto s = util::SockSendWithTimeout(conn_->GetFD(), batches_bulk, conn_->GetBufferEvent(), send_timeout_ms_);
226244
if (!s.IsOK()) {
227-
ERROR("Write error while sending batch to slave: {}. batches: 0x{}", s.Msg(), util::StringToHex(batches_bulk));
245+
ERROR("Write error while sending batch to slave {}:{}: {}. batch_size={}", conn_->GetAnnounceIP(),
246+
conn_->GetListeningPort(), s.Msg(), batches_bulk.size());
228247
Stop();
229248
return;
230249
}
@@ -260,9 +279,14 @@ void ReplicationThread::CallbacksStateMachine::ConnEventCB(bufferevent *bev, int
260279
}
261280
if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) {
262281
ERROR("[replication] connection error/eof, reconnect the master");
263-
// Wait a bit and reconnect
282+
// Wait with exponential backoff before reconnecting
283+
constexpr int kMaxBackoffSeconds = 60;
284+
constexpr int kMaxShiftBits = 6; // Cap shift to avoid UB; 2^6 = 64 then clamped to 60
264285
repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed);
265-
std::this_thread::sleep_for(std::chrono::seconds(1));
286+
int attempts = repl_->reconnect_attempts_.fetch_add(1, std::memory_order_relaxed);
287+
int backoff_secs = std::min(1 << std::min(attempts, kMaxShiftBits), kMaxBackoffSeconds);
288+
WARN("[replication] waiting {} seconds before reconnecting (attempt {})", backoff_secs, attempts + 1);
289+
std::this_thread::sleep_for(std::chrono::seconds(backoff_secs));
266290
Stop();
267291
Start();
268292
}
@@ -634,6 +658,7 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
634658
} else {
635659
// PSYNC is OK, use IncrementBatchLoop
636660
INFO("[replication] PSync is ok, start increment batch loop");
661+
reconnect_attempts_.store(0, std::memory_order_relaxed); // Reset backoff counter on successful connection
637662
return CBState::NEXT;
638663
}
639664
}
@@ -879,6 +904,7 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) {
879904
return CBState::RESTART;
880905
}
881906
INFO("[replication] Succeeded restoring the backup, fullsync was finish");
907+
reconnect_attempts_.store(0, std::memory_order_relaxed); // Reset backoff counter on successful fullsync
882908
post_fullsync_cb_();
883909

884910
// It needs to reload namespaces from DB after the full sync is done,

src/cluster/replication.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ class FeedSlaveThread {
9191
// Configurable delay limits
9292
size_t max_delay_bytes_;
9393
size_t max_delay_updates_;
94+
int64_t max_replication_lag_;
95+
int send_timeout_ms_;
9496

9597
void loop();
9698
void checkLivenessIfNeed();
@@ -166,6 +168,7 @@ class ReplicationThread : private EventCallbackBase<ReplicationThread> {
166168
const bool replication_group_sync_ = false;
167169
std::atomic<int64_t> last_io_time_secs_ = 0;
168170
int64_t last_ack_time_secs_ = 0;
171+
std::atomic<int> reconnect_attempts_ = 0; // For exponential backoff on reconnection
169172
bool next_try_old_psync_ = false;
170173
bool next_try_without_announce_ip_address_ = false;
171174

src/common/io_util.cc

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@
2929
#include <poll.h>
3030
#include <sys/types.h>
3131

32+
#include <chrono>
33+
3234
#include "fmt/ostream.h"
35+
#include "scope_exit.h"
3336
#include "server/tls_util.h"
3437

3538
#ifdef __linux__
@@ -468,6 +471,100 @@ Status SockSend(int fd, const std::string &data, [[maybe_unused]] bufferevent *b
468471
#endif
469472
}
470473

474+
Status SockSendWithTimeout(int fd, const std::string &data, int timeout_ms) {
475+
// Fall back to blocking send if timeout is non-positive
476+
if (timeout_ms <= 0) {
477+
return SockSend(fd, data);
478+
}
479+
480+
ssize_t n = 0;
481+
auto start = std::chrono::steady_clock::now();
482+
483+
while (n < static_cast<ssize_t>(data.size())) {
484+
// Check if we've exceeded the timeout
485+
auto elapsed =
486+
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start).count();
487+
if (elapsed >= timeout_ms) {
488+
return {Status::NotOK, fmt::format("send timeout after {} ms, sent {} of {} bytes", elapsed, n, data.size())};
489+
}
490+
491+
// Calculate remaining timeout
492+
int remaining_ms = timeout_ms - static_cast<int>(elapsed);
493+
494+
// Wait for socket to be writable with timeout
495+
int ready = AeWait(fd, AE_WRITABLE, remaining_ms);
496+
if (ready == 0) {
497+
return {Status::NotOK, fmt::format("send timeout waiting for socket, sent {} of {} bytes", n, data.size())};
498+
}
499+
if (ready < 0) {
500+
return Status::FromErrno("poll error while sending");
501+
}
502+
503+
ssize_t nwritten = write(fd, data.data() + n, data.size() - n);
504+
if (nwritten == -1) {
505+
if (errno == EAGAIN || errno == EWOULDBLOCK) {
506+
// Socket buffer is full, continue waiting
507+
continue;
508+
}
509+
return Status::FromErrno();
510+
}
511+
n += nwritten;
512+
}
513+
return Status::OK();
514+
}
515+
516+
Status SockSendWithTimeout(int fd, const std::string &data, [[maybe_unused]] bufferevent *bev, int timeout_ms) {
517+
// Fall back to blocking send if timeout is non-positive
518+
if (timeout_ms <= 0) {
519+
return SockSend(fd, data, bev);
520+
}
521+
522+
#ifdef ENABLE_OPENSSL
523+
auto ssl = bufferevent_openssl_get_ssl(bev);
524+
if (ssl) {
525+
// Save original flags and set socket to non-blocking for timeout support
526+
int orig_flags = fcntl(fd, F_GETFL);
527+
if (orig_flags == -1) return Status::FromErrno("fcntl(F_GETFL)");
528+
529+
auto s = SockSetBlocking(fd, 0);
530+
if (!s.IsOK()) return s;
531+
532+
// Restore original flags on scope exit
533+
auto restore_flags = MakeScopeExit([fd, orig_flags] { fcntl(fd, F_SETFL, orig_flags); });
534+
535+
ssize_t n = 0;
536+
auto start = std::chrono::steady_clock::now();
537+
538+
while (n < static_cast<ssize_t>(data.size())) {
539+
auto elapsed =
540+
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start).count();
541+
if (elapsed >= timeout_ms) {
542+
return {Status::NotOK,
543+
fmt::format("SSL send timeout after {} ms, sent {} of {} bytes", elapsed, n, data.size())};
544+
}
545+
546+
int remaining_ms = timeout_ms - static_cast<int>(elapsed);
547+
int ready = AeWait(fd, AE_WRITABLE, remaining_ms);
548+
if (ready <= 0) {
549+
return {Status::NotOK, fmt::format("SSL send timeout waiting for socket, sent {} of {} bytes", n, data.size())};
550+
}
551+
552+
int nwritten = SSL_write(ssl, data.data() + n, static_cast<int>(data.size() - n));
553+
if (nwritten <= 0) {
554+
int err = SSL_get_error(ssl, nwritten);
555+
if (err == SSL_ERROR_WANT_WRITE || err == SSL_ERROR_WANT_READ) {
556+
continue;
557+
}
558+
return {Status::NotOK, fmt::format("SSL_write error: {}", err)};
559+
}
560+
n += nwritten;
561+
}
562+
return Status::OK();
563+
}
564+
#endif
565+
return SockSendWithTimeout(fd, data, timeout_ms);
566+
}
567+
471568
StatusOr<int> SockConnect(const std::string &host, uint32_t port, [[maybe_unused]] ssl_st *ssl, int conn_timeout,
472569
int timeout) {
473570
#ifdef ENABLE_OPENSSL

src/common/io_util.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ Status Pwrite(int fd, const std::string &data, off_t offset);
5454
Status SockSend(int fd, const std::string &data, ssl_st *ssl);
5555
Status SockSend(int fd, const std::string &data, bufferevent *bev);
5656

57+
// Send with timeout - returns error if send would block for longer than timeout_ms
58+
Status SockSendWithTimeout(int fd, const std::string &data, int timeout_ms);
59+
Status SockSendWithTimeout(int fd, const std::string &data, bufferevent *bev, int timeout_ms);
60+
5761
Status SockSendFile(int out_fd, int in_fd, size_t size, ssl_st *ssl);
5862
Status SockSendFile(int out_fd, int in_fd, size_t size, bufferevent *bev);
5963

src/config/config.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ Config::Config() {
206206
{"replication-no-slowdown", false, new YesNoField(&replication_no_slowdown, true)},
207207
{"replication-delay-bytes", false, new IntField(&max_replication_delay_bytes, 16 * 1024, 1, INT_MAX)},
208208
{"replication-delay-updates", false, new IntField(&max_replication_delay_updates, 16, 1, INT_MAX)},
209+
{"max-replication-lag", false, new Int64Field(&max_replication_lag, 100000000, 1, INT64_MAX)},
210+
{"replication-send-timeout-ms", false, new IntField(&replication_send_timeout_ms, 30000, 1000, 300000)},
209211
{"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
210212
{"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)},
211213
{"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)},

src/config/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ struct Config {
126126
int replication_recv_timeout_ms = 3200;
127127
int max_replication_delay_bytes = 16 * 1024; // 16KB default
128128
int max_replication_delay_updates = 16; // 16 updates default
129+
int64_t max_replication_lag = 0; // 0 = disabled, otherwise max sequences before disconnecting slow consumer
130+
int replication_send_timeout_ms = 30000; // 30 second timeout for socket sends to replicas
129131
int max_db_size = 0;
130132
int max_replication_mb = 0;
131133
int max_io_mb = 0;

tests/cppunit/config_test.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ TEST(Config, GetAndSet) {
8686
{"rocksdb.max_background_jobs", "4"},
8787
{"rocksdb.compression_start_level", "2"},
8888
{"rocksdb.sst_file_delete_rate_bytes_per_sec", "0"},
89+
{"max-replication-lag", "50000000"},
90+
{"replication-send-timeout-ms", "60000"},
8991
};
9092
std::vector<std::string> values;
9193
for (const auto &iter : mutable_cases) {

tests/gocase/integration/replication/replication_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -711,3 +711,88 @@ func TestReplicationWatermark(t *testing.T) {
711711
// The small command should be processed much faster than 1 second
712712
require.Less(t, duration, 1*time.Second, "small command should be processed promptly")
713713
}
714+
715+
func TestReplicationSlowConsumerConfig(t *testing.T) {
716+
t.Parallel()
717+
ctx := context.Background()
718+
719+
// This test verifies the slow consumer protection config options are working:
720+
// - max-replication-lag: threshold before disconnecting slow consumers
721+
// - replication-send-timeout-ms: timeout for socket sends to replicas
722+
master := util.StartServer(t, map[string]string{
723+
"max-replication-lag": "100000000",
724+
"replication-send-timeout-ms": "30000",
725+
})
726+
defer master.Close()
727+
masterClient := master.NewClient()
728+
defer func() { require.NoError(t, masterClient.Close()) }()
729+
730+
slave := util.StartServer(t, map[string]string{})
731+
defer slave.Close()
732+
slaveClient := slave.NewClient()
733+
defer func() { require.NoError(t, slaveClient.Close()) }()
734+
735+
t.Run("Slow consumer config options are readable and settable", func(t *testing.T) {
736+
// Verify initial config values
737+
maxLag := masterClient.ConfigGet(ctx, "max-replication-lag").Val()
738+
require.Equal(t, "100000000", maxLag["max-replication-lag"])
739+
740+
sendTimeout := masterClient.ConfigGet(ctx, "replication-send-timeout-ms").Val()
741+
require.Equal(t, "30000", sendTimeout["replication-send-timeout-ms"])
742+
743+
// Test CONFIG SET for max-replication-lag
744+
require.NoError(t, masterClient.ConfigSet(ctx, "max-replication-lag", "50000000").Err())
745+
maxLag = masterClient.ConfigGet(ctx, "max-replication-lag").Val()
746+
require.Equal(t, "50000000", maxLag["max-replication-lag"])
747+
748+
// Test CONFIG SET for replication-send-timeout-ms
749+
require.NoError(t, masterClient.ConfigSet(ctx, "replication-send-timeout-ms", "15000").Err())
750+
sendTimeout = masterClient.ConfigGet(ctx, "replication-send-timeout-ms").Val()
751+
require.Equal(t, "15000", sendTimeout["replication-send-timeout-ms"])
752+
753+
// Verify replication still works normally with these config options
754+
util.SlaveOf(t, slaveClient, master)
755+
util.WaitForSync(t, slaveClient)
756+
require.Equal(t, "slave", util.FindInfoEntry(slaveClient, "role"))
757+
758+
require.NoError(t, masterClient.Set(ctx, "test_key", "test_value", 0).Err())
759+
util.WaitForOffsetSync(t, masterClient, slaveClient, 5*time.Second)
760+
require.Equal(t, "test_value", slaveClient.Get(ctx, "test_key").Val())
761+
})
762+
}
763+
764+
func TestReplicationExponentialBackoff(t *testing.T) {
765+
t.Parallel()
766+
ctx := context.Background()
767+
768+
master := util.StartServer(t, map[string]string{})
769+
defer master.Close()
770+
masterClient := master.NewClient()
771+
defer func() { require.NoError(t, masterClient.Close()) }()
772+
773+
slave := util.StartServer(t, map[string]string{})
774+
defer slave.Close()
775+
slaveClient := slave.NewClient()
776+
defer func() { require.NoError(t, slaveClient.Close()) }()
777+
778+
t.Run("Slave uses exponential backoff on reconnection", func(t *testing.T) {
779+
// Connect slave to master
780+
util.SlaveOf(t, slaveClient, master)
781+
util.WaitForSync(t, slaveClient)
782+
783+
// Kill the slave connection from master side to trigger reconnection
784+
_, err := masterClient.ClientKillByFilter(ctx, "type", "slave").Result()
785+
require.NoError(t, err)
786+
787+
// The slave should log backoff messages when reconnecting
788+
// First reconnection attempt should wait 1 second
789+
require.Eventually(t, func() bool {
790+
return slave.LogFileMatches(t, ".*waiting 1 seconds before reconnecting.*")
791+
}, 10*time.Second, 200*time.Millisecond, "slave should log backoff on first reconnection")
792+
793+
// Slave should eventually reconnect
794+
require.Eventually(t, func() bool {
795+
return util.FindInfoEntry(slaveClient, "master_link_status") == "up"
796+
}, 15*time.Second, 500*time.Millisecond, "slave should reconnect with backoff")
797+
})
798+
}

0 commit comments

Comments
 (0)