Skip to content

Commit 904b61f

Browse files
committed
fix merge conflicts
1 parent 02f7c14 commit 904b61f

1 file changed

Lines changed: 78 additions & 41 deletions

File tree

collector/receiver/telemetryapireceiver/receiver.go

Lines changed: 78 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
"time"
2929

3030
"github.com/golang-collections/go-datastructures/queue"
31+
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi"
32+
"github.com/open-telemetry/opentelemetry-lambda/collector/lambdalifecycle"
3133
"go.opentelemetry.io/collector/component"
3234
"go.opentelemetry.io/collector/consumer"
3335
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -37,8 +39,6 @@ import (
3739
"go.opentelemetry.io/collector/receiver"
3840
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
3941
"go.uber.org/zap"
40-
41-
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi"
4242
)
4343

4444
const (
@@ -48,7 +48,7 @@ const (
4848
telemetryFailureStatus = "failure"
4949
telemetryErrorStatus = "error"
5050
telemetryTimeoutStatus = "timeout"
51-
platformReportLogFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB"
51+
platformReportLogFmt = "REPORT RequestId: %s"
5252
platformStartLogFmt = "START RequestId: %s Version: %s"
5353
platformRuntimeDoneLogFmt = "END RequestId: %s Version: %s"
5454
platformInitStartLogFmt = "INIT_START Runtime Version: %s Runtime Version ARN: %s"
@@ -80,6 +80,7 @@ type telemetryAPIReceiver struct {
8080
faasName string
8181
faaSMetricBuilders *FaaSMetricBuilders
8282
currentFaasInvocationID string
83+
lambdaInitType lambdalifecycle.InitType
8384
logReport bool
8485
}
8586

@@ -148,8 +149,10 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ
148149
switch el.Type {
149150
// Function initialization started.
150151
case string(telemetryapi.PlatformInitStart):
151-
r.logger.Info(fmt.Sprintf("Init start: %s", r.lastPlatformStartTime), zap.Any("event", el))
152-
r.lastPlatformStartTime = el.Time
152+
if el.Time != "" {
153+
r.lastPlatformStartTime = el.Time
154+
r.logger.Info(fmt.Sprintf("Init start: %s", r.lastPlatformStartTime), zap.Any("event", el))
155+
}
153156

154157
if record, ok := el.Record.(map[string]any); ok {
155158
functionName, _ := record["functionName"].(string)
@@ -159,8 +162,10 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ
159162
}
160163
// Function initialization completed.
161164
case string(telemetryapi.PlatformInitRuntimeDone):
162-
r.logger.Info(fmt.Sprintf("Init end: %s", r.lastPlatformEndTime), zap.Any("event", el))
163-
r.lastPlatformEndTime = el.Time
165+
if r.lastPlatformStartTime != "" && el.Time != "" {
166+
r.lastPlatformEndTime = el.Time
167+
r.logger.Info(fmt.Sprintf("Init end: %s", r.lastPlatformEndTime), zap.Any("event", el))
168+
}
164169

165170
if len(r.lastPlatformStartTime) > 0 && len(r.lastPlatformEndTime) > 0 {
166171
if record, ok := el.Record.(map[string]any); ok {
@@ -175,6 +180,11 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ
175180
}
176181
}
177182
}
183+
case string(telemetryapi.PlatformInitReport):
184+
if r.lastPlatformStartTime != "" && el.Time != "" {
185+
r.lastPlatformEndTime = el.Time
186+
r.logger.Info(fmt.Sprintf("Init end: %s", r.lastPlatformEndTime), zap.Any("event", el))
187+
}
178188
}
179189
// TODO: add support for additional events, see https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html
180190
// A report of function initialization.
@@ -225,14 +235,27 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ
225235
}
226236

