源码深度解析

OpenTelemetry Collector Contrib

从组件工厂到可观测性管道,逐层拆解云原生可观测数据处理引擎的设计精髓

01

什么是 OTel Collector Contrib

云原生可观测数据处理引擎

OpenTelemetry Collector Contrib 是 OpenTelemetry 项目的社区贡献组件仓库,基于 OTel Collector Core 框架构建。它提供了 200+ 个开箱即用的组件,涵盖主流云平台、数据库、消息队列和可观测性后端,是当今最全面的可观测性数据采集、处理和导出平台。

核心能力一览

🔌

插件化架构

200+ 组件通过 Factory 模式注册,OCB (Collector Builder) 按需组装自定义发行版

🔬

三信号统一

Traces、Metrics、Logs 三大信号统一采集、处理和导出,一套管道搞定全链路可观测

管道式处理

Receiver → Processor → Exporter 链式管道,数据流经 Consumer 接口逐级传递

云原生全覆盖

AWS / Azure / GCP / K8s / Prometheus / Kafka / Elasticsearch 等 99 个 Receiver、45 个 Exporter

组件数量统计 (v0.117.0)

类型数量说明
Receiver99数据接入:Prometheus、Kafka、K8s、云平台、数据库、日志文件等
Processor26数据处理:属性修改、过滤、采样、路由、K8s 元数据注入等
Exporter45数据导出:Kafka、ES、Prometheus、Datadog、Splunk、Loki 等
Extension24辅助服务:健康检查、认证、存储、编码、OPAMP 远程管理
Connector12跨信号桥接:Traces→Metrics、路由分发、负载均衡、故障转移
合计206Go 源文件 5,700+,metadata.yaml 300+
02

整体架构

架构全景图

                        ┌──────────────────────────────┐
                        │    YAML Configuration        │
                        │  receivers / processors /     │
                        │  exporters / extensions /     │
                        │  connectors / service         │
                        └──────────────┬───────────────┘
                                       │
                                       ▼
┌──────────────────────────────────────────────────────────────────────┐
│                     OTel Collector Core Framework                     │
│                                                                      │
│  ┌────────────┐   ┌────────────┐   ┌────────────┐   ┌────────────┐  │
│  │ Component   │   │ Config     │   │ Pipeline   │   │ Extension  │  │
│  │ Registry    │   │ Resolver   │   │ Builder    │   │ Manager    │  │
│  │            │   │            │   │            │   │            │  │
│  │ Factory    │   │ YAML Parse │   │ Wire       │   │ HealthChk  │  │
│  │ Discovery  │   │ Validate   │   │ Consumer   │   │ Auth       │  │
│  │ Metadata   │   │ Env Expand │   │ Chain      │   │ OPAMP      │  │
│  └────────────┘   └────────────┘   └────────────┘   └────────────┘  │
└──────────────────────────────────────────────────────────────────────┘
         │                                      │
         ▼                                      ▼
┌──────────────────────────────────────────────────────────────────────┐
│                        Signal Pipelines                              │
│                                                                      │
│  ┌─────────┐     ┌─────────────┐     ┌──────────┐                   │
│  │Receiver │────▶│ Processor   │────▶│ Exporter │                   │
│  │         │     │ Chain       │     │          │                   │
│  │ kafka   │     │ attributes  │     │ kafka    │                   │
│  │ prom    │     │ filter      │     │ ES       │                   │
│  │ otlp    │     │ sampling    │     │ prom     │                   │
│  │ k8s     │     │ transform   │     │ datadog  │                   │
│  └─────────┘     └──────┬──────┘     └──────────┘                   │
│                         │                                            │
│                    ┌────▼─────┐                                      │
│                    │Connector │  Traces ──▶ Metrics                  │
│                    │ (cross-  │  spanmetrics / servicegraph          │
│                    │  signal) │  routing / failover                  │
│                    └──────────┘                                      │
└──────────────────────────────────────────────────────────────────────┘

配置驱动的管道拼装

Collector 通过 YAML 配置文件声明管道结构,将组件按信号类型串联:

# collector-config.yaml
receivers:
  kafka:
    brokers: ["kafka:9092"]
    topic: otel-traces
  prometheus:
    config:
      scrape_configs:
        - job_name: 'app'
          static_configs:
            - targets: ['app:8080']

processors:
  attributes:
    actions:
      - key: env
        value: production
        action: upsert
  filter:
    traces:
      span_conditions:
        - 'attributes["http.status_code"] >= 500'

exporters:
  elasticsearch:
    endpoints: ["https://es:9200"]
  prometheus:
    endpoint: "0.0.0.0:8889"

service:
  pipelines:
    traces:
      receivers: [kafka]
      processors: [attributes, filter]
      exporters: [elasticsearch]
    metrics:
      receivers: [prometheus]
      processors: [attributes]
      exporters: [prometheus]
03

项目结构与组件生态

顶层目录结构

opentelemetry-collector-contrib/
├── receiver/                  # 99 个 Receiver 组件
│   ├── kafkareceiver/         #   消息队列接入
│   ├── prometheusreceiver/    #   Prometheus 抓取
│   ├── hostmetricsreceiver/   #   主机指标采集
│   ├── k8sclusterreceiver/    #   K8s 集群状态
│   ├── filelogreceiver/       #   文件日志采集
│   └── ...                    #   还有 94 个
│
├── processor/                 # 26 个 Processor 组件
│   ├── attributesprocessor/   #   属性增删改
│   ├── filterprocessor/       #   条件过滤
│   ├── tailsamplingprocessor/ #   尾部采样
│   ├── k8sattributesprocessor/#   K8s 元数据注入
│   └── ...
│
├── exporter/                  # 45 个 Exporter 组件
│   ├── kafkaexporter/         #   发布到 Kafka
│   ├── elasticsearchexporter/ #   写入 ES
│   ├── prometheusexporter/    #   暴露 Prometheus 端点
│   ├── datadogexporter/       #   发送到 Datadog
│   └── ...
│
├── extension/                 # 24 个 Extension 组件
│   ├── healthcheckextension/  #   健康检查 HTTP 端点
│   ├── basicauthextension/    #   Basic Auth 认证
│   ├── opaborextension/       #   OpAMP 远程管理
│   └── ...
│
├── connector/                 # 12 个 Connector 组件
│   ├── spanmetricsconnector/  #   Traces → RED Metrics
│   ├── servicegraphconnector/ #   Traces → 服务拓扑
│   ├── routingconnector/      #   OTTL 条件路由
│   └── ...
│
├── cmd/                       # 入口与工具
│   ├── otelcontribcol/        #   官方 Contrib 发行版
│   │   └── builder-config.yaml#   OCB 构建配置
│   ├── telemetrygen/          #   测试数据生成器
│   └── opampsupervisor/       #   OpAMP 管理器
│
├── pkg/                       # 15 个公共可复用库
│   ├── ottl/                  #   OTel Transformation Language
│   ├── stanza/                #   日志采集处理库
│   ├── translator/            #   协议翻译器
│   └── ...
│
├── internal/                  # 21 个内部共享库
│   ├── filter/                #   跨组件过滤引擎
│   ├── coreinternal/          #   Core 重导出工具
│   ├── kafka/                 #   Kafka 共享工具
│   └── ...
│
├── confmap/provider/          # 配置源提供器
│   ├── s3provider/            #   从 S3 加载配置
│   └── secretsmanagerprovider/#   从 Secrets Manager 加载
│
├── testbed/                   # 稳定性与正确性测试
└── examples/                  # 7 个实战示例

