@@ -19,17 +19,16 @@ package accountidprocessor
1919
2020import (
2121 "context"
22- "fmt"
2322
2423 "go.opentelemetry.io/collector/confmap"
24+ semconv "go.opentelemetry.io/otel/semconv/v1.27.0"
2525)
2626
2727const (
28- serviceKey = "service"
29- pipelinesKey = "pipelines"
30- processorsKey = "processors"
31- resourceProc = "resource/aws-account-id"
32- accountIDAttrKey = "cloud.account.id"
28+ serviceKey = "service"
29+ pipelinesKey = "pipelines"
30+ processorsKey = "processors"
31+ resourceProc = "resource/aws-account-id"
3332)
3433
3534type converter struct {
@@ -41,63 +40,49 @@ func New(accountID string) confmap.Converter {
4140 return & converter {accountID : accountID }
4241}
4342
44- func (c converter ) Convert (_ context.Context , conf * confmap.Conf ) error {
43+ func (c * converter ) Convert (_ context.Context , conf * confmap.Conf ) error {
4544 if c .accountID == "" {
46- return nil // Skip if no account ID
47- }
48-
49- // Navigate to service.pipelines
50- serviceVal := conf .Get (serviceKey )
51- service , ok := serviceVal .(map [string ]interface {})
52- if ! ok {
5345 return nil
5446 }
5547
56- pipelinesVal , ok := service [ pipelinesKey ]
48+ service , ok := conf . Get ( serviceKey ).( map [ string ] any )
5749 if ! ok {
5850 return nil
5951 }
6052
61- pipelines , ok := pipelinesVal .(map [string ]interface {} )
53+ pipelines , ok := service [ pipelinesKey ] .(map [string ]any )
6254 if ! ok {
6355 return nil
6456 }
6557
66- updates := make (map [string ]interface {} )
58+ updates := make (map [string ]any )
6759
68- // For each pipeline, add resource processor to beginning
69- for telemetryType , pipelineVal := range pipelines {
70- pipeline , ok := pipelineVal .(map [string ]interface {})
60+ for pipelineName , pipelineVal := range pipelines {
61+ pipeline , ok := pipelineVal .(map [string ]any )
7162 if ! ok {
7263 continue
7364 }
7465
7566 processorsVal , _ := pipeline [processorsKey ]
76- processors , ok := processorsVal .([]interface {})
77- if ! ok {
78- processors = []interface {}{}
67+ processors , _ := processorsVal .([]any )
68+
69+ // Idempotency: skip if already prepended
70+ if len (processors ) > 0 && processors [0 ] == resourceProc {
71+ continue
7972 }
8073
81- // Prepend resource/aws-account-id processor
82- processors = append ([]interface {}{resourceProc }, processors ... )
83- updates [fmt .Sprintf ("%s::%s::%s::%s" , serviceKey , pipelinesKey , telemetryType , processorsKey )] = processors
74+ processors = append ([]any {resourceProc }, processors ... )
75+ updates [serviceKey + "::" + pipelinesKey + "::" + pipelineName + "::" + processorsKey ] = processors
8476 }
8577
86- // Configure the resource processor with cloud.account.id attribute
87- updates [fmt . Sprintf ( "processors::%s ::attributes" , resourceProc ) ] = []map [string ]interface {} {
78+ // Add the resource processor definition
79+ updates ["processors::" + resourceProc + " ::attributes" ] = []map [string ]any {
8880 {
89- "key" : accountIDAttrKey ,
81+ "key" : string ( semconv . CloudAccountIDKey ) ,
9082 "value" : c .accountID ,
9183 "action" : "insert" ,
9284 },
9385 }
9486
95- // Apply all updates
96- if len (updates ) > 0 {
97- if err := conf .Merge (confmap .NewFromStringMap (updates )); err != nil {
98- return err
99- }
100- }
101-
102- return nil
87+ return conf .Merge (confmap .NewFromStringMap (updates ))
10388}
0 commit comments