227237
func (r *telemetryAPIReceiver) getRecordRequestId(record map[string]interface{}) string {
228-
if requestId, ok := record["requestId"].(string); ok {
229-
return requestId
230-
} else if r.currentFaasInvocationID != "" {
238+
if record != nil {
239+
if requestId, ok := record["requestId"].(string); ok {
240+
return requestId
241+
}
242+
}
243+
return ""
244+
}
245+
246+
func (r *telemetryAPIReceiver) getCurrentRequestId() string {
247+
if r.lambdaInitType != lambdalifecycle.LambdaManagedInstances {
231248
return r.currentFaasInvocationID
232249
}
233250
return ""
234251
}
235252

253+
func (r *telemetryAPIReceiver) updateCurrentRequestId(requestId string) {
254+
if r.lambdaInitType != lambdalifecycle.LambdaManagedInstances {
255+
r.currentFaasInvocationID = requestId
256+
}
257+
}
258+
236259
func (r *telemetryAPIReceiver) createMetrics(slice []event) (pmetric.Metrics, error) {
237260
metric := pmetric.NewMetrics()
238261
resourceMetric := metric.ResourceMetrics().AppendEmpty()
@@ -340,6 +363,17 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) {
340363
}
341364
if record, ok := el.Record.(map[string]interface{}); ok {
342365
requestId := r.getRecordRequestId(record)
366+
367+
// If this is the first event in the invocation with a request id (i.e. the "platform.start" event),
368+
// set the current invocation id to this request id.
369+
if requestId != "" && el.Type == string(telemetryapi.PlatformStart) {
370+
r.updateCurrentRequestId(requestId)
371+
}
372+
373+
if requestId == "" {
374+
requestId = r.getCurrentRequestId()
375+
}
376+
343377
if requestId != "" {
344378
logRecord.Attributes().PutStr(string(semconv.FaaSInvocationIDKey), requestId)
345379

@@ -370,6 +404,10 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) {
370404
if functionVersion != "" {
371405
r.faasFunctionVersion = functionVersion
372406
}
407+
} else if el.Type == string(telemetryapi.PlatformStart) {
408+
if version, _ := record["version"].(string); version != "" {
409+
r.faasFunctionVersion = version
410+
}
373411
}
374412

375413
message := createPlatformMessage(requestId, r.faasFunctionVersion, el.Type, record)
@@ -380,16 +418,22 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) {
380418
logRecord.Body().SetStr(line)
381419
}
382420
} else {
383-
if r.currentFaasInvocationID != "" {
384-
logRecord.Attributes().PutStr(string(semconv.FaaSInvocationIDKey), r.currentFaasInvocationID)
421+
requestId := r.getRecordRequestId(nil)
422+
if requestId == "" {
423+
requestId = r.getCurrentRequestId()
385424
}
425+
426+
if requestId != "" {
427+
logRecord.Attributes().PutStr(string(semconv.FaaSInvocationIDKey), requestId)
428+
}
429+
386430
// in plain text https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#telemetry-api-function
387431
if line, ok := el.Record.(string); ok {
388432
logRecord.Body().SetStr(line)
389433
}
390434
}
391435
if el.Type == string(telemetryapi.PlatformRuntimeDone) {
392-
r.currentFaasInvocationID = ""
436+
r.updateCurrentRequestId("")
393437
}
394438
}
395439
return log, nil
@@ -480,44 +524,34 @@ func createPlatformMessage(requestId string, functionVersion string, eventType s
480524
}
481525
return ""
482526
}
483-
484527
func createPlatformReportMessage(requestId string, record map[string]interface{}) string {
485-
// gathering metrics
486-
metrics, ok := record["metrics"].(map[string]interface{})
487-
if !ok {
528+
if requestId == "" {
488529
return ""
489530
}
490-
var durationMs, billedDurationMs, memorySizeMB, maxMemoryUsedMB float64
491-
if durationMs, ok = metrics[string(telemetryapi.MetricDurationMs)].(float64); !ok {
492-
return ""
531+
532+
message := fmt.Sprintf(platformReportLogFmt, requestId)
533+
metrics, ok := record["metrics"].(map[string]interface{})
534+
if !ok {
535+
return message
493536
}
494-
if billedDurationMs, ok = metrics[string(telemetryapi.MetricBilledDurationMs)].(float64); !ok {
495-
return ""
537+
538+
if durationMs, ok := metrics["durationMs"].(float64); ok {
539+
message += fmt.Sprintf(" Duration: %.2f ms", durationMs)
496540
}
497-
if memorySizeMB, ok = metrics[string(telemetryapi.MetricMemorySizeMB)].(float64); !ok {
498-
return ""
541+
542+
if billedDurationMs, ok := metrics["billedDurationMs"].(float64); ok {
543+
message += fmt.Sprintf(" Billed Duration: %.0f ms", billedDurationMs)
499544
}
500-
if maxMemoryUsedMB, ok = metrics[string(telemetryapi.MetricMaxMemoryUsedMB)].(float64); !ok {
501-
return ""
545+
546+
if memorySizeMB, ok := metrics["memorySizeMB"].(float64); ok {
547+
message += fmt.Sprintf(" Memory Size: %.0f MB", memorySizeMB)
502548
}
503549

504-
// optionally gather information about cold start time
505-
var initDurationMs float64
506-
if initDurationMsVal, exists := metrics[string(telemetryapi.MetricInitDurationMs)]; exists {
507-
if val, ok := initDurationMsVal.(float64); ok {
508-
initDurationMs = val
509-
}
550+
if maxMemoryUsedMB, ok := metrics["maxMemoryUsedMB"].(float64); ok {
551+
message += fmt.Sprintf(" Max Memory Used: %.0f MB", maxMemoryUsedMB)
510552
}
511553

512-
message := fmt.Sprintf(
513-
platformReportLogFmt,
514-
requestId,
515-
durationMs,
516-
billedDurationMs,
517-
memorySizeMB,
518-
maxMemoryUsedMB,
519-
)
520-
if initDurationMs > 0 {
554+
if initDurationMs, ok := metrics["initDurationMs"].(float64); ok {
521555
message += fmt.Sprintf(" Init Duration: %.2f ms", initDurationMs)
522556
}
523557

@@ -666,6 +700,8 @@ func newTelemetryAPIReceiver(
666700
}
667701
}
668702

703+
lambdaInitType := lambdalifecycle.InitTypeFromEnv(lambdalifecycle.InitTypeEnvVar)
704+
669705
return &telemetryAPIReceiver{
670706
logger: set.Logger,
671707
queue: queue.New(initialQueueSize),
@@ -674,6 +710,7 @@ func newTelemetryAPIReceiver(
674710
types: subscribedTypes,
675711
resource: r,
676712
faaSMetricBuilders: NewFaaSMetricBuilders(pcommon.NewTimestampFromTime(time.Now()), getMetricsTemporality(cfg)),
713+
lambdaInitType: lambdaInitType,
677714
logReport: cfg.LogReport,
678715
}, nil
679716
}

0 commit comments

Comments
 (0)