组件分类 — Receiver 矩阵

分类代表组件说明
云平台awscloudwatch, azuremonitor, googlecloudAWS/Azure/GCP 原生监控指标
数据库mysql, postgresql, mongodb, redis, elasticsearch数据库性能指标
消息队列kafka, pulsar, solace消息消费接入
基础设施hostmetrics, k8scluster, docker, kubeletstats主机/容器/K8s 指标
协议otlp, jaeger, zipkin, prometheus, opencensus标准可观测协议
日志filelog, syslog, tcplog, udplog日志采集
应用apache, nginx, iis, haproxy, rabbitmq中间件指标

构建体系:OCB (Collector Builder)

Collector 使用 builder-config.yaml 声明组件列表,OCB 自动生成 import 代码并编译:

# cmd/otelcontribcol/builder-config.yaml
dist:
  module: go.opentelemetry.io/collector/cmd/otelcontribcol
  name: otelcontribcol
  version: 0.117.0-dev

receivers:
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.117.0
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.117.0
  # ... 97 more receivers

processors:
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.117.0
  # ... 25 more processors

exporters:
  - gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.117.0
  # ... 44 more exporters

用户可以创建自己的 builder-config.yaml 只包含所需组件,构建最小化 Collector 二进制。

第二部分

核心机制

Factory 工厂模式、配置系统与 Pipeline 管道模型

04

Factory 工厂模式

一切皆 Factory

OTel Collector 中所有组件(Receiver、Processor、Exporter、Extension、Connector)都遵循同一个设计原则:通过 NewFactory() 函数注册自身。Factory 封装了组件类型、默认配置和各信号类型的创建方法。

Factory 三要素

// 所有组件共享的 Factory 模式
func NewFactory() receiver.Factory {
    return receiver.NewFactory(
        metadata.Type,            // 1. 组件类型标识 (如 "kafka")
        createDefaultConfig,       // 2. 默认配置工厂
        // 3. 各信号类型的创建方法
        receiver.WithTraces(createTracesReceiver, metadata.TracesStability),
        receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability),
        receiver.WithLogs(createLogsReceiver, metadata.LogsStability),
    )
}

// 默认配置工厂
func createDefaultConfig() component.Config {
    return &Config{
        Brokers:  []string{"localhost:9092"},
        Encoding: "otlp_proto",
        GroupID:  "otel-collector",
    }
}

// Traces 信号创建方法
func createTracesReceiver(
    ctx context.Context,
    set receiver.Settings,         // Logger, ID, Telemetry
    cfg component.Config,          // 用户配置
    nextConsumer consumer.Traces,  // 下游消费者
) (receiver.Traces, error) {
    // 创建并返回 Receiver 实例
}

五大组件的 Factory 签名对比

组件类型Factory 函数信号方法
Receiverreceiver.NewFactory()WithTraces / WithMetrics / WithLogs
Processorprocessor.NewFactory()WithTraces / WithMetrics / WithLogs
Exporterexporter.NewFactory()WithTraces / WithMetrics / WithLogs
Extensionextension.NewFactory()无信号方法,单一 createExtension
Connectorconnector.NewFactory()WithTracesToMetrics / WithLogsToLogs 等跨信号

metadata.yaml 与代码生成

每个组件通过 metadata.yaml 声明元数据,mdatagen 工具自动生成 internal/metadata/ 代码:

# receiver/kafkareceiver/metadata.yaml
type: kafka
status:
  class: receiver
  stability:
    beta: [traces, metrics, logs]
  distributions: [contrib]
  codeowners:
    active: [pavolloffay, MovieStoreGuy]
// 自动生成: internal/metadata/generated_status.go
const (
    Type                     = component.MustNewType("kafka")
    ScopeName                = "github.com/open-telemetry/.../kafkareceiver"
    TracesStability          = component.StabilityLevelBeta
    MetricsStability         = component.StabilityLevelBeta
    LogsStability            = component.StabilityLevelBeta
)
05

配置系统

Config 结构体约定

每个组件必须提供一个 Config 结构体,实现 component.Config 接口:

// 典型 Config 结构体 (kafkareceiver/config.go)
type Config struct {
    Brokers         []string        `mapstructure:"brokers"`
    ProtocolVersion string          `mapstructure:"protocol_version"`
    SessionTimeout  time.Duration   `mapstructure:"session_timeout"`
    Topic           string          `mapstructure:"topic"`
    Encoding        string          `mapstructure:"encoding"`
    GroupID         string          `mapstructure:"group_id"`
    Authentication  kafka.Authentication `mapstructure:"auth"`
    AutoCommit      AutoCommit      `mapstructure:"autocommit"`
}

// 编译时接口检查
var _ component.Config = (*Config)(nil)

// 语义校验
func (cfg *Config) Validate() error {
    if len(cfg.Brokers) == 0 {
        return errors.New("brokers must not be empty")
    }
    return nil
}

配置加载流程

YAML 文件
    │
    ├── confmap.Resolver 解析
    │   ├── 环境变量展开 ${ENV_VAR}
    │   ├── S3 / Secrets Manager / HTTP 远程加载
    │   └── AES 加密字段解密
    │
    ├── mapstructure 反序列化
    │   └── YAML field → Go struct field (via mapstructure tags)
    │
    ├── Config.Validate() 语义校验
    │   ├── 必填字段检查
    │   ├── 取值范围校验
    │   └── 逻辑关系校验
    │
    └── Factory.CreateReceiver/Processor/Exporter(cfg)
        └── 配置传入组件实例

Exporter 常用嵌入配置

Exporter 通常嵌入框架提供的通用配置:

type Config struct {
    // 嵌入通用配置 (mapstructure:",squash" 扁平化)
    exporterhelper.TimeoutConfig  `mapstructure:",squash"`  // 超时控制
    exporterhelper.QueueConfig    `mapstructure:",squash"`  // 发送队列
    configretry.BackOffConfig     `mapstructure:",squash"`  // 重试退避

    // 组件特有配置
    Endpoint  string `mapstructure:"endpoint"`
    Index     string `mapstructure:"index"`
}
06

Pipeline 管道模型

管道拓扑结构

