源码深度解析

SkyWalking Java Agent

从 premain 入口到分布式追踪的完整链路,逐行拆解 APM 探针的设计精髓

01

什么是 SkyWalking Java Agent

无侵入的 APM 探针

Apache SkyWalking Java Agent 是一个基于 Java Agent 机制 + ByteBuddy 字节码增强的 APM(Application Performance Monitoring)探针。它在 不修改任何业务代码 的前提下,自动采集分布式链路追踪、性能指标和日志关联数据。

核心能力一览

🔍

分布式链路追踪

自动追踪跨服务调用链,支持 HTTP、RPC、MQ、数据库等 100+ 中间件

零侵入字节码增强

基于 ByteBuddy 在类加载时动态修改字节码,业务代码无需任何改动

🔌

插件化架构

所有中间件支持通过插件实现,SPI 机制自动发现,热插拔

🚀

高性能低开销

ThreadLocal 上下文、DataCarrier 异步缓冲、采样策略,对业务影响极小

使用方式

只需在 JVM 启动参数中加入一行:

java -javaagent:/path/to/skywalking-agent.jar \
     -Dskywalking.agent.service_name=my-service \
     -Dskywalking.collector.backend_service=127.0.0.1:11800 \
     -jar my-application.jar

Agent 会在 premain 阶段完成所有初始化,之后对目标类进行字节码增强,自动采集链路数据并上报到 OAP 后端。

02

整体架构

架构全景图

┌─────────────────────────────────────────────────────────────────┐
│                        JVM 启动                                 │
│  java -javaagent:skywalking-agent.jar -jar app.jar              │
└───────────────────────────┬─────────────────────────────────────┘
                            │ premain()
                            ▼
┌─────────────────────────────────────────────────────────────────┐
│                   SkyWalkingAgent.premain()                      │
│  1. SnifferConfigInitializer  ──  加载配置                       │
│  2. PluginBootstrap           ──  发现 & 加载插件               │
│  3. installClassTransformer   ──  安装 ByteBuddy 转换器          │
│  4. ServiceManager.boot()     ──  启动核心服务                   │
└───────────────────────────┬─────────────────────────────────────┘
                            │
        ┌───────────────────┼───────────────────┐
        ▼                   ▼                   ▼
┌──────────────┐  ┌─────────────────┐  ┌──────────────────┐
│  ByteBuddy   │  │  ServiceManager │  │   Plugin System  │
│  Transformer │  │                 │  │                  │
│              │  │  - GRPCChannel  │  │  skywalking-     │
│  类匹配       │  │  - TraceReport  │  │  plugin.def      │
│  字节码增强    │  │  - Sampling     │  │                  │
│  拦截器注入    │  │  - JVM Metrics  │  │  100+ 插件        │
└──────┬───────┘  └────────┬────────┘  └──────────────────┘
       │                   │
       ▼                   ▼
┌──────────────────────────────────────────┐
│           Tracing Context                │
│  ContextManager (ThreadLocal)            │
│  TracingContext → TraceSegment → Span    │
│  ContextCarrier (跨进程) / Snapshot (跨线程)│
└──────────────────────┬───────────────────┘
                       │
                       ▼
┌──────────────────────────────────────────┐
│       DataCarrier (异步缓冲)              │
│  多 Channel + 环形 Buffer + 消费者线程     │
└──────────────────────┬───────────────────┘
                       │ gRPC Stream
                       ▼
┌──────────────────────────────────────────┐
│          OAP Server (后端)                │
│  接收 SegmentObject,分析 & 存储          │
└──────────────────────────────────────────┘
03

模块结构

核心模块拆解

模块路径职责
apm-agentapm-sniffer/apm-agentAgent 入口,包含 SkyWalkingAgent.premain()
apm-agent-coreapm-sniffer/apm-agent-core核心引擎:插件加载、字节码增强、上下文管理、数据上报
apm-sdk-pluginapm-sniffer/apm-sdk-plugin标准插件集:Spring、Dubbo、MySQL、Redis 等 100+
bootstrap-pluginsapm-sniffer/bootstrap-pluginsJDK 级插件:Thread、HttpURLConnection 等
optional-pluginsapm-sniffer/optional-plugins可选插件:需手动启用
apm-commonsapm-commons公共组件:DataCarrier 异步缓冲、工具类
apm-protocolapm-protocol/apm-networkgRPC 协议定义,与 OAP 通信的 Protobuf
apm-toolkitapm-application-toolkit应用层 API:手动追踪、日志关联

apm-agent-core 内部结构(核心中的核心)

apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/
├── boot/              # ServiceManager、BootService 生命周期
├── conf/              # Config 配置类、动态配置
├── context/           # ContextManager、TracingContext、Span 模型
│   ├── ids/           # TraceId、SegmentId 生成器
│   └── trace/         # TraceSegment、EntrySpan、ExitSpan、LocalSpan
├── plugin/            # 插件体系
│   ├── interceptor/   # 拦截器接口 & 增强实现
│   │   └── enhance/   # ClassEnhancePluginDefine、InstMethodsInter
│   ├── match/         # 类匹配策略
│   ├── loader/        # AgentClassLoader
│   └── bootstrap/     # Bootstrap 类增强
├── remote/            # gRPC 通道管理 & 数据上报
├── sampling/          # 采样策略
├── profile/           # 性能剖析
├── jvm/               # JVM 指标采集
└── logging/           # Agent 内部日志
第一部分

启动流程

从 JVM premain 到所有服务就绪的完整初始化链路

04

premain 入口

Java Agent 机制

Java Agent 是 JDK 提供的一种在 main() 方法执行之前介入 JVM 的机制。通过在 JAR 的 MANIFEST.MF 中声明 Premain-Class,JVM 会在加载应用类之前调用指定类的 premain() 方法,这就是 SkyWalking 实现"零侵入"的基础。

# META-INF/MANIFEST.MF
Premain-Class: org.apache.skywalking.apm.agent.SkyWalkingAgent
Can-Redefine-Classes: true
Can-Retransform-Classes: true

SkyWalkingAgent.premain() 完整流程

这是整个 Agent 的总入口,源码位于 SkyWalkingAgent.java:75

public static void premain(String agentArgs, Instrumentation instrumentation)
        throws PluginException {
    final PluginFinder pluginFinder;

    // 第一步:加载配置
    try {
        SnifferConfigInitializer.initializeCoreConfig(agentArgs);
    } catch (Exception e) {
        LogManager.getLogger(SkyWalkingAgent.class)
                .error(e, "SkyWalking agent initialized failure. Shutting down.");
        return;
    }

    // 第二步:检查 Agent 是否启用
    if (!Config.Agent.ENABLE) {
        LOGGER.warn("SkyWalking agent is disabled.");
        return;
    }

    // 第三步:加载所有插件
    try {
        pluginFinder = new PluginFinder(new PluginBootstrap().loadPlugins());
    } catch (AgentPackageNotFoundException ape) {
        LOGGER.error(ape, "Locate agent.jar failure. Shutting down.");
        return;
    }

    // 第四步:安装 ByteBuddy 类转换器
    try {
        installClassTransformer(instrumentation, pluginFinder);
    } catch (Exception e) {
        LOGGER.error(e, "Skywalking agent installed class transformer failure.");
    }

    // 第五步:启动核心服务
    try {
        ServiceManager.INSTANCE.boot();
    } catch (Exception e) {
        LOGGER.error(e, "Skywalking agent boot failure.");
    }

    // 第六步:注册 JVM 关闭钩子
    Runtime.getRuntime().addShutdownHook(
        new Thread(ServiceManager.INSTANCE::shutdown,
                   "skywalking service shutdown thread"));
}
设计亮点:每个步骤都用 try-catch 隔离,单个步骤失败不会导致整个应用启动失败。Agent 的健壮性设计原则——永远不能因为探针问题影响业务。
05

配置加载

四层配置覆盖机制

SnifferConfigInitializer.initializeCoreConfig() 实现了一个优雅的多层配置体系,优先级从低到高:

优先级来源示例
1 (最低)Config 类字段默认值Config.Agent.SERVICE_NAME = ""
2agent.config 配置文件agent.service_name=my-svc
3JVM 系统属性-Dskywalking.agent.service_name=my-svc
4 (最高)Agent 参数-javaagent:agent.jar=agent.service_name=my-svc

配置通过反射映射到 Config 类的静态字段上。Config.java 使用嵌套静态内部类组织配置项:

public class Config {
    public static class Agent {
        public static String SERVICE_NAME = "";
        public static boolean ENABLE = true;
        public static int SAMPLE_N_PER_3_SECS = -1;  // -1 表示全采样
        public static int SPAN_LIMIT_PER_SEGMENT = 300;
    }

    public static class Collector {
        public static String BACKEND_SERVICE = "";
        public static int GRPC_CHANNEL_CHECK_INTERVAL = 30;
        public static int GRPC_UPSTREAM_TIMEOUT = 30;  // 秒
    }

    public static class Buffer {
        public static int CHANNEL_SIZE = 5;
        public static int BUFFER_SIZE = 300;
    }
}
06

ServiceManager 生命周期

枚举单例 + SPI 服务发现

ServiceManager 是整个 Agent 的服务容器,采用枚举单例模式,通过 Java SPI(ServiceLoader)自动发现所有 BootService 实现:

public enum ServiceManager {
    INSTANCE;  // 枚举单例,天然线程安全

    private Map<Class, BootService> bootedServices = Collections.emptyMap();

    public void boot() {
        bootedServices = loadAllServices();  // SPI 加载
        prepare();    // 第一阶段:准备
        startup();    // 第二阶段:启动
        onComplete(); // 第三阶段:完成
    }

    // 通过 Java SPI 加载所有 BootService 实现
    void load(List<BootService> allServices) {
        for (final BootService bootService : ServiceLoader.load(
                BootService.class, AgentClassLoader.getDefault())) {
            allServices.add(bootService);
        }
    }
}

三阶段生命周期

每个 BootService 都经历标准的三阶段生命周期,按 priority() 排序执行:

