@@ -74,18 +74,14 @@ func PourItemToBucket(
7474
7575 sigclosed := 0
7676 failed_sent := 0
77- attempts := 0
7877 start := time .Now ().UTC ()
7978
80- for {
81- attempts += 1
82- /* Warn the user if we used more than a 100 ms to pour an event, it's at least an half lock*/
83- if attempts % 100000 == 0 && start .Add (100 * time .Millisecond ).Before (time .Now ().UTC ()) {
84- holder .logger .Warningf ("stuck for %s sending event to %s (sigclosed:%d failed_sent:%d attempts:%d)" , time .Since (start ),
85- buckey , sigclosed , failed_sent , attempts )
86- }
79+ // Warn if we're stuck for too long trying to pour
80+ warnTicker := time .NewTicker (100 * time .Millisecond )
81+ defer warnTicker .Stop ()
8782
88- /* check if leak routine is up */
83+ for {
84+ // If bucket is dead, recreate and retry.
8985 select {
9086 case <- bucket .done :
9187 // the bucket was found and dead, get a new one and continue
@@ -98,6 +94,8 @@ func PourItemToBucket(
9894 }
9995 continue
10096 // holder.logger.Tracef("Signal exists, try to pour :)")
97+ case <- ctx .Done ():
98+ return ctx .Err ()
10199 default :
102100 // nothing to read, but not closed, try to pour
103101 // holder.logger.Tracef("Signal exists but empty, try to pour :)")
@@ -129,7 +127,9 @@ func PourItemToBucket(
129127 }
130128 }
131129 }
132- // the bucket seems to be up & running
130+
131+ // Block until we can send, or we learn it's dead/canceled, or we warn periodically.
132+
133133 select {
134134 case bucket .In <- parsed :
135135 // holder.logger.Tracef("Successfully sent !")
@@ -139,11 +139,23 @@ func PourItemToBucket(
139139 }
140140 holder .logger .Debugf ("bucket '%s' is poured" , holder .Spec .Name )
141141 return nil
142+ // XXX: bucket died while we were waiting to send.
143+ // case <- bucket.done:
144+
145+ case <- ctx .Done ():
146+ return ctx .Err ()
147+
148+ case <- warnTicker .C :
149+ // We are blocked because bucket.In isn't being read fast enough (or at all).
150+ holder .logger .Warningf (
151+ "stuck for %s sending event to %s (sigclosed:%d failed_sent:%d" ,
152+ time .Since (start ),
153+ buckey ,
154+ sigclosed ,
155+ failed_sent ,
156+ )
157+ failed_sent ++
142158 default :
143- failed_sent += 1
144- // holder.logger.Tracef("Failed to send, try again")
145- continue
146-
147159 }
148160 }
149161}
0 commit comments