┌──────────────────────────────────────────────────────────────────────┐
│                       service.pipelines                              │
│                                                                      │
│  traces/default:                                                     │
│  ┌─────────┐   ┌────────────┐   ┌────────────┐   ┌──────────┐      │
│  │ kafka    │──▶│ attributes │──▶│ filter     │──▶│ ES       │      │
│  │ receiver │   │ processor  │   │ processor  │   │ exporter │      │
│  └─────────┘   └────────────┘   └────────────┘   └──────────┘      │
│                                                                      │
│  metrics/default:                                                    │
│  ┌─────────┐   ┌────────────┐   ┌──────────┐   ┌──────────┐       │
│  │ prom     │──▶│ k8sattrs   │──▶│ prom     │ + │ datadog  │       │
│  │ receiver │   │ processor  │   │ exporter │   │ exporter │       │
│  └─────────┘   └────────────┘   └──────────┘   └──────────┘       │
│                                                                      │
│  traces/spanmetrics (Connector 桥接):                                 │
│  ┌─────────┐   ┌────────────────┐   ┌──────────┐                    │
│  │ traces   │──▶│ spanmetrics    │──▶│ metrics  │                    │
│  │ pipeline │   │ connector      │   │ pipeline │                    │
│  └─────────┘   │ Traces→Metrics │   └──────────┘                    │
│                 └────────────────┘                                    │
└──────────────────────────────────────────────────────────────────────┘

Fan-out 模型

一个管道可以配置多个 Exporter,数据会被 fan-out 到每个 Exporter:

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [attributes]
      exporters: [elasticsearch, kafka, datadog]  # 同时发往 3 个目标

同样,一个管道也可以有多个 Receiver,数据会被 fan-in 合并:

    metrics:
      receivers: [prometheus, hostmetrics, k8scluster]  # 3 个源汇聚
      processors: [resourcedetection]
      exporters: [prometheus]
07

Consumer 链式调用

Consumer 接口 — 管道的粘合剂

整个管道的数据流通过三个 Consumer 接口驱动,每个组件要么生产数据(Receiver),要么消费并转发(Processor),要么消费并导出(Exporter):

三信号 Consumer 接口

// consumer 包定义了三个信号的消费接口
type Traces interface {
    ConsumeTraces(ctx context.Context, td ptrace.Traces) error
}

type Metrics interface {
    ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error
}

type Logs interface {
    ConsumeLogs(ctx context.Context, ld plog.Logs) error
}

链式调用流转

Receiver.Start()
    │
    │  (数据到达)
    │
    ▼
nextConsumer.ConsumeTraces(ctx, traces)     ← Processor1
    │
    │  (处理后转发)
    │
    ▼
nextConsumer.ConsumeTraces(ctx, traces)     ← Processor2
    │
    │  (处理后转发)
    │
    ▼
exporter.ConsumeTraces(ctx, traces)         ← Exporter (终点)
    │
    └── 发送到外部系统 (Kafka / ES / Prometheus / ...)

每个组件在创建时接收 nextConsumer 参数,形成链式调用。框架负责串联,组件只需关注自身逻辑。

错误处理分类

// 永久性错误 — 不可重试,丢弃数据
consumererror.NewPermanent(err)

// 瞬时性错误 — 可重试(配合 exporterhelper 的 Retry/Queue)
// 直接返回普通 error 即可

// 跳过处理 — 当前批次不处理但不报错
processorhelper.ErrSkipProcessingData
第三部分

Receiver 源码剖析

数据入口:从 Kafka 消费、Prometheus 抓取到文件采集

08

Receiver 设计模式

Receiver 核心契约

// Receiver 必须实现的接口
type Receiver interface {
    component.Component  // Start(ctx, host) + Shutdown(ctx)
}

// 具体信号类型
type Traces interface {
    Receiver
    // 不需要额外方法 — Receiver 主动推送数据给 nextConsumer
}

// Receiver 的职责:
// 1. Start() — 启动数据采集 (开启 goroutine、监听端口、连接消息队列等)
// 2. 采集到数据 → 调用 nextConsumer.ConsumeTraces/Metrics/Logs()
// 3. Shutdown() — 优雅关闭 (关闭连接、等待 goroutine 退出)

Receiver 文件组织约定

receiver/kafkareceiver/
├── metadata.yaml           # 组件元数据 (type, stability, codeowners)
├── factory.go              # NewFactory() + createDefaultConfig + create*Receiver
├── config.go               # Config 结构体 + Validate()
├── kafka_receiver.go       # 核心实现 (Start / Shutdown / 消费循环)
├── internal/metadata/      # mdatagen 生成的代码
│   ├── generated_status.go # Type, ScopeName, Stability 常量
│   └── generated_telemetry.go  # 自定义指标 (TelemetryBuilder)
├── go.mod                  # 独立 Go Module
└── *_test.go               # 测试文件

Receiver 三大模式

模式特点代表组件
消息驱动型消费消息队列,事件驱动kafkareceiver, pulsarreceiver
拉取抓取型定时抓取目标端点prometheusreceiver, hostmetricsreceiver
服务监听型启动 HTTP/gRPC 服务接收推送otlpreceiver, jaegerreceiver
09

Kafka Receiver 实战

核心结构体

// kafka_receiver.go
type kafkaTracesConsumer struct {
    config            Config
    consumerGroup     sarama.ConsumerGroup    // Kafka Consumer Group
    nextConsumer      consumer.Traces         // 下游 Pipeline Consumer
    topics            []string
    cancelConsumeLoop context.CancelFunc      // 取消信号
    unmarshaler       TracesUnmarshaler       // 数据反序列化
    consumeLoopWG     *sync.WaitGroup         // 等待 goroutine 退出
    settings          receiver.Settings
    telemetryBuilder  *metadata.TelemetryBuilder  // 自定义指标
    autocommitEnabled bool
    messageMarking    MessageMarking
}

// 编译时接口检查
var _ receiver.Traces = (*kafkaTracesConsumer)(nil)

Start — 启动消费循环

func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) error {
    // 1. 创建取消上下文
    ctx, cancel := context.WithCancel(context.Background())
    c.cancelConsumeLoop = cancel

    // 2. 创建观测性报告器
    obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
        ReceiverID: c.settings.ID,
        Transport:  transport,
    })

    // 3. 加载编码扩展 (Extension 优先于内置编码)
    if unmarshaler, err := loadEncodingExtension[ptrace.Unmarshaler](
        host, c.config.Encoding); err == nil {
        c.unmarshaler = &tracesEncodingUnmarshaler{unmarshaler: *unmarshaler}
    }

    // 4. 创建 Kafka Consumer Group
    c.consumerGroup, err = createKafkaClient(ctx, c.config)

    // 5. 启动消费循环 goroutine
    handler := &tracesConsumerGroupHandler{
        unmarshaler:  c.unmarshaler,
        nextConsumer: c.nextConsumer,
        ready:        make(chan bool),
        obsrecv:      obsrecv,
    }
    c.consumeLoopWG.Add(1)
    go c.consumeLoop(ctx, handler)

    // 6. 等待 Consumer Group 就绪
    <-handler.ready
    return nil
}