public interface BootService {
    void prepare();     // 准备阶段:注册监听器、初始化资源
    void boot();        // 启动阶段:开始工作(建立连接、启动线程等)
    void onComplete();  // 完成阶段:所有服务就绪后的回调
    void shutdown();    // 关闭阶段:优雅停机(倒序执行)
    default int priority() { return 0; }
}
核心 BootService 列表:
  • GRPCChannelManager — 管理到 OAP 的 gRPC 连接
  • TraceSegmentServiceClient — 链路数据上报
  • ContextManager — 上下文管理(ThreadLocal)
  • SamplingService — 采样策略
  • JVMService — JVM 指标采集
  • ProfileTaskExecutionService — 性能剖析
  • CommandService — 下发命令处理

@DefaultImplementor 与 @OverrideImplementor

SkyWalking 设计了一套优雅的服务覆盖机制,允许不同场景替换默认实现:

// 默认实现
@DefaultImplementor
public class TraceSegmentServiceClient implements BootService { ... }

// Kafka 上报覆盖 gRPC 上报(在 optional-reporter-plugins 中)
@OverrideImplementor(TraceSegmentServiceClient.class)
public class KafkaTraceSegmentServiceClient implements BootService { ... }

加载逻辑在 ServiceManager.loadAllServices() 中:如果检测到 @OverrideImplementor 注解,会替换对应的 @DefaultImplementor 服务,实现可插拔的服务替换。

第二部分

插件体系

SPI 自动发现、插件定义与增强规则

07

插件发现与加载

PluginBootstrap.loadPlugins()

插件加载的完整流程,源码位于 PluginBootstrap.java:41

public List<AbstractClassEnhancePluginDefine> loadPlugins()
        throws AgentPackageNotFoundException {
    // 1. 初始化 Agent 专用的 ClassLoader
    AgentClassLoader.initDefaultLoader();

    // 2. 扫描所有 JAR 中的 skywalking-plugin.def 文件
    PluginResourcesResolver resolver = new PluginResourcesResolver();
    List<URL> resources = resolver.getResources();

    // 3. 解析每个 def 文件
    for (URL pluginUrl : resources) {
        PluginCfg.INSTANCE.load(pluginUrl.openStream());
    }

    // 4. 通过反射实例化每个插件类
    List<PluginDefine> pluginClassList = PluginCfg.INSTANCE.getPluginClassList();
    List<AbstractClassEnhancePluginDefine> plugins = new ArrayList<>();
    for (PluginDefine pluginDefine : pluginClassList) {
        AbstractClassEnhancePluginDefine plugin =
            (AbstractClassEnhancePluginDefine) Class.forName(
                pluginDefine.getDefineClass(),
                true,
                AgentClassLoader.getDefault()
            ).newInstance();
        plugins.add(plugin);
    }

    // 5. 加载动态插件
    plugins.addAll(DynamicPluginLoader.INSTANCE.load(AgentClassLoader.getDefault()));

    return plugins;
}

skywalking-plugin.def 文件格式

每个插件 JAR 中都包含一个 skywalking-plugin.def 文件,格式极其简洁:

# 格式:插件名称=插件定义类全限定名
feign-default-http-9.x=org.apache.skywalking.apm.plugin.feign.http.v9.define.DefaultHttpClientInstrumentation
spring-mvc-annotation-5.x=org.apache.skywalking.apm.plugin.spring.mvc.v5.define.Spring5Instrumentation
mysql-8.x=org.apache.skywalking.apm.plugin.jdbc.mysql.v8.define.ConnectionInstrumentation
设计亮点:插件声明采用简单的 key=value 文本文件而非 XML/JSON,保证了极快的解析速度。在 Agent premain 阶段,每一毫秒都很宝贵。

PluginFinder — 插件索引

加载完所有插件后,PluginFinder 将它们按匹配方式分类索引:

public class PluginFinder {
    // 精确类名匹配:className → List<Plugin>
    private final Map<String, LinkedList<AbstractClassEnhancePluginDefine>>
        nameMatchDefine = new HashMap<>();

    // 间接匹配(接口、继承、注解等)
    private final List<AbstractClassEnhancePluginDefine>
        signatureMatchDefine = new ArrayList<>();

    // Bootstrap 类增强(JDK 核心类)
    private final List<AbstractClassEnhancePluginDefine>
        bootstrapClassMatchDefine = new ArrayList<>();

    // 当某个类被加载时,查找所有适用的插件
    public List<AbstractClassEnhancePluginDefine> find(TypeDescription typeDescription) {
        // 先查精确匹配,再查签名匹配
        ...
    }

    // 构建 ByteBuddy 的 ElementMatcher,用于全局类匹配
    public ElementMatcher<? super TypeDescription> buildMatch() {
        // 合并所有插件的匹配规则
        ...
    }
}
08

插件定义体系

插件类继承体系

AbstractClassEnhancePluginDefine          # 抽象基类
├── ClassEnhancePluginDefine              # 实现字节码增强逻辑
│   ├── ClassInstanceMethodsEnhancePluginDefine  # 只增强实例方法
│   └── ClassStaticMethodsEnhancePluginDefine    # 只增强静态方法
└── v2/ClassEnhancePluginDefineV2         # V2 版本(优化性能)

AbstractClassEnhancePluginDefine — 插件基类

每个插件都必须实现以下核心方法:

public abstract class AbstractClassEnhancePluginDefine {
    // 动态注入的字段名(用于存储 Agent 上下文)
    public static final String CONTEXT_ATTR_NAME = "_$EnhancedClassField_ws";

    // 定义要增强的目标类
    protected abstract ClassMatch enhanceClass();

    // 定义构造器拦截点
    public abstract ConstructorInterceptPoint[] getConstructorsInterceptPoints();

    // 定义实例方法拦截点
    public abstract InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints();

    // 定义静态方法拦截点
    public abstract StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints();

    // 主入口:define() 方法
    public DynamicType.Builder<?> define(TypeDescription typeDescription,
            DynamicType.Builder<?> builder, ClassLoader classLoader,
            EnhanceContext context) throws PluginException {
        // 1. 版本检测(Witness 机制)
        String[] witnessClasses = witnessClasses();
        if (witnessClasses != null) {
            for (String witnessClass : witnessClasses) {
                if (!WitnessFinder.INSTANCE.exist(witnessClass, classLoader)) {
                    return null;  // 版本不匹配,跳过增强
                }
            }
        }
        // 2. 执行增强
        return this.enhance(typeDescription, builder, classLoader, context);
    }
}

Witness 版本检测机制

同一个框架的不同版本可能有相同的类名但不同的方法签名。SkyWalking 通过 Witness 类解决这个问题:

// 只在 Spring 5.x 中存在的类,作为版本标记
@Override
protected String[] witnessClasses() {
    return new String[] {
        "org.springframework.web.servlet.resource.HttpResource"
    };
}

如果指定的 Witness 类在当前 ClassLoader 中不存在,说明版本不匹配,该插件会被安全地跳过。这确保了针对同一框架不同版本的多个插件可以共存而不冲突。

09

插件实战示例

Feign HTTP 客户端插件

DefaultHttpClientInstrumentation 为例,看看一个完整插件的定义:

public class DefaultHttpClientInstrumentation
        extends ClassInstanceMethodsEnhancePluginDefine {

    private static final String ENHANCE_CLASS = "feign.Client$Default";
    private static final String INTERCEPT_CLASS =
        "org.apache.skywalking.apm.plugin.feign.http.v9.DefaultHttpClientInterceptor";

    // 1. 指定要增强的目标类
    @Override
    protected ClassMatch enhanceClass() {
        return byName(ENHANCE_CLASS);  // 精确匹配 feign.Client$Default
    }

    // 2. 不拦截构造器
    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[0];
    }

    // 3. 拦截 execute() 方法
    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[] {
            new InstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named("execute");  // 匹配方法名
                }

                @Override
                public String getMethodsInterceptor() {
                    return INTERCEPT_CLASS;  // 拦截器类名(延迟加载)
                }

                @Override
                public boolean isOverrideArgs() {
                    return false;  // 不修改方法参数
                }
            }
        };
    }
}
关键设计:拦截器类名是一个字符串而非类引用。这是延迟加载策略——只有当目标类真正被加载时,才会实例化拦截器,避免不必要的类加载开销。
第三部分

字节码增强

ByteBuddy 集成、类转换与拦截器注入的实现细节

10

ByteBuddy 集成

installClassTransformer()

这是将 ByteBuddy 与 Java Instrumentation API 桥接的关键方法:

static void installClassTransformer(Instrumentation instrumentation,
        PluginFinder pluginFinder) throws Exception {
    // 1. 创建定制化的 ByteBuddy 实例
    AgentBuilder agentBuilder = newAgentBuilder()
        .ignore(
            nameStartsWith("net.bytebuddy.")     // 排除 ByteBuddy 自身
            .or(nameStartsWith("org.slf4j."))     // 排除日志框架
            .or(nameContains("javassist"))         // 排除其他字节码工具
            .or(nameContains(".asm."))
            .or(nameStartsWith("sun.reflect"))     // 排除反射相关
            .or(allSkyWalkingAgentExcludeToolkit()) // 排除 Agent 自身(保留 Toolkit)
            .or(ElementMatchers.isSynthetic())      // 排除合成类
        );

    // 2. 注入 Bootstrap 类增强(增强 JDK 核心类)
    agentBuilder = BootstrapInstrumentBoost.inject(
        pluginFinder, instrumentation, agentBuilder, edgeClasses);

    // 3. JDK 9+ 模块系统兼容
    agentBuilder = JDK9ModuleExporter.openReadEdge(
        instrumentation, agentBuilder, edgeClasses);

    // 4. 注册类匹配 + Transformer + 安装
    agentBuilder
        .type(pluginFinder.buildMatch())           // 全局类匹配规则
        .transform(new Transformer(pluginFinder))  // 类转换器
        .with(AgentBuilder.RedefinitionStrategy.RETRANSFORMATION)
        .with(new Listener())                      // 转换事件监听
        .installOn(instrumentation);               // 安装到 JVM
}

定制化 ByteBuddy 实例

SkyWalking 对 ByteBuddy 进行了多项定制,避免与业务代码冲突:

