diff --git a/server/common/module_shared.go b/server/common/module_shared.go index 69d4d1e44c5..03e03499b97 100644 --- a/server/common/module_shared.go +++ b/server/common/module_shared.go @@ -33,6 +33,14 @@ var log = logging.MustGetLogger("server_common") const QUEUE_SIZE = 1 << 16 +type IngesterStatus uint8 + +const ( + IngesterOk IngesterStatus = iota + IngesterAbnormal + IngesterInitializing +) + type ControllerIngesterShared struct { ResourceEventQueue *queue.OverwriteQueue TraceTreeQueue *queue.OverwriteQueue @@ -83,15 +91,16 @@ func ExportersEnabled(configPath string) bool { return false } -type OrgHanderInterface interface { +type OrgHandlerInterface interface { DropOrg(orgId uint16) error UpdateNativeTag(nativetag.NativeTagOP, uint16, *nativetag.NativeTag) error + IsHealthy() bool } -var ingesterOrgHanders []OrgHanderInterface +var ingesterOrgHandlers []OrgHandlerInterface -func SetOrgHandler(orgHandler OrgHanderInterface) { - ingesterOrgHanders = append(ingesterOrgHanders, orgHandler) +func SetOrgHandler(orgHandler OrgHandlerInterface) { + ingesterOrgHandlers = append(ingesterOrgHandlers, orgHandler) } /* @@ -115,11 +124,11 @@ func SetOrgHandler(orgHandler OrgHanderInterface) { */ func DropOrg(orgId uint16) error { log.Info("drop org id:", orgId) - if ingesterOrgHanders == nil { - return fmt.Errorf("ingesterOrgHanders is nil, drop org id %d failed", orgId) + if ingesterOrgHandlers == nil { + return fmt.Errorf("ingesterOrgHandlers is nil, drop org id %d failed", orgId) } - for _, ingesterOrgHander := range ingesterOrgHanders { - err := ingesterOrgHander.DropOrg(orgId) + for _, orgHandler := range ingesterOrgHandlers { + err := orgHandler.DropOrg(orgId) if err != nil { return err } @@ -142,13 +151,13 @@ func PushNativeTags(orgId uint16, nativeTags []nativetag.NativeTag) { // When adding or removing native_tag, you need to call the interface func UpdateNativeTag(op nativetag.NativeTagOP, orgId uint16, nativeTag *nativetag.NativeTag) error { log.Infof("orgId %d %s native tag: %+v", orgId, op, nativeTag) - if ingesterOrgHanders == nil { + if ingesterOrgHandlers == nil { err := fmt.Errorf("ingester is not ready, update native tag failed") log.Error(err) return err } - for _, ingesterOrgHander := range ingesterOrgHanders { - err := ingesterOrgHander.UpdateNativeTag(op, orgId, nativeTag) + for _, orgHandler := range ingesterOrgHandlers { + err := orgHandler.UpdateNativeTag(op, orgId, nativeTag) if err != nil { log.Error(err) return err @@ -157,3 +166,18 @@ func UpdateNativeTag(op nativetag.NativeTagOP, orgId uint16, nativeTag *nativeta nativetag.UpdateNativeTag(op, orgId, nativeTag) return nil } + +func CheckIngesterStatus() IngesterStatus { + if ingesterOrgHandlers == nil { + log.Infof("ingester is initializing") + return IngesterInitializing + } + for _, orgHandler := range ingesterOrgHandlers { + // Treat the ingester as abnormal only after a handler has observed write failures without any successful writes. + if !orgHandler.IsHealthy() { + log.Errorf("ingester is abnormal") + return IngesterAbnormal + } + } + return IngesterOk +} diff --git a/server/ingester/ingester/org_handler.go b/server/ingester/ingester/org_handler.go index 7ead6593852..1811eac7278 100644 --- a/server/ingester/ingester/org_handler.go +++ b/server/ingester/ingester/org_handler.go @@ -67,6 +67,10 @@ func (o *OrgHandler) DropOrg(orgId uint16) error { return o.dropOrgDatabase(orgId) } +func (o *OrgHandler) IsHealthy() bool { + return true +} + // FIXME: After clearing the Org data, if the same Org ID is created again later, data writing will fail. You can restart deepflow-server to solve it. func (o *OrgHandler) dropOrgDatabase(orgId uint16) error { if ckdb.IsDefaultOrgID(orgId) { diff --git a/server/ingester/pkg/ckwriter/ckwriter.go b/server/ingester/pkg/ckwriter/ckwriter.go index ae8cdab8431..c7152cebf1d 100644 --- a/server/ingester/pkg/ckwriter/ckwriter.go +++ b/server/ingester/pkg/ckwriter/ckwriter.go @@ -93,6 +93,24 @@ func (m *CKWriterManager) EndpointsChange(addrs []string) { ckwriterManager.Unlock() } +func (m *CKWriterManager) IsHealthy() bool { + ckwriterManager.Lock() + defer ckwriterManager.Unlock() + hasWriteFailure := false + for _, writer := range m.ckwriters { + for _, queueCtx := range writer.queueContexts { + // Consider the writer healthy once any queue succeeds; before that, only an observed failure makes it abnormal. + if queueCtx.counter.WriteSuccessCount > 0 { + return true + } + if queueCtx.counter.WriteFailedCount > 0 { + hasWriteFailure = true + } + } + } + return !hasWriteFailure +} + type CKWriter struct { addrs []string user string