消费循环 — 消息处理核心

// ConsumeClaim — Kafka Consumer Group Handler 的核心方法
func (c *tracesConsumerGroupHandler) ConsumeClaim(
    session sarama.ConsumerGroupSession,
    claim sarama.ConsumerGroupClaim,
) error {
    for {
        select {
        case message, ok := <-claim.Messages():
            if !ok { return nil }

            // 1. 开始观测性 Span
            ctx := c.obsrecv.StartTracesOp(session.Context())

            // 2. 反序列化消息
            traces, err := c.unmarshaler.UnmarshalTraces(message.Value)
            if err != nil {
                c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), 0, err)
                return err
            }

            // 3. 提取 Header (如果配置)
            c.headerExtractor.extractHeadersTraces(traces, message)

            // 4. *** 关键: 发送到下游 Pipeline ***
            err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
            c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(),
                traces.SpanCount(), err)

            // 5. 标记消费位移
            if c.messageMarking.After {
                session.MarkMessage(message, "")
            }

        case <-session.Context().Done():
            return nil
        }
    }
}

Shutdown — 优雅关闭

func (c *kafkaTracesConsumer) Shutdown(context.Context) error {
    if c.cancelConsumeLoop == nil { return nil }
    c.cancelConsumeLoop()         // 1. 发送取消信号
    c.consumeLoopWG.Wait()        // 2. 等待 goroutine 退出
    if c.consumerGroup == nil { return nil }
    return c.consumerGroup.Close() // 3. 关闭 Kafka 连接
}
10

Prometheus Receiver

架构:复用 Prometheus 原生代码

Prometheus Receiver 直接嵌入了 Prometheus 的 DiscoveryManagerScrapeManager,复用其成熟的服务发现和抓取逻辑:

prometheusreceiver
    │
    ├── DiscoveryManager   ← Prometheus 原生服务发现
    │   ├── static_configs
    │   ├── kubernetes_sd
    │   ├── consul_sd
    │   └── ...
    │
    ├── ScrapeManager      ← Prometheus 原生抓取引擎
    │   └── 定时抓取目标 /metrics 端点
    │
    ├── Appendable (适配器)
    │   └── 将 Prometheus 时序数据 → OTLP Metrics (pmetric.Metrics)
    │
    └── nextConsumer.ConsumeMetrics(ctx, metrics)
        └── 发送到下游 Pipeline

Start — 启动抓取引擎

func (r *pReceiver) Start(ctx context.Context, host component.Host) error {
    discoveryCtx, cancel := context.WithCancel(context.Background())
    r.cancelFunc = cancel

    // 1. 初始化 Prometheus Discovery Manager
    r.discoveryManager = discovery.NewManager(discoveryCtx, logger, r.registerer, sdMetrics)
    go func() {
        r.discoveryManager.Run()  // goroutine: 持续发现目标
    }()

    // 2. 创建 Appendable 适配器 (Prometheus → OTLP)
    store, _ := internal.NewAppendable(r.consumer, r.settings, ...)

    // 3. 初始化 Prometheus Scrape Manager
    r.scrapeManager, _ = scrape.NewManager(opts, logger, store, r.registerer)
    go func() {
        <-r.configLoaded  // 等待配置加载完成
        r.scrapeManager.Run(r.discoveryManager.SyncCh())  // goroutine: 持续抓取
    }()

    return nil
}

func (r *pReceiver) Shutdown(context.Context) error {
    r.cancelFunc()                           // 停止服务发现
    r.scrapeManager.Stop()                   // 停止抓取
    r.targetAllocatorManager.Shutdown()      // 停止 Target Allocator
    return nil
}

Scraper Helper 模式

对于定时采集型 Receiver(如 hostmetricsreceiver),框架提供 ScraperControllerReceiver 简化开发:

// hostmetricsreceiver/factory.go — 使用 Scraper Helper
func createMetricsReceiver(ctx context.Context, set receiver.Settings,
    cfg component.Config, consumer consumer.Metrics,
) (receiver.Metrics, error) {

    scraperOptions := createAddScraperOptions(ctx, set, oCfg, scraperFactories)

    // 框架自动管理: 定时调度 + 生命周期 + 错误处理
    return scraperhelper.NewScraperControllerReceiver(
        &oCfg.ControllerConfig,  // 采集间隔等配置
        set,
        consumer,                // 下游 Consumer
        scraperOptions...,       // 各子采集器 (CPU, Memory, Disk...)
    )
}
第四部分

Processor 源码剖析

数据中间处理:属性修改、条件过滤与采样决策

11

Processor 设计模式

Processor 核心职责

Processor 接收上游数据,原地修改或过滤后传递给下游。它既是 Consumer(消费上游数据),又持有 nextConsumer(转发给下游)。

processorhelper 封装

// Processor 通常通过 processorhelper 创建
func createTracesProcessor(ctx context.Context, set processor.Settings,
    cfg component.Config, nextConsumer consumer.Traces,
) (processor.Traces, error) {

    proc := newMyProcessor(cfg)

    return processorhelper.NewTraces(
        ctx,
        set,
        cfg,
        nextConsumer,                  // 自动串联下游
        proc.processTraces,            // 核心处理函数
        processorhelper.WithCapabilities(consumer.Capabilities{
            MutatesData: true,         // 声明是否修改数据
        }),
    )
}

// 核心处理函数签名
func (p *myProcessor) processTraces(
    ctx context.Context,
    td ptrace.Traces,       // 输入
) (ptrace.Traces, error) { // 输出 (可以是修改后的同一对象)
    // ... 处理逻辑 ...
    return td, nil
}

Processor 分类

类型特点代表组件
无状态/同步每条数据独立处理,无内存状态attributes, filter, transform
有状态/异步维护内存状态,批量/延迟决策tailsampling, groupbytrace, interval
富化增强型外部数据源注入元数据k8sattributes, resourcedetection, geoip
12

Attributes Processor

功能:属性增删改查

最常用的 Processor 之一,支持 7 种属性操作:

# 配置示例
processors:
  attributes:
    actions:
      - key: env
        value: production
        action: upsert          # 存在则更新,不存在则插入
      - key: password
        action: delete          # 删除敏感属性
      - key: user.id
        action: hash            # SHA-256 哈希脱敏
      - key: http.url
        pattern: "^(?P<host>[^/]+)"
        action: extract         # 正则提取子属性

核心处理逻辑