private static AgentBuilder newAgentBuilder() {
    final ByteBuddy byteBuddy = new ByteBuddy()
        .with(TypeValidation.of(Config.Agent.IS_OPEN_DEBUGGING_CLASS))
        // 定制命名策略:生成的辅助类带 "_sw" 后缀
        .with(new SWAuxiliaryTypeNamingStrategy(NAME_TRAIT))
        // 定制实现上下文工厂
        .with(new SWImplementationContextFactory(NAME_TRAIT))
        // 定制方法图编译器
        .with(new SWMethodGraphCompilerDelegate(MethodGraph.Compiler.DEFAULT));

    return new SWAgentBuilderDefault(byteBuddy,
                new SWNativeMethodStrategy(NAME_TRAIT))
            .with(new SWDescriptionStrategy(NAME_TRAIT));
}
为什么要定制? ByteBuddy 默认生成的辅助类名可能与业务代码冲突。通过添加 _sw 后缀,确保所有 SkyWalking 生成的类都有唯一的命名空间。
11

类增强流程

Transformer — 类转换核心

当 JVM 加载一个匹配插件规则的类时,Transformer.transform() 被调用:

private static class Transformer implements AgentBuilder.Transformer {
    private PluginFinder pluginFinder;

    @Override
    public DynamicType.Builder<?> transform(
            final DynamicType.Builder<?> builder,
            final TypeDescription typeDescription,
            final ClassLoader classLoader,
            final JavaModule javaModule,
            final ProtectionDomain protectionDomain) {

        // 1. 查找所有匹配的插件
        List<AbstractClassEnhancePluginDefine> pluginDefines =
            pluginFinder.find(typeDescription);

        if (pluginDefines.size() > 0) {
            DynamicType.Builder<?> newBuilder = builder;
            EnhanceContext context = new EnhanceContext();

            // 2. 链式应用所有插件的增强逻辑
            for (AbstractClassEnhancePluginDefine define : pluginDefines) {
                DynamicType.Builder<?> possibleNewBuilder = define.define(
                    typeDescription, newBuilder, classLoader, context);
                if (possibleNewBuilder != null) {
                    newBuilder = possibleNewBuilder;
                }
            }
            return newBuilder;
        }
        return builder;
    }
}
多插件叠加:同一个类可以被多个插件增强。每个插件的 define() 返回新的 Builder,作为下一个插件的输入,实现链式增强。

ClassEnhancePluginDefine.enhanceInstance() — 实例增强

这是字节码操作的核心,源码位于 ClassEnhancePluginDefine.java:67

protected DynamicType.Builder<?> enhanceInstance(
        TypeDescription typeDescription,
        DynamicType.Builder<?> newClassBuilder,
        ClassLoader classLoader,
        EnhanceContext context) throws PluginException {

    // 步骤 1:注入上下文字段
    // 为目标类添加一个 Object 类型的 private volatile 字段
    // 并实现 EnhancedInstance 接口
    if (!typeDescription.isAssignableTo(EnhancedInstance.class)) {
        if (!context.isObjectExtended()) {
            newClassBuilder = newClassBuilder
                .defineField(CONTEXT_ATTR_NAME, Object.class,
                             ACC_PRIVATE | ACC_VOLATILE)
                .implement(EnhancedInstance.class)
                .intercept(FieldAccessor.ofField(CONTEXT_ATTR_NAME));
            context.extendObjectCompleted();
        }
    }

    // 步骤 2:增强构造器
    for (ConstructorInterceptPoint point : constructorInterceptPoints) {
        newClassBuilder = newClassBuilder
            .constructor(point.getConstructorMatcher())
            .intercept(SuperMethodCall.INSTANCE.andThen(
                MethodDelegation.withDefaultConfiguration()
                    .to(new ConstructorInter(
                        point.getConstructorInterceptor(), classLoader))));
    }

    // 步骤 3:增强实例方法
    for (InstanceMethodsInterceptPoint point : instanceMethodsInterceptPoints) {
        newClassBuilder = newClassBuilder
            .method(not(isStatic()).and(point.getMethodsMatcher()))
            .intercept(MethodDelegation.withDefaultConfiguration()
                .to(new InstMethodsInter(
                    point.getMethodsInterceptor(), classLoader)));
    }

    return newClassBuilder;
}

_$EnhancedClassField_ws 字段注入

增强后的类等价于:

// 增强前
public class FeignClient implements Client {
    public Response execute(Request request, Options options) { ... }
}

// 增强后(概念等价,实际由字节码操作完成)
public class FeignClient implements Client, EnhancedInstance {
    // Agent 注入的上下文字段
    private volatile Object _$EnhancedClassField_ws;

    @Override
    public Object getSkyWalkingDynamicField() {
        return _$EnhancedClassField_ws;
    }

    @Override
    public void setSkyWalkingDynamicField(Object value) {
        _$EnhancedClassField_ws = value;
    }

    // execute() 方法被拦截器包装
    public Response execute(Request request, Options options) {
        // → beforeMethod()
        // → 原始方法调用
        // → afterMethod() / handleMethodException()
    }
}
12

拦截器机制

InstanceMethodsAroundInterceptor 接口

所有实例方法拦截器都实现这个接口,遵循 AOP 的 Around Advice 模式:

public interface InstanceMethodsAroundInterceptor {
    // 方法执行前
    void beforeMethod(EnhancedInstance objInst,
                      Method method,
                      Object[] allArguments,
                      Class<?>[] argumentsTypes,
                      MethodInterceptResult result) throws Throwable;

    // 方法执行后
    Object afterMethod(EnhancedInstance objInst,
                       Method method,
                       Object[] allArguments,
                       Class<?>[] argumentsTypes,
                       Object ret) throws Throwable;

    // 方法异常时
    void handleMethodException(EnhancedInstance objInst,
                               Method method,
                               Object[] allArguments,
                               Class<?>[] argumentsTypes,
                               Throwable t);
}

InstMethodsInter — 方法委托桥梁

InstMethodsInter 是 ByteBuddy 的 MethodDelegation 目标,它在运行时加载实际的拦截器并调用:

public class InstMethodsInter {
    private InstanceMethodsAroundInterceptor interceptor;

    public InstMethodsInter(String interceptorClassName, ClassLoader classLoader) {
        // 延迟加载拦截器实例(带缓存)
        this.interceptor = InterceptorInstanceLoader.load(
            interceptorClassName, classLoader);
    }

    @RuntimeType
    public Object intercept(@This Object obj,
                            @AllArguments Object[] allArguments,
                            @SuperCall Callable<?> zupiCall,
                            @Origin Method method) throws Throwable {
        EnhancedInstance targetObject = (EnhancedInstance) obj;
        MethodInterceptResult result = new MethodInterceptResult();

        // 1. beforeMethod
        interceptor.beforeMethod(targetObject, method, allArguments,
                                 method.getParameterTypes(), result);
        if (result.isContinue()) {
            // 2. 调用原始方法
            ret = zupiCall.call();
        }
        // 3. afterMethod
        ret = interceptor.afterMethod(targetObject, method, allArguments,
                                      method.getParameterTypes(), ret);
        return ret;
    }
}
InterceptorInstanceLoader:使用 ConcurrentHashMap 缓存拦截器实例,保证每个拦截器类只实例化一次。同时为每个目标 ClassLoader 创建独立的 AgentClassLoader,确保类加载隔离。
第四部分

分布式追踪核心

Trace/Segment/Span 数据模型与上下文传播机制

13

Trace / Segment / Span 模型

三层数据模型

Trace(分布式追踪)
 ├── TraceSegment A(Service-A,Thread-1)
 │    ├── EntrySpan: POST /api/order     ← 入口
 │    ├── LocalSpan: OrderService.create  ← 本地调用
 │    └── ExitSpan: MySQL INSERT          ← 出口(数据库)
 │
 ├── TraceSegment B(Service-A,Thread-2,异步)
 │    └── LocalSpan: sendNotification
 │
 └── TraceSegment C(Service-B,跨进程)
      ├── EntrySpan: POST /api/payment   ← 入口
      └── ExitSpan: Redis SET            ← 出口(缓存)

关系:
 - 同一个 Trace 由 traceId 关联
 - 每个线程一个 TraceSegment(由 traceSegmentId 标识)
 - Segment 内的 Span 通过 spanId(0, 1, 2...)编号
 - 跨进程通过 ContextCarrier 传递 parentSegmentId + parentSpanId
 - 跨线程通过 ContextSnapshot 传递

TraceSegment — 线程级追踪片段

源码位于 TraceSegment.java:34

public class TraceSegment {
    private String traceSegmentId;            // 全局唯一 Segment ID
    private TraceSegmentRef ref;              // 父 Segment 引用(跨进程/线程)
    private List<AbstractTracingSpan> spans;  // 已完成的 Span 列表
    private DistributedTraceId relatedGlobalTraceId; // 所属 Trace ID
    private boolean ignore = false;           // 是否被采样策略忽略
    private boolean isSizeLimited = false;    // Span 数量是否超限
    private final long createTime;

    public TraceSegment() {
        this.traceSegmentId = GlobalIdGenerator.generate();
        this.spans = new LinkedList<>();
        this.relatedGlobalTraceId = new NewDistributedTraceId();
        this.createTime = System.currentTimeMillis();
    }

    // Span 完成后归档到 Segment
    public void archive(AbstractTracingSpan finishedSpan) {
        spans.add(finishedSpan);
    }

    // 转换为 gRPC Protobuf 对象
    public SegmentObject transform() {
        SegmentObject.Builder builder = SegmentObject.newBuilder();
        builder.setTraceId(getRelatedGlobalTrace().getId());
        builder.setTraceSegmentId(this.traceSegmentId);
        for (AbstractTracingSpan span : this.spans) {
            builder.addSpans(span.transform());
        }
        builder.setService(Config.Agent.SERVICE_NAME);
        builder.setServiceInstance(Config.Agent.INSTANCE_NAME);
        return builder.build();
    }
}

Span 类型体系

Span 类型用途创建时机
EntrySpan服务入口边界收到 HTTP 请求、RPC 调用、MQ 消费
ExitSpan服务出口边界发起 HTTP 请求、数据库调用、缓存操作
LocalSpan本地方法调用内部方法追踪(无网络交互)
NoopSpan空操作采样策略决定不追踪时
EntrySpan 的栈复用机制:一个请求可能经过多个入口插件(如 Tomcat → Spring MVC),但只创建一个 EntrySpan。每次 createEntrySpan() 检查栈顶是否已有 EntrySpan,如果有则复用并更新操作名,避免冗余 Span。ExitSpan 同理。
14

ContextManager 上下文管理

ThreadLocal 上下文

ContextManager 是分布式追踪的门面类,所有 Span 操作都通过它:

