Skip to content

Commit c6c48b7

Browse files
committed
fix(replication): prevent WAL exhaustion from slow consumers
The replication feed thread could block indefinitely when sending data to a slow replica. If the replica wasn't consuming data fast enough, the TCP send buffer would fill and the feed thread would block on write() with no timeout. During this time, WAL files would rotate and be pruned, leaving the replica's sequence unavailable when the thread eventually unblocked or the connection dropped. This commit adds three mechanisms to address the issue: 1. Socket send timeout: New SockSendWithTimeout() function that uses poll() to wait for socket writability with a configurable timeout (default 30 seconds). This prevents indefinite blocking. 2. Replication lag detection: At the start of each loop iteration, check if the replica has fallen too far behind (configurable via max-replication-lag, default 100M sequences). If exceeded, disconnect the slow consumer before WAL is exhausted, allowing psync on reconnect. 3. Exponential backoff on reconnection: When a replica is disconnected, it now waits with exponential backoff (1s, 2s, 4s... up to 60s) before reconnecting. This prevents rapid reconnection loops for persistently slow replicas. The backoff resets on successful psync or fullsync. New configuration options: - max-replication-lag: Maximum sequence lag before disconnecting (default: 100M) - replication-send-timeout-ms: Socket send timeout in ms (default: 30000) Fixes #3356
1 parent 4404eb7 commit c6c48b7

10 files changed

Lines changed: 778 additions & 6 deletions

File tree

src/cluster/replication.cc

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ FeedSlaveThread::FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb::
6363
next_repl_seq_(next_repl_seq),
6464
req_(srv),
6565
max_delay_bytes_(srv->GetConfig()->max_replication_delay_bytes),
66-
max_delay_updates_(srv->GetConfig()->max_replication_delay_updates) {}
66+
max_delay_updates_(srv->GetConfig()->max_replication_delay_updates),
67+
max_replication_lag_(srv->GetConfig()->max_replication_lag),
68+
send_timeout_ms_(srv->GetConfig()->replication_send_timeout_ms) {}
6769

