The receivers package provides a modular system for collecting metrics, events, and logs from various sources. It defines a common Receiver interface and a ReceiveManager to orchestrate multiple receiver instances.
Receivers are the entry points for data into the ClusterCockpit monitoring stack. They collect or receive data from external sources, convert it into the internal CCMessage format, and send it to a unified sink channel.
- Receiver: A component that gathers metrics from a specific source (e.g., HTTP POSTs, NATS subscriptions, or direct hardware queries like IPMI).
- ReceiveManager: Orchestrates the lifecycle of all configured receivers. It initializes, starts, and stops them, and ensures they all point to the correct output channel.
- Message Processor: An optional component within each receiver that can filter, rename, or transform messages before they are sent to the sink.
- Ingress: Data arrives at the receiver (e.g., an HTTP request or a scheduled sensor read).
- Decoding: The raw data is decoded into one or more
CCMessageobjects. - Processing (Optional): If
process_messagesis configured, the messages pass through a processing pipeline. - Egress: The processed messages are sent to the receiver's sink channel.
- Aggregation: All receivers send their messages to the same output channel managed by the
ReceiveManager, which typically leads to theSinkManager.
graph LR
S1[Source 1] --> R1[Receiver 1]
S2[Source 2] --> R2[Receiver 2]
R1 --> MP1[Message Processor 1]
R2 --> MP2[Message Processor 2]
MP1 --> OC[Output Channel]
MP2 --> OC
OC --> SM[Sink Manager]
The ReceiveManager is configured with a JSON object where each key is a unique name for a receiver instance, and the value is its configuration.
Every receiver supports these common fields:
type: (Required) The type of receiver to initialize (e.g., "http", "nats").process_messages: (Optional) A list of message processing rules. See the messageProcessor documentation for details.
Example:
{
"my_http_receiver" : {
"type": "http",
"port": "8080",
"process_messages": [
{ "add_meta": { "source": "frontend-http" } }
]
},
"my_nats_receiver": {
"type": "nats",
"address": "nats.example.com",
"subject": "metrics"
}
}All receivers must implement the Receiver interface defined in metricReceiver.go:
type Receiver interface {
Start() // Start begins the metric collection process
Close() // Close stops the receiver and releases resources
Name() string // Name returns the receiver's identifier
SetSink(sink chan lp.CCMessage) // SetSink configures the output channel for collected metrics
}Start(): This method is called to begin operation. For server-like receivers (e.g., HTTP), it starts the listener. For polling receivers (e.g., IPMI), it starts the collection loop. It should typically be non-blocking or start its own goroutines.Close(): Gracefully shuts down the receiver, closing network connections, stopping goroutines, and releasing resources.Name(): Returns a human-readable name for the receiver instance, usually including its type and the name from the configuration.SetSink(): Receives the channel where all collectedCCMessageobjects should be sent.
| Type | Description | Platform |
|---|---|---|
http |
Receives InfluxDB line protocol via HTTP POST requests. | All |
nats |
Subscribes to NATS subjects to receive metrics. | All |
prometheus |
Scrapes metrics from Prometheus-compatible endpoints. | All |
eecpt |
Specialized HTTP receiver for EECPT instrumentation. | All |
ipmi |
Polls hardware metrics via IPMI (requires freeipmi). |
Linux |
redfish |
Polls hardware metrics via the Redfish API. | Linux |
The influxDecoder.go provides a helper function DecodeInfluxMessage to simplify parsing InfluxDB line protocol data into CCMessage objects. This is used by most receivers that accept line protocol data.
import "github.com/ClusterCockpit/cc-lib/v2/receivers"
// ...
msg, err := receivers.DecodeInfluxMessage(decoder)To add a new receiver type:
- Define Configuration: Create a struct for your receiver's configuration, embedding
defaultReceiverConfig. - Implement the Interface: Create a struct that implements
Receiver. Use thereceiverbase struct frommetricReceiver.goto gain default implementations ofName()andSetSink(). - Implement Factory Function: Create a
New<Type>Receiver(name string, config json.RawMessage)function. - Register Receiver: Add your factory function to
AvailableReceiversinavailableReceivers.go(oravailableReceiversLinux.gofor Linux-only receivers). - Documentation: Create a
<type>Receiver.mdfile and update this README.
Refer to sampleReceiver.go for a complete, documented template.
- Goroutine Management: Always ensure goroutines started in
Start()are cleaned up inClose(). Usesync.WaitGroupand stop channels. - Error Handling: Use the
ccLoggerpackage to log errors and debug information. - Message Processing: Always initialize and use a
messageProcessorin your receiver to support theprocess_messagesconfig option. - Testing: Provide a
_test.gofile. Use theccMessagepackage to verify the messages produced by your receiver.
- Receiver won't start: Check the logs for "ReceiveManager: SKIP" messages. This usually indicates a configuration error (e.g., missing
typefield or unknown receiver type). - No data received:
- Verify the network connectivity and address/port configuration.
- Check if basic authentication is required and configured correctly.
- If using
process_messages, verify that rules are not accidentally dropping all messages.
- IPMI/Redfish issues: Ensure the required external tools (like
freeipmi) are installed and that the user running the collector has the necessary permissions.
Each receiver should have unit tests that:
- Initialize the receiver with a valid configuration.
- Simulate ingress data (e.g., sending a mock HTTP request or providing mock command output).
- Verify that the correct
CCMessageobjects are sent to the sink channel. - Verify that
Close()successfully stops all background activities.