public class ContextManager implements BootService {
    // 核心:每个线程一个 TracingContext
    private static ThreadLocal<AbstractTracerContext> CONTEXT =
        new ThreadLocal<>();
    // 运行时上下文(存储自定义数据)
    private static ThreadLocal<RuntimeContext> RUNTIME_CONTEXT =
        new ThreadLocal<>();

    // 懒创建上下文
    private static AbstractTracerContext getOrCreate(
            String operationName, boolean forceSampling) {
        AbstractTracerContext context = CONTEXT.get();
        if (context == null) {
            if (StringUtil.isEmpty(operationName)) {
                context = new IgnoredTracerContext();  // 忽略
            } else {
                context = EXTEND_SERVICE.createTraceContext(
                    operationName, forceSampling);     // 创建真实上下文
            }
            CONTEXT.set(context);
        }
        return context;
    }
}

核心 API

// ===== 创建 Span =====
// 创建入口 Span(带跨进程上下文提取)
AbstractSpan span = ContextManager.createEntrySpan(operationName, carrier);

// 创建出口 Span(带跨进程上下文注入)
AbstractSpan span = ContextManager.createExitSpan(operationName, carrier, remotePeer);

// 创建本地 Span
AbstractSpan span = ContextManager.createLocalSpan(operationName);

// ===== 停止 Span =====
ContextManager.stopSpan(span);
// 当所有 Span 停止后,TracingContext 自动关闭,触发 Segment 上报

// ===== 跨进程传播 =====
ContextManager.inject(carrier);   // 注入上下文到 Carrier
ContextManager.extract(carrier);  // 从 Carrier 提取上下文

// ===== 跨线程传播 =====
ContextSnapshot snapshot = ContextManager.capture();    // 线程 A 捕获
ContextManager.continued(snapshot);                      // 线程 B 恢复

TracingContext — 活跃 Span 栈

源码位于 TracingContext.java:61

public class TracingContext implements AbstractTracerContext {
    private TraceSegment segment;                           // 所属 Segment
    private LinkedList<AbstractSpan> activeSpanStack;       // 活跃 Span 栈
    private int spanIdGenerator;                            // Span ID 计数器
    private volatile int asyncSpanCounter;                  // 异步 Span 计数
    private volatile boolean isRunningInAsyncMode;
    private volatile ReentrantLock asyncFinishLock;
    private volatile boolean running;
    private final ProfileStatusContext profileStatus;       // 性能剖析状态
    private final CorrelationContext correlationContext;     // 用户自定义上下文
    private final ExtensionContext extensionContext;         // 扩展上下文

    // Span 栈操作(LIFO)
    // push: createEntrySpan/createExitSpan/createLocalSpan
    // pop:  stopSpan → 当栈空时,finish Segment → 通知 Listener
}
Span 栈生命周期:createXxxSpan() 把 Span push 到栈中,stopSpan() 把 Span pop 出来并 archive 到 Segment。当栈为空时,说明当前线程的所有操作都已完成,Segment 被标记为 finished 并通知所有 TracingContextListener(主要是 TraceSegmentServiceClient)。
15

跨进程/线程传播

ContextCarrier — 跨进程传播

当服务 A 调用服务 B 时,需要将追踪上下文通过 HTTP Header / RPC 附加信息传递:

public class ContextCarrier implements Serializable {
    private String traceId;                // 分布式 Trace ID
    private String traceSegmentId;         // 父 Segment ID
    private int spanId = -1;               // 父 Span ID
    private String parentService;          // 父服务名
    private String parentServiceInstance;  // 父服务实例
    private String parentEndpoint;         // 父端点名
    private String addressUsedAtClient;    // 客户端访问地址
    private ExtensionContext extensionContext;
    private CorrelationContext correlationContext;

    // 序列化为 SW8 协议头
    // 格式: 1-BASE64(traceId)-BASE64(segmentId)-spanId-BASE64(service)-...
    String serialize(HeaderVersion version) {
        return StringUtil.join('-',
            "1",
            Base64.encode(this.getTraceId()),
            Base64.encode(this.getTraceSegmentId()),
            this.getSpanId() + "",
            Base64.encode(this.getParentService()),
            Base64.encode(this.getParentServiceInstance()),
            Base64.encode(this.getParentEndpoint()),
            Base64.encode(this.getAddressUsedAtClient())
        );
    }
}

跨进程传播流程

# 服务 A(发送方)
ExitSpan 插件:
  1. ContextCarrier carrier = new ContextCarrier()
  2. ContextManager.inject(carrier)        ← 将当前上下文写入 Carrier
  3. HTTP Header: sw8 = carrier.serialize() ← 序列化为 HTTP Header

─── HTTP 请求 ──→

# 服务 B(接收方)
EntrySpan 插件:
  1. String sw8Header = request.getHeader("sw8")
  2. carrier.deserialize(sw8Header)         ← 反序列化
  3. ContextManager.createEntrySpan(op, carrier)
     → context.extract(carrier)            ← 恢复父上下文
     → segment.ref(parentRef)              ← 建立 Segment 引用关系
SW8 协议头:SkyWalking v8 使用单个 HTTP Header sw8 传输所有上下文信息,通过 - 分隔 8 个字段,值使用 Base64 编码。相比 v6 的多 Header 方案,减少了网络开销。

ContextSnapshot — 跨线程传播

当业务代码使用异步线程(线程池、CompletableFuture 等)时,ThreadLocal 上下文无法自动传递。SkyWalking 通过快照机制解决:

// ===== 线程 A(主线程)=====
// 捕获当前上下文快照
ContextSnapshot snapshot = ContextManager.capture();

// 将 snapshot 传递给子线程(通过参数、队列等)
executor.submit(() -> {
    // ===== 线程 B(子线程)=====
    // 恢复上下文快照
    ContextManager.continued(snapshot);

    // 现在线程 B 的 TracingContext 通过 TraceSegmentRef
    // 关联到了线程 A 的 Segment,同属一个 Trace

    // ... 执行业务逻辑 ...

    ContextManager.stopSpan();
});
JDK Threading Plugin:SkyWalking 提供了 jdk-threading-plugin,自动增强 RunnableCallableForkJoinTask 等,在提交时自动 capture snapshot,在 run/call 时自动 continued,对业务完全透明。
第五部分

数据上报

从 Segment 完成到 gRPC 发送的异步数据管线

16

DataCarrier 异步缓冲

高性能生产者-消费者模型

DataCarrier 是 SkyWalking 自研的高性能异步数据传输组件,位于 apm-commons/apm-datacarrier

┌───────────────────────────────────────────────────────┐
│                    DataCarrier                         │
│                                                       │
│  Producer Threads         Channels        Consumer    │
│  ┌──────────┐         ┌──────────┐                   │
│  │ Thread-1 │──┐      │Channel-0 │ [buf][buf][buf]   │
│  └──────────┘  │      ├──────────┤                   │
│  ┌──────────┐  ├──→   │Channel-1 │ [buf][buf][buf] ──→ Consumer Thread
│  │ Thread-2 │──┤      ├──────────┤                   │    (批量消费)
│  └──────────┘  │      │Channel-2 │ [buf][buf][buf]   │
│  ┌──────────┐  │      ├──────────┤                   │
│  │ Thread-N │──┘      │Channel-N │ [buf][buf][buf]   │
│  └──────────┘         └──────────┘                   │
│                                                       │
│  路由策略:ThreadId % ChannelSize → 无锁分区            │
│  缓冲策略:IF_POSSIBLE(满则丢弃,不阻塞业务线程)        │
└───────────────────────────────────────────────────────┘

核心设计

// 初始化:5 个 Channel,每个 Channel 300 个 Buffer 槽位
DataCarrier<TraceSegment> carrier = new DataCarrier<>(
    CHANNEL_SIZE,   // 默认 5
    BUFFER_SIZE,    // 默认 300
    BufferStrategy.IF_POSSIBLE  // 满则丢弃
);
carrier.consume(consumer, 1);  // 1 个消费者线程

// 生产者(业务线程,无锁)
carrier.produce(traceSegment);
// 内部:partitioner.partition(channels.length, data)
// 默认按 Thread ID 取模路由到对应 Channel
为什么不用 BlockingQueue? BlockingQueue 在满时会阻塞生产者线程——这对 APM 探针是不可接受的,因为不能因为数据上报而阻塞业务请求。DataCarrier 采用 IF_POSSIBLE 策略:缓冲区满时直接丢弃数据,保证业务零影响。
17

gRPC 数据上报

TraceSegmentServiceClient — 上报核心

源码位于 TraceSegmentServiceClient.java:49,它同时实现了三个接口:

@DefaultImplementor
public class TraceSegmentServiceClient
    implements BootService,              // 服务生命周期
               IConsumer<TraceSegment>,  // DataCarrier 消费者
               TracingContextListener,   // Segment 完成监听
               GRPCChannelListener {     // gRPC 连接状态监听

    private volatile DataCarrier<TraceSegment> carrier;
    private volatile TraceSegmentReportServiceGrpc
        .TraceSegmentReportServiceStub serviceStub;
    private volatile GRPCChannelStatus status = DISCONNECT;

    // 生命周期回调
    @Override
    public void prepare() {
        // 监听 gRPC 连接状态
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class)
            .addChannelListener(this);
    }

    @Override
    public void boot() {
        // 创建 DataCarrier 并启动消费者线程
        carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE,
                                     BufferStrategy.IF_POSSIBLE);
        carrier.consume(this, 1);
    }

    @Override
    public void onComplete() {
        // 注册为 Segment 完成的监听者
        TracingContext.ListenerManager.add(this);
    }
}

数据流转全链路

// ① Segment 完成 → 生产到 DataCarrier
@Override
public void afterFinished(TraceSegment traceSegment) {
    if (traceSegment.isIgnore()) return;  // 采样策略忽略的不上报
    if (!carrier.produce(traceSegment)) {
        // Buffer 满,丢弃(但不影响业务)
        LOGGER.debug("One trace segment has been abandoned, cause by buffer is full.");
    }
}