// attributes_trace.go — 三层遍历 + 属性处理
func (a *spanAttributesProcessor) processTraces(
    ctx context.Context, td ptrace.Traces,
) (ptrace.Traces, error) {

    rss := td.ResourceSpans()
    for i := 0; i < rss.Len(); i++ {          // 遍历 ResourceSpans
        rs := rss.At(i)
        ilss := rs.ScopeSpans()
        for j := 0; j < ilss.Len(); j++ {     // 遍历 ScopeSpans
            spans := ilss.At(j).Spans()
            for k := 0; k < spans.Len(); k++ { // 遍历每个 Span
                span := spans.At(k)

                // OTTL 条件跳过
                if a.skipExpr != nil {
                    skip, _ := a.skipExpr.Eval(ctx,
                        ottlspan.NewTransformContext(span, ...))
                    if skip { continue }
                }

                // 执行属性操作 (insert/update/upsert/delete/hash/extract/convert)
                a.attrProc.Process(ctx, a.logger, span.Attributes())
            }
        }
    }
    return td, nil
}

AttrProc — 7 种操作

// internal/coreinternal/attraction/attraction.go
const (
    INSERT  Action = "insert"   // 仅在不存在时插入
    UPDATE  Action = "update"   // 仅在存在时更新
    UPSERT  Action = "upsert"   // 存在更新,不存在插入
    DELETE  Action = "delete"   // 删除属性
    HASH    Action = "hash"     // SHA-256 哈希值替换
    EXTRACT Action = "extract"  // 正则提取子属性
    CONVERT Action = "convert"  // 类型转换 (string→int 等)
)
13

Filter Processor

双模式过滤

Filter Processor 支持两种过滤模式:传统 Matcher 和 OTTL 表达式条件:

# OTTL 条件模式 (推荐)
processors:
  filter:
    error_mode: ignore   # ignore / propagate / silent
    traces:
      span_conditions:
        - 'attributes["http.status_code"] >= 500'
        - 'name == "health-check"'
    logs:
      log_record_conditions:
        - 'severity_number < SEVERITY_NUMBER_WARN'
    metrics:
      metric_conditions:
        - 'name == "system.cpu.time"'

RemoveIf 模式 — 零拷贝过滤

// filterprocessor/traces.go — 嵌套 RemoveIf 实现高效过滤
func (fsp *filterSpanProcessor) processTraces(
    ctx context.Context, td ptrace.Traces,
) (ptrace.Traces, error) {

    td.ResourceSpans().RemoveIf(func(rs ptrace.ResourceSpans) bool {
        rs.ScopeSpans().RemoveIf(func(ss ptrace.ScopeSpans) bool {
            ss.Spans().RemoveIf(func(span ptrace.Span) bool {
                // OTTL 表达式求值
                skip, _ := fsp.skipSpanExpr.Eval(ctx,
                    ottlspan.NewTransformContext(span, ss.Scope(),
                        rs.Resource(), ss, rs))
                return skip  // true = 移除该 Span
            })
            // 过滤 SpanEvent
            if fsp.skipSpanEventExpr != nil {
                span.Events().RemoveIf(func(event ptrace.SpanEvent) bool {
                    skip, _ := fsp.skipSpanEventExpr.Eval(ctx, ...)
                    return skip
                })
            }
            return ss.Spans().Len() == 0  // 空则移除 ScopeSpans
        })
        return rs.ScopeSpans().Len() == 0  // 空则移除 ResourceSpans
    })

    return td, nil
}

RemoveIf 是 pdata 提供的原地删除方法,通过交换元素到末尾并截断切片实现零拷贝过滤。

14

Tail Sampling Processor

有状态 Processor 典范

Tail Sampling 是最复杂的 Processor 之一:它需要缓存完整 Trace,等待所有 Span 到达后再做采样决策。这与无状态 Processor 完全不同。

核心架构

type tailSamplingSpanProcessor struct {
    ctx            context.Context
    nextConsumer   consumer.Traces
    maxNumTraces   uint64              // 最大缓存 Trace 数
    policies       []*policy           // 采样策略列表
    idToTrace      sync.Map            // TraceID → Trace 数据 (并发安全)
    sampledIDCache cache.Cache[bool]   // 已决策 TraceID 缓存
    deleteChan     chan pcommon.TraceID // 删除通知通道
    policyTicker   timeutils.TTicker   // 定时决策
}

type policy struct {
    name      string
    evaluator sampling.PolicyEvaluator  // 策略求值器
}

// 11+ 策略类型
// AlwaysSample, Latency, NumericAttribute, Probabilistic,
// StatusCode, StringAttribute, RateLimiting, Composite,
// And, SpanCount, OTTLCondition

工作流程

Span 到达 → processTraces()
    │
    ├── 按 TraceID 分组
    ├── 加入 idToTrace 缓存
    │
    ▼
定时器触发 (每 decision_wait 秒)
    │
    ├── 遍历所有缓存的 Trace
    ├── 依次评估 policies
    │   ├── Latency Policy: trace 总延迟 > 阈值?
    │   ├── StatusCode Policy: 包含错误 Span?
    │   ├── Probabilistic: 概率采样
    │   ├── OTTL Condition: 自定义表达式
    │   └── Composite: 多策略组合
    │
    ├── 决策: Sampled / NotSampled / InvertSampled
    │
    ├── Sampled → nextConsumer.ConsumeTraces(trace)  发送下游
    │
    └── 清理缓存 + 写入 sampledIDCache (防重复)
        └── 后续同 TraceID 的 Span 直接查缓存决定
第五部分

Exporter 源码剖析

数据出口:消息发布、HTTP 暴露与批量索引

15

Exporter 设计模式

exporterhelper — 强大的导出助手

与 Receiver/Processor 不同,Exporter 通过 exporterhelper 获得重试、队列、超时等开箱即用的能力:

// 典型 Exporter 创建 (kafkaexporter/factory.go)
func createTracesExporter(ctx context.Context, set exporter.Settings,
    cfg component.Config,
) (exporter.Traces, error) {

    exp := newTracesExporter(oCfg, set)

    return exporterhelper.NewTraces(
        ctx,
        set,
        cfg,
        exp.tracesPusher,           // 核心发送函数
        // 能力声明
        exporterhelper.WithCapabilities(consumer.Capabilities{
            MutatesData: false,
        }),
        // 超时控制
        exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{
            Timeout: 30 * time.Second,
        }),
        // 重试退避
        exporterhelper.WithRetry(oCfg.BackOffConfig),
        // 发送队列 (异步缓冲)
        exporterhelper.WithQueue(oCfg.QueueSettings),
        // 生命周期钩子
        exporterhelper.WithStart(exp.start),
        exporterhelper.WithShutdown(exp.Close),
    )
}

exporterhelper 内置能力

ConsumeTraces(ctx, traces)
    │
    ├── Timeout 控制
    │   └── context.WithTimeout(ctx, timeout)
    │
    ├── Queue (异步发送队列)
    │   ├── 内存队列 / 持久化队列 (File Storage Extension)
    │   ├── num_consumers: 并发消费者数
    │   └── queue_size: 队列容量
    │
    ├── Retry (指数退避重试)
    │   ├── enabled: true
    │   ├── initial_interval: 5s
    │   ├── max_interval: 30s
    │   ├── max_elapsed_time: 300s
    │   └── 仅重试非 Permanent 错误
    │
    └── tracesPusher(ctx, traces)
        └── 实际发送逻辑