6870
Status FeedSlaveThread::Start() {
6971
auto s = util::CreateThread("feed-replica", [this] {
@@ -184,6 +186,18 @@ void FeedSlaveThread::loop() {
184186
while (!IsStopped()) {
185187
auto curr_seq = next_repl_seq_.load();
186188

189+
// Check replication lag - disconnect slow consumers before WAL is exhausted
190+
auto latest_seq = srv_->storage->LatestSeqNumber();
191+
if (latest_seq > curr_seq) {
192+
auto lag = static_cast<int64_t>(latest_seq - curr_seq);
193+
if (lag > max_replication_lag_) {
194+
ERROR("Replication lag {} exceeds max allowed {} for slave {}:{}, disconnecting to prevent WAL exhaustion",
195+
lag, max_replication_lag_, conn_->GetAnnounceIP(), conn_->GetListeningPort());
196+
Stop();
197+
return;
198+
}
199+
}
200+
187201
if (!iter_ || !iter_->Valid()) {
188202
if (iter_) INFO("WAL was rotated, would reopen again");
189203
if (!srv_->storage->WALHasNewData(curr_seq) || !srv_->storage->GetWALIter(curr_seq, &iter_).IsOK()) {
@@ -221,10 +235,12 @@ void FeedSlaveThread::loop() {
221235
batches_bulk += redis::BulkString("_getack");
222236
}
223237

224-
// Send entire bulk which contain multiple batches
225-
auto s = util::SockSend(conn_->GetFD(), batches_bulk, conn_->GetBufferEvent());
238+
// Send entire bulk which contain multiple batches with timeout
239+
// This prevents blocking indefinitely on slow consumers
240+
auto s = util::SockSendWithTimeout(conn_->GetFD(), batches_bulk, conn_->GetBufferEvent(), send_timeout_ms_);
226241
if (!s.IsOK()) {
227-
ERROR("Write error while sending batch to slave: {}. batches: 0x{}", s.Msg(), util::StringToHex(batches_bulk));
242+
ERROR("Write error while sending batch to slave {}:{}: {}. batch_size={}",
243+
conn_->GetAnnounceIP(), conn_->GetListeningPort(), s.Msg(), batches_bulk.size());
228244
Stop();
229245
return;
230246
}
@@ -260,9 +276,15 @@ void ReplicationThread::CallbacksStateMachine::ConnEventCB(bufferevent *bev, int
260276
}
261277
if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) {
262278
ERROR("[replication] connection error/eof, reconnect the master");
263-
// Wait a bit and reconnect
279+
// Wait with exponential backoff before reconnecting
280+
constexpr int kMaxBackoffSeconds = 60;
281+
constexpr int kMaxShiftBits = 6; // Cap shift to avoid UB; 2^6 = 64 then clamped to 60
264282
repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed);
265-
std::this_thread::sleep_for(std::chrono::seconds(1));
283+
int attempts = repl_->reconnect_attempts_.fetch_add(1, std::memory_order_relaxed);
284+
int backoff_secs = std::min(1 << std::min(attempts, kMaxShiftBits), kMaxBackoffSeconds);
285+
WARN("[replication] waiting {} seconds before reconnecting (attempt {})",
286+
backoff_secs, attempts + 1);
287+
std::this_thread::sleep_for(std::chrono::seconds(backoff_secs));
266288
Stop();
267289
Start();
268290
}
@@ -634,6 +656,7 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev) {
634656
} else {
635657
// PSYNC is OK, use IncrementBatchLoop
636658
INFO("[replication] PSync is ok, start increment batch loop");
659+
reconnect_attempts_.store(0, std::memory_order_relaxed); // Reset backoff counter on successful connection
637660
return CBState::NEXT;
638661
}
639662
}
@@ -879,6 +902,7 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) {
879902
return CBState::RESTART;
880903
}
881904
INFO("[replication] Succeeded restoring the backup, fullsync was finish");
905+
reconnect_attempts_.store(0, std::memory_order_relaxed); // Reset backoff counter on successful fullsync
882906
post_fullsync_cb_();
883907

884908
// It needs to reload namespaces from DB after the full sync is done,

src/cluster/replication.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ class FeedSlaveThread {
9191
// Configurable delay limits
9292
size_t max_delay_bytes_;
9393
size_t max_delay_updates_;
94+
int64_t max_replication_lag_;
95+
int send_timeout_ms_;
9496

9597
void loop();
9698
void checkLivenessIfNeed();
@@ -166,6 +168,7 @@ class ReplicationThread : private EventCallbackBase<ReplicationThread> {
166168
const bool replication_group_sync_ = false;
167169
std::atomic<int64_t> last_io_time_secs_ = 0;
168170
int64_t last_ack_time_secs_ = 0;
171+
std::atomic<int> reconnect_attempts_ = 0; // For exponential backoff on reconnection
169172
bool next_try_old_psync_ = false;
170173
bool next_try_without_announce_ip_address_ = false;
171174

src/common/io_util.cc

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@
2929
#include <poll.h>
3030
#include <sys/types.h>
3131

32+
#include <chrono>
33+
3234
#include "fmt/ostream.h"
35+
#include "scope_exit.h"
3336
#include "server/tls_util.h"
3437

3538
#ifdef __linux__
@@ -468,6 +471,93 @@ Status SockSend(int fd, const std::string &data, [[maybe_unused]] bufferevent *b
468471
#endif
469472
}
470473

474+
Status SockSendWithTimeout(int fd, const std::string &data, int timeout_ms) {
475+
ssize_t n = 0;
476+
auto start = std::chrono::steady_clock::now();
477+
478+
while (n < static_cast<ssize_t>(data.size())) {
479+
// Check if we've exceeded the timeout
480+
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
481+
std::chrono::steady_clock::now() - start).count();
482+
if (elapsed >= timeout_ms) {
483+
return {Status::NotOK, fmt::format("send timeout after {} ms, sent {} of {} bytes",
484+
elapsed, n, data.size())};
485+
}
486+
487+
// Calculate remaining timeout
488+
int remaining_ms = timeout_ms - static_cast<int>(elapsed);
489+
490+
// Wait for socket to be writable with timeout
491+
int ready = AeWait(fd, AE_WRITABLE, remaining_ms);
492+
if (ready == 0) {
493+
return {Status::NotOK, fmt::format("send timeout waiting for socket, sent {} of {} bytes",
494+
n, data.size())};
495+
}
496+
if (ready < 0) {
497+
return Status::FromErrno("poll error while sending");
498+
}
499+
500+
ssize_t nwritten = write(fd, data.data() + n, data.size() - n);
501+
if (nwritten == -1) {
502+
if (errno == EAGAIN || errno == EWOULDBLOCK) {
503+
// Socket buffer is full, continue waiting
504+
continue;
505+
}
506+
return Status::FromErrno();
507+
}
508+
n += nwritten;
509+
}
510+
return Status::OK();
511+
}
512+
513+
Status SockSendWithTimeout(int fd, const std::string &data, [[maybe_unused]] bufferevent *bev, int timeout_ms) {
514+
#ifdef ENABLE_OPENSSL
515+
auto ssl = bufferevent_openssl_get_ssl(bev);
516+
if (ssl) {
517+
// Save original flags and set socket to non-blocking for timeout support
518+
int orig_flags = fcntl(fd, F_GETFL);
519+
if (orig_flags == -1) return Status::FromErrno("fcntl(F_GETFL)");
520+
521+
auto s = SockSetBlocking(fd, 0);
522+
if (!s.IsOK()) return s;
523+
524+
// Restore original flags on scope exit
525+
auto restore_flags = MakeScopeExit([fd, orig_flags] { fcntl(fd, F_SETFL, orig_flags); });
526+
527+
ssize_t n = 0;
528+
auto start = std::chrono::steady_clock::now();
529+
530+
while (n < static_cast<ssize_t>(data.size())) {
531+
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
532+
std::chrono::steady_clock::now() - start).count();
533+
if (elapsed >= timeout_ms) {
534+
return {Status::NotOK, fmt::format("SSL send timeout after {} ms, sent {} of {} bytes",
535+
elapsed, n, data.size())};
536+
}
537+
538+
int remaining_ms = timeout_ms - static_cast<int>(elapsed);
539+
int ready = AeWait(fd, AE_WRITABLE, remaining_ms);
540+
if (ready <= 0) {
541+
return {Status::NotOK, fmt::format("SSL send timeout waiting for socket, sent {} of {} bytes",
542+
n, data.size())};
543+
}
544+
545+
int nwritten = SSL_write(ssl, data.data() + n, static_cast<int>(data.size() - n));
546+
if (nwritten <= 0) {
547+
int err = SSL_get_error(ssl, nwritten);
548+
if (err == SSL_ERROR_WANT_WRITE || err == SSL_ERROR_WANT_READ) {
549+
continue;
550+
}
551+
return {Status::NotOK, fmt::format("SSL_write error: {}", err)};
552+
}
553+
n += nwritten;
554+
}
555+
return Status::OK();
556+
}
557+
#endif
558+
return SockSendWithTimeout(fd, data, timeout_ms);
559+
}
560+
471561
StatusOr<int> SockConnect(const std::string &host, uint32_t port, [[maybe_unused]] ssl_st *ssl, int conn_timeout,
472562
int timeout) {
473563
#ifdef ENABLE_OPENSSL

src/common/io_util.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ Status Pwrite(int fd, const std::string &data, off_t offset);
5454
Status SockSend(int fd, const std::string &data, ssl_st *ssl);
5555
Status SockSend(int fd, const std::string &data, bufferevent *bev);
5656

57+
// Send with timeout - returns error if send would block for longer than timeout_ms
58+
Status SockSendWithTimeout(int fd, const std::string &data, int timeout_ms);
59+
Status SockSendWithTimeout(int fd, const std::string &data, bufferevent *bev, int timeout_ms);
60+
5761
Status SockSendFile(int out_fd, int in_fd, size_t size, ssl_st *ssl);
5862
Status SockSendFile(int out_fd, int in_fd, size_t size, bufferevent *bev);
5963

src/config/config.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ Config::Config() {
206206
{"replication-no-slowdown", false, new YesNoField(&replication_no_slowdown, true)},
207207
{"replication-delay-bytes", false, new IntField(&max_replication_delay_bytes, 16 * 1024, 1, INT_MAX)},
208208
{"replication-delay-updates", false, new IntField(&max_replication_delay_updates, 16, 1, INT_MAX)},
209+
{"max-replication-lag", false, new Int64Field(&max_replication_lag, 100000000, 1, INT64_MAX)},
210+
{"replication-send-timeout-ms", false, new IntField(&replication_send_timeout_ms, 30000, 1000, 300000)},
209211
{"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)},
210212
{"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)},
211213
{"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)},

src/config/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ struct Config {
126126
int replication_recv_timeout_ms = 3200;
127127
int max_replication_delay_bytes = 16 * 1024; // 16KB default
128128
int max_replication_delay_updates = 16; // 16 updates default
129+
int64_t max_replication_lag = 100000000; // 100M sequences before disconnecting slow consumer
130+
int replication_send_timeout_ms = 30000; // 30 second timeout for socket sends to replicas
129131
int max_db_size = 0;
130132
int max_replication_mb = 0;
131133
int max_io_mb = 0;

tests/cppunit/config_test.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ TEST(Config, GetAndSet) {
8686
{"rocksdb.max_background_jobs", "4"},
8787
{"rocksdb.compression_start_level", "2"},
8888
{"rocksdb.sst_file_delete_rate_bytes_per_sec", "0"},
89+
{"max-replication-lag", "50000000"},
90+
{"replication-send-timeout-ms", "60000"},
8991
};
9092
std::vector<std::string> values;
9193
for (const auto &iter : mutable_cases) {

tests/gocase/integration/replication/replication_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -711,3 +711,88 @@ func TestReplicationWatermark(t *testing.T) {
711711
// The small command should be processed much faster than 1 second
712712
require.Less(t, duration, 1*time.Second, "small command should be processed promptly")
713713
}
714+
715+
func TestReplicationSlowConsumerConfig(t *testing.T) {
716+
t.Parallel()
717+
ctx := context.Background()
718+
719+
// This test verifies the slow consumer protection config options are working:
720+
// - max-replication-lag: threshold before disconnecting slow consumers
721+
// - replication-send-timeout-ms: timeout for socket sends to replicas
722+
master := util.StartServer(t, map[string]string{
723+
"max-replication-lag": "100000000",
724+
"replication-send-timeout-ms": "30000",
725+
})
726+
defer master.Close()
727+
masterClient := master.NewClient()
728+
defer func() { require.NoError(t, masterClient.Close()) }()
729+
730+
slave := util.StartServer(t, map[string]string{})
731+
defer slave.Close()
732+
slaveClient := slave.NewClient()
733+
defer func() { require.NoError(t, slaveClient.Close()) }()
734+
735+
t.Run("Slow consumer config options are readable and settable", func(t *testing.T) {
736+
// Verify initial config values
737+
maxLag := masterClient.ConfigGet(ctx, "max-replication-lag").Val()
738+
require.Equal(t, "100000000", maxLag["max-replication-lag"])
739+
740+
sendTimeout := masterClient.ConfigGet(ctx, "replication-send-timeout-ms").Val()
741+
require.Equal(t, "30000", sendTimeout["replication-send-timeout-ms"])
742+
743+
// Test CONFIG SET for max-replication-lag
744+
require.NoError(t, masterClient.ConfigSet(ctx, "max-replication-lag", "50000000").Err())
745+
maxLag = masterClient.ConfigGet(ctx, "max-replication-lag").Val()
746+
require.Equal(t, "50000000", maxLag["max-replication-lag"])
747+
748+
// Test CONFIG SET for replication-send-timeout-ms
749+
require.NoError(t, masterClient.ConfigSet(ctx, "replication-send-timeout-ms", "15000").Err())
750+
sendTimeout = masterClient.ConfigGet(ctx, "replication-send-timeout-ms").Val()
751+
require.Equal(t, "15000", sendTimeout["replication-send-timeout-ms"])
752+
753+
// Verify replication still works normally with these config options
754+
util.SlaveOf(t, slaveClient, master)
755+
util.WaitForSync(t, slaveClient)
756+
require.Equal(t, "slave", util.FindInfoEntry(slaveClient, "role"))
757+
758+
require.NoError(t, masterClient.Set(ctx, "test_key", "test_value", 0).Err())
759+
util.WaitForOffsetSync(t, masterClient, slaveClient, 5*time.Second)
760+
require.Equal(t, "test_value", slaveClient.Get(ctx, "test_key").Val())
761+
})
762+
}
763+
764+
func TestReplicationExponentialBackoff(t *testing.T) {
765+
t.Parallel()
766+
ctx := context.Background()
767+
768+
master := util.StartServer(t, map[string]string{})
769+
defer master.Close()
770+
masterClient := master.NewClient()
771+
defer func() { require.NoError(t, masterClient.Close()) }()
772+
773+
slave := util.StartServer(t, map[string]string{})
774+
defer slave.Close()
775+
slaveClient := slave.NewClient()
776+
defer func() { require.NoError(t, slaveClient.Close()) }()
777+
778+
t.Run("Slave uses exponential backoff on reconnection", func(t *testing.T) {
779+
// Connect slave to master
780+
util.SlaveOf(t, slaveClient, master)
781+
util.WaitForSync(t, slaveClient)
782+
783+
// Kill the slave connection from master side to trigger reconnection
784+
_, err := masterClient.ClientKillByFilter(ctx, "type", "slave").Result()
785+
require.NoError(t, err)
786+
787+
// The slave should log backoff messages when reconnecting
788+
// First reconnection attempt should wait 1 second
789+
require.Eventually(t, func() bool {
790+
return slave.LogFileMatches(t, ".*waiting 1 seconds before reconnecting.*")
791+
}, 10*time.Second, 200*time.Millisecond, "slave should log backoff on first reconnection")
792+
793+
// Slave should eventually reconnect
794+
require.Eventually(t, func() bool {
795+
return util.FindInfoEntry(slaveClient, "master_link_status") == "up"
796+
}, 15*time.Second, 500*time.Millisecond, "slave should reconnect with backoff")
797+
})
798+
}

0 commit comments

Comments
 (0)