// ② 消费者线程批量消费 → gRPC 流式上报
@Override
public void consume(List<TraceSegment> data) {
    if (CONNECTED.equals(status)) {
        // 创建 gRPC 双向流
        StreamObserver<SegmentObject> upstreamObserver =
            serviceStub
                .withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT,
                                   TimeUnit.SECONDS)
                .collect(new StreamObserver<Commands>() {
                    @Override
                    public void onNext(Commands commands) {
                        // 处理 OAP 下发的命令
                        ServiceManager.INSTANCE.findService(CommandService.class)
                            .receiveCommand(commands);
                    }
                    @Override
                    public void onError(Throwable t) { ... }
                    @Override
                    public void onCompleted() { ... }
                });

        // 逐个发送 Segment
        for (TraceSegment segment : data) {
            SegmentObject segmentObject = segment.transform();
            upstreamObserver.onNext(segmentObject);
        }
        upstreamObserver.onCompleted();

        segmentUplinkedCounter += data.size();
    } else {
        // gRPC 未连接,丢弃
        segmentAbandonedCounter += data.size();
    }
}

GRPCChannelManager — 连接管理

GRPCChannelManager 负责维护到 OAP 后端的 gRPC 长连接:

  • 多后端负载均衡:支持配置多个 OAP 地址,随机选择连接
  • 定时心跳检测:默认每 30 秒检查连接状态
  • 自动重连:连接断开后自动重建
  • TLS 支持:可选 TLS 加密通信
  • 认证支持:通过 AuthenticationDecorator 添加认证 Header
  • 状态通知:连接状态变化时通知所有 GRPCChannelListener
第六部分

OneAgent 服务治理

基于 etcd 的流量管理、黑名单过滤与四层优雅下线机制

18

OneAgent 功能概览

什么是 OneAgent

OneAgent 是 SkyWalking Java Agent 中的服务治理增强模块,在 APM 探针的基础上,利用 etcd 作为分布式协调中心,实现了自动摘流(黑名单过滤)和优雅下线(Graceful Shutdown)能力。它以可选插件(optional-plugin)的形式集成,通过 OneAgentConfig.ENABLE_ONEAGENT 开关控制是否加载。

核心能力

🔗

服务注册发现

拦截 Eureka 注册/发现流程,将 Provider/Consumer 信息同步写入 etcd,构建统一服务拓扑

🚫

流量黑名单

实例下线时自动加入黑名单,Consumer 侧 Ribbon/LoadBalancer 拦截器实时过滤,秒级摘流

🔄

优雅下线

四层防护:PreStop 文件监听 → Signal Handler → JVM Shutdown Hook → etcd Lease 过期,确保零流量损失

动态配置

三级配置体系(静态/全局/服务级),支持 etcd Watch 实时热更新,Service 级覆盖 Global 级

模块结构

apm-sniffer/
├── apm-agent-core/src/.../oneagent/         # 核心模块
│   ├── OneAgentConfig.java                  # 统一配置门面
│   ├── EtcdClientService.java               # BootService 生命周期管理
│   ├── EtcdClient.java                      # 纯 JDK HTTP etcd 客户端
│   ├── ServiceContext.java                   # 全局服务上下文(volatile)
│   ├── TrafficManager.java                   # 黑名单管理 & 流量排空
│   ├── ProviderDeregistrationManager.java    # Provider 注销 & 多层清理
│   ├── PreStopFileShutdownTrigger.java       # K8s PreStop 文件监听
│   ├── config/                              # 配置子系统
│   │   ├── StaticConfig.java                # 静态配置(启动时确定)
│   │   ├── ConfigDefinitions.java           # 配置字段定义(含约束)
│   │   ├── ConfigLoader.java                # 配置加载器
│   │   ├── ConfigSnapshotManager.java       # 配置快照管理
│   │   └── ConfigWatcher.java               # etcd Watch 热更新
│   ├── model/                               # etcd 协议模型(17 个类)
│   └── reporter/                            # 数据上报子系统
│       ├── OneAgentDataReporter.java         # HTTP 上报服务
│       ├── EventCollector.java               # 事件采集
│       └── MetricCollector.java              # 指标采集
│
└── optional-plugins/optional-oneagent-plugins/
    └── eureka-2.x-plugin/                   # Eureka 注册中心插件
        ├── RegisterInterceptor.java          # 注册拦截 → etcd 写入
        ├── DiscoveryInterceptor.java         # 发现拦截 → Consumer 注册
        ├── RibbonServerListInterceptor.java  # Ribbon 黑名单过滤
        └── SpringCloudLoadBalancerInterceptor.java  # SCL 黑名单过滤
19

整体架构设计

架构全景图

┌──────────────────────────────────────────────────────────────────────┐
│                         JVM (微服务实例)                               │
│                                                                      │
│  ┌─────────────────────────────────────────────────────────────────┐  │
│  │                    SkyWalking Agent                              │  │
│  │                                                                 │  │
│  │  ┌───────────────┐   ┌────────────────┐   ┌────────────────┐   │  │
│  │  │ EtcdClient    │   │ TrafficManager │   │ ProviderDereg  │   │  │
│  │  │ Service       │   │                │   │ Manager        │   │  │
│  │  │ (BootService) │   │ - Blacklist    │   │                │   │  │
│  │  │               │──▶│ - Watch        │   │ - 4-layer      │   │  │
│  │  │ priority=15   │   │ - Drain        │   │   cleanup      │   │  │
│  │  └───────┬───────┘   └───────┬────────┘   └───────┬────────┘   │  │
│  │          │                   │                     │            │  │
│  │          ▼                   ▼                     ▼            │  │
│  │  ┌──────────────────────────────────────────────────────────┐   │  │
│  │  │                    EtcdClient (纯 JDK HTTP)               │   │  │
│  │  │  put / get / delete / watch / lease / keepalive / auth   │   │  │
│  │  └──────────────────────────┬───────────────────────────────┘   │  │
│  └─────────────────────────────┼───────────────────────────────────┘  │
│                                │                                      │
│  ┌─────────────────────────────┼───────────────────────────────────┐  │
│  │          Eureka Plugin (字节码增强)                               │  │
│  │                             │                                   │  │
│  │  register()  ──▶  RegisterInterceptor  ──▶  etcd put provider  │  │
│  │  discover()  ──▶  DiscoveryInterceptor ──▶  etcd put consumer  │  │
│  │  getServers()──▶  RibbonInterceptor    ──▶  blacklist filter   │  │
│  └─────────────────────────────┼───────────────────────────────────┘  │
└──────────────────────────────────────────────────────────────────────┘
                                 │
                                 ▼
┌──────────────────────────────────────────────────────────────────────┐
│                          etcd Cluster                                │
│                                                                      │
│  /oneagent/                                                          │
│  ├── configs                    (全局动态配置 JSON)                    │
│  │   └── {serviceName}         (服务级配置 JSON,覆盖全局)              │
│  ├── services/                                                       │
│  │   └── {serviceName}/                                              │
│  │       ├── providers/{instanceId}   (Lease + Keepalive)            │
│  │       └── consumers/{instanceId}   (Lease + Keepalive)            │
│  └── traffic_manager/blacklist/                                      │
│      └── {serviceName}/{instanceId}   (TTL 自动过期)                  │
└──────────────────────────────────────────────────────────────────────┘

启动流程:从 premain 到 OneAgent 就绪

JVM 启动  ─▶  SkyWalkingAgent.premain()
                │
                ├── 1. SnifferConfigInitializer
                │      └── 加载 StaticConfig (ENABLE_ONEAGENT, ETCD_ENDPOINTS)
                │
                ├── 2. PluginBootstrap.loadPlugins()
                │      └── PluginSelector.select()
                │          ├── 排除 EXCLUDE_PLUGINS 中的插件
                │          ├── 排除 gray-* (如果灰度关闭)
                │          └── 排除 oneagent-* (如果 ENABLE_ONEAGENT=false)
                │
                ├── 3. installClassTransformer()
                │      └── ByteBuddy 注册 Eureka 拦截点
                │
                └── 4. ServiceManager.boot()
                       └── EtcdClientService (priority=15)
                           ├── prepare()   → 校验 etcd 配置
                           ├── boot()      → 创建 EtcdClient, 启动 PreStop 监听
                           └── onComplete()→ 初始化 ConfigWatcher & BlacklistWatch

PluginSelector 中的 OneAgent 过滤

PluginSelector.select() 在加载插件时,根据 OneAgentConfig.ENABLE_ONEAGENT 决定是否过滤 oneagent- 前缀的插件:

// PluginSelector.java 核心逻辑
public List<AbstractClassEnhancePluginDefine> select(
        List<AbstractClassEnhancePluginDefine> plugins) {

    // 1. 排除用户配置的 EXCLUDE_PLUGINS
    // 2. 排除灰度插件 (gray-*)
    // 3. 排除 OneAgent 插件 (如果功能关闭)
    if (!OneAgentConfig.ENABLE_ONEAGENT) {
        // 过滤所有 pluginName 以 "oneagent-" 开头的插件
        removePlugins(plugins, "oneagent-");
    }
    return plugins;
}
20

三级配置体系

配置分层设计

┌──────────────────────────────────────────────────────────────┐
│                    配置优先级(高 → 低)                        │
│                                                              │
│   Level 1: 静态配置 (StaticConfig)                            │
│   ├── 来源: -D 系统属性 > 环境变量 > 默认值                   │
│   ├── 特点: 启动时确定,不可动态修改                             │
│   └── 示例: ENABLE_ONEAGENT, ETCD_ENDPOINTS                  │
│                                                              │
│   Level 2: 全局动态配置 (GLOBAL)                               │
│   ├── 来源: etcd /oneagent/configs (JSON)                    │
│   ├── 特点: 运行时 Watch 热更新,影响全部服务                    │
│   └── 示例: ENABLE_ONEAGENT_FEATURE, ETCD_LEASE_TTL          │
│                                                              │
│   Level 3: 服务级动态配置 (SERVICE)                            │
│   ├── 来源: etcd /oneagent/configs/{serviceName} (JSON)      │
│   ├── 特点: 覆盖 GLOBAL 配置,实现细粒度管控                    │
│   └── 示例: DRAIN_WAIT_TIME_MS, ENABLE_BLACKLIST             │
└──────────────────────────────────────────────────────────────┘

StaticConfig — 启动时静态配置