16

Kafka Exporter 实战

核心发送逻辑

// kafkaexporter/kafka_exporter.go
type kafkaTracesProducer struct {
    cfg       Config
    producer  sarama.SyncProducer  // Kafka 同步生产者
    marshaler TracesMarshaler      // 序列化器
    logger    *zap.Logger
}

// tracesPusher — 被 exporterhelper 调用
func (e *kafkaTracesProducer) tracesPusher(
    ctx context.Context, td ptrace.Traces,
) error {
    // 1. 序列化为 Kafka 消息
    messages, err := e.marshaler.Marshal(td,
        getTopic(ctx, &e.cfg, td.ResourceSpans()))
    if err != nil {
        return consumererror.NewPermanent(err)  // 序列化失败 → 不可重试
    }

    // 2. 发送到 Kafka
    err = e.producer.SendMessages(messages)
    if err != nil {
        return err  // 发送失败 → 可重试 (exporterhelper 会自动重试)
    }
    return nil
}

// start — 在 exporterhelper.WithStart() 中调用
func (e *kafkaTracesProducer) start(ctx context.Context, host component.Host) error {
    // 优先从 Extension 加载编码器
    if marshaler, err := loadEncodingExtension[ptrace.Marshaler](
        host, e.cfg.Encoding); err == nil {
        e.marshaler = &tracesEncodingMarshaler{marshaler: *marshaler}
    }
    // 回退到内置编码
    if e.marshaler == nil {
        e.marshaler, _ = createTracesMarshaler(e.cfg)
    }
    // 创建 Kafka 生产者
    e.producer, _ = newSaramaProducer(ctx, e.cfg)
    return nil
}
17

Prometheus Exporter

拉取模式:启动 HTTP Server

与大多数 push 模式的 Exporter 不同,Prometheus Exporter 启动一个 HTTP 服务,等待 Prometheus Server 来抓取:

// prometheusexporter/prometheus.go
func (pe *prometheusExporter) Start(ctx context.Context, host component.Host) error {
    ln, _ := pe.config.ToListener(ctx)

    mux := http.NewServeMux()
    mux.Handle("/metrics", pe.handler)  // 暴露 /metrics 端点

    srv, _ := pe.config.ToServer(ctx, host, pe.settings, mux)
    pe.shutdownFunc = func(ctx context.Context) error {
        return srv.Shutdown(ctx)
    }

    go func() {
        _ = srv.Serve(ln)  // goroutine: 持续监听
    }()
    return nil
}

// ConsumeMetrics — 接收数据并缓存 (非发送)
func (pe *prometheusExporter) ConsumeMetrics(
    _ context.Context, md pmetric.Metrics,
) error {
    rmetrics := md.ResourceMetrics()
    for i := 0; i < rmetrics.Len(); i++ {
        pe.collector.processMetrics(rmetrics.At(i))  // 转换并缓存
    }
    return nil
    // Prometheus Server 抓取 /metrics 时读取缓存
}
18

Elasticsearch Exporter

批量索引模式

// elasticsearchexporter/exporter.go
func (e *elasticsearchExporter) pushLogsData(
    ctx context.Context, ld plog.Logs,
) error {
    e.wg.Add(1)
    defer e.wg.Done()

    // 1. 获取批量索引会话
    session, _ := e.bulkIndexer.StartSession(ctx)
    defer session.End()

    var errs []error

    // 2. 三层遍历: ResourceLogs → ScopeLogs → LogRecords
    rls := ld.ResourceLogs()
    for i := 0; i < rls.Len(); i++ {
        rl := rls.At(i)
        ills := rl.ScopeLogs()
        for j := 0; j < ills.Len(); j++ {
            logs := ills.At(j).LogRecords()
            for k := 0; k < logs.Len(); k++ {
                // 3. 逐条写入 Bulk 缓冲
                err := e.pushLogRecord(ctx, rl.Resource(),
                    logs.At(k), ills.At(j).Scope(), session)
                if err != nil {
                    errs = append(errs, err)
                }
            }
        }
    }

    // 4. Flush 批量请求
    if err := session.Flush(ctx); err != nil {
        errs = append(errs, err)
    }

    return multierr.Combine(errs...)
}
第六部分

Extension & Connector

辅助服务与跨信号桥接

19

Extension 扩展机制

Extension 的角色

Extension 不参与数据管道,而是提供辅助功能。其他组件通过 component.Host 在 Start 时发现并使用 Extension:

Extension 分类:

认证类:
├── basicauth        — Basic Auth 认证
├── bearertoken      — Bearer Token 认证
├── oauth2           — OAuth2 Client Credentials
├── oidc             — OpenID Connect 认证
└── sigv4            — AWS Signature V4

编码类:
├── otlpencoding     — OTLP 编解码
├── jaegerencoding   — Jaeger 编解码
├── zipkinencoding   — Zipkin 编解码
├── avrologencoding  — Avro 日志编解码
└── jsonlogencoding  — JSON 日志编解码

运维类:
├── healthcheck      — /health HTTP 健康检查端点
├── pprof            — Go pprof 性能分析端点
├── remotetap        — 远程数据采样
└── opamp            — 远程 Agent 管理 (OpAMP 协议)

存储类:
├── filestorage      — 文件持久化 (用于 Queue)
└── dbstorage        — 数据库持久化

发现类:
├── k8sobserver      — K8s Pod 自动发现
├── dockerobserver   — Docker 容器发现
└── ecsobserver      — ECS Task 发现

Extension Factory 模式

// healthcheckextension/factory.go
func NewFactory() extension.Factory {
    return extension.NewFactory(
        metadata.Type,           // "healthcheck"
        createDefaultConfig,
        createExtension,         // 只有一个创建方法 (无信号区分)
        metadata.ExtensionStability,
    )
}

func createExtension(_ context.Context, set extension.Settings,
    cfg component.Config,
) (extension.Extension, error) {
    return newServer(*cfg.(*Config), set.TelemetrySettings), nil
}

Extension 被消费方式

// Receiver/Exporter 在 Start() 中通过 Host 获取 Extension
func (r *myReceiver) Start(ctx context.Context, host component.Host) error {
    // 从 Host 查找编码扩展
    if marshaler, err := loadEncodingExtension[ptrace.Marshaler](
        host, r.config.Encoding,
    ); err == nil {
        r.marshaler = marshaler  // 使用 Extension 提供的编码器
    }

    // 从 Host 查找存储扩展
    storageClient, _ := adapter.GetStorageClient(ctx, host, r.storageID, r.id)
    return r.input.Start(storageClient)
}
20

Connector 跨信号桥接

Connector = Exporter + Receiver

Connector 是 OTel Collector 中最独特的组件类型:它同时充当一个管道的 Exporter 和另一个管道的 Receiver,实现跨信号类型的数据转换。

spanmetricsconnector — Traces → Metrics

// spanmetricsconnector/factory.go
func NewFactory() connector.Factory {
    return connector.NewFactory(
        metadata.Type,
        createDefaultConfig,
        // 注意: WithTracesToMetrics — 输入 Traces, 输出 Metrics
        connector.WithTracesToMetrics(
            createTracesToMetricsConnector,
            metadata.TracesToMetricsStability),
    )
}

func createTracesToMetricsConnector(ctx context.Context,
    params connector.Settings, cfg component.Config,
    nextConsumer consumer.Metrics,  // 下游是 Metrics Consumer
) (connector.Traces, error) {      // 返回 Traces 接口 (作为上游的 Exporter)
    c, _ := newConnector(params.Logger, cfg)
    c.metricsConsumer = nextConsumer
    return c, nil
}

跨信号管道配置

# Traces → spanmetrics Connector → Metrics
receivers:
  otlp:
    protocols:
      grpc:

connectors:
  spanmetrics:
    histogram:
      explicit:
        boundaries: [10ms, 50ms, 100ms, 500ms, 1s]
    dimensions:
      - name: http.method
      - name: http.status_code

exporters:
  prometheus:
    endpoint: "0.0.0.0:8889"

service:
  pipelines:
    traces:
      receivers: [otlp]
      exporters: [spanmetrics]       # Connector 作为 Exporter
    metrics:
      receivers: [spanmetrics]        # 同一 Connector 作为 Receiver
      exporters: [prometheus]
数据流:
OTLP Traces ──▶ spanmetrics Connector ──▶ Prometheus Metrics
                     │
                     ├── 计算 Span 延迟直方图
                     ├── 计算 Span 计数
                     ├── 按 dimensions 分组
                     └── 生成 RED Metrics (Rate, Errors, Duration)

Connector 类型矩阵

Connector输入 → 输出功能
spanmetricsTraces → Metrics生成 RED 指标 (延迟/计数/错误率)
servicegraphTraces → Metrics构建服务依赖拓扑图
routingAny → AnyOTTL 条件路由到不同管道
failoverAny → Any主备故障转移
roundrobinAny → Any轮询负载均衡
countAny → Metrics计数信号量并输出为指标
第七部分

共享库与工具

OTTL 转换语言与 Internal 共享库

21

OTTL 转换语言

OpenTelemetry Transformation Language

OTTL 是 OTel Collector 内置的 DSL(领域特定语言),提供类型安全的表达式求值引擎。它被 filterprocessor、transformprocessor、routingconnector 等多个组件共用,是整个 Contrib 项目中最重要的共享库之一。

OTTL 表达式示例

# 过滤条件
attributes["http.status_code"] >= 500
name == "health-check"
resource.attributes["service.name"] == "payment"
severity_number < SEVERITY_NUMBER_WARN

# 转换操作
set(attributes["env"], "production")
replace_match(attributes["url"], "/api/v1/*", "/api/latest/*")
truncate_all(attributes, 256)
delete_key(attributes, "password")

# 复杂表达式
IsMatch(attributes["http.target"], "/api/.*") and
    attributes["http.method"] == "POST"

OTTL 架构

pkg/ottl/
├── parser.go           # 表达式解析器
├── expression.go       # 表达式求值
├── boolean_value.go    # 布尔表达式
├── compare.go          # 比较运算
├── functions.go        # 函数注册表
├── grammar.go          # 语法定义
│
├── ottlfuncs/          # 50+ 内置函数
│   ├── func_set.go
│   ├── func_delete_key.go
│   ├── func_replace_match.go
│   ├── func_concat.go
│   ├── func_is_match.go
│   ├── func_sha256.go
│   └── ...
│
└── contexts/           # 信号上下文
    ├── ottlspan/       # Span 上下文 (访问 span.attributes, span.name 等)
    ├── ottlspanevent/  # SpanEvent 上下文
    ├── ottlmetric/     # Metric 上下文
    ├── ottldatapoint/  # DataPoint 上下文
    ├── ottllog/        # Log 上下文
    └── ottlresource/   # Resource 上下文

OTTL 类型系统

类型说明示例
int64位整数attributes["code"] == 200
float64位浮点attributes["latency"] > 1.5
string字符串name == "GET /api"
bool布尔值attributes["sampled"] == true
bytes字节数组span_id, trace_id
map键值映射attributes
slice数组attributes["tags"]
enum枚举常量SEVERITY_NUMBER_WARN

BoolExpr 通用过滤接口

// internal/filter/expr/matcher.go — 跨组件共用
type BoolExpr[K any] interface {
    Eval(ctx context.Context, tCtx K) (bool, error)
}

// 组合器
func Or[K any](matchers ...BoolExpr[K]) BoolExpr[K]   // 任一匹配
func And[K any](matchers ...BoolExpr[K]) BoolExpr[K]  // 全部匹配
func Not[K any](matcher BoolExpr[K]) BoolExpr[K]      // 取反

// 使用方 (filterprocessor)
type filterSpanProcessor struct {
    skipSpanExpr expr.BoolExpr[ottlspan.TransformContext]
}
// 使用方 (routingconnector)
type routingConnector struct {
    routeExpr expr.BoolExpr[ottllog.TransformContext]
}
22

Internal 共享库

关键共享库概览

路径功能使用方
attractioninternal/coreinternal/属性操作引擎 (7 种 Action)attributesprocessor, resourceprocessor
filterinternal/filter/通用过滤引擎 (BoolExpr)filterprocessor, routingconnector
kafkainternal/kafka/Kafka 认证/配置工具kafkareceiver, kafkaexporter
k8sconfiginternal/k8sconfig/K8s 客户端配置k8sattributesprocessor, k8sclusterreceiver
awsinternal/aws/AWS SDK 工具/元数据awscloudwatchreceiver, awss3exporter
splunkinternal/splunk/Splunk HEC 协议工具splunkhecreceiver, splunkhecexporter
sharedcomponentinternal/sharedcomponent/组件共享/复用工具需要多管道共享同一实例的组件
otelarrowinternal/otelarrow/OTel Arrow 协议支持otelarrowreceiver, otelarrowexporter

pkg/ vs internal/ 的边界

pkg/ (公共 API — 外部可引用)
├── ottl/               # OTTL 语言引擎
├── stanza/             # 日志处理库
├── translator/         # 协议翻译器 (Prometheus, Datadog 等)
├── pdatautil/          # pdata 工具函数
├── pdatatest/          # 测试辅助
├── golden/             # Golden File 测试
├── sampling/           # 采样算法
└── resourcetotelemetry/# Resource → Telemetry 属性转换

internal/ (内部实现 — 仅 Contrib 内部使用)
├── coreinternal/       # Core 重导出 (attraction, textutils 等)
├── filter/             # 过滤表达式引擎
├── kafka/              # Kafka 认证/连接工具
├── aws/                # AWS 特定工具
├── k8sconfig/          # K8s 客户端配置
└── docker/             # Docker API 工具

