Carpe Diem

備忘録

OpenTelemetry Collectorでデータを一元的に管理する

概要

OpenTelemetryでは直接ExportせずにCollectorと呼ばれるProxyを挟むことで以下のことが可能になります。

  • アプリケーションコードでExport先を意識しなくて済み、ベンダーロックインを避けることができる
  • データを一元的に管理できる
    • アプリケーションやインフラストラクチャの複数の部分からデータを収集できる
    • データの変換を集約できる
    • Jaeger, Prometheus, Zipkinなど複数のExporterにデータを送信できる
  • 水平スケールが可能
  • Tail based samplingが可能

今回はマイクロサービス環境でOpenTelemetry Collectorを用いた実装例を紹介します。

環境

  • OpenTelemetry Collector v0.67.0
  • Jaeger v1.46.0
  • Zipkin 2.24.2
  • Go v1.20.4

実装

今回例として、

christina04.hatenablog.com

で行った実装をOpenTelemetry Collectorに対応させ、JaegerとZipkin両方に送信するようにしてみます。

アーキテクチャ図は以下です。

docker-compose.yaml

opentelemetry-collector-contrib/examples/demo at main · open-telemetry/opentelemetry-collector-contrib · GitHub

にデモ用のサンプルコードがあるのでこちらを参考にします。

version: "3"

services:
  gateway:
    build:
      context: .
      dockerfile: Dockerfile
      args:
        - TARGET=gateway
    ports:
      - 8000:8000
    environment:
      - BACKEND_ADDR=backend-grpc:8080
      - SAMPLING_RATIO=1
      - EXPORTER_ENDPOINT=otel-collector:4317
    depends_on:
      - otel-collector
  backend-grpc:
    build:
      context: .
      dockerfile: Dockerfile
      args:
        - TARGET=backend-grpc
    environment:
      - EXPORTER_ENDPOINT=otel-collector:4317
      - SAMPLING_RATIO=0.5
    depends_on:
      - otel-collector
  jaeger:
    image: "jaegertracing/all-in-one:latest"
    ports:
      - "16686:16686"
      - "14268"
      - "14250"
    environment:
      - COLLECTOR_ZIPKIN_HOST_PORT=:9411
      - COLLECTOR_OTLP_ENABLED=true
  # Zipkin
  zipkin:
    image: openzipkin/zipkin:latest
    restart: always
    ports:
      - "9411:9411"
  # Collector
  otel-collector:
    image: otel/opentelemetry-collector:0.67.0
    restart: always
    command: ["--config=/etc/otel-collector-config.yaml", ""]
    volumes:
      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
    ports:
      - "4317:4317" # OTLP gRPC receiver
    depends_on:
      - jaeger
      - zipkin

otel-collector-config.yaml

OpenTelemetry Collectorの設定です。

ref: Collector | OpenTelemetry

にあるコンポーネント

  • receivers
  • exporters
  • processors
  • extensions

の設定を行います。

receivers:
  otlp:
    protocols:
      grpc:

exporters:
  zipkin:
    endpoint: "http://zipkin:9411/api/v2/spans"
    format: proto

  jaeger:
    endpoint: jaeger:14250
    tls:
      insecure: true

processors:
  batch:

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch]
      exporters: [zipkin, jaeger]

詳細は↓を確認して下さい。

Configuration | OpenTelemetry

endpointはdocker-composeで名前解決できるサービス名になっています。

アプリケーション

次にアプリケーションコードの修正です。

とは言ってもExporterをCollectorに向けるように変更すれば良く、OpenTelemetry Protocolのライブラリ使えば簡単に実装できます。

before

以前のコード

func NewTracerProvider(serviceName string) (*sdktrace.TracerProvider, func(), error) {
        exporter, err := NewJaegerExporter()
        if err != nil {
                return nil, nil, err
        }

        r := NewResource(serviceName, "1.0.0", "local")
        tp := sdktrace.NewTracerProvider(
                sdktrace.WithBatcher(exporter),
                sdktrace.WithResource(r),
                sdktrace.WithSampler(sdktrace.AlwaysSample()),
        )

        otel.SetTracerProvider(tp)

        cleanup := func() {
                ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
                if err := tp.ForceFlush(ctx); err != nil {
                        log.Print(err)
                }
                ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
                if err := tp.Shutdown(ctx2); err != nil {
                        log.Print(err)
                }
                cancel()
                cancel2()
        }
        return tp, cleanup, nil
}

func NewJaegerExporter() (sdktrace.SpanExporter, error) {
        // Port details: https://www.jaegertracing.io/docs/getting-started/
        endpoint := os.Getenv("EXPORTER_ENDPOINT")

        exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(endpoint)))
        if err != nil {
                return nil, err
        }
        return exporter, nil
}

after

OpenTelemetry Collector対応のコード。

OpenTelemetry CollectorはgRPC, HTTPのどちらかのプロトコルで通信が可能です。
今回はgRPCの方を使っています。

import (
        "context"
        "log"
        "net/http"
        "os"
        "time"

        "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
        "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/filters"
        "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
        "go.opentelemetry.io/otel"
        "go.opentelemetry.io/otel/attribute"
        "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
        "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
        "go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
        "go.opentelemetry.io/otel/propagation"
        "go.opentelemetry.io/otel/sdk/resource"
        sdktrace "go.opentelemetry.io/otel/sdk/trace"
        semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
        "google.golang.org/grpc"
)

func NewTracerProvider(ctx context.Context, otelAgentAddr, serviceName string, fraction float64) (*sdktrace.TracerProvider, func(), error) {
        traceClient := otlptracegrpc.NewClient(
                otlptracegrpc.WithInsecure(),
                otlptracegrpc.WithEndpoint(otelAgentAddr),
                otlptracegrpc.WithDialOption(grpc.WithBlock()))
        exporter, err := otlptrace.New(ctx, traceClient)
        if err != nil {
                return nil, nil, err
        }

        r := NewResource(serviceName, "1.0.0", "local")
        tp := sdktrace.NewTracerProvider(
                sdktrace.WithBatcher(exporter),
                sdktrace.WithResource(r),
                sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.TraceIDRatioBased(fraction))),
        )

        otel.SetTracerProvider(tp)

        cleanup := func() {
                ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
                if err := tp.ForceFlush(ctx); err != nil {
                        log.Print(err)
                }
                ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
                if err := tp.Shutdown(ctx2); err != nil {
                        log.Print(err)
                }
                cancel()
                cancel2()
        }
        return tp, cleanup, nil
}

Exporter(SpanExporter)にもShutdownメソッドはありますが、TracerProviderでShutdownを呼び出せば内部でExporterのShutdownも呼び出されます。

動作確認

リクエストを投げてみます。

$ curl http://localhost:8000/hello

Jaeger

Jaegerでちゃんと分散トレースが保存されてます。

Zipkin

Zipkinでもちゃんと分散トレースが保存されてます。

その他

今回のサンプルコードはこちら

github.com

まとめ

OpenTelemetry Collectorの説明とGoでの実装例を紹介しました。