99import io .split .engine .segments .SegmentFetcher ;
1010import io .split .engine .segments .SegmentSynchronizationTask ;
1111import io .split .engine .sse .dtos .SplitKillNotification ;
12+ import io .split .storages .RuleBasedSegmentCacheProducer ;
1213import io .split .storages .SegmentCacheProducer ;
1314import io .split .storages .SplitCacheProducer ;
1415import io .split .telemetry .synchronizer .TelemetrySyncTask ;
@@ -34,6 +35,7 @@ public class SynchronizerImp implements Synchronizer {
3435 private final SplitFetcher _splitFetcher ;
3536 private final SegmentSynchronizationTask _segmentSynchronizationTaskImp ;
3637 private final SplitCacheProducer _splitCacheProducer ;
38+ private final RuleBasedSegmentCacheProducer _ruleBasedSegmentCacheProducer ;
3739 private final SegmentCacheProducer segmentCacheProducer ;
3840 private final ImpressionsManager _impressionManager ;
3941 private final EventsTask _eventsTask ;
@@ -48,6 +50,7 @@ public SynchronizerImp(SplitTasks splitTasks,
4850 SplitFetcher splitFetcher ,
4951 SplitCacheProducer splitCacheProducer ,
5052 SegmentCacheProducer segmentCacheProducer ,
53+ RuleBasedSegmentCacheProducer ruleBasedSegmentCacheProducer ,
5154 int onDemandFetchRetryDelayMs ,
5255 int onDemandFetchMaxRetries ,
5356 int failedAttemptsBeforeLogging ,
@@ -56,6 +59,7 @@ public SynchronizerImp(SplitTasks splitTasks,
5659 _splitFetcher = checkNotNull (splitFetcher );
5760 _segmentSynchronizationTaskImp = checkNotNull (splitTasks .getSegmentSynchronizationTask ());
5861 _splitCacheProducer = checkNotNull (splitCacheProducer );
62+ _ruleBasedSegmentCacheProducer = checkNotNull (ruleBasedSegmentCacheProducer );
5963 this .segmentCacheProducer = checkNotNull (segmentCacheProducer );
6064 _onDemandFetchRetryDelayMs = checkNotNull (onDemandFetchRetryDelayMs );
6165 _onDemandFetchMaxRetries = onDemandFetchMaxRetries ;
@@ -103,7 +107,7 @@ private static class SyncResult {
103107 private final FetchResult _fetchResult ;
104108 }
105109
106- private SyncResult attemptSplitsSync (long targetChangeNumber ,
110+ private SyncResult attemptSplitsSync (long targetChangeNumber , long ruleBasedSegmentChangeNumber ,
107111 FetchOptions opts ,
108112 Function <Void , Long > nextWaitMs ,
109113 int maxRetries ) {
@@ -114,7 +118,8 @@ private SyncResult attemptSplitsSync(long targetChangeNumber,
114118 if (fetchResult != null && !fetchResult .retry () && !fetchResult .isSuccess ()) {
115119 return new SyncResult (false , remainingAttempts , fetchResult );
116120 }
117- if (targetChangeNumber <= _splitCacheProducer .getChangeNumber ()) {
121+ if (targetChangeNumber <= _splitCacheProducer .getChangeNumber ()
122+ && ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer .getChangeNumber ()) {
118123 return new SyncResult (true , remainingAttempts , fetchResult );
119124 } else if (remainingAttempts <= 0 ) {
120125 return new SyncResult (false , remainingAttempts , fetchResult );
@@ -130,9 +135,17 @@ private SyncResult attemptSplitsSync(long targetChangeNumber,
130135 }
131136
132137 @ Override
133- public void refreshSplits (Long targetChangeNumber ) {
138+ public void refreshSplits (Long targetChangeNumber , Long ruleBasedSegmentChangeNumber ) {
134139
135- if (targetChangeNumber <= _splitCacheProducer .getChangeNumber ()) {
140+ if (targetChangeNumber == null || targetChangeNumber == 0 ) {
141+ targetChangeNumber = _splitCacheProducer .getChangeNumber ();
142+ }
143+ if (ruleBasedSegmentChangeNumber == null || ruleBasedSegmentChangeNumber == 0 ) {
144+ ruleBasedSegmentChangeNumber = _ruleBasedSegmentCacheProducer .getChangeNumber ();
145+ }
146+
147+ if (targetChangeNumber <= _splitCacheProducer .getChangeNumber ()
148+ && ruleBasedSegmentChangeNumber <= _ruleBasedSegmentCacheProducer .getChangeNumber ()) {
136149 return ;
137150 }
138151
@@ -142,7 +155,7 @@ public void refreshSplits(Long targetChangeNumber) {
142155 .flagSetsFilter (_sets )
143156 .build ();
144157
145- SyncResult regularResult = attemptSplitsSync (targetChangeNumber , opts ,
158+ SyncResult regularResult = attemptSplitsSync (targetChangeNumber , ruleBasedSegmentChangeNumber , opts ,
146159 (discard ) -> (long ) _onDemandFetchRetryDelayMs , _onDemandFetchMaxRetries );
147160
148161 int attempts = _onDemandFetchMaxRetries - regularResult .remainingAttempts ();
@@ -157,7 +170,7 @@ public void refreshSplits(Long targetChangeNumber) {
157170 _log .info (String .format ("No changes fetched after %s attempts. Will retry bypassing CDN." , attempts ));
158171 FetchOptions withCdnBypass = new FetchOptions .Builder (opts ).targetChangeNumber (targetChangeNumber ).build ();
159172 Backoff backoff = new Backoff (ON_DEMAND_FETCH_BACKOFF_BASE_MS , ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS );
160- SyncResult withCDNBypassed = attemptSplitsSync (targetChangeNumber , withCdnBypass ,
173+ SyncResult withCDNBypassed = attemptSplitsSync (targetChangeNumber , ruleBasedSegmentChangeNumber , withCdnBypass ,
161174 (discard ) -> backoff .interval (), ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES );
162175
163176 int withoutCDNAttempts = ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - withCDNBypassed ._remainingAttempts ;
@@ -175,7 +188,7 @@ public void localKillSplit(SplitKillNotification splitKillNotification) {
175188 if (splitKillNotification .getChangeNumber () > _splitCacheProducer .getChangeNumber ()) {
176189 _splitCacheProducer .kill (splitKillNotification .getSplitName (), splitKillNotification .getDefaultTreatment (),
177190 splitKillNotification .getChangeNumber ());
178- refreshSplits (splitKillNotification .getChangeNumber ());
191+ refreshSplits (splitKillNotification .getChangeNumber (), 0L );
179192 }
180193 }
181194
0 commit comments