OpenTelemetry Collector Contrib
从组件工厂到可观测性管道,逐层拆解云原生可观测数据处理引擎的设计精髓
什么是 OTel Collector Contrib
云原生可观测数据处理引擎
OpenTelemetry Collector Contrib 是 OpenTelemetry 项目的社区贡献组件仓库,基于 OTel Collector Core 框架构建。它提供了 200+ 个开箱即用的组件,涵盖主流云平台、数据库、消息队列和可观测性后端,是当今最全面的可观测性数据采集、处理和导出平台。
核心能力一览
插件化架构
200+ 组件通过 Factory 模式注册,OCB (Collector Builder) 按需组装自定义发行版
三信号统一
Traces、Metrics、Logs 三大信号统一采集、处理和导出,一套管道搞定全链路可观测
管道式处理
Receiver → Processor → Exporter 链式管道,数据流经 Consumer 接口逐级传递
云原生全覆盖
AWS / Azure / GCP / K8s / Prometheus / Kafka / Elasticsearch 等 99 个 Receiver、45 个 Exporter
组件数量统计 (v0.117.0)
| 类型 | 数量 | 说明 |
|---|---|---|
| Receiver | 99 | 数据接入:Prometheus、Kafka、K8s、云平台、数据库、日志文件等 |
| Processor | 26 | 数据处理:属性修改、过滤、采样、路由、K8s 元数据注入等 |
| Exporter | 45 | 数据导出:Kafka、ES、Prometheus、Datadog、Splunk、Loki 等 |
| Extension | 24 | 辅助服务:健康检查、认证、存储、编码、OPAMP 远程管理 |
| Connector | 12 | 跨信号桥接:Traces→Metrics、路由分发、负载均衡、故障转移 |
| 合计 | 206 | Go 源文件 5,700+,metadata.yaml 300+ |
整体架构
架构全景图
┌──────────────────────────────┐
│ YAML Configuration │
│ receivers / processors / │
│ exporters / extensions / │
│ connectors / service │
└──────────────┬───────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────┐
│ OTel Collector Core Framework │
│ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Component │ │ Config │ │ Pipeline │ │ Extension │ │
│ │ Registry │ │ Resolver │ │ Builder │ │ Manager │ │
│ │ │ │ │ │ │ │ │ │
│ │ Factory │ │ YAML Parse │ │ Wire │ │ HealthChk │ │
│ │ Discovery │ │ Validate │ │ Consumer │ │ Auth │ │
│ │ Metadata │ │ Env Expand │ │ Chain │ │ OPAMP │ │
│ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │
└──────────────────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌──────────────────────────────────────────────────────────────────────┐
│ Signal Pipelines │
│ │
│ ┌─────────┐ ┌─────────────┐ ┌──────────┐ │
│ │Receiver │────▶│ Processor │────▶│ Exporter │ │
│ │ │ │ Chain │ │ │ │
│ │ kafka │ │ attributes │ │ kafka │ │
│ │ prom │ │ filter │ │ ES │ │
│ │ otlp │ │ sampling │ │ prom │ │
│ │ k8s │ │ transform │ │ datadog │ │
│ └─────────┘ └──────┬──────┘ └──────────┘ │
│ │ │
│ ┌────▼─────┐ │
│ │Connector │ Traces ──▶ Metrics │
│ │ (cross- │ spanmetrics / servicegraph │
│ │ signal) │ routing / failover │
│ └──────────┘ │
└──────────────────────────────────────────────────────────────────────┘
配置驱动的管道拼装
Collector 通过 YAML 配置文件声明管道结构,将组件按信号类型串联:
# collector-config.yaml
receivers:
kafka:
brokers: ["kafka:9092"]
topic: otel-traces
prometheus:
config:
scrape_configs:
- job_name: 'app'
static_configs:
- targets: ['app:8080']
processors:
attributes:
actions:
- key: env
value: production
action: upsert
filter:
traces:
span_conditions:
- 'attributes["http.status_code"] >= 500'
exporters:
elasticsearch:
endpoints: ["https://es:9200"]
prometheus:
endpoint: "0.0.0.0:8889"
service:
pipelines:
traces:
receivers: [kafka]
processors: [attributes, filter]
exporters: [elasticsearch]
metrics:
receivers: [prometheus]
processors: [attributes]
exporters: [prometheus]
项目结构与组件生态
顶层目录结构
opentelemetry-collector-contrib/
├── receiver/ # 99 个 Receiver 组件
│ ├── kafkareceiver/ # 消息队列接入
│ ├── prometheusreceiver/ # Prometheus 抓取
│ ├── hostmetricsreceiver/ # 主机指标采集
│ ├── k8sclusterreceiver/ # K8s 集群状态
│ ├── filelogreceiver/ # 文件日志采集
│ └── ... # 还有 94 个
│
├── processor/ # 26 个 Processor 组件
│ ├── attributesprocessor/ # 属性增删改
│ ├── filterprocessor/ # 条件过滤
│ ├── tailsamplingprocessor/ # 尾部采样
│ ├── k8sattributesprocessor/# K8s 元数据注入
│ └── ...
│
├── exporter/ # 45 个 Exporter 组件
│ ├── kafkaexporter/ # 发布到 Kafka
│ ├── elasticsearchexporter/ # 写入 ES
│ ├── prometheusexporter/ # 暴露 Prometheus 端点
│ ├── datadogexporter/ # 发送到 Datadog
│ └── ...
│
├── extension/ # 24 个 Extension 组件
│ ├── healthcheckextension/ # 健康检查 HTTP 端点
│ ├── basicauthextension/ # Basic Auth 认证
│ ├── opaborextension/ # OpAMP 远程管理
│ └── ...
│
├── connector/ # 12 个 Connector 组件
│ ├── spanmetricsconnector/ # Traces → RED Metrics
│ ├── servicegraphconnector/ # Traces → 服务拓扑
│ ├── routingconnector/ # OTTL 条件路由
│ └── ...
│
├── cmd/ # 入口与工具
│ ├── otelcontribcol/ # 官方 Contrib 发行版
│ │ └── builder-config.yaml# OCB 构建配置
│ ├── telemetrygen/ # 测试数据生成器
│ └── opampsupervisor/ # OpAMP 管理器
│
├── pkg/ # 15 个公共可复用库
│ ├── ottl/ # OTel Transformation Language
│ ├── stanza/ # 日志采集处理库
│ ├── translator/ # 协议翻译器
│ └── ...
│
├── internal/ # 21 个内部共享库
│ ├── filter/ # 跨组件过滤引擎
│ ├── coreinternal/ # Core 重导出工具
│ ├── kafka/ # Kafka 共享工具
│ └── ...
│
├── confmap/provider/ # 配置源提供器
│ ├── s3provider/ # 从 S3 加载配置
│ └── secretsmanagerprovider/# 从 Secrets Manager 加载
│
├── testbed/ # 稳定性与正确性测试
└── examples/ # 7 个实战示例
组件分类 — Receiver 矩阵
| 分类 | 代表组件 | 说明 |
|---|---|---|
| 云平台 | awscloudwatch, azuremonitor, googlecloud | AWS/Azure/GCP 原生监控指标 |
| 数据库 | mysql, postgresql, mongodb, redis, elasticsearch | 数据库性能指标 |
| 消息队列 | kafka, pulsar, solace | 消息消费接入 |
| 基础设施 | hostmetrics, k8scluster, docker, kubeletstats | 主机/容器/K8s 指标 |
| 协议 | otlp, jaeger, zipkin, prometheus, opencensus | 标准可观测协议 |
| 日志 | filelog, syslog, tcplog, udplog | 日志采集 |
| 应用 | apache, nginx, iis, haproxy, rabbitmq | 中间件指标 |
构建体系:OCB (Collector Builder)
Collector 使用 builder-config.yaml 声明组件列表,OCB 自动生成 import 代码并编译:
# cmd/otelcontribcol/builder-config.yaml
dist:
module: go.opentelemetry.io/collector/cmd/otelcontribcol
name: otelcontribcol
version: 0.117.0-dev
receivers:
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.117.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver v0.117.0
# ... 97 more receivers
processors:
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.117.0
# ... 25 more processors
exporters:
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.117.0
# ... 44 more exporters
用户可以创建自己的 builder-config.yaml 只包含所需组件,构建最小化 Collector 二进制。
核心机制
Factory 工厂模式、配置系统与 Pipeline 管道模型
Factory 工厂模式
一切皆 Factory
OTel Collector 中所有组件(Receiver、Processor、Exporter、Extension、Connector)都遵循同一个设计原则:通过 NewFactory() 函数注册自身。Factory 封装了组件类型、默认配置和各信号类型的创建方法。
Factory 三要素
// 所有组件共享的 Factory 模式
func NewFactory() receiver.Factory {
return receiver.NewFactory(
metadata.Type, // 1. 组件类型标识 (如 "kafka")
createDefaultConfig, // 2. 默认配置工厂
// 3. 各信号类型的创建方法
receiver.WithTraces(createTracesReceiver, metadata.TracesStability),
receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability),
receiver.WithLogs(createLogsReceiver, metadata.LogsStability),
)
}
// 默认配置工厂
func createDefaultConfig() component.Config {
return &Config{
Brokers: []string{"localhost:9092"},
Encoding: "otlp_proto",
GroupID: "otel-collector",
}
}
// Traces 信号创建方法
func createTracesReceiver(
ctx context.Context,
set receiver.Settings, // Logger, ID, Telemetry
cfg component.Config, // 用户配置
nextConsumer consumer.Traces, // 下游消费者
) (receiver.Traces, error) {
// 创建并返回 Receiver 实例
}
五大组件的 Factory 签名对比
| 组件类型 | Factory 函数 | 信号方法 |
|---|---|---|
| Receiver | receiver.NewFactory() | WithTraces / WithMetrics / WithLogs |
| Processor | processor.NewFactory() | WithTraces / WithMetrics / WithLogs |
| Exporter | exporter.NewFactory() | WithTraces / WithMetrics / WithLogs |
| Extension | extension.NewFactory() | 无信号方法,单一 createExtension |
| Connector | connector.NewFactory() | WithTracesToMetrics / WithLogsToLogs 等跨信号 |
metadata.yaml 与代码生成
每个组件通过 metadata.yaml 声明元数据,mdatagen 工具自动生成 internal/metadata/ 代码:
# receiver/kafkareceiver/metadata.yaml
type: kafka
status:
class: receiver
stability:
beta: [traces, metrics, logs]
distributions: [contrib]
codeowners:
active: [pavolloffay, MovieStoreGuy]
// 自动生成: internal/metadata/generated_status.go
const (
Type = component.MustNewType("kafka")
ScopeName = "github.com/open-telemetry/.../kafkareceiver"
TracesStability = component.StabilityLevelBeta
MetricsStability = component.StabilityLevelBeta
LogsStability = component.StabilityLevelBeta
)
配置系统
Config 结构体约定
每个组件必须提供一个 Config 结构体,实现 component.Config 接口:
// 典型 Config 结构体 (kafkareceiver/config.go)
type Config struct {
Brokers []string `mapstructure:"brokers"`
ProtocolVersion string `mapstructure:"protocol_version"`
SessionTimeout time.Duration `mapstructure:"session_timeout"`
Topic string `mapstructure:"topic"`
Encoding string `mapstructure:"encoding"`
GroupID string `mapstructure:"group_id"`
Authentication kafka.Authentication `mapstructure:"auth"`
AutoCommit AutoCommit `mapstructure:"autocommit"`
}
// 编译时接口检查
var _ component.Config = (*Config)(nil)
// 语义校验
func (cfg *Config) Validate() error {
if len(cfg.Brokers) == 0 {
return errors.New("brokers must not be empty")
}
return nil
}
配置加载流程
YAML 文件
│
├── confmap.Resolver 解析
│ ├── 环境变量展开 ${ENV_VAR}
│ ├── S3 / Secrets Manager / HTTP 远程加载
│ └── AES 加密字段解密
│
├── mapstructure 反序列化
│ └── YAML field → Go struct field (via mapstructure tags)
│
├── Config.Validate() 语义校验
│ ├── 必填字段检查
│ ├── 取值范围校验
│ └── 逻辑关系校验
│
└── Factory.CreateReceiver/Processor/Exporter(cfg)
└── 配置传入组件实例
Exporter 常用嵌入配置
Exporter 通常嵌入框架提供的通用配置:
type Config struct {
// 嵌入通用配置 (mapstructure:",squash" 扁平化)
exporterhelper.TimeoutConfig `mapstructure:",squash"` // 超时控制
exporterhelper.QueueConfig `mapstructure:",squash"` // 发送队列
configretry.BackOffConfig `mapstructure:",squash"` // 重试退避
// 组件特有配置
Endpoint string `mapstructure:"endpoint"`
Index string `mapstructure:"index"`
}
Pipeline 管道模型
管道拓扑结构
┌──────────────────────────────────────────────────────────────────────┐
│ service.pipelines │
│ │
│ traces/default: │
│ ┌─────────┐ ┌────────────┐ ┌────────────┐ ┌──────────┐ │
│ │ kafka │──▶│ attributes │──▶│ filter │──▶│ ES │ │
│ │ receiver │ │ processor │ │ processor │ │ exporter │ │
│ └─────────┘ └────────────┘ └────────────┘ └──────────┘ │
│ │
│ metrics/default: │
│ ┌─────────┐ ┌────────────┐ ┌──────────┐ ┌──────────┐ │
│ │ prom │──▶│ k8sattrs │──▶│ prom │ + │ datadog │ │
│ │ receiver │ │ processor │ │ exporter │ │ exporter │ │
│ └─────────┘ └────────────┘ └──────────┘ └──────────┘ │
│ │
│ traces/spanmetrics (Connector 桥接): │
│ ┌─────────┐ ┌────────────────┐ ┌──────────┐ │
│ │ traces │──▶│ spanmetrics │──▶│ metrics │ │
│ │ pipeline │ │ connector │ │ pipeline │ │
│ └─────────┘ │ Traces→Metrics │ └──────────┘ │
│ └────────────────┘ │
└──────────────────────────────────────────────────────────────────────┘
Fan-out 模型
一个管道可以配置多个 Exporter,数据会被 fan-out 到每个 Exporter:
service:
pipelines:
traces:
receivers: [otlp]
processors: [attributes]
exporters: [elasticsearch, kafka, datadog] # 同时发往 3 个目标
同样,一个管道也可以有多个 Receiver,数据会被 fan-in 合并:
metrics:
receivers: [prometheus, hostmetrics, k8scluster] # 3 个源汇聚
processors: [resourcedetection]
exporters: [prometheus]
Consumer 链式调用
Consumer 接口 — 管道的粘合剂
整个管道的数据流通过三个 Consumer 接口驱动,每个组件要么生产数据(Receiver),要么消费并转发(Processor),要么消费并导出(Exporter):
三信号 Consumer 接口
// consumer 包定义了三个信号的消费接口
type Traces interface {
ConsumeTraces(ctx context.Context, td ptrace.Traces) error
}
type Metrics interface {
ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error
}
type Logs interface {
ConsumeLogs(ctx context.Context, ld plog.Logs) error
}
链式调用流转
Receiver.Start()
│
│ (数据到达)
│
▼
nextConsumer.ConsumeTraces(ctx, traces) ← Processor1
│
│ (处理后转发)
│
▼
nextConsumer.ConsumeTraces(ctx, traces) ← Processor2
│
│ (处理后转发)
│
▼
exporter.ConsumeTraces(ctx, traces) ← Exporter (终点)
│
└── 发送到外部系统 (Kafka / ES / Prometheus / ...)
每个组件在创建时接收 nextConsumer 参数,形成链式调用。框架负责串联,组件只需关注自身逻辑。
错误处理分类
// 永久性错误 — 不可重试,丢弃数据
consumererror.NewPermanent(err)
// 瞬时性错误 — 可重试(配合 exporterhelper 的 Retry/Queue)
// 直接返回普通 error 即可
// 跳过处理 — 当前批次不处理但不报错
processorhelper.ErrSkipProcessingData
Receiver 源码剖析
数据入口:从 Kafka 消费、Prometheus 抓取到文件采集
Receiver 设计模式
Receiver 核心契约
// Receiver 必须实现的接口
type Receiver interface {
component.Component // Start(ctx, host) + Shutdown(ctx)
}
// 具体信号类型
type Traces interface {
Receiver
// 不需要额外方法 — Receiver 主动推送数据给 nextConsumer
}
// Receiver 的职责:
// 1. Start() — 启动数据采集 (开启 goroutine、监听端口、连接消息队列等)
// 2. 采集到数据 → 调用 nextConsumer.ConsumeTraces/Metrics/Logs()
// 3. Shutdown() — 优雅关闭 (关闭连接、等待 goroutine 退出)
Receiver 文件组织约定
receiver/kafkareceiver/
├── metadata.yaml # 组件元数据 (type, stability, codeowners)
├── factory.go # NewFactory() + createDefaultConfig + create*Receiver
├── config.go # Config 结构体 + Validate()
├── kafka_receiver.go # 核心实现 (Start / Shutdown / 消费循环)
├── internal/metadata/ # mdatagen 生成的代码
│ ├── generated_status.go # Type, ScopeName, Stability 常量
│ └── generated_telemetry.go # 自定义指标 (TelemetryBuilder)
├── go.mod # 独立 Go Module
└── *_test.go # 测试文件
Receiver 三大模式
| 模式 | 特点 | 代表组件 |
|---|---|---|
| 消息驱动型 | 消费消息队列,事件驱动 | kafkareceiver, pulsarreceiver |
| 拉取抓取型 | 定时抓取目标端点 | prometheusreceiver, hostmetricsreceiver |
| 服务监听型 | 启动 HTTP/gRPC 服务接收推送 | otlpreceiver, jaegerreceiver |
Kafka Receiver 实战
核心结构体
// kafka_receiver.go
type kafkaTracesConsumer struct {
config Config
consumerGroup sarama.ConsumerGroup // Kafka Consumer Group
nextConsumer consumer.Traces // 下游 Pipeline Consumer
topics []string
cancelConsumeLoop context.CancelFunc // 取消信号
unmarshaler TracesUnmarshaler // 数据反序列化
consumeLoopWG *sync.WaitGroup // 等待 goroutine 退出
settings receiver.Settings
telemetryBuilder *metadata.TelemetryBuilder // 自定义指标
autocommitEnabled bool
messageMarking MessageMarking
}
// 编译时接口检查
var _ receiver.Traces = (*kafkaTracesConsumer)(nil)
Start — 启动消费循环
func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) error {
// 1. 创建取消上下文
ctx, cancel := context.WithCancel(context.Background())
c.cancelConsumeLoop = cancel
// 2. 创建观测性报告器
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: c.settings.ID,
Transport: transport,
})
// 3. 加载编码扩展 (Extension 优先于内置编码)
if unmarshaler, err := loadEncodingExtension[ptrace.Unmarshaler](
host, c.config.Encoding); err == nil {
c.unmarshaler = &tracesEncodingUnmarshaler{unmarshaler: *unmarshaler}
}
// 4. 创建 Kafka Consumer Group
c.consumerGroup, err = createKafkaClient(ctx, c.config)
// 5. 启动消费循环 goroutine
handler := &tracesConsumerGroupHandler{
unmarshaler: c.unmarshaler,
nextConsumer: c.nextConsumer,
ready: make(chan bool),
obsrecv: obsrecv,
}
c.consumeLoopWG.Add(1)
go c.consumeLoop(ctx, handler)
// 6. 等待 Consumer Group 就绪
<-handler.ready
return nil
}
消费循环 — 消息处理核心
// ConsumeClaim — Kafka Consumer Group Handler 的核心方法
func (c *tracesConsumerGroupHandler) ConsumeClaim(
session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
) error {
for {
select {
case message, ok := <-claim.Messages():
if !ok { return nil }
// 1. 开始观测性 Span
ctx := c.obsrecv.StartTracesOp(session.Context())
// 2. 反序列化消息
traces, err := c.unmarshaler.UnmarshalTraces(message.Value)
if err != nil {
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), 0, err)
return err
}
// 3. 提取 Header (如果配置)
c.headerExtractor.extractHeadersTraces(traces, message)
// 4. *** 关键: 发送到下游 Pipeline ***
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(),
traces.SpanCount(), err)
// 5. 标记消费位移
if c.messageMarking.After {
session.MarkMessage(message, "")
}
case <-session.Context().Done():
return nil
}
}
}
Shutdown — 优雅关闭
func (c *kafkaTracesConsumer) Shutdown(context.Context) error {
if c.cancelConsumeLoop == nil { return nil }
c.cancelConsumeLoop() // 1. 发送取消信号
c.consumeLoopWG.Wait() // 2. 等待 goroutine 退出
if c.consumerGroup == nil { return nil }
return c.consumerGroup.Close() // 3. 关闭 Kafka 连接
}
Prometheus Receiver
架构:复用 Prometheus 原生代码
Prometheus Receiver 直接嵌入了 Prometheus 的 DiscoveryManager 和 ScrapeManager,复用其成熟的服务发现和抓取逻辑:
prometheusreceiver
│
├── DiscoveryManager ← Prometheus 原生服务发现
│ ├── static_configs
│ ├── kubernetes_sd
│ ├── consul_sd
│ └── ...
│
├── ScrapeManager ← Prometheus 原生抓取引擎
│ └── 定时抓取目标 /metrics 端点
│
├── Appendable (适配器)
│ └── 将 Prometheus 时序数据 → OTLP Metrics (pmetric.Metrics)
│
└── nextConsumer.ConsumeMetrics(ctx, metrics)
└── 发送到下游 Pipeline
Start — 启动抓取引擎
func (r *pReceiver) Start(ctx context.Context, host component.Host) error {
discoveryCtx, cancel := context.WithCancel(context.Background())
r.cancelFunc = cancel
// 1. 初始化 Prometheus Discovery Manager
r.discoveryManager = discovery.NewManager(discoveryCtx, logger, r.registerer, sdMetrics)
go func() {
r.discoveryManager.Run() // goroutine: 持续发现目标
}()
// 2. 创建 Appendable 适配器 (Prometheus → OTLP)
store, _ := internal.NewAppendable(r.consumer, r.settings, ...)
// 3. 初始化 Prometheus Scrape Manager
r.scrapeManager, _ = scrape.NewManager(opts, logger, store, r.registerer)
go func() {
<-r.configLoaded // 等待配置加载完成
r.scrapeManager.Run(r.discoveryManager.SyncCh()) // goroutine: 持续抓取
}()
return nil
}
func (r *pReceiver) Shutdown(context.Context) error {
r.cancelFunc() // 停止服务发现
r.scrapeManager.Stop() // 停止抓取
r.targetAllocatorManager.Shutdown() // 停止 Target Allocator
return nil
}
Scraper Helper 模式
对于定时采集型 Receiver(如 hostmetricsreceiver),框架提供 ScraperControllerReceiver 简化开发:
// hostmetricsreceiver/factory.go — 使用 Scraper Helper
func createMetricsReceiver(ctx context.Context, set receiver.Settings,
cfg component.Config, consumer consumer.Metrics,
) (receiver.Metrics, error) {
scraperOptions := createAddScraperOptions(ctx, set, oCfg, scraperFactories)
// 框架自动管理: 定时调度 + 生命周期 + 错误处理
return scraperhelper.NewScraperControllerReceiver(
&oCfg.ControllerConfig, // 采集间隔等配置
set,
consumer, // 下游 Consumer
scraperOptions..., // 各子采集器 (CPU, Memory, Disk...)
)
}
Processor 源码剖析
数据中间处理:属性修改、条件过滤与采样决策
Processor 设计模式
Processor 核心职责
Processor 接收上游数据,原地修改或过滤后传递给下游。它既是 Consumer(消费上游数据),又持有 nextConsumer(转发给下游)。
processorhelper 封装
// Processor 通常通过 processorhelper 创建
func createTracesProcessor(ctx context.Context, set processor.Settings,
cfg component.Config, nextConsumer consumer.Traces,
) (processor.Traces, error) {
proc := newMyProcessor(cfg)
return processorhelper.NewTraces(
ctx,
set,
cfg,
nextConsumer, // 自动串联下游
proc.processTraces, // 核心处理函数
processorhelper.WithCapabilities(consumer.Capabilities{
MutatesData: true, // 声明是否修改数据
}),
)
}
// 核心处理函数签名
func (p *myProcessor) processTraces(
ctx context.Context,
td ptrace.Traces, // 输入
) (ptrace.Traces, error) { // 输出 (可以是修改后的同一对象)
// ... 处理逻辑 ...
return td, nil
}
Processor 分类
| 类型 | 特点 | 代表组件 |
|---|---|---|
| 无状态/同步 | 每条数据独立处理,无内存状态 | attributes, filter, transform |
| 有状态/异步 | 维护内存状态,批量/延迟决策 | tailsampling, groupbytrace, interval |
| 富化增强型 | 外部数据源注入元数据 | k8sattributes, resourcedetection, geoip |
Attributes Processor
功能:属性增删改查
最常用的 Processor 之一,支持 7 种属性操作:
# 配置示例
processors:
attributes:
actions:
- key: env
value: production
action: upsert # 存在则更新,不存在则插入
- key: password
action: delete # 删除敏感属性
- key: user.id
action: hash # SHA-256 哈希脱敏
- key: http.url
pattern: "^(?P<host>[^/]+)"
action: extract # 正则提取子属性
核心处理逻辑
// attributes_trace.go — 三层遍历 + 属性处理
func (a *spanAttributesProcessor) processTraces(
ctx context.Context, td ptrace.Traces,
) (ptrace.Traces, error) {
rss := td.ResourceSpans()
for i := 0; i < rss.Len(); i++ { // 遍历 ResourceSpans
rs := rss.At(i)
ilss := rs.ScopeSpans()
for j := 0; j < ilss.Len(); j++ { // 遍历 ScopeSpans
spans := ilss.At(j).Spans()
for k := 0; k < spans.Len(); k++ { // 遍历每个 Span
span := spans.At(k)
// OTTL 条件跳过
if a.skipExpr != nil {
skip, _ := a.skipExpr.Eval(ctx,
ottlspan.NewTransformContext(span, ...))
if skip { continue }
}
// 执行属性操作 (insert/update/upsert/delete/hash/extract/convert)
a.attrProc.Process(ctx, a.logger, span.Attributes())
}
}
}
return td, nil
}
AttrProc — 7 种操作
// internal/coreinternal/attraction/attraction.go
const (
INSERT Action = "insert" // 仅在不存在时插入
UPDATE Action = "update" // 仅在存在时更新
UPSERT Action = "upsert" // 存在更新,不存在插入
DELETE Action = "delete" // 删除属性
HASH Action = "hash" // SHA-256 哈希值替换
EXTRACT Action = "extract" // 正则提取子属性
CONVERT Action = "convert" // 类型转换 (string→int 等)
)
Filter Processor
双模式过滤
Filter Processor 支持两种过滤模式:传统 Matcher 和 OTTL 表达式条件:
# OTTL 条件模式 (推荐)
processors:
filter:
error_mode: ignore # ignore / propagate / silent
traces:
span_conditions:
- 'attributes["http.status_code"] >= 500'
- 'name == "health-check"'
logs:
log_record_conditions:
- 'severity_number < SEVERITY_NUMBER_WARN'
metrics:
metric_conditions:
- 'name == "system.cpu.time"'
RemoveIf 模式 — 零拷贝过滤
// filterprocessor/traces.go — 嵌套 RemoveIf 实现高效过滤
func (fsp *filterSpanProcessor) processTraces(
ctx context.Context, td ptrace.Traces,
) (ptrace.Traces, error) {
td.ResourceSpans().RemoveIf(func(rs ptrace.ResourceSpans) bool {
rs.ScopeSpans().RemoveIf(func(ss ptrace.ScopeSpans) bool {
ss.Spans().RemoveIf(func(span ptrace.Span) bool {
// OTTL 表达式求值
skip, _ := fsp.skipSpanExpr.Eval(ctx,
ottlspan.NewTransformContext(span, ss.Scope(),
rs.Resource(), ss, rs))
return skip // true = 移除该 Span
})
// 过滤 SpanEvent
if fsp.skipSpanEventExpr != nil {
span.Events().RemoveIf(func(event ptrace.SpanEvent) bool {
skip, _ := fsp.skipSpanEventExpr.Eval(ctx, ...)
return skip
})
}
return ss.Spans().Len() == 0 // 空则移除 ScopeSpans
})
return rs.ScopeSpans().Len() == 0 // 空则移除 ResourceSpans
})
return td, nil
}
RemoveIf 是 pdata 提供的原地删除方法,通过交换元素到末尾并截断切片实现零拷贝过滤。
Tail Sampling Processor
有状态 Processor 典范
Tail Sampling 是最复杂的 Processor 之一:它需要缓存完整 Trace,等待所有 Span 到达后再做采样决策。这与无状态 Processor 完全不同。
核心架构
type tailSamplingSpanProcessor struct {
ctx context.Context
nextConsumer consumer.Traces
maxNumTraces uint64 // 最大缓存 Trace 数
policies []*policy // 采样策略列表
idToTrace sync.Map // TraceID → Trace 数据 (并发安全)
sampledIDCache cache.Cache[bool] // 已决策 TraceID 缓存
deleteChan chan pcommon.TraceID // 删除通知通道
policyTicker timeutils.TTicker // 定时决策
}
type policy struct {
name string
evaluator sampling.PolicyEvaluator // 策略求值器
}
// 11+ 策略类型
// AlwaysSample, Latency, NumericAttribute, Probabilistic,
// StatusCode, StringAttribute, RateLimiting, Composite,
// And, SpanCount, OTTLCondition
工作流程
Span 到达 → processTraces()
│
├── 按 TraceID 分组
├── 加入 idToTrace 缓存
│
▼
定时器触发 (每 decision_wait 秒)
│
├── 遍历所有缓存的 Trace
├── 依次评估 policies
│ ├── Latency Policy: trace 总延迟 > 阈值?
│ ├── StatusCode Policy: 包含错误 Span?
│ ├── Probabilistic: 概率采样
│ ├── OTTL Condition: 自定义表达式
│ └── Composite: 多策略组合
│
├── 决策: Sampled / NotSampled / InvertSampled
│
├── Sampled → nextConsumer.ConsumeTraces(trace) 发送下游
│
└── 清理缓存 + 写入 sampledIDCache (防重复)
└── 后续同 TraceID 的 Span 直接查缓存决定
Exporter 源码剖析
数据出口:消息发布、HTTP 暴露与批量索引
Exporter 设计模式
exporterhelper — 强大的导出助手
与 Receiver/Processor 不同,Exporter 通过 exporterhelper 获得重试、队列、超时等开箱即用的能力:
// 典型 Exporter 创建 (kafkaexporter/factory.go)
func createTracesExporter(ctx context.Context, set exporter.Settings,
cfg component.Config,
) (exporter.Traces, error) {
exp := newTracesExporter(oCfg, set)
return exporterhelper.NewTraces(
ctx,
set,
cfg,
exp.tracesPusher, // 核心发送函数
// 能力声明
exporterhelper.WithCapabilities(consumer.Capabilities{
MutatesData: false,
}),
// 超时控制
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{
Timeout: 30 * time.Second,
}),
// 重试退避
exporterhelper.WithRetry(oCfg.BackOffConfig),
// 发送队列 (异步缓冲)
exporterhelper.WithQueue(oCfg.QueueSettings),
// 生命周期钩子
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.Close),
)
}
exporterhelper 内置能力
ConsumeTraces(ctx, traces)
│
├── Timeout 控制
│ └── context.WithTimeout(ctx, timeout)
│
├── Queue (异步发送队列)
│ ├── 内存队列 / 持久化队列 (File Storage Extension)
│ ├── num_consumers: 并发消费者数
│ └── queue_size: 队列容量
│
├── Retry (指数退避重试)
│ ├── enabled: true
│ ├── initial_interval: 5s
│ ├── max_interval: 30s
│ ├── max_elapsed_time: 300s
│ └── 仅重试非 Permanent 错误
│
└── tracesPusher(ctx, traces)
└── 实际发送逻辑
Kafka Exporter 实战
核心发送逻辑
// kafkaexporter/kafka_exporter.go
type kafkaTracesProducer struct {
cfg Config
producer sarama.SyncProducer // Kafka 同步生产者
marshaler TracesMarshaler // 序列化器
logger *zap.Logger
}
// tracesPusher — 被 exporterhelper 调用
func (e *kafkaTracesProducer) tracesPusher(
ctx context.Context, td ptrace.Traces,
) error {
// 1. 序列化为 Kafka 消息
messages, err := e.marshaler.Marshal(td,
getTopic(ctx, &e.cfg, td.ResourceSpans()))
if err != nil {
return consumererror.NewPermanent(err) // 序列化失败 → 不可重试
}
// 2. 发送到 Kafka
err = e.producer.SendMessages(messages)
if err != nil {
return err // 发送失败 → 可重试 (exporterhelper 会自动重试)
}
return nil
}
// start — 在 exporterhelper.WithStart() 中调用
func (e *kafkaTracesProducer) start(ctx context.Context, host component.Host) error {
// 优先从 Extension 加载编码器
if marshaler, err := loadEncodingExtension[ptrace.Marshaler](
host, e.cfg.Encoding); err == nil {
e.marshaler = &tracesEncodingMarshaler{marshaler: *marshaler}
}
// 回退到内置编码
if e.marshaler == nil {
e.marshaler, _ = createTracesMarshaler(e.cfg)
}
// 创建 Kafka 生产者
e.producer, _ = newSaramaProducer(ctx, e.cfg)
return nil
}
Prometheus Exporter
拉取模式:启动 HTTP Server
与大多数 push 模式的 Exporter 不同,Prometheus Exporter 启动一个 HTTP 服务,等待 Prometheus Server 来抓取:
// prometheusexporter/prometheus.go
func (pe *prometheusExporter) Start(ctx context.Context, host component.Host) error {
ln, _ := pe.config.ToListener(ctx)
mux := http.NewServeMux()
mux.Handle("/metrics", pe.handler) // 暴露 /metrics 端点
srv, _ := pe.config.ToServer(ctx, host, pe.settings, mux)
pe.shutdownFunc = func(ctx context.Context) error {
return srv.Shutdown(ctx)
}
go func() {
_ = srv.Serve(ln) // goroutine: 持续监听
}()
return nil
}
// ConsumeMetrics — 接收数据并缓存 (非发送)
func (pe *prometheusExporter) ConsumeMetrics(
_ context.Context, md pmetric.Metrics,
) error {
rmetrics := md.ResourceMetrics()
for i := 0; i < rmetrics.Len(); i++ {
pe.collector.processMetrics(rmetrics.At(i)) // 转换并缓存
}
return nil
// Prometheus Server 抓取 /metrics 时读取缓存
}
Elasticsearch Exporter
批量索引模式
// elasticsearchexporter/exporter.go
func (e *elasticsearchExporter) pushLogsData(
ctx context.Context, ld plog.Logs,
) error {
e.wg.Add(1)
defer e.wg.Done()
// 1. 获取批量索引会话
session, _ := e.bulkIndexer.StartSession(ctx)
defer session.End()
var errs []error
// 2. 三层遍历: ResourceLogs → ScopeLogs → LogRecords
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rl := rls.At(i)
ills := rl.ScopeLogs()
for j := 0; j < ills.Len(); j++ {
logs := ills.At(j).LogRecords()
for k := 0; k < logs.Len(); k++ {
// 3. 逐条写入 Bulk 缓冲
err := e.pushLogRecord(ctx, rl.Resource(),
logs.At(k), ills.At(j).Scope(), session)
if err != nil {
errs = append(errs, err)
}
}
}
}
// 4. Flush 批量请求
if err := session.Flush(ctx); err != nil {
errs = append(errs, err)
}
return multierr.Combine(errs...)
}
Extension & Connector
辅助服务与跨信号桥接
Extension 扩展机制
Extension 的角色
Extension 不参与数据管道,而是提供辅助功能。其他组件通过 component.Host 在 Start 时发现并使用 Extension:
Extension 分类:
认证类:
├── basicauth — Basic Auth 认证
├── bearertoken — Bearer Token 认证
├── oauth2 — OAuth2 Client Credentials
├── oidc — OpenID Connect 认证
└── sigv4 — AWS Signature V4
编码类:
├── otlpencoding — OTLP 编解码
├── jaegerencoding — Jaeger 编解码
├── zipkinencoding — Zipkin 编解码
├── avrologencoding — Avro 日志编解码
└── jsonlogencoding — JSON 日志编解码
运维类:
├── healthcheck — /health HTTP 健康检查端点
├── pprof — Go pprof 性能分析端点
├── remotetap — 远程数据采样
└── opamp — 远程 Agent 管理 (OpAMP 协议)
存储类:
├── filestorage — 文件持久化 (用于 Queue)
└── dbstorage — 数据库持久化
发现类:
├── k8sobserver — K8s Pod 自动发现
├── dockerobserver — Docker 容器发现
└── ecsobserver — ECS Task 发现
Extension Factory 模式
// healthcheckextension/factory.go
func NewFactory() extension.Factory {
return extension.NewFactory(
metadata.Type, // "healthcheck"
createDefaultConfig,
createExtension, // 只有一个创建方法 (无信号区分)
metadata.ExtensionStability,
)
}
func createExtension(_ context.Context, set extension.Settings,
cfg component.Config,
) (extension.Extension, error) {
return newServer(*cfg.(*Config), set.TelemetrySettings), nil
}
Extension 被消费方式
// Receiver/Exporter 在 Start() 中通过 Host 获取 Extension
func (r *myReceiver) Start(ctx context.Context, host component.Host) error {
// 从 Host 查找编码扩展
if marshaler, err := loadEncodingExtension[ptrace.Marshaler](
host, r.config.Encoding,
); err == nil {
r.marshaler = marshaler // 使用 Extension 提供的编码器
}
// 从 Host 查找存储扩展
storageClient, _ := adapter.GetStorageClient(ctx, host, r.storageID, r.id)
return r.input.Start(storageClient)
}
Connector 跨信号桥接
Connector = Exporter + Receiver
Connector 是 OTel Collector 中最独特的组件类型:它同时充当一个管道的 Exporter 和另一个管道的 Receiver,实现跨信号类型的数据转换。
spanmetricsconnector — Traces → Metrics
// spanmetricsconnector/factory.go
func NewFactory() connector.Factory {
return connector.NewFactory(
metadata.Type,
createDefaultConfig,
// 注意: WithTracesToMetrics — 输入 Traces, 输出 Metrics
connector.WithTracesToMetrics(
createTracesToMetricsConnector,
metadata.TracesToMetricsStability),
)
}
func createTracesToMetricsConnector(ctx context.Context,
params connector.Settings, cfg component.Config,
nextConsumer consumer.Metrics, // 下游是 Metrics Consumer
) (connector.Traces, error) { // 返回 Traces 接口 (作为上游的 Exporter)
c, _ := newConnector(params.Logger, cfg)
c.metricsConsumer = nextConsumer
return c, nil
}
跨信号管道配置
# Traces → spanmetrics Connector → Metrics
receivers:
otlp:
protocols:
grpc:
connectors:
spanmetrics:
histogram:
explicit:
boundaries: [10ms, 50ms, 100ms, 500ms, 1s]
dimensions:
- name: http.method
- name: http.status_code
exporters:
prometheus:
endpoint: "0.0.0.0:8889"
service:
pipelines:
traces:
receivers: [otlp]
exporters: [spanmetrics] # Connector 作为 Exporter
metrics:
receivers: [spanmetrics] # 同一 Connector 作为 Receiver
exporters: [prometheus]
数据流:
OTLP Traces ──▶ spanmetrics Connector ──▶ Prometheus Metrics
│
├── 计算 Span 延迟直方图
├── 计算 Span 计数
├── 按 dimensions 分组
└── 生成 RED Metrics (Rate, Errors, Duration)
Connector 类型矩阵
| Connector | 输入 → 输出 | 功能 |
|---|---|---|
| spanmetrics | Traces → Metrics | 生成 RED 指标 (延迟/计数/错误率) |
| servicegraph | Traces → Metrics | 构建服务依赖拓扑图 |
| routing | Any → Any | OTTL 条件路由到不同管道 |
| failover | Any → Any | 主备故障转移 |
| roundrobin | Any → Any | 轮询负载均衡 |
| count | Any → Metrics | 计数信号量并输出为指标 |
共享库与工具
OTTL 转换语言与 Internal 共享库
OTTL 转换语言
OpenTelemetry Transformation Language
OTTL 是 OTel Collector 内置的 DSL(领域特定语言),提供类型安全的表达式求值引擎。它被 filterprocessor、transformprocessor、routingconnector 等多个组件共用,是整个 Contrib 项目中最重要的共享库之一。
OTTL 表达式示例
# 过滤条件
attributes["http.status_code"] >= 500
name == "health-check"
resource.attributes["service.name"] == "payment"
severity_number < SEVERITY_NUMBER_WARN
# 转换操作
set(attributes["env"], "production")
replace_match(attributes["url"], "/api/v1/*", "/api/latest/*")
truncate_all(attributes, 256)
delete_key(attributes, "password")
# 复杂表达式
IsMatch(attributes["http.target"], "/api/.*") and
attributes["http.method"] == "POST"
OTTL 架构
pkg/ottl/
├── parser.go # 表达式解析器
├── expression.go # 表达式求值
├── boolean_value.go # 布尔表达式
├── compare.go # 比较运算
├── functions.go # 函数注册表
├── grammar.go # 语法定义
│
├── ottlfuncs/ # 50+ 内置函数
│ ├── func_set.go
│ ├── func_delete_key.go
│ ├── func_replace_match.go
│ ├── func_concat.go
│ ├── func_is_match.go
│ ├── func_sha256.go
│ └── ...
│
└── contexts/ # 信号上下文
├── ottlspan/ # Span 上下文 (访问 span.attributes, span.name 等)
├── ottlspanevent/ # SpanEvent 上下文
├── ottlmetric/ # Metric 上下文
├── ottldatapoint/ # DataPoint 上下文
├── ottllog/ # Log 上下文
└── ottlresource/ # Resource 上下文
OTTL 类型系统
| 类型 | 说明 | 示例 |
|---|---|---|
| int | 64位整数 | attributes["code"] == 200 |
| float | 64位浮点 | attributes["latency"] > 1.5 |
| string | 字符串 | name == "GET /api" |
| bool | 布尔值 | attributes["sampled"] == true |
| bytes | 字节数组 | span_id, trace_id |
| map | 键值映射 | attributes |
| slice | 数组 | attributes["tags"] |
| enum | 枚举常量 | SEVERITY_NUMBER_WARN |
BoolExpr 通用过滤接口
// internal/filter/expr/matcher.go — 跨组件共用
type BoolExpr[K any] interface {
Eval(ctx context.Context, tCtx K) (bool, error)
}
// 组合器
func Or[K any](matchers ...BoolExpr[K]) BoolExpr[K] // 任一匹配
func And[K any](matchers ...BoolExpr[K]) BoolExpr[K] // 全部匹配
func Not[K any](matcher BoolExpr[K]) BoolExpr[K] // 取反
// 使用方 (filterprocessor)
type filterSpanProcessor struct {
skipSpanExpr expr.BoolExpr[ottlspan.TransformContext]
}
// 使用方 (routingconnector)
type routingConnector struct {
routeExpr expr.BoolExpr[ottllog.TransformContext]
}
Internal 共享库
关键共享库概览
| 库 | 路径 | 功能 | 使用方 |
|---|---|---|---|
| attraction | internal/coreinternal/ | 属性操作引擎 (7 种 Action) | attributesprocessor, resourceprocessor |
| filter | internal/filter/ | 通用过滤引擎 (BoolExpr) | filterprocessor, routingconnector |
| kafka | internal/kafka/ | Kafka 认证/配置工具 | kafkareceiver, kafkaexporter |
| k8sconfig | internal/k8sconfig/ | K8s 客户端配置 | k8sattributesprocessor, k8sclusterreceiver |
| aws | internal/aws/ | AWS SDK 工具/元数据 | awscloudwatchreceiver, awss3exporter |
| splunk | internal/splunk/ | Splunk HEC 协议工具 | splunkhecreceiver, splunkhecexporter |
| sharedcomponent | internal/sharedcomponent/ | 组件共享/复用工具 | 需要多管道共享同一实例的组件 |
| otelarrow | internal/otelarrow/ | OTel Arrow 协议支持 | otelarrowreceiver, otelarrowexporter |
pkg/ vs internal/ 的边界
pkg/ (公共 API — 外部可引用)
├── ottl/ # OTTL 语言引擎
├── stanza/ # 日志处理库
├── translator/ # 协议翻译器 (Prometheus, Datadog 等)
├── pdatautil/ # pdata 工具函数
├── pdatatest/ # 测试辅助
├── golden/ # Golden File 测试
├── sampling/ # 采样算法
└── resourcetotelemetry/# Resource → Telemetry 属性转换
internal/ (内部实现 — 仅 Contrib 内部使用)
├── coreinternal/ # Core 重导出 (attraction, textutils 等)
├── filter/ # 过滤表达式引擎
├── kafka/ # Kafka 认证/连接工具
├── aws/ # AWS 特定工具
├── k8sconfig/ # K8s 客户端配置
└── docker/ # Docker API 工具
规则:pkg/ 是稳定的公共 API,版本语义化管理;internal/ 可随时修改,不对外暴露。
设计精华
核心设计模式总结与完整数据流转示例
核心设计模式总结
十大设计模式
| 模式 | 机制 | 应用场景 |
|---|---|---|
| Factory 工厂 | NewFactory() 统一注册 | 所有 206 个组件 |
| Consumer Chain 链 | nextConsumer 参数传递 | 管道数据流转 |
| Helper 封装 | processorhelper / exporterhelper | 通用能力注入 (重试/队列/超时) |
| BoolExpr 泛型 | BoolExpr[K] 接口 | 跨组件过滤/路由表达式 |
| Encoding Extension | Extension 提供编解码 | Receiver/Exporter 可插拔编码 |
| Context Cancel | context.WithCancel | Goroutine 生命周期管理 |
| WaitGroup 协调 | sync.WaitGroup | 确保优雅关闭 |
| metadata 代码生成 | mdatagen + metadata.yaml | 类型常量/自定义指标自动生成 |
| mapstructure 配置 | YAML → Go struct | 统一配置反序列化 |
| RemoveIf 零拷贝 | pdata 原地删除 | Filter/Sampling 高效过滤 |
组件开发四步法
Step 1: 定义元数据
────────────────
创建 metadata.yaml:
type, stability, distributions, codeowners
Step 2: 定义配置
────────────────
创建 config.go:
type Config struct { ... }
func (c *Config) Validate() error { ... }
Step 3: 实现工厂
────────────────
创建 factory.go:
func NewFactory() xxx.Factory {
return xxx.NewFactory(metadata.Type, createDefaultConfig, ...)
}
Step 4: 实现核心逻辑
────────────────
创建 receiver.go / processor.go / exporter.go:
Start(ctx, host) error — 初始化
processData(ctx, data) — 处理 (Processor)
pushData(ctx, data) error — 发送 (Exporter)
Shutdown(ctx) error — 清理
完整数据流转示例
场景:Kafka Traces → 过滤 → ES + Prometheus Metrics
# 配置
receivers:
kafka:
brokers: ["kafka:9092"]
topic: otel-traces
encoding: otlp_proto
processors:
attributes:
actions:
- { key: env, value: prod, action: upsert }
filter:
traces:
span_conditions:
- 'name == "health-check"'
connectors:
spanmetrics:
dimensions: [{ name: http.method }]
exporters:
elasticsearch:
endpoints: ["https://es:9200"]
prometheus:
endpoint: "0.0.0.0:8889"
service:
pipelines:
traces:
receivers: [kafka]
processors: [attributes, filter]
exporters: [elasticsearch, spanmetrics]
metrics/spanmetrics:
receivers: [spanmetrics]
exporters: [prometheus]
数据流转全链路
Step 1: Kafka Receiver 消费消息
─────────────────────────────────
kafkaTracesConsumer.consumeLoop():
message := <-claim.Messages()
traces := unmarshaler.UnmarshalTraces(message.Value)
→ 产出: ptrace.Traces (10 个 Span)
Step 2: Attributes Processor 注入属性
─────────────────────────────────
attributesProcessor.processTraces(ctx, traces):
for each span:
span.Attributes().PutStr("env", "prod") // upsert
→ 产出: ptrace.Traces (10 个 Span, 每个多了 env=prod)
Step 3: Filter Processor 过滤健康检查
─────────────────────────────────
filterSpanProcessor.processTraces(ctx, traces):
spans.RemoveIf(span.Name() == "health-check")
→ 产出: ptrace.Traces (8 个 Span, 2 个 health-check 被移除)
Step 4a: Elasticsearch Exporter 批量写入
─────────────────────────────────
elasticsearchExporter.pushTracesData(ctx, traces):
session := bulkIndexer.StartSession()
for each span:
session.Add(index: "traces-2026.04.03", body: spanJSON)
session.Flush()
→ ES 写入 8 条 Trace 文档
Step 4b: spanmetrics Connector 生成指标 (并行)
─────────────────────────────────
spanmetricsConnector.ConsumeTraces(ctx, traces):
for each span:
计算延迟直方图 bucket
按 http.method 分组计数
定时器触发:
metrics := generateMetrics()
metricsConsumer.ConsumeMetrics(ctx, metrics)
Step 5: Prometheus Exporter 暴露指标
─────────────────────────────────
prometheusExporter.ConsumeMetrics(ctx, metrics):
collector.processMetrics(metrics) // 缓存到 Registry
Prometheus Server 抓取 GET /metrics:
→ traces_spanmetrics_latency_bucket{http_method="GET",le="100"} 42
→ traces_spanmetrics_calls_total{http_method="GET"} 150
总结与启示
OTel Collector Contrib 的核心设计哲学
通读整个源码,OTel Collector Contrib 的设计哲学可以概括为三个原则:
- 一切皆插件 — 206 个组件通过统一的 Factory 模式注册,OCB 按需组装,用户可构建最小化发行版
- 信号统一 — Traces、Metrics、Logs 共享相同的管道模型和 Consumer 接口,Connector 实现跨信号桥接
- 关注点分离 — 组件只需实现核心逻辑,重试/队列/超时/观测性由 Helper 自动注入
关键源码速查表
| 功能 | 核心文件 | 路径 |
|---|---|---|
| Receiver 消息驱动 | kafka_receiver.go | receiver/kafkareceiver/ |
| Receiver 抓取模式 | metrics_receiver.go | receiver/prometheusreceiver/ |
| Receiver Scraper | factory.go | receiver/hostmetricsreceiver/ |
| Processor 无状态 | attributes_trace.go | processor/attributesprocessor/ |
| Processor 过滤 | traces.go | processor/filterprocessor/ |
| Processor 有状态 | processor.go | processor/tailsamplingprocessor/ |
| Exporter 消息队列 | kafka_exporter.go | exporter/kafkaexporter/ |
| Exporter HTTP 拉取 | prometheus.go | exporter/prometheusexporter/ |
| Exporter 批量索引 | exporter.go | exporter/elasticsearchexporter/ |
| Connector 跨信号 | connector.go | connector/spanmetricsconnector/ |
| OTTL 语言 | parser.go | pkg/ottl/ |
| 属性操作引擎 | attraction.go | internal/coreinternal/attraction/ |
| 过滤表达式 | matcher.go | internal/filter/expr/ |
| 构建配置 | builder-config.yaml | cmd/otelcontribcol/ |
可借鉴的架构思想
Factory + Builder
统一 Factory 注册 + OCB 按需构建,适用于任何需要可插拔组件的大型系统
Consumer Chain
链式 Consumer 接口串联管道,每个节点只需关注处理逻辑,框架负责串联和错误传播
Helper 能力注入
通用能力(重试/队列/超时/观测性)通过 Helper 包装注入,避免每个组件重复实现
DSL 表达式引擎
OTTL 统一表达式语言被多个组件复用,避免每个组件自建过滤/转换语法
基于 OpenTelemetry Collector Contrib 源码分析 · 仅供研究学习
源码版权归 OpenTelemetry / CNCF 所有