规则pkg/ 是稳定的公共 API,版本语义化管理;internal/ 可随时修改,不对外暴露。

第八部分

设计精华

核心设计模式总结与完整数据流转示例

23

核心设计模式总结

十大设计模式

模式机制应用场景
Factory 工厂NewFactory() 统一注册所有 206 个组件
Consumer Chain 链nextConsumer 参数传递管道数据流转
Helper 封装processorhelper / exporterhelper通用能力注入 (重试/队列/超时)
BoolExpr 泛型BoolExpr[K] 接口跨组件过滤/路由表达式
Encoding ExtensionExtension 提供编解码Receiver/Exporter 可插拔编码
Context Cancelcontext.WithCancelGoroutine 生命周期管理
WaitGroup 协调sync.WaitGroup确保优雅关闭
metadata 代码生成mdatagen + metadata.yaml类型常量/自定义指标自动生成
mapstructure 配置YAML → Go struct统一配置反序列化
RemoveIf 零拷贝pdata 原地删除Filter/Sampling 高效过滤

组件开发四步法

Step 1: 定义元数据
────────────────
创建 metadata.yaml:
  type, stability, distributions, codeowners

Step 2: 定义配置
────────────────
创建 config.go:
  type Config struct { ... }
  func (c *Config) Validate() error { ... }

Step 3: 实现工厂
────────────────
创建 factory.go:
  func NewFactory() xxx.Factory {
      return xxx.NewFactory(metadata.Type, createDefaultConfig, ...)
  }

Step 4: 实现核心逻辑
────────────────
创建 receiver.go / processor.go / exporter.go:
  Start(ctx, host) error     — 初始化
  processData(ctx, data)     — 处理 (Processor)
  pushData(ctx, data) error  — 发送 (Exporter)
  Shutdown(ctx) error        — 清理
24

完整数据流转示例

场景:Kafka Traces → 过滤 → ES + Prometheus Metrics

# 配置
receivers:
  kafka:
    brokers: ["kafka:9092"]
    topic: otel-traces
    encoding: otlp_proto

processors:
  attributes:
    actions:
      - { key: env, value: prod, action: upsert }
  filter:
    traces:
      span_conditions:
        - 'name == "health-check"'

connectors:
  spanmetrics:
    dimensions: [{ name: http.method }]

exporters:
  elasticsearch:
    endpoints: ["https://es:9200"]
  prometheus:
    endpoint: "0.0.0.0:8889"

service:
  pipelines:
    traces:
      receivers: [kafka]
      processors: [attributes, filter]
      exporters: [elasticsearch, spanmetrics]
    metrics/spanmetrics:
      receivers: [spanmetrics]
      exporters: [prometheus]

数据流转全链路

Step 1: Kafka Receiver 消费消息
─────────────────────────────────
kafkaTracesConsumer.consumeLoop():
  message := <-claim.Messages()
  traces := unmarshaler.UnmarshalTraces(message.Value)
  → 产出: ptrace.Traces (10 个 Span)

Step 2: Attributes Processor 注入属性
─────────────────────────────────
attributesProcessor.processTraces(ctx, traces):
  for each span:
    span.Attributes().PutStr("env", "prod")  // upsert
  → 产出: ptrace.Traces (10 个 Span, 每个多了 env=prod)

Step 3: Filter Processor 过滤健康检查
─────────────────────────────────
filterSpanProcessor.processTraces(ctx, traces):
  spans.RemoveIf(span.Name() == "health-check")
  → 产出: ptrace.Traces (8 个 Span, 2 个 health-check 被移除)

Step 4a: Elasticsearch Exporter 批量写入
─────────────────────────────────
elasticsearchExporter.pushTracesData(ctx, traces):
  session := bulkIndexer.StartSession()
  for each span:
    session.Add(index: "traces-2026.04.03", body: spanJSON)
  session.Flush()
  → ES 写入 8 条 Trace 文档

Step 4b: spanmetrics Connector 生成指标 (并行)
─────────────────────────────────
spanmetricsConnector.ConsumeTraces(ctx, traces):
  for each span:
    计算延迟直方图 bucket
    按 http.method 分组计数
  定时器触发:
    metrics := generateMetrics()
    metricsConsumer.ConsumeMetrics(ctx, metrics)

Step 5: Prometheus Exporter 暴露指标
─────────────────────────────────
prometheusExporter.ConsumeMetrics(ctx, metrics):
  collector.processMetrics(metrics)  // 缓存到 Registry

Prometheus Server 抓取 GET /metrics:
  → traces_spanmetrics_latency_bucket{http_method="GET",le="100"} 42
  → traces_spanmetrics_calls_total{http_method="GET"} 150
25

总结与启示

OTel Collector Contrib 的核心设计哲学

通读整个源码,OTel Collector Contrib 的设计哲学可以概括为三个原则:

  1. 一切皆插件 — 206 个组件通过统一的 Factory 模式注册,OCB 按需组装,用户可构建最小化发行版
  2. 信号统一 — Traces、Metrics、Logs 共享相同的管道模型和 Consumer 接口,Connector 实现跨信号桥接
  3. 关注点分离 — 组件只需实现核心逻辑,重试/队列/超时/观测性由 Helper 自动注入

关键源码速查表

功能核心文件路径
Receiver 消息驱动kafka_receiver.goreceiver/kafkareceiver/
Receiver 抓取模式metrics_receiver.goreceiver/prometheusreceiver/
Receiver Scraperfactory.goreceiver/hostmetricsreceiver/
Processor 无状态attributes_trace.goprocessor/attributesprocessor/
Processor 过滤traces.goprocessor/filterprocessor/
Processor 有状态processor.goprocessor/tailsamplingprocessor/
Exporter 消息队列kafka_exporter.goexporter/kafkaexporter/
Exporter HTTP 拉取prometheus.goexporter/prometheusexporter/
Exporter 批量索引exporter.goexporter/elasticsearchexporter/
Connector 跨信号connector.goconnector/spanmetricsconnector/
OTTL 语言parser.gopkg/ottl/
属性操作引擎attraction.gointernal/coreinternal/attraction/
过滤表达式matcher.gointernal/filter/expr/
构建配置builder-config.yamlcmd/otelcontribcol/

可借鉴的架构思想

🎯

Factory + Builder

统一 Factory 注册 + OCB 按需构建,适用于任何需要可插拔组件的大型系统

🔗

Consumer Chain

链式 Consumer 接口串联管道,每个节点只需关注处理逻辑,框架负责串联和错误传播

Helper 能力注入

通用能力(重试/队列/超时/观测性)通过 Helper 包装注入,避免每个组件重复实现

📝

DSL 表达式引擎

OTTL 统一表达式语言被多个组件复用,避免每个组件自建过滤/转换语法

基于 OpenTelemetry Collector Contrib 源码分析 · 仅供研究学习
源码版权归 OpenTelemetry / CNCF 所有