配置项系统属性 (-D)环境变量默认值说明
ENABLE_ONEAGENToneagent.enableONEAGENT_ENABLEtrue总开关
ETCD_ENDPOINTSoneagent.etcd.endpointsONEAGENT_ETCD_ENDPOINTS""etcd 地址
ETCD_SERVICE_PREFIXoneagent.etcd.service.prefixONEAGENT_ETCD_SERVICE_PREFIX/oneagentetcd 路径前缀
ETCD_USERNAMEoneagent.etcd.usernameONEAGENT_ETCD_USERNAME""认证用户
ETCD_PASSWORDoneagent.etcd.passwordONEAGENT_ETCD_PASSWORD""认证密码
// StaticConfig.java — 加载优先级:系统属性 > 环境变量 > 默认值
public class StaticConfig {
    public static final boolean ENABLE_ONEAGENT =
        loadBoolean("oneagent.enable", "ONEAGENT_ENABLE", true);
    public static final String ETCD_ENDPOINTS =
        loadString("oneagent.etcd.endpoints", "ONEAGENT_ETCD_ENDPOINTS", "");

    private static String loadString(String sysProp, String envVar, String def) {
        String val = System.getProperty(sysProp);
        if (val != null) return val;
        val = System.getenv(envVar);
        if (val != null) return val;
        return def;
    }
}

ConfigDefinitions — 动态配置字段定义

每个配置字段定义为枚举值,包含 key、默认值、配置级别、是否支持动态更新、取值范围:

字段级别默认值取值范围动态说明
ENABLE_ONEAGENT_FEATUREGLOBALtrue-Y运行时功能开关
ETCD_LEASE_TTLGLOBAL60s[10, 3600]YLease TTL
DRAIN_WAIT_TIME_MSSERVICE1000ms[0, 30000]Y流量排空等待
ENABLE_BLACKLISTSERVICEfalse-Y黑名单开关
BLACKLIST_TTL_MSSERVICE60000ms[5000, 432000000]Y黑名单条目 TTL
BLACKLIST_MAX_COUNTSERVICE1[1, 1000]Y每服务最大黑名单数
PRESTOP_ENABLEDGLOBALtrue-NPreStop 监听
PRESTOP_FILE_PATHGLOBAL/tmp/prestop-NPreStop 文件路径
// ConfigDefinitions.java — 类型安全的配置定义
public enum ConfigDefinitions {
    ENABLE_ONEAGENT_FEATURE("oneagent.feature.enable", "true",
        ConfigLevel.GLOBAL, true),

    DRAIN_WAIT_TIME_MS("oneagent.drain.wait.time.ms", "1000",
        ConfigLevel.SERVICE, true, 0L, 30000L),

    ENABLE_BLACKLIST("oneagent.blacklist.enable", "false",
        ConfigLevel.SERVICE, true),

    BLACKLIST_TTL_MS("oneagent.blacklist.ttl.ms", "60000",
        ConfigLevel.SERVICE, true, 5000L, 432000000L),

    ETCD_LEASE_TTL("oneagent.etcd.lease.ttl", "60",
        ConfigLevel.GLOBAL, true, 10L, 3600L);
    // ...
}

配置热更新流程

ConfigWatcher 启动
    │
    ├── Watch: /oneagent/configs  (全局配置)
    │      └── PUT 事件 → JSON 解析 → ConfigValidator 校验
    │                                   │
    │                     ┌─────────────┴──────────────┐
    │                     │ 检查 minValue / maxValue    │
    │                     │ 超出范围 → 使用边界值 + 告警  │
    │                     └─────────────┬──────────────┘
    │                                   │
    │                         ConfigSnapshotManager.update()
    │                                   │
    │                         通知所有 ConfigChangeListener
    │
    └── Watch: /oneagent/configs/{serviceName}  (服务级配置)
           └── 同上流程,SERVICE 级配置覆盖 GLOBAL 级

查询时: OneAgentConfig.getConfigValue(field, serviceName)
         → 先查 SERVICE 级 → 无则 fallback GLOBAL 级 → 无则用默认值
21

etcd 数据结构与客户端

etcd Key 设计

/oneagent/                                          # 前缀 (StaticConfig.ETCD_SERVICE_PREFIX)
│
├── configs                                         # 全局配置
│   {"oneagent.feature.enable":"true",
│    "oneagent.etcd.lease.ttl":"60"}
│
├── configs/{serviceName}                           # 服务级配置 (覆盖全局)
│   {"oneagent.blacklist.enable":"true",
│    "oneagent.drain.wait.time.ms":"5000"}
│
├── services/{serviceName}/
│   ├── providers/{instanceId}                      # Provider 节点
│   │   ├── Lease: TTL=60s + Keepalive              # 心跳续租
│   │   └── Value: {                                # 实例元数据
│   │         "ip": "10.0.1.5",
│   │         "port": 8080,
│   │         "serviceName": "order-service",
│   │         "instanceId": "10.0.1.5:8080",
│   │         "registeredTime": 1712000000000
│   │       }
│   │
│   └── consumers/{consumerInstanceId}              # Consumer 节点
│       ├── Lease: TTL=60s + Keepalive
│       └── Value: consumer 元数据
│
└── traffic_manager/blacklist/
    └── {serviceName}/{instanceId}                  # 黑名单条目
        ├── Lease: TTL 由 BLACKLIST_TTL_MS 控制
        └── Value: {
              "ttlMs": 60000,
              "expireTimeMs": 1712000060000
            }

EtcdClient — 零依赖 HTTP 客户端

OneAgent 实现了一个纯 JDK HttpURLConnection 的 etcd v3 HTTP 客户端(约 69KB),不引入任何第三方 etcd SDK 依赖:

// EtcdClient.java — 核心 API
public class EtcdClient {
    // 基础 KV 操作
    void put(String key, String value);
    void putWithLease(String key, String value, long leaseId);
    String get(String key);
    void delete(String key);
    void deleteRange(String prefix);  // 前缀批量删除

    // Lease 管理
    long leaseGrant(long ttl);         // 创建租约
    void leaseKeepAlive(long leaseId); // 续租心跳
    void leaseRevoke(long leaseId);    // 撤销租约

    // Watch (HTTP 长连接流式)
    void watch(String prefix, WatchCallback callback);

    // Auth (Token 认证 + 自动刷新)
    void authenticate(String user, String password);
}

设计亮点:

  • 零外部依赖:使用 JDK 内置 HttpURLConnection,避免 Jar 冲突
  • HTTP 流式 Watch:通过长连接读取 etcd Watch 事件流,而非轮询
  • Lease 续租:后台定时 Keepalive,实例异常退出时 Lease 自动过期
  • Token 认证:自动获取和刷新 etcd Auth Token

ServiceContext — 全局服务信息注册

插件拦截器将服务信息写入 ServiceContext,供核心模块统一读取:

// ServiceContext.java — volatile 保证线程可见性
public class ServiceContext {
    private static volatile String serviceName;
    private static volatile String instanceId;

    public static void setServiceInfo(String svcName, String instId) {
        serviceName = svcName.toLowerCase();  // 统一小写
        instanceId = instId;
    }

    public static boolean isAvailable() {
        return serviceName != null && instanceId != null;
    }
}
22

Eureka 插件拦截

插件定义

Eureka 2.x 插件通过 skywalking-plugin.def 声明 4 个拦截点:

# skywalking-plugin.def
oneagent-eureka-2.x=...RegisterInstrumentation
oneagent-eureka-2.x=...DiscoveryInstrumentation
oneagent-eureka-2.x=...RibbonServerListInstrumentation
oneagent-eureka-2.x=...SpringCloudLoadBalancerInstrumentation

RegisterInterceptor — 注册拦截

拦截 Eureka 的 register() 方法,在注册完成后将 Provider 信息写入 etcd:

// RegisterInterceptor.java — afterMethod()
public Object afterMethod(EnhancedInstance objInst, Method method,
        Object[] allArguments, Class<?>[] argumentsTypes, Object ret) {

    // 1. 功能开关检查
    if (!OneAgentConfig.getBoolean(ENABLE_ONEAGENT_FEATURE)) return ret;
    if (!ServiceContext.isAvailable()) return ret;

    // 2. 从 Eureka InstanceInfo 提取元数据
    InstanceInfo instanceInfo = (InstanceInfo) allArguments[0];
    String serviceName = instanceInfo.getAppName().toLowerCase();
    String instanceId = instanceInfo.getIPAddr() + ":" + instanceInfo.getPort();

    // 3. 注册服务上下文
    ServiceContext.setServiceInfo(serviceName, instanceId);

    // 4. 写入 etcd (带 Lease 续租)
    String providerPath = prefix + "/services/" + serviceName
                        + "/providers/" + instanceId;
    String value = buildProviderMetadata(instanceInfo);  // JSON
    etcdClient.putWithLease(providerPath, value, leaseId);

    // 5. 注册到 ProviderDeregistrationManager (下线时自动清理)
    deregManager.registerProvider(providerPath, serviceName, instanceId);

    return ret;
}

DiscoveryInterceptor — 发现拦截

拦截服务发现流程,将 Consumer 注册到 etcd:

应用调用 discoveryClient.getInstances("order-service")
    │
    ▼
DiscoveryInterceptor.beforeMethod()
    │
    ├── 检查功能开关 & ServiceContext
    │
    ├── 注册 Consumer 到 etcd (幂等)
    │   key:   /oneagent/services/order-service/consumers/{consumerInstanceId}
    │   value: consumer 元数据 JSON
    │   lease: TTL + Keepalive
    │
    └── 返回原始结果(不修改发现结果)

RibbonServerListInterceptor — 负载均衡黑名单过滤

拦截 Ribbon 的 getAllServers() / getReachableServers(),过滤掉黑名单中的实例:

// RibbonServerListInterceptor.java — afterMethod()
public Object afterMethod(EnhancedInstance objInst, Method method,
        Object[] allArguments, Class<?>[] argumentsTypes, Object ret) {

    List<Server> servers = (List<Server>) ret;
    if (servers == null || servers.isEmpty()) return ret;

    // 1. 功能开关 & 黑名单开关检查 (SERVICE 级)
    String serviceName = extractServiceName(servers);
    if (!OneAgentConfig.getBoolean(ENABLE_BLACKLIST, serviceName)) return ret;

    // 2. 快速路径: 黑名单为空则直接返回
    if (TrafficManager.isBlacklistEmpty()) return ret;

    // 3. 单次遍历过滤 (延迟分配 ArrayList)
    List<Server> filtered = null;
    for (int i = 0; i < servers.size(); i++) {
        String instanceId = RibbonInstanceUtils.getInstanceId(servers.get(i));
        if (TrafficManager.isInBlacklist(instanceId)) {
            if (filtered == null) {
                filtered = new ArrayList<>(servers.subList(0, i));  // 延迟拷贝
            }
        } else if (filtered != null) {
            filtered.add(servers.get(i));
        }
    }

    // 4. 安全兜底: 全部被过滤则返回原列表
    if (filtered != null && filtered.isEmpty()) return ret;

    return filtered != null ? filtered : ret;
}

设计亮点:

  • 延迟分配:只有命中黑名单时才创建新列表,零开销快速路径
  • 安全兜底:如果所有实例都在黑名单,返回原列表防止服务完全不可用
  • SERVICE 级开关:每个服务可独立控制是否启用黑名单

完整请求流转图

Service-A (Consumer)                              Service-B (Provider)
┌───────────────────┐                             ┌───────────────────┐
│ Feign 调用         │                             │ Eureka register() │
│ order-service      │                             │        │          │
│      │             │                             │        ▼          │
│      ▼             │                             │ RegisterInterceptor│
│ Ribbon.getServers()│                             │   → etcd put      │
│      │             │                             │     /providers/B  │
│      ▼             │                             └───────────────────┘
│ RibbonInterceptor  │
│   → check blacklist│──── TrafficManager ────── etcd Watch ──── /providers/*
│   → filter B (下线) │                                           (DELETE event)
│   → return [C, D]  │
│      │             │
│      ▼             │
│ 发往 C 或 D        │
└───────────────────┘
23

流量黑名单机制

TrafficManager 双层索引设计

TrafficManager 内存数据结构:

blacklistEntries (ConcurrentHashMap):
┌──────────────────┬────────────────────────────────────────┐
│   instanceId     │          BlacklistEntry                │
├──────────────────┼────────────────────────────────────────┤
│ 10.0.1.5:8080    │ {serviceName:"order-svc", expireAt:T1}│
│ 10.0.1.6:8080    │ {serviceName:"order-svc", expireAt:T2}│
│ 10.0.2.3:9090    │ {serviceName:"pay-svc",   expireAt:T3}│
└──────────────────┴────────────────────────────────────────┘

serviceBlacklistIndex (ConcurrentHashMap):
┌──────────────────┬──────────────────────────────┐
│   serviceName    │      Set<instanceId>          │
├──────────────────┼──────────────────────────────┤
│ order-svc        │ {10.0.1.5:8080, 10.0.1.6:..} │
│ pay-svc          │ {10.0.2.3:9090}               │
└──────────────────┴──────────────────────────────┘

查询: isInBlacklist("10.0.1.5:8080")
      → O(1) 查 blacklistEntries
      → 检查 expireAt,过期则 CAS 删除(懒删除)

黑名单 Watch 事件处理

// TrafficManager.java — Watch 回调
private void handleBlacklistWatchEvent(WatchEvent event) {
    String key = event.getKey();   // /oneagent/traffic_manager/blacklist/{svc}/{inst}
    String instanceId = extractInstanceId(key);
    String serviceName = extractServiceName(key);

    switch (event.getType()) {
        case PUT:
            // 解析 BlacklistValue: {ttlMs, expireTimeMs}
            BlacklistValue bv = deserialize(event.getValue());
            long expireAt = System.currentTimeMillis() + bv.getTtlMs();
            BlacklistEntry entry = new BlacklistEntry(serviceName, expireAt);

            blacklistEntries.put(instanceId, entry);
            serviceBlacklistIndex
                .computeIfAbsent(serviceName, k -> ConcurrentHashMap.newKeySet())
                .add(instanceId);
            break;

        case DELETE:
            blacklistEntries.remove(instanceId);
            Set<String> set = serviceBlacklistIndex.get(serviceName);
            if (set != null) set.remove(instanceId);
            break;
    }
}

黑名单查询 — 懒删除策略

// TrafficManager.java — 快速路径 + 懒过期
public static boolean isInBlacklist(String instanceId) {
    BlacklistEntry entry = blacklistEntries.get(instanceId);
    if (entry == null) return false;

    // 检查本地 TTL 是否过期
    if (System.currentTimeMillis() > entry.getExpireAt()) {
        // CAS 删除:防止并发重复删除
        blacklistEntries.remove(instanceId, entry);
        Set<String> set = serviceBlacklistIndex.get(entry.getServiceName());
        if (set != null) set.remove(instanceId);
        return false;
    }
    return true;
}

设计要点:

  • 双重过期:etcd 侧 Lease TTL + 本地 expireAt,双保险
  • CAS 懒删除:查询时检测过期并原子移除,无需后台清理线程
  • O(1) 查询:ConcurrentHashMap 直接定位,对 Ribbon 调用零延迟影响
24

四层优雅下线

四层防护体系

OneAgent 设计了四层递进式下线防护,确保在各种场景下都能实现零流量损失的优雅退出:

Layer 1: PreStop 文件监听 (最早触发, K8s 场景)
    │
    ▼
Layer 2: Signal Handler (SIGTERM/SIGINT)
    │
    ▼
Layer 3: JVM Shutdown Hook (ServiceManager.shutdown)
    │
    ▼
Layer 4: etcd Lease 自动过期 (兜底, kill -9 场景)

Layer 1: PreStopFileShutdownTrigger

K8s PreStop Hook 通过写文件触发下线流程,比 SIGTERM 更早

// PreStopFileShutdownTrigger.java
public class PreStopFileShutdownTrigger {
    // 守护线程轮询检测 prestop 文件
    private final ScheduledExecutorService executor =
        Executors.newSingleThreadScheduledExecutor(
            r -> new Thread(r, "oneagent-prestop-watcher"));

    public void start() {
        // 启动时清理残留文件,防止误触发
        Files.deleteIfExists(Paths.get(prestopFilePath));

        executor.scheduleWithFixedDelay(() -> {
            String content = readFirstLine(prestopFilePath);
            if ("stop".equals(content)) {
                // 触发下线!
                providerDeregManager.cleanupAllProviders();
                trafficManager.waitForTrafficDrain();
            }
        }, 0, checkIntervalMs, TimeUnit.MILLISECONDS);
    }
}

K8s Pod YAML 配置示例:

lifecycle:
  preStop:
    exec:
      command:
        - /bin/sh
        - -c
        - |
          echo 'stop' > /tmp/prestop    # 触发 OneAgent 下线
          sleep 15                        # 等待流量排空
          # SIGTERM 将在 sleep 结束后发出

Layer 2 & 3: Signal Handler + Shutdown Hook

SIGTERM / SIGINT 信号到达
    │
    ▼
Signal Handler (RegisterInterceptor 中注册)
    ├── ProviderDeregistrationManager.cleanupAllProviders()
    │   ├── 删除 etcd Provider 节点
    │   ├── 加入黑名单 (如果条件满足)
    │   └── 上报 INSTANCE_OFFLINE 事件
    └── TrafficManager.waitForTrafficDrain()
        └── sleep(DRAIN_WAIT_TIME_MS)  // 等待在途请求完成
    │
    ▼
JVM Shutdown Hook (ServiceManager.shutdown → EtcdClientService.shutdown)
    ├── 检查 cleaned 标志 (AtomicBoolean CAS)
    │   └── 如果已被 PreStop/Signal 处理 → 跳过
    ├── 未处理则执行完整清理流程
    ├── 关闭 OneAgentDataReporter
    └── 关闭 EtcdClient (撤销 Lease, 断开连接)

Layer 4: etcd Lease 自动过期

异常退出场景 (kill -9, OOM, 物理机宕机):
    │
    ├── Keepalive 心跳停止
    ├── etcd 等待 Lease TTL 过期 (默认 60s)
    ├── Provider 节点自动删除
    │
    ▼
Consumer 侧的 Watch 检测到 DELETE 事件
    └── TrafficManager 将该实例加入黑名单
        └── 后续请求自动绕过该实例

ProviderDeregistrationManager — 核心清理逻辑

// ProviderDeregistrationManager.java
public void cleanupAllProviders() {
    // AtomicBoolean CAS: 确保只执行一次
    if (!cleaned.compareAndSet(false, true)) return;

    for (Map.Entry<String, ProviderInfo> entry : registeredProviders.entrySet()) {
        String providerPath = entry.getKey();
        ProviderInfo info = entry.getValue();

        // 1. 删除 etcd Provider 节点
        boolean deleted = etcdClient.delete(providerPath);

        if (deleted) {
            // 2. 加入黑名单 (条件: 该服务有 >1 个 Provider && 黑名单未满)
            if (providerCount > 1 && blacklistCount < maxCount) {
                String blacklistKey = prefix + "/traffic_manager/blacklist/"
                    + info.getServiceName() + "/" + info.getInstanceId();
                String blacklistValue = "{\"ttlMs\":" + ttlMs
                    + ",\"expireTimeMs\":" + (now + ttlMs) + "}";
                etcdClient.putWithLease(blacklistKey, blacklistValue, leaseId);
            }

            // 3. 上报 INSTANCE_OFFLINE 事件
            eventCollector.reportEvent(AgentEvent.instanceOffline(info));
            registeredProviders.remove(providerPath);
        } else {
            // 删除失败: 重置标志允许重试
            cleaned.set(false);
        }
    }
}

完整下线时序图 (K8s 场景)

时间轴                     K8s              PreStop Trigger      Agent Core
────────────────────────────────────────────────────────────────────────────
 T+0s    Pod 终止信号 ─────▶ preStop Hook
                             echo 'stop' ─▶ 检测到文件 ──────▶ 删除 Provider
                                                              加入黑名单
                                                              上报事件
 T+1s                                       waitForDrain ────▶ sleep 10s
                                                              (等待在途请求)
 T+5s                        sleep 结束
         发送 SIGTERM ───────────────────▶ Signal Handler
                                           cleaned=true → 跳过 (已处理)
 T+5s    SIGTERM ──────────────────────▶ JVM Shutdown Hook
                                           cleaned=true → 跳过 (已处理)
                                           关闭 Reporter
                                           关闭 EtcdClient
 T+11s                                     排空完成, 进程退出
────────────────────────────────────────────────────────────────────────────
 Consumer 侧: Watch 收到 DELETE → 黑名单生效 → 后续请求绕过下线实例
25

数据上报与事件采集

上报子系统架构

┌──────────────────────────────────────────────────────────┐
│                OneAgentDataReporter                       │
│                (HTTP POST 定时上报)                         │
│                                                          │
│  ┌────────────────────┐    ┌────────────────────────┐    │
│  │  EventCollector    │    │  MetricCollector        │    │
│  │                    │    │                         │    │
│  │  - INSTANCE_ONLINE │    │  - 方法调用计数          │    │
│  │  - INSTANCE_OFFLINE│    │  - etcd 操作延迟         │    │
│  │  - BLACKLIST_ADD   │    │  - 黑名单过滤统计        │    │
│  │  - BLACKLIST_FAIL  │    │  - Keepalive 成功/失败   │    │
│  │  - CONFIG_CHANGE   │    │                         │    │
│  └────────────────────┘    └────────────────────────┘    │
│                                                          │
│  定时器: 每 N 秒收集一次 → 合并 → HTTP POST → 后端         │
└──────────────────────────────────────────────────────────┘

关键事件类型

事件触发时机携带信息
INSTANCE_ONLINEProvider 首次注册到 etcdserviceName, instanceId, 元数据
INSTANCE_OFFLINEProvider 主动下线清理serviceName, instanceId, 运行时长
BLACKLIST_ADD实例加入黑名单targetService, instanceId, ttlMs
BLACKLIST_FAIL黑名单写入失败失败原因, targetService
CONFIG_CHANGE动态配置变更变更字段, 旧值, 新值
第七部分

设计精华

核心设计模式与完整链路追踪示例

26

核心设计模式

1. ThreadLocal 上下文管理(零锁追踪)

每个线程通过 ThreadLocal<TracingContext> 持有自己的追踪上下文,Span 的创建、停止、归档全部是线程本地操作,无需任何同步。这是 Agent 高性能的关键:

// 无锁操作链:
// 1. CONTEXT.get()              → ThreadLocal 读取
// 2. activeSpanStack.addLast()  → LinkedList 本地操作
// 3. span.tag(key, value)       → 对象字段赋值
// 4. activeSpanStack.removeLast() → LinkedList 本地操作
// 5. segment.archive(span)      → LinkedList.add()
// 全程无锁!

2. SPI + 注解的服务覆盖机制

通过 @DefaultImplementor@OverrideImplementor 注解,配合 Java SPI 的 ServiceLoader,实现了一套灵活的服务替换体系:

优势:
├── 默认实现开箱即用(gRPC 上报)
├── 只需加入对应 JAR 即可替换(Kafka 上报)
├── 不需要修改任何配置文件
└── 编译时检查覆盖关系(注解的 value 指定被覆盖的类)

3. Witness 版本兼容机制

通过检查特定类是否存在来判断框架版本,而非读取 pom.xml 或 MANIFEST 版本号:

优势:
├── 运行时检测,不依赖构建信息
├── 同一框架不同版本的插件可以共存
├── 版本不匹配时安全跳过,不抛异常
└── 支持检测类和方法两个粒度

4. ClassLoader 隔离策略

SkyWalking 设计了 AgentClassLoader 体系来避免类加载冲突:

Bootstrap ClassLoader
    └── App ClassLoader
         ├── AgentClassLoader (默认)    ← Agent 核心类
         └── AgentClassLoader (per-app) ← 每个目标 ClassLoader 对应一个
              └── 拦截器实例缓存

为什么需要隔离?
├── Agent 依赖的 Guava/gRPC 版本可能与业务不同
├── 插件拦截器需要在目标 ClassLoader 的上下文中运行
└── 防止 Agent 内部类泄露到业务代码空间

5. 异步缓冲 + 优雅降级

整个数据上报链路遵循"不能影响业务"的核心原则:

降级策略:
├── DataCarrier 缓冲满 → 丢弃 Segment(不阻塞业务线程)
├── gRPC 未连接       → 丢弃 Segment(计数 + 日志)
├── gRPC 发送超时     → 中断上报(不影响下次)
├── Span 数量超限     → 标记 isSizeLimited,停止创建新 Span
├── Agent 初始化失败   → 静默退出,不影响应用启动
└── 任何插件异常      → catch 后继续,不传播到业务代码

6. 延迟加载(Lazy Loading)

贯穿整个 Agent 的懒加载策略,最小化启动开销:

组件延迟时机
拦截器实例目标方法首次被调用时才加载
gRPC Channel首次需要上报时才建立连接
TracingContext首次创建 Span 时才实例化
AgentClassLoader对应目标 ClassLoader 首次出现时
27

完整链路追踪示例

一次 HTTP 请求的完整追踪之旅

假设:Service-A 收到一个 HTTP 请求,调用数据库后再通过 Feign 调用 Service-B。

Step 1: HTTP 请求到达 Service-A
────────────────────────────────
Tomcat/Spring MVC 插件触发:
  → ContextManager.createEntrySpan("POST /api/order", carrier)
  → 从 HTTP Header 中提取 sw8(如果存在父上下文)
  → ThreadLocal 中创建 TracingContext + TraceSegment
  → 压入 EntrySpan (spanId=0)

Step 2: 业务逻辑调用数据库
────────────────────────────────
MySQL 插件触发:
  → ContextManager.createExitSpan("MySQL/INSERT", "mysql:3306")
  → 压入 ExitSpan (spanId=1)
  → ExitSpan.tag("db.type", "MySQL")
  → ExitSpan.tag("db.statement", "INSERT INTO orders ...")
  → SQL 执行完毕
  → ContextManager.stopSpan()
  → ExitSpan pop 出栈,archive 到 Segment

Step 3: 通过 Feign 调用 Service-B
────────────────────────────────
Feign 插件触发:
  → ContextManager.createExitSpan("GET /api/payment", carrier, "svc-b:8080")
  → 压入 ExitSpan (spanId=2)
  → ContextManager.inject(carrier)  // 注入上下文
  → HTTP Header: sw8 = "1-BASE64(traceId)-BASE64(segmentId)-2-..."
  → Feign 发起 HTTP 请求
  → 收到响应
  → ContextManager.stopSpan()
  → ExitSpan pop 出栈,archive 到 Segment

Step 4: HTTP 请求返回
────────────────────────────────
Tomcat/Spring MVC 插件触发:
  → ContextManager.stopSpan()  // 停止 EntrySpan
  → EntrySpan pop 出栈,archive 到 Segment
  → activeSpanStack 为空!
  → TracingContext.finish()
  → 通知 TracingContextListener

Step 5: 数据上报
────────────────────────────────
TraceSegmentServiceClient.afterFinished(segment):
  → carrier.produce(segment)  // 放入 DataCarrier

Consumer Thread (异步):
  → consume([segment1, segment2, ...])
  → segment.transform()  // → SegmentObject (Protobuf)
  → gRPC stream.onNext(segmentObject)
  → stream.onCompleted()
  → OAP Server 接收并存储

Step 6: 同时在 Service-B
────────────────────────────────
Service-B 的 Spring MVC 插件:
  → 从 sw8 Header 反序列化 ContextCarrier
  → ContextManager.createEntrySpan("GET /api/payment", carrier)
  → context.extract(carrier)  // 恢复父上下文
  → segment.ref(parentRef)    // 关联到 Service-A 的 Segment
  → ... 执行业务逻辑 ...
  → 相同的上报流程

最终在 SkyWalking UI 上看到的拓扑

  Trace Timeline (traceId: abc123)
  ┌─────────────────────────────────────────────────────────┐
  │ Service-A                                                │
  │ ├─ EntrySpan:  POST /api/order          [0ms ─── 150ms] │
  │ │  ├─ ExitSpan: MySQL INSERT            [10ms ── 30ms]  │
  │ │  └─ ExitSpan: GET /api/payment        [35ms ── 140ms] │
  │                                                          │
  │ Service-B                                                │
  │ └─ EntrySpan:  GET /api/payment         [40ms ── 135ms] │
  │    └─ ExitSpan: Redis SET               [50ms ── 60ms]  │
  └─────────────────────────────────────────────────────────┘
28

总结与启示

SkyWalking Java Agent 的核心设计哲学

通读整个源码,SkyWalking Agent 的设计哲学可以概括为三个原则:

  1. 绝对不影响业务 — 所有可能阻塞或抛异常的操作都被 try-catch 包裹,数据丢弃优于业务卡顿
  2. 极致的性能优化 — ThreadLocal 无锁上下文、DataCarrier 无锁分区、延迟加载、对象复用
  3. 高度可扩展 — SPI 插件机制、服务覆盖注解、Witness 版本兼容,100+ 中间件支持

关键源码文件速查表

功能核心类路径
Agent 入口SkyWalkingAgentapm-agent/.../SkyWalkingAgent.java
服务管理ServiceManagerapm-agent-core/.../boot/ServiceManager.java
插件加载PluginBootstrapapm-agent-core/.../plugin/PluginBootstrap.java
插件索引PluginFinderapm-agent-core/.../plugin/PluginFinder.java
插件基类AbstractClassEnhancePluginDefineapm-agent-core/.../plugin/AbstractClassEnhancePluginDefine.java
字节码增强ClassEnhancePluginDefineapm-agent-core/.../enhance/ClassEnhancePluginDefine.java
方法拦截InstMethodsInterapm-agent-core/.../enhance/InstMethodsInter.java
上下文管理ContextManagerapm-agent-core/.../context/ContextManager.java
追踪上下文TracingContextapm-agent-core/.../context/TracingContext.java
线程追踪片段TraceSegmentapm-agent-core/.../context/trace/TraceSegment.java
跨进程传播ContextCarrierapm-agent-core/.../context/ContextCarrier.java
数据上报TraceSegmentServiceClientapm-agent-core/.../remote/TraceSegmentServiceClient.java
连接管理GRPCChannelManagerapm-agent-core/.../remote/GRPCChannelManager.java
异步缓冲DataCarrierapm-commons/.../datacarrier/DataCarrier.java
配置定义Configapm-agent-core/.../conf/Config.java

可借鉴的架构思想

🎯

SPI 插件化

通过 ServiceLoader + def 文件实现零耦合插件发现,适用于任何需要扩展点的系统

🔒

ThreadLocal 上下文

线程本地存储实现无锁操作,适用于高并发场景下的请求级数据传递

📦

异步缓冲

生产者-消费者 + 可丢弃策略,适用于不能阻塞主线程的旁路数据采集

🧰

字节码增强

ByteBuddy 运行时修改类,适用于不修改源码就需要增加横切关注点的场景

基于 Apache SkyWalking Java Agent 源码分析 · 仅供研究学习
源码版权归 Apache SkyWalking 所有