SkyWalking Java Agent
从 premain 入口到分布式追踪的完整链路,逐行拆解 APM 探针的设计精髓
什么是 SkyWalking Java Agent
无侵入的 APM 探针
Apache SkyWalking Java Agent 是一个基于 Java Agent 机制 + ByteBuddy 字节码增强的 APM(Application Performance Monitoring)探针。它在 不修改任何业务代码 的前提下,自动采集分布式链路追踪、性能指标和日志关联数据。
核心能力一览
分布式链路追踪
自动追踪跨服务调用链,支持 HTTP、RPC、MQ、数据库等 100+ 中间件
零侵入字节码增强
基于 ByteBuddy 在类加载时动态修改字节码,业务代码无需任何改动
插件化架构
所有中间件支持通过插件实现,SPI 机制自动发现,热插拔
高性能低开销
ThreadLocal 上下文、DataCarrier 异步缓冲、采样策略,对业务影响极小
使用方式
只需在 JVM 启动参数中加入一行:
java -javaagent:/path/to/skywalking-agent.jar \
-Dskywalking.agent.service_name=my-service \
-Dskywalking.collector.backend_service=127.0.0.1:11800 \
-jar my-application.jar
Agent 会在 premain 阶段完成所有初始化,之后对目标类进行字节码增强,自动采集链路数据并上报到 OAP 后端。
整体架构
架构全景图
┌─────────────────────────────────────────────────────────────────┐
│ JVM 启动 │
│ java -javaagent:skywalking-agent.jar -jar app.jar │
└───────────────────────────┬─────────────────────────────────────┘
│ premain()
▼
┌─────────────────────────────────────────────────────────────────┐
│ SkyWalkingAgent.premain() │
│ 1. SnifferConfigInitializer ── 加载配置 │
│ 2. PluginBootstrap ── 发现 & 加载插件 │
│ 3. installClassTransformer ── 安装 ByteBuddy 转换器 │
│ 4. ServiceManager.boot() ── 启动核心服务 │
└───────────────────────────┬─────────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌─────────────────┐ ┌──────────────────┐
│ ByteBuddy │ │ ServiceManager │ │ Plugin System │
│ Transformer │ │ │ │ │
│ │ │ - GRPCChannel │ │ skywalking- │
│ 类匹配 │ │ - TraceReport │ │ plugin.def │
│ 字节码增强 │ │ - Sampling │ │ │
│ 拦截器注入 │ │ - JVM Metrics │ │ 100+ 插件 │
└──────┬───────┘ └────────┬────────┘ └──────────────────┘
│ │
▼ ▼
┌──────────────────────────────────────────┐
│ Tracing Context │
│ ContextManager (ThreadLocal) │
│ TracingContext → TraceSegment → Span │
│ ContextCarrier (跨进程) / Snapshot (跨线程)│
└──────────────────────┬───────────────────┘
│
▼
┌──────────────────────────────────────────┐
│ DataCarrier (异步缓冲) │
│ 多 Channel + 环形 Buffer + 消费者线程 │
└──────────────────────┬───────────────────┘
│ gRPC Stream
▼
┌──────────────────────────────────────────┐
│ OAP Server (后端) │
│ 接收 SegmentObject,分析 & 存储 │
└──────────────────────────────────────────┘
模块结构
核心模块拆解
| 模块 | 路径 | 职责 |
|---|---|---|
| apm-agent | apm-sniffer/apm-agent | Agent 入口,包含 SkyWalkingAgent.premain() |
| apm-agent-core | apm-sniffer/apm-agent-core | 核心引擎:插件加载、字节码增强、上下文管理、数据上报 |
| apm-sdk-plugin | apm-sniffer/apm-sdk-plugin | 标准插件集:Spring、Dubbo、MySQL、Redis 等 100+ |
| bootstrap-plugins | apm-sniffer/bootstrap-plugins | JDK 级插件:Thread、HttpURLConnection 等 |
| optional-plugins | apm-sniffer/optional-plugins | 可选插件:需手动启用 |
| apm-commons | apm-commons | 公共组件:DataCarrier 异步缓冲、工具类 |
| apm-protocol | apm-protocol/apm-network | gRPC 协议定义,与 OAP 通信的 Protobuf |
| apm-toolkit | apm-application-toolkit | 应用层 API:手动追踪、日志关联 |
apm-agent-core 内部结构(核心中的核心)
apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/
├── boot/ # ServiceManager、BootService 生命周期
├── conf/ # Config 配置类、动态配置
├── context/ # ContextManager、TracingContext、Span 模型
│ ├── ids/ # TraceId、SegmentId 生成器
│ └── trace/ # TraceSegment、EntrySpan、ExitSpan、LocalSpan
├── plugin/ # 插件体系
│ ├── interceptor/ # 拦截器接口 & 增强实现
│ │ └── enhance/ # ClassEnhancePluginDefine、InstMethodsInter
│ ├── match/ # 类匹配策略
│ ├── loader/ # AgentClassLoader
│ └── bootstrap/ # Bootstrap 类增强
├── remote/ # gRPC 通道管理 & 数据上报
├── sampling/ # 采样策略
├── profile/ # 性能剖析
├── jvm/ # JVM 指标采集
└── logging/ # Agent 内部日志
启动流程
从 JVM premain 到所有服务就绪的完整初始化链路
premain 入口
Java Agent 机制
Java Agent 是 JDK 提供的一种在 main() 方法执行之前介入 JVM 的机制。通过在 JAR 的 MANIFEST.MF 中声明 Premain-Class,JVM 会在加载应用类之前调用指定类的 premain() 方法,这就是 SkyWalking 实现"零侵入"的基础。
# META-INF/MANIFEST.MF
Premain-Class: org.apache.skywalking.apm.agent.SkyWalkingAgent
Can-Redefine-Classes: true
Can-Retransform-Classes: true
SkyWalkingAgent.premain() 完整流程
这是整个 Agent 的总入口,源码位于 SkyWalkingAgent.java:75:
public static void premain(String agentArgs, Instrumentation instrumentation)
throws PluginException {
final PluginFinder pluginFinder;
// 第一步:加载配置
try {
SnifferConfigInitializer.initializeCoreConfig(agentArgs);
} catch (Exception e) {
LogManager.getLogger(SkyWalkingAgent.class)
.error(e, "SkyWalking agent initialized failure. Shutting down.");
return;
}
// 第二步:检查 Agent 是否启用
if (!Config.Agent.ENABLE) {
LOGGER.warn("SkyWalking agent is disabled.");
return;
}
// 第三步:加载所有插件
try {
pluginFinder = new PluginFinder(new PluginBootstrap().loadPlugins());
} catch (AgentPackageNotFoundException ape) {
LOGGER.error(ape, "Locate agent.jar failure. Shutting down.");
return;
}
// 第四步:安装 ByteBuddy 类转换器
try {
installClassTransformer(instrumentation, pluginFinder);
} catch (Exception e) {
LOGGER.error(e, "Skywalking agent installed class transformer failure.");
}
// 第五步:启动核心服务
try {
ServiceManager.INSTANCE.boot();
} catch (Exception e) {
LOGGER.error(e, "Skywalking agent boot failure.");
}
// 第六步:注册 JVM 关闭钩子
Runtime.getRuntime().addShutdownHook(
new Thread(ServiceManager.INSTANCE::shutdown,
"skywalking service shutdown thread"));
}
配置加载
四层配置覆盖机制
SnifferConfigInitializer.initializeCoreConfig() 实现了一个优雅的多层配置体系,优先级从低到高:
| 优先级 | 来源 | 示例 |
|---|---|---|
| 1 (最低) | Config 类字段默认值 | Config.Agent.SERVICE_NAME = "" |
| 2 | agent.config 配置文件 | agent.service_name=my-svc |
| 3 | JVM 系统属性 | -Dskywalking.agent.service_name=my-svc |
| 4 (最高) | Agent 参数 | -javaagent:agent.jar=agent.service_name=my-svc |
配置通过反射映射到 Config 类的静态字段上。Config.java 使用嵌套静态内部类组织配置项:
public class Config {
public static class Agent {
public static String SERVICE_NAME = "";
public static boolean ENABLE = true;
public static int SAMPLE_N_PER_3_SECS = -1; // -1 表示全采样
public static int SPAN_LIMIT_PER_SEGMENT = 300;
}
public static class Collector {
public static String BACKEND_SERVICE = "";
public static int GRPC_CHANNEL_CHECK_INTERVAL = 30;
public static int GRPC_UPSTREAM_TIMEOUT = 30; // 秒
}
public static class Buffer {
public static int CHANNEL_SIZE = 5;
public static int BUFFER_SIZE = 300;
}
}
ServiceManager 生命周期
枚举单例 + SPI 服务发现
ServiceManager 是整个 Agent 的服务容器,采用枚举单例模式,通过 Java SPI(ServiceLoader)自动发现所有 BootService 实现:
public enum ServiceManager {
INSTANCE; // 枚举单例,天然线程安全
private Map<Class, BootService> bootedServices = Collections.emptyMap();
public void boot() {
bootedServices = loadAllServices(); // SPI 加载
prepare(); // 第一阶段:准备
startup(); // 第二阶段:启动
onComplete(); // 第三阶段:完成
}
// 通过 Java SPI 加载所有 BootService 实现
void load(List<BootService> allServices) {
for (final BootService bootService : ServiceLoader.load(
BootService.class, AgentClassLoader.getDefault())) {
allServices.add(bootService);
}
}
}
三阶段生命周期
每个 BootService 都经历标准的三阶段生命周期,按 priority() 排序执行:
public interface BootService {
void prepare(); // 准备阶段:注册监听器、初始化资源
void boot(); // 启动阶段:开始工作(建立连接、启动线程等)
void onComplete(); // 完成阶段:所有服务就绪后的回调
void shutdown(); // 关闭阶段:优雅停机(倒序执行)
default int priority() { return 0; }
}
GRPCChannelManager— 管理到 OAP 的 gRPC 连接TraceSegmentServiceClient— 链路数据上报ContextManager— 上下文管理(ThreadLocal)SamplingService— 采样策略JVMService— JVM 指标采集ProfileTaskExecutionService— 性能剖析CommandService— 下发命令处理
@DefaultImplementor 与 @OverrideImplementor
SkyWalking 设计了一套优雅的服务覆盖机制,允许不同场景替换默认实现:
// 默认实现
@DefaultImplementor
public class TraceSegmentServiceClient implements BootService { ... }
// Kafka 上报覆盖 gRPC 上报(在 optional-reporter-plugins 中)
@OverrideImplementor(TraceSegmentServiceClient.class)
public class KafkaTraceSegmentServiceClient implements BootService { ... }
加载逻辑在 ServiceManager.loadAllServices() 中:如果检测到 @OverrideImplementor 注解,会替换对应的 @DefaultImplementor 服务,实现可插拔的服务替换。
插件体系
SPI 自动发现、插件定义与增强规则
插件发现与加载
PluginBootstrap.loadPlugins()
插件加载的完整流程,源码位于 PluginBootstrap.java:41:
public List<AbstractClassEnhancePluginDefine> loadPlugins()
throws AgentPackageNotFoundException {
// 1. 初始化 Agent 专用的 ClassLoader
AgentClassLoader.initDefaultLoader();
// 2. 扫描所有 JAR 中的 skywalking-plugin.def 文件
PluginResourcesResolver resolver = new PluginResourcesResolver();
List<URL> resources = resolver.getResources();
// 3. 解析每个 def 文件
for (URL pluginUrl : resources) {
PluginCfg.INSTANCE.load(pluginUrl.openStream());
}
// 4. 通过反射实例化每个插件类
List<PluginDefine> pluginClassList = PluginCfg.INSTANCE.getPluginClassList();
List<AbstractClassEnhancePluginDefine> plugins = new ArrayList<>();
for (PluginDefine pluginDefine : pluginClassList) {
AbstractClassEnhancePluginDefine plugin =
(AbstractClassEnhancePluginDefine) Class.forName(
pluginDefine.getDefineClass(),
true,
AgentClassLoader.getDefault()
).newInstance();
plugins.add(plugin);
}
// 5. 加载动态插件
plugins.addAll(DynamicPluginLoader.INSTANCE.load(AgentClassLoader.getDefault()));
return plugins;
}
skywalking-plugin.def 文件格式
每个插件 JAR 中都包含一个 skywalking-plugin.def 文件,格式极其简洁:
# 格式:插件名称=插件定义类全限定名
feign-default-http-9.x=org.apache.skywalking.apm.plugin.feign.http.v9.define.DefaultHttpClientInstrumentation
spring-mvc-annotation-5.x=org.apache.skywalking.apm.plugin.spring.mvc.v5.define.Spring5Instrumentation
mysql-8.x=org.apache.skywalking.apm.plugin.jdbc.mysql.v8.define.ConnectionInstrumentation
PluginFinder — 插件索引
加载完所有插件后,PluginFinder 将它们按匹配方式分类索引:
public class PluginFinder {
// 精确类名匹配:className → List<Plugin>
private final Map<String, LinkedList<AbstractClassEnhancePluginDefine>>
nameMatchDefine = new HashMap<>();
// 间接匹配(接口、继承、注解等)
private final List<AbstractClassEnhancePluginDefine>
signatureMatchDefine = new ArrayList<>();
// Bootstrap 类增强(JDK 核心类)
private final List<AbstractClassEnhancePluginDefine>
bootstrapClassMatchDefine = new ArrayList<>();
// 当某个类被加载时,查找所有适用的插件
public List<AbstractClassEnhancePluginDefine> find(TypeDescription typeDescription) {
// 先查精确匹配,再查签名匹配
...
}
// 构建 ByteBuddy 的 ElementMatcher,用于全局类匹配
public ElementMatcher<? super TypeDescription> buildMatch() {
// 合并所有插件的匹配规则
...
}
}
插件定义体系
插件类继承体系
AbstractClassEnhancePluginDefine # 抽象基类
├── ClassEnhancePluginDefine # 实现字节码增强逻辑
│ ├── ClassInstanceMethodsEnhancePluginDefine # 只增强实例方法
│ └── ClassStaticMethodsEnhancePluginDefine # 只增强静态方法
└── v2/ClassEnhancePluginDefineV2 # V2 版本(优化性能)
AbstractClassEnhancePluginDefine — 插件基类
每个插件都必须实现以下核心方法:
public abstract class AbstractClassEnhancePluginDefine {
// 动态注入的字段名(用于存储 Agent 上下文)
public static final String CONTEXT_ATTR_NAME = "_$EnhancedClassField_ws";
// 定义要增强的目标类
protected abstract ClassMatch enhanceClass();
// 定义构造器拦截点
public abstract ConstructorInterceptPoint[] getConstructorsInterceptPoints();
// 定义实例方法拦截点
public abstract InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints();
// 定义静态方法拦截点
public abstract StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints();
// 主入口:define() 方法
public DynamicType.Builder<?> define(TypeDescription typeDescription,
DynamicType.Builder<?> builder, ClassLoader classLoader,
EnhanceContext context) throws PluginException {
// 1. 版本检测(Witness 机制)
String[] witnessClasses = witnessClasses();
if (witnessClasses != null) {
for (String witnessClass : witnessClasses) {
if (!WitnessFinder.INSTANCE.exist(witnessClass, classLoader)) {
return null; // 版本不匹配,跳过增强
}
}
}
// 2. 执行增强
return this.enhance(typeDescription, builder, classLoader, context);
}
}
Witness 版本检测机制
同一个框架的不同版本可能有相同的类名但不同的方法签名。SkyWalking 通过 Witness 类解决这个问题:
// 只在 Spring 5.x 中存在的类,作为版本标记
@Override
protected String[] witnessClasses() {
return new String[] {
"org.springframework.web.servlet.resource.HttpResource"
};
}
如果指定的 Witness 类在当前 ClassLoader 中不存在,说明版本不匹配,该插件会被安全地跳过。这确保了针对同一框架不同版本的多个插件可以共存而不冲突。
插件实战示例
Feign HTTP 客户端插件
以 DefaultHttpClientInstrumentation 为例,看看一个完整插件的定义:
public class DefaultHttpClientInstrumentation
extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "feign.Client$Default";
private static final String INTERCEPT_CLASS =
"org.apache.skywalking.apm.plugin.feign.http.v9.DefaultHttpClientInterceptor";
// 1. 指定要增强的目标类
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS); // 精确匹配 feign.Client$Default
}
// 2. 不拦截构造器
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
// 3. 拦截 execute() 方法
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("execute"); // 匹配方法名
}
@Override
public String getMethodsInterceptor() {
return INTERCEPT_CLASS; // 拦截器类名(延迟加载)
}
@Override
public boolean isOverrideArgs() {
return false; // 不修改方法参数
}
}
};
}
}
字节码增强
ByteBuddy 集成、类转换与拦截器注入的实现细节
ByteBuddy 集成
installClassTransformer()
这是将 ByteBuddy 与 Java Instrumentation API 桥接的关键方法:
static void installClassTransformer(Instrumentation instrumentation,
PluginFinder pluginFinder) throws Exception {
// 1. 创建定制化的 ByteBuddy 实例
AgentBuilder agentBuilder = newAgentBuilder()
.ignore(
nameStartsWith("net.bytebuddy.") // 排除 ByteBuddy 自身
.or(nameStartsWith("org.slf4j.")) // 排除日志框架
.or(nameContains("javassist")) // 排除其他字节码工具
.or(nameContains(".asm."))
.or(nameStartsWith("sun.reflect")) // 排除反射相关
.or(allSkyWalkingAgentExcludeToolkit()) // 排除 Agent 自身(保留 Toolkit)
.or(ElementMatchers.isSynthetic()) // 排除合成类
);
// 2. 注入 Bootstrap 类增强(增强 JDK 核心类)
agentBuilder = BootstrapInstrumentBoost.inject(
pluginFinder, instrumentation, agentBuilder, edgeClasses);
// 3. JDK 9+ 模块系统兼容
agentBuilder = JDK9ModuleExporter.openReadEdge(
instrumentation, agentBuilder, edgeClasses);
// 4. 注册类匹配 + Transformer + 安装
agentBuilder
.type(pluginFinder.buildMatch()) // 全局类匹配规则
.transform(new Transformer(pluginFinder)) // 类转换器
.with(AgentBuilder.RedefinitionStrategy.RETRANSFORMATION)
.with(new Listener()) // 转换事件监听
.installOn(instrumentation); // 安装到 JVM
}
定制化 ByteBuddy 实例
SkyWalking 对 ByteBuddy 进行了多项定制,避免与业务代码冲突:
private static AgentBuilder newAgentBuilder() {
final ByteBuddy byteBuddy = new ByteBuddy()
.with(TypeValidation.of(Config.Agent.IS_OPEN_DEBUGGING_CLASS))
// 定制命名策略:生成的辅助类带 "_sw" 后缀
.with(new SWAuxiliaryTypeNamingStrategy(NAME_TRAIT))
// 定制实现上下文工厂
.with(new SWImplementationContextFactory(NAME_TRAIT))
// 定制方法图编译器
.with(new SWMethodGraphCompilerDelegate(MethodGraph.Compiler.DEFAULT));
return new SWAgentBuilderDefault(byteBuddy,
new SWNativeMethodStrategy(NAME_TRAIT))
.with(new SWDescriptionStrategy(NAME_TRAIT));
}
_sw 后缀,确保所有 SkyWalking 生成的类都有唯一的命名空间。
类增强流程
Transformer — 类转换核心
当 JVM 加载一个匹配插件规则的类时,Transformer.transform() 被调用:
private static class Transformer implements AgentBuilder.Transformer {
private PluginFinder pluginFinder;
@Override
public DynamicType.Builder<?> transform(
final DynamicType.Builder<?> builder,
final TypeDescription typeDescription,
final ClassLoader classLoader,
final JavaModule javaModule,
final ProtectionDomain protectionDomain) {
// 1. 查找所有匹配的插件
List<AbstractClassEnhancePluginDefine> pluginDefines =
pluginFinder.find(typeDescription);
if (pluginDefines.size() > 0) {
DynamicType.Builder<?> newBuilder = builder;
EnhanceContext context = new EnhanceContext();
// 2. 链式应用所有插件的增强逻辑
for (AbstractClassEnhancePluginDefine define : pluginDefines) {
DynamicType.Builder<?> possibleNewBuilder = define.define(
typeDescription, newBuilder, classLoader, context);
if (possibleNewBuilder != null) {
newBuilder = possibleNewBuilder;
}
}
return newBuilder;
}
return builder;
}
}
define() 返回新的 Builder,作为下一个插件的输入,实现链式增强。
ClassEnhancePluginDefine.enhanceInstance() — 实例增强
这是字节码操作的核心,源码位于 ClassEnhancePluginDefine.java:67:
protected DynamicType.Builder<?> enhanceInstance(
TypeDescription typeDescription,
DynamicType.Builder<?> newClassBuilder,
ClassLoader classLoader,
EnhanceContext context) throws PluginException {
// 步骤 1:注入上下文字段
// 为目标类添加一个 Object 类型的 private volatile 字段
// 并实现 EnhancedInstance 接口
if (!typeDescription.isAssignableTo(EnhancedInstance.class)) {
if (!context.isObjectExtended()) {
newClassBuilder = newClassBuilder
.defineField(CONTEXT_ATTR_NAME, Object.class,
ACC_PRIVATE | ACC_VOLATILE)
.implement(EnhancedInstance.class)
.intercept(FieldAccessor.ofField(CONTEXT_ATTR_NAME));
context.extendObjectCompleted();
}
}
// 步骤 2:增强构造器
for (ConstructorInterceptPoint point : constructorInterceptPoints) {
newClassBuilder = newClassBuilder
.constructor(point.getConstructorMatcher())
.intercept(SuperMethodCall.INSTANCE.andThen(
MethodDelegation.withDefaultConfiguration()
.to(new ConstructorInter(
point.getConstructorInterceptor(), classLoader))));
}
// 步骤 3:增强实例方法
for (InstanceMethodsInterceptPoint point : instanceMethodsInterceptPoints) {
newClassBuilder = newClassBuilder
.method(not(isStatic()).and(point.getMethodsMatcher()))
.intercept(MethodDelegation.withDefaultConfiguration()
.to(new InstMethodsInter(
point.getMethodsInterceptor(), classLoader)));
}
return newClassBuilder;
}
_$EnhancedClassField_ws 字段注入
增强后的类等价于:
// 增强前
public class FeignClient implements Client {
public Response execute(Request request, Options options) { ... }
}
// 增强后(概念等价,实际由字节码操作完成)
public class FeignClient implements Client, EnhancedInstance {
// Agent 注入的上下文字段
private volatile Object _$EnhancedClassField_ws;
@Override
public Object getSkyWalkingDynamicField() {
return _$EnhancedClassField_ws;
}
@Override
public void setSkyWalkingDynamicField(Object value) {
_$EnhancedClassField_ws = value;
}
// execute() 方法被拦截器包装
public Response execute(Request request, Options options) {
// → beforeMethod()
// → 原始方法调用
// → afterMethod() / handleMethodException()
}
}
拦截器机制
InstanceMethodsAroundInterceptor 接口
所有实例方法拦截器都实现这个接口,遵循 AOP 的 Around Advice 模式:
public interface InstanceMethodsAroundInterceptor {
// 方法执行前
void beforeMethod(EnhancedInstance objInst,
Method method,
Object[] allArguments,
Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable;
// 方法执行后
Object afterMethod(EnhancedInstance objInst,
Method method,
Object[] allArguments,
Class<?>[] argumentsTypes,
Object ret) throws Throwable;
// 方法异常时
void handleMethodException(EnhancedInstance objInst,
Method method,
Object[] allArguments,
Class<?>[] argumentsTypes,
Throwable t);
}
InstMethodsInter — 方法委托桥梁
InstMethodsInter 是 ByteBuddy 的 MethodDelegation 目标,它在运行时加载实际的拦截器并调用:
public class InstMethodsInter {
private InstanceMethodsAroundInterceptor interceptor;
public InstMethodsInter(String interceptorClassName, ClassLoader classLoader) {
// 延迟加载拦截器实例(带缓存)
this.interceptor = InterceptorInstanceLoader.load(
interceptorClassName, classLoader);
}
@RuntimeType
public Object intercept(@This Object obj,
@AllArguments Object[] allArguments,
@SuperCall Callable<?> zupiCall,
@Origin Method method) throws Throwable {
EnhancedInstance targetObject = (EnhancedInstance) obj;
MethodInterceptResult result = new MethodInterceptResult();
// 1. beforeMethod
interceptor.beforeMethod(targetObject, method, allArguments,
method.getParameterTypes(), result);
if (result.isContinue()) {
// 2. 调用原始方法
ret = zupiCall.call();
}
// 3. afterMethod
ret = interceptor.afterMethod(targetObject, method, allArguments,
method.getParameterTypes(), ret);
return ret;
}
}
ConcurrentHashMap 缓存拦截器实例,保证每个拦截器类只实例化一次。同时为每个目标 ClassLoader 创建独立的 AgentClassLoader,确保类加载隔离。
分布式追踪核心
Trace/Segment/Span 数据模型与上下文传播机制
Trace / Segment / Span 模型
三层数据模型
Trace(分布式追踪)
├── TraceSegment A(Service-A,Thread-1)
│ ├── EntrySpan: POST /api/order ← 入口
│ ├── LocalSpan: OrderService.create ← 本地调用
│ └── ExitSpan: MySQL INSERT ← 出口(数据库)
│
├── TraceSegment B(Service-A,Thread-2,异步)
│ └── LocalSpan: sendNotification
│
└── TraceSegment C(Service-B,跨进程)
├── EntrySpan: POST /api/payment ← 入口
└── ExitSpan: Redis SET ← 出口(缓存)
关系:
- 同一个 Trace 由 traceId 关联
- 每个线程一个 TraceSegment(由 traceSegmentId 标识)
- Segment 内的 Span 通过 spanId(0, 1, 2...)编号
- 跨进程通过 ContextCarrier 传递 parentSegmentId + parentSpanId
- 跨线程通过 ContextSnapshot 传递
TraceSegment — 线程级追踪片段
源码位于 TraceSegment.java:34:
public class TraceSegment {
private String traceSegmentId; // 全局唯一 Segment ID
private TraceSegmentRef ref; // 父 Segment 引用(跨进程/线程)
private List<AbstractTracingSpan> spans; // 已完成的 Span 列表
private DistributedTraceId relatedGlobalTraceId; // 所属 Trace ID
private boolean ignore = false; // 是否被采样策略忽略
private boolean isSizeLimited = false; // Span 数量是否超限
private final long createTime;
public TraceSegment() {
this.traceSegmentId = GlobalIdGenerator.generate();
this.spans = new LinkedList<>();
this.relatedGlobalTraceId = new NewDistributedTraceId();
this.createTime = System.currentTimeMillis();
}
// Span 完成后归档到 Segment
public void archive(AbstractTracingSpan finishedSpan) {
spans.add(finishedSpan);
}
// 转换为 gRPC Protobuf 对象
public SegmentObject transform() {
SegmentObject.Builder builder = SegmentObject.newBuilder();
builder.setTraceId(getRelatedGlobalTrace().getId());
builder.setTraceSegmentId(this.traceSegmentId);
for (AbstractTracingSpan span : this.spans) {
builder.addSpans(span.transform());
}
builder.setService(Config.Agent.SERVICE_NAME);
builder.setServiceInstance(Config.Agent.INSTANCE_NAME);
return builder.build();
}
}
Span 类型体系
| Span 类型 | 用途 | 创建时机 |
|---|---|---|
| EntrySpan | 服务入口边界 | 收到 HTTP 请求、RPC 调用、MQ 消费 |
| ExitSpan | 服务出口边界 | 发起 HTTP 请求、数据库调用、缓存操作 |
| LocalSpan | 本地方法调用 | 内部方法追踪(无网络交互) |
| NoopSpan | 空操作 | 采样策略决定不追踪时 |
createEntrySpan() 检查栈顶是否已有 EntrySpan,如果有则复用并更新操作名,避免冗余 Span。ExitSpan 同理。
ContextManager 上下文管理
ThreadLocal 上下文
ContextManager 是分布式追踪的门面类,所有 Span 操作都通过它:
public class ContextManager implements BootService {
// 核心:每个线程一个 TracingContext
private static ThreadLocal<AbstractTracerContext> CONTEXT =
new ThreadLocal<>();
// 运行时上下文(存储自定义数据)
private static ThreadLocal<RuntimeContext> RUNTIME_CONTEXT =
new ThreadLocal<>();
// 懒创建上下文
private static AbstractTracerContext getOrCreate(
String operationName, boolean forceSampling) {
AbstractTracerContext context = CONTEXT.get();
if (context == null) {
if (StringUtil.isEmpty(operationName)) {
context = new IgnoredTracerContext(); // 忽略
} else {
context = EXTEND_SERVICE.createTraceContext(
operationName, forceSampling); // 创建真实上下文
}
CONTEXT.set(context);
}
return context;
}
}
核心 API
// ===== 创建 Span =====
// 创建入口 Span(带跨进程上下文提取)
AbstractSpan span = ContextManager.createEntrySpan(operationName, carrier);
// 创建出口 Span(带跨进程上下文注入)
AbstractSpan span = ContextManager.createExitSpan(operationName, carrier, remotePeer);
// 创建本地 Span
AbstractSpan span = ContextManager.createLocalSpan(operationName);
// ===== 停止 Span =====
ContextManager.stopSpan(span);
// 当所有 Span 停止后,TracingContext 自动关闭,触发 Segment 上报
// ===== 跨进程传播 =====
ContextManager.inject(carrier); // 注入上下文到 Carrier
ContextManager.extract(carrier); // 从 Carrier 提取上下文
// ===== 跨线程传播 =====
ContextSnapshot snapshot = ContextManager.capture(); // 线程 A 捕获
ContextManager.continued(snapshot); // 线程 B 恢复
TracingContext — 活跃 Span 栈
源码位于 TracingContext.java:61:
public class TracingContext implements AbstractTracerContext {
private TraceSegment segment; // 所属 Segment
private LinkedList<AbstractSpan> activeSpanStack; // 活跃 Span 栈
private int spanIdGenerator; // Span ID 计数器
private volatile int asyncSpanCounter; // 异步 Span 计数
private volatile boolean isRunningInAsyncMode;
private volatile ReentrantLock asyncFinishLock;
private volatile boolean running;
private final ProfileStatusContext profileStatus; // 性能剖析状态
private final CorrelationContext correlationContext; // 用户自定义上下文
private final ExtensionContext extensionContext; // 扩展上下文
// Span 栈操作(LIFO)
// push: createEntrySpan/createExitSpan/createLocalSpan
// pop: stopSpan → 当栈空时,finish Segment → 通知 Listener
}
createXxxSpan() 把 Span push 到栈中,stopSpan() 把 Span pop 出来并 archive 到 Segment。当栈为空时,说明当前线程的所有操作都已完成,Segment 被标记为 finished 并通知所有 TracingContextListener(主要是 TraceSegmentServiceClient)。
跨进程/线程传播
ContextCarrier — 跨进程传播
当服务 A 调用服务 B 时,需要将追踪上下文通过 HTTP Header / RPC 附加信息传递:
public class ContextCarrier implements Serializable {
private String traceId; // 分布式 Trace ID
private String traceSegmentId; // 父 Segment ID
private int spanId = -1; // 父 Span ID
private String parentService; // 父服务名
private String parentServiceInstance; // 父服务实例
private String parentEndpoint; // 父端点名
private String addressUsedAtClient; // 客户端访问地址
private ExtensionContext extensionContext;
private CorrelationContext correlationContext;
// 序列化为 SW8 协议头
// 格式: 1-BASE64(traceId)-BASE64(segmentId)-spanId-BASE64(service)-...
String serialize(HeaderVersion version) {
return StringUtil.join('-',
"1",
Base64.encode(this.getTraceId()),
Base64.encode(this.getTraceSegmentId()),
this.getSpanId() + "",
Base64.encode(this.getParentService()),
Base64.encode(this.getParentServiceInstance()),
Base64.encode(this.getParentEndpoint()),
Base64.encode(this.getAddressUsedAtClient())
);
}
}
跨进程传播流程
# 服务 A(发送方)
ExitSpan 插件:
1. ContextCarrier carrier = new ContextCarrier()
2. ContextManager.inject(carrier) ← 将当前上下文写入 Carrier
3. HTTP Header: sw8 = carrier.serialize() ← 序列化为 HTTP Header
─── HTTP 请求 ──→
# 服务 B(接收方)
EntrySpan 插件:
1. String sw8Header = request.getHeader("sw8")
2. carrier.deserialize(sw8Header) ← 反序列化
3. ContextManager.createEntrySpan(op, carrier)
→ context.extract(carrier) ← 恢复父上下文
→ segment.ref(parentRef) ← 建立 Segment 引用关系
sw8 传输所有上下文信息,通过 - 分隔 8 个字段,值使用 Base64 编码。相比 v6 的多 Header 方案,减少了网络开销。
ContextSnapshot — 跨线程传播
当业务代码使用异步线程(线程池、CompletableFuture 等)时,ThreadLocal 上下文无法自动传递。SkyWalking 通过快照机制解决:
// ===== 线程 A(主线程)=====
// 捕获当前上下文快照
ContextSnapshot snapshot = ContextManager.capture();
// 将 snapshot 传递给子线程(通过参数、队列等)
executor.submit(() -> {
// ===== 线程 B(子线程)=====
// 恢复上下文快照
ContextManager.continued(snapshot);
// 现在线程 B 的 TracingContext 通过 TraceSegmentRef
// 关联到了线程 A 的 Segment,同属一个 Trace
// ... 执行业务逻辑 ...
ContextManager.stopSpan();
});
jdk-threading-plugin,自动增强 Runnable、Callable、ForkJoinTask 等,在提交时自动 capture snapshot,在 run/call 时自动 continued,对业务完全透明。
数据上报
从 Segment 完成到 gRPC 发送的异步数据管线
DataCarrier 异步缓冲
高性能生产者-消费者模型
DataCarrier 是 SkyWalking 自研的高性能异步数据传输组件,位于 apm-commons/apm-datacarrier:
┌───────────────────────────────────────────────────────┐
│ DataCarrier │
│ │
│ Producer Threads Channels Consumer │
│ ┌──────────┐ ┌──────────┐ │
│ │ Thread-1 │──┐ │Channel-0 │ [buf][buf][buf] │
│ └──────────┘ │ ├──────────┤ │
│ ┌──────────┐ ├──→ │Channel-1 │ [buf][buf][buf] ──→ Consumer Thread
│ │ Thread-2 │──┤ ├──────────┤ │ (批量消费)
│ └──────────┘ │ │Channel-2 │ [buf][buf][buf] │
│ ┌──────────┐ │ ├──────────┤ │
│ │ Thread-N │──┘ │Channel-N │ [buf][buf][buf] │
│ └──────────┘ └──────────┘ │
│ │
│ 路由策略:ThreadId % ChannelSize → 无锁分区 │
│ 缓冲策略:IF_POSSIBLE(满则丢弃,不阻塞业务线程) │
└───────────────────────────────────────────────────────┘
核心设计
// 初始化:5 个 Channel,每个 Channel 300 个 Buffer 槽位
DataCarrier<TraceSegment> carrier = new DataCarrier<>(
CHANNEL_SIZE, // 默认 5
BUFFER_SIZE, // 默认 300
BufferStrategy.IF_POSSIBLE // 满则丢弃
);
carrier.consume(consumer, 1); // 1 个消费者线程
// 生产者(业务线程,无锁)
carrier.produce(traceSegment);
// 内部:partitioner.partition(channels.length, data)
// 默认按 Thread ID 取模路由到对应 Channel
BlockingQueue 在满时会阻塞生产者线程——这对 APM 探针是不可接受的,因为不能因为数据上报而阻塞业务请求。DataCarrier 采用 IF_POSSIBLE 策略:缓冲区满时直接丢弃数据,保证业务零影响。
gRPC 数据上报
TraceSegmentServiceClient — 上报核心
源码位于 TraceSegmentServiceClient.java:49,它同时实现了三个接口:
@DefaultImplementor
public class TraceSegmentServiceClient
implements BootService, // 服务生命周期
IConsumer<TraceSegment>, // DataCarrier 消费者
TracingContextListener, // Segment 完成监听
GRPCChannelListener { // gRPC 连接状态监听
private volatile DataCarrier<TraceSegment> carrier;
private volatile TraceSegmentReportServiceGrpc
.TraceSegmentReportServiceStub serviceStub;
private volatile GRPCChannelStatus status = DISCONNECT;
// 生命周期回调
@Override
public void prepare() {
// 监听 gRPC 连接状态
ServiceManager.INSTANCE.findService(GRPCChannelManager.class)
.addChannelListener(this);
}
@Override
public void boot() {
// 创建 DataCarrier 并启动消费者线程
carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE,
BufferStrategy.IF_POSSIBLE);
carrier.consume(this, 1);
}
@Override
public void onComplete() {
// 注册为 Segment 完成的监听者
TracingContext.ListenerManager.add(this);
}
}
数据流转全链路
// ① Segment 完成 → 生产到 DataCarrier
@Override
public void afterFinished(TraceSegment traceSegment) {
if (traceSegment.isIgnore()) return; // 采样策略忽略的不上报
if (!carrier.produce(traceSegment)) {
// Buffer 满,丢弃(但不影响业务)
LOGGER.debug("One trace segment has been abandoned, cause by buffer is full.");
}
}
// ② 消费者线程批量消费 → gRPC 流式上报
@Override
public void consume(List<TraceSegment> data) {
if (CONNECTED.equals(status)) {
// 创建 gRPC 双向流
StreamObserver<SegmentObject> upstreamObserver =
serviceStub
.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT,
TimeUnit.SECONDS)
.collect(new StreamObserver<Commands>() {
@Override
public void onNext(Commands commands) {
// 处理 OAP 下发的命令
ServiceManager.INSTANCE.findService(CommandService.class)
.receiveCommand(commands);
}
@Override
public void onError(Throwable t) { ... }
@Override
public void onCompleted() { ... }
});
// 逐个发送 Segment
for (TraceSegment segment : data) {
SegmentObject segmentObject = segment.transform();
upstreamObserver.onNext(segmentObject);
}
upstreamObserver.onCompleted();
segmentUplinkedCounter += data.size();
} else {
// gRPC 未连接,丢弃
segmentAbandonedCounter += data.size();
}
}
GRPCChannelManager — 连接管理
GRPCChannelManager 负责维护到 OAP 后端的 gRPC 长连接:
- 多后端负载均衡:支持配置多个 OAP 地址,随机选择连接
- 定时心跳检测:默认每 30 秒检查连接状态
- 自动重连:连接断开后自动重建
- TLS 支持:可选 TLS 加密通信
- 认证支持:通过
AuthenticationDecorator添加认证 Header - 状态通知:连接状态变化时通知所有
GRPCChannelListener
OneAgent 服务治理
基于 etcd 的流量管理、黑名单过滤与四层优雅下线机制
OneAgent 功能概览
什么是 OneAgent
OneAgent 是 SkyWalking Java Agent 中的服务治理增强模块,在 APM 探针的基础上,利用 etcd 作为分布式协调中心,实现了自动摘流(黑名单过滤)和优雅下线(Graceful Shutdown)能力。它以可选插件(optional-plugin)的形式集成,通过 OneAgentConfig.ENABLE_ONEAGENT 开关控制是否加载。
核心能力
服务注册发现
拦截 Eureka 注册/发现流程,将 Provider/Consumer 信息同步写入 etcd,构建统一服务拓扑
流量黑名单
实例下线时自动加入黑名单,Consumer 侧 Ribbon/LoadBalancer 拦截器实时过滤,秒级摘流
优雅下线
四层防护:PreStop 文件监听 → Signal Handler → JVM Shutdown Hook → etcd Lease 过期,确保零流量损失
动态配置
三级配置体系(静态/全局/服务级),支持 etcd Watch 实时热更新,Service 级覆盖 Global 级
模块结构
apm-sniffer/
├── apm-agent-core/src/.../oneagent/ # 核心模块
│ ├── OneAgentConfig.java # 统一配置门面
│ ├── EtcdClientService.java # BootService 生命周期管理
│ ├── EtcdClient.java # 纯 JDK HTTP etcd 客户端
│ ├── ServiceContext.java # 全局服务上下文(volatile)
│ ├── TrafficManager.java # 黑名单管理 & 流量排空
│ ├── ProviderDeregistrationManager.java # Provider 注销 & 多层清理
│ ├── PreStopFileShutdownTrigger.java # K8s PreStop 文件监听
│ ├── config/ # 配置子系统
│ │ ├── StaticConfig.java # 静态配置(启动时确定)
│ │ ├── ConfigDefinitions.java # 配置字段定义(含约束)
│ │ ├── ConfigLoader.java # 配置加载器
│ │ ├── ConfigSnapshotManager.java # 配置快照管理
│ │ └── ConfigWatcher.java # etcd Watch 热更新
│ ├── model/ # etcd 协议模型(17 个类)
│ └── reporter/ # 数据上报子系统
│ ├── OneAgentDataReporter.java # HTTP 上报服务
│ ├── EventCollector.java # 事件采集
│ └── MetricCollector.java # 指标采集
│
└── optional-plugins/optional-oneagent-plugins/
└── eureka-2.x-plugin/ # Eureka 注册中心插件
├── RegisterInterceptor.java # 注册拦截 → etcd 写入
├── DiscoveryInterceptor.java # 发现拦截 → Consumer 注册
├── RibbonServerListInterceptor.java # Ribbon 黑名单过滤
└── SpringCloudLoadBalancerInterceptor.java # SCL 黑名单过滤
整体架构设计
架构全景图
┌──────────────────────────────────────────────────────────────────────┐
│ JVM (微服务实例) │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ SkyWalking Agent │ │
│ │ │ │
│ │ ┌───────────────┐ ┌────────────────┐ ┌────────────────┐ │ │
│ │ │ EtcdClient │ │ TrafficManager │ │ ProviderDereg │ │ │
│ │ │ Service │ │ │ │ Manager │ │ │
│ │ │ (BootService) │ │ - Blacklist │ │ │ │ │
│ │ │ │──▶│ - Watch │ │ - 4-layer │ │ │
│ │ │ priority=15 │ │ - Drain │ │ cleanup │ │ │
│ │ └───────┬───────┘ └───────┬────────┘ └───────┬────────┘ │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ ▼ │ │
│ │ ┌──────────────────────────────────────────────────────────┐ │ │
│ │ │ EtcdClient (纯 JDK HTTP) │ │ │
│ │ │ put / get / delete / watch / lease / keepalive / auth │ │ │
│ │ └──────────────────────────┬───────────────────────────────┘ │ │
│ └─────────────────────────────┼───────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────┼───────────────────────────────────┐ │
│ │ Eureka Plugin (字节码增强) │ │
│ │ │ │ │
│ │ register() ──▶ RegisterInterceptor ──▶ etcd put provider │ │
│ │ discover() ──▶ DiscoveryInterceptor ──▶ etcd put consumer │ │
│ │ getServers()──▶ RibbonInterceptor ──▶ blacklist filter │ │
│ └─────────────────────────────┼───────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────┐
│ etcd Cluster │
│ │
│ /oneagent/ │
│ ├── configs (全局动态配置 JSON) │
│ │ └── {serviceName} (服务级配置 JSON,覆盖全局) │
│ ├── services/ │
│ │ └── {serviceName}/ │
│ │ ├── providers/{instanceId} (Lease + Keepalive) │
│ │ └── consumers/{instanceId} (Lease + Keepalive) │
│ └── traffic_manager/blacklist/ │
│ └── {serviceName}/{instanceId} (TTL 自动过期) │
└──────────────────────────────────────────────────────────────────────┘
启动流程:从 premain 到 OneAgent 就绪
JVM 启动 ─▶ SkyWalkingAgent.premain()
│
├── 1. SnifferConfigInitializer
│ └── 加载 StaticConfig (ENABLE_ONEAGENT, ETCD_ENDPOINTS)
│
├── 2. PluginBootstrap.loadPlugins()
│ └── PluginSelector.select()
│ ├── 排除 EXCLUDE_PLUGINS 中的插件
│ ├── 排除 gray-* (如果灰度关闭)
│ └── 排除 oneagent-* (如果 ENABLE_ONEAGENT=false)
│
├── 3. installClassTransformer()
│ └── ByteBuddy 注册 Eureka 拦截点
│
└── 4. ServiceManager.boot()
└── EtcdClientService (priority=15)
├── prepare() → 校验 etcd 配置
├── boot() → 创建 EtcdClient, 启动 PreStop 监听
└── onComplete()→ 初始化 ConfigWatcher & BlacklistWatch
PluginSelector 中的 OneAgent 过滤
PluginSelector.select() 在加载插件时,根据 OneAgentConfig.ENABLE_ONEAGENT 决定是否过滤 oneagent- 前缀的插件:
// PluginSelector.java 核心逻辑
public List<AbstractClassEnhancePluginDefine> select(
List<AbstractClassEnhancePluginDefine> plugins) {
// 1. 排除用户配置的 EXCLUDE_PLUGINS
// 2. 排除灰度插件 (gray-*)
// 3. 排除 OneAgent 插件 (如果功能关闭)
if (!OneAgentConfig.ENABLE_ONEAGENT) {
// 过滤所有 pluginName 以 "oneagent-" 开头的插件
removePlugins(plugins, "oneagent-");
}
return plugins;
}
三级配置体系
配置分层设计
┌──────────────────────────────────────────────────────────────┐
│ 配置优先级(高 → 低) │
│ │
│ Level 1: 静态配置 (StaticConfig) │
│ ├── 来源: -D 系统属性 > 环境变量 > 默认值 │
│ ├── 特点: 启动时确定,不可动态修改 │
│ └── 示例: ENABLE_ONEAGENT, ETCD_ENDPOINTS │
│ │
│ Level 2: 全局动态配置 (GLOBAL) │
│ ├── 来源: etcd /oneagent/configs (JSON) │
│ ├── 特点: 运行时 Watch 热更新,影响全部服务 │
│ └── 示例: ENABLE_ONEAGENT_FEATURE, ETCD_LEASE_TTL │
│ │
│ Level 3: 服务级动态配置 (SERVICE) │
│ ├── 来源: etcd /oneagent/configs/{serviceName} (JSON) │
│ ├── 特点: 覆盖 GLOBAL 配置,实现细粒度管控 │
│ └── 示例: DRAIN_WAIT_TIME_MS, ENABLE_BLACKLIST │
└──────────────────────────────────────────────────────────────┘
StaticConfig — 启动时静态配置
| 配置项 | 系统属性 (-D) | 环境变量 | 默认值 | 说明 |
|---|---|---|---|---|
| ENABLE_ONEAGENT | oneagent.enable | ONEAGENT_ENABLE | true | 总开关 |
| ETCD_ENDPOINTS | oneagent.etcd.endpoints | ONEAGENT_ETCD_ENDPOINTS | "" | etcd 地址 |
| ETCD_SERVICE_PREFIX | oneagent.etcd.service.prefix | ONEAGENT_ETCD_SERVICE_PREFIX | /oneagent | etcd 路径前缀 |
| ETCD_USERNAME | oneagent.etcd.username | ONEAGENT_ETCD_USERNAME | "" | 认证用户 |
| ETCD_PASSWORD | oneagent.etcd.password | ONEAGENT_ETCD_PASSWORD | "" | 认证密码 |
// StaticConfig.java — 加载优先级:系统属性 > 环境变量 > 默认值
public class StaticConfig {
public static final boolean ENABLE_ONEAGENT =
loadBoolean("oneagent.enable", "ONEAGENT_ENABLE", true);
public static final String ETCD_ENDPOINTS =
loadString("oneagent.etcd.endpoints", "ONEAGENT_ETCD_ENDPOINTS", "");
private static String loadString(String sysProp, String envVar, String def) {
String val = System.getProperty(sysProp);
if (val != null) return val;
val = System.getenv(envVar);
if (val != null) return val;
return def;
}
}
ConfigDefinitions — 动态配置字段定义
每个配置字段定义为枚举值,包含 key、默认值、配置级别、是否支持动态更新、取值范围:
| 字段 | 级别 | 默认值 | 取值范围 | 动态 | 说明 |
|---|---|---|---|---|---|
| ENABLE_ONEAGENT_FEATURE | GLOBAL | true | - | Y | 运行时功能开关 |
| ETCD_LEASE_TTL | GLOBAL | 60s | [10, 3600] | Y | Lease TTL |
| DRAIN_WAIT_TIME_MS | SERVICE | 1000ms | [0, 30000] | Y | 流量排空等待 |
| ENABLE_BLACKLIST | SERVICE | false | - | Y | 黑名单开关 |
| BLACKLIST_TTL_MS | SERVICE | 60000ms | [5000, 432000000] | Y | 黑名单条目 TTL |
| BLACKLIST_MAX_COUNT | SERVICE | 1 | [1, 1000] | Y | 每服务最大黑名单数 |
| PRESTOP_ENABLED | GLOBAL | true | - | N | PreStop 监听 |
| PRESTOP_FILE_PATH | GLOBAL | /tmp/prestop | - | N | PreStop 文件路径 |
// ConfigDefinitions.java — 类型安全的配置定义
public enum ConfigDefinitions {
ENABLE_ONEAGENT_FEATURE("oneagent.feature.enable", "true",
ConfigLevel.GLOBAL, true),
DRAIN_WAIT_TIME_MS("oneagent.drain.wait.time.ms", "1000",
ConfigLevel.SERVICE, true, 0L, 30000L),
ENABLE_BLACKLIST("oneagent.blacklist.enable", "false",
ConfigLevel.SERVICE, true),
BLACKLIST_TTL_MS("oneagent.blacklist.ttl.ms", "60000",
ConfigLevel.SERVICE, true, 5000L, 432000000L),
ETCD_LEASE_TTL("oneagent.etcd.lease.ttl", "60",
ConfigLevel.GLOBAL, true, 10L, 3600L);
// ...
}
配置热更新流程
ConfigWatcher 启动
│
├── Watch: /oneagent/configs (全局配置)
│ └── PUT 事件 → JSON 解析 → ConfigValidator 校验
│ │
│ ┌─────────────┴──────────────┐
│ │ 检查 minValue / maxValue │
│ │ 超出范围 → 使用边界值 + 告警 │
│ └─────────────┬──────────────┘
│ │
│ ConfigSnapshotManager.update()
│ │
│ 通知所有 ConfigChangeListener
│
└── Watch: /oneagent/configs/{serviceName} (服务级配置)
└── 同上流程,SERVICE 级配置覆盖 GLOBAL 级
查询时: OneAgentConfig.getConfigValue(field, serviceName)
→ 先查 SERVICE 级 → 无则 fallback GLOBAL 级 → 无则用默认值
etcd 数据结构与客户端
etcd Key 设计
/oneagent/ # 前缀 (StaticConfig.ETCD_SERVICE_PREFIX)
│
├── configs # 全局配置
│ {"oneagent.feature.enable":"true",
│ "oneagent.etcd.lease.ttl":"60"}
│
├── configs/{serviceName} # 服务级配置 (覆盖全局)
│ {"oneagent.blacklist.enable":"true",
│ "oneagent.drain.wait.time.ms":"5000"}
│
├── services/{serviceName}/
│ ├── providers/{instanceId} # Provider 节点
│ │ ├── Lease: TTL=60s + Keepalive # 心跳续租
│ │ └── Value: { # 实例元数据
│ │ "ip": "10.0.1.5",
│ │ "port": 8080,
│ │ "serviceName": "order-service",
│ │ "instanceId": "10.0.1.5:8080",
│ │ "registeredTime": 1712000000000
│ │ }
│ │
│ └── consumers/{consumerInstanceId} # Consumer 节点
│ ├── Lease: TTL=60s + Keepalive
│ └── Value: consumer 元数据
│
└── traffic_manager/blacklist/
└── {serviceName}/{instanceId} # 黑名单条目
├── Lease: TTL 由 BLACKLIST_TTL_MS 控制
└── Value: {
"ttlMs": 60000,
"expireTimeMs": 1712000060000
}
EtcdClient — 零依赖 HTTP 客户端
OneAgent 实现了一个纯 JDK HttpURLConnection 的 etcd v3 HTTP 客户端(约 69KB),不引入任何第三方 etcd SDK 依赖:
// EtcdClient.java — 核心 API
public class EtcdClient {
// 基础 KV 操作
void put(String key, String value);
void putWithLease(String key, String value, long leaseId);
String get(String key);
void delete(String key);
void deleteRange(String prefix); // 前缀批量删除
// Lease 管理
long leaseGrant(long ttl); // 创建租约
void leaseKeepAlive(long leaseId); // 续租心跳
void leaseRevoke(long leaseId); // 撤销租约
// Watch (HTTP 长连接流式)
void watch(String prefix, WatchCallback callback);
// Auth (Token 认证 + 自动刷新)
void authenticate(String user, String password);
}
设计亮点:
- 零外部依赖:使用 JDK 内置 HttpURLConnection,避免 Jar 冲突
- HTTP 流式 Watch:通过长连接读取 etcd Watch 事件流,而非轮询
- Lease 续租:后台定时 Keepalive,实例异常退出时 Lease 自动过期
- Token 认证:自动获取和刷新 etcd Auth Token
ServiceContext — 全局服务信息注册
插件拦截器将服务信息写入 ServiceContext,供核心模块统一读取:
// ServiceContext.java — volatile 保证线程可见性
public class ServiceContext {
private static volatile String serviceName;
private static volatile String instanceId;
public static void setServiceInfo(String svcName, String instId) {
serviceName = svcName.toLowerCase(); // 统一小写
instanceId = instId;
}
public static boolean isAvailable() {
return serviceName != null && instanceId != null;
}
}
Eureka 插件拦截
插件定义
Eureka 2.x 插件通过 skywalking-plugin.def 声明 4 个拦截点:
# skywalking-plugin.def
oneagent-eureka-2.x=...RegisterInstrumentation
oneagent-eureka-2.x=...DiscoveryInstrumentation
oneagent-eureka-2.x=...RibbonServerListInstrumentation
oneagent-eureka-2.x=...SpringCloudLoadBalancerInstrumentation
RegisterInterceptor — 注册拦截
拦截 Eureka 的 register() 方法,在注册完成后将 Provider 信息写入 etcd:
// RegisterInterceptor.java — afterMethod()
public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Object ret) {
// 1. 功能开关检查
if (!OneAgentConfig.getBoolean(ENABLE_ONEAGENT_FEATURE)) return ret;
if (!ServiceContext.isAvailable()) return ret;
// 2. 从 Eureka InstanceInfo 提取元数据
InstanceInfo instanceInfo = (InstanceInfo) allArguments[0];
String serviceName = instanceInfo.getAppName().toLowerCase();
String instanceId = instanceInfo.getIPAddr() + ":" + instanceInfo.getPort();
// 3. 注册服务上下文
ServiceContext.setServiceInfo(serviceName, instanceId);
// 4. 写入 etcd (带 Lease 续租)
String providerPath = prefix + "/services/" + serviceName
+ "/providers/" + instanceId;
String value = buildProviderMetadata(instanceInfo); // JSON
etcdClient.putWithLease(providerPath, value, leaseId);
// 5. 注册到 ProviderDeregistrationManager (下线时自动清理)
deregManager.registerProvider(providerPath, serviceName, instanceId);
return ret;
}
DiscoveryInterceptor — 发现拦截
拦截服务发现流程,将 Consumer 注册到 etcd:
应用调用 discoveryClient.getInstances("order-service")
│
▼
DiscoveryInterceptor.beforeMethod()
│
├── 检查功能开关 & ServiceContext
│
├── 注册 Consumer 到 etcd (幂等)
│ key: /oneagent/services/order-service/consumers/{consumerInstanceId}
│ value: consumer 元数据 JSON
│ lease: TTL + Keepalive
│
└── 返回原始结果(不修改发现结果)
RibbonServerListInterceptor — 负载均衡黑名单过滤
拦截 Ribbon 的 getAllServers() / getReachableServers(),过滤掉黑名单中的实例:
// RibbonServerListInterceptor.java — afterMethod()
public Object afterMethod(EnhancedInstance objInst, Method method,
Object[] allArguments, Class<?>[] argumentsTypes, Object ret) {
List<Server> servers = (List<Server>) ret;
if (servers == null || servers.isEmpty()) return ret;
// 1. 功能开关 & 黑名单开关检查 (SERVICE 级)
String serviceName = extractServiceName(servers);
if (!OneAgentConfig.getBoolean(ENABLE_BLACKLIST, serviceName)) return ret;
// 2. 快速路径: 黑名单为空则直接返回
if (TrafficManager.isBlacklistEmpty()) return ret;
// 3. 单次遍历过滤 (延迟分配 ArrayList)
List<Server> filtered = null;
for (int i = 0; i < servers.size(); i++) {
String instanceId = RibbonInstanceUtils.getInstanceId(servers.get(i));
if (TrafficManager.isInBlacklist(instanceId)) {
if (filtered == null) {
filtered = new ArrayList<>(servers.subList(0, i)); // 延迟拷贝
}
} else if (filtered != null) {
filtered.add(servers.get(i));
}
}
// 4. 安全兜底: 全部被过滤则返回原列表
if (filtered != null && filtered.isEmpty()) return ret;
return filtered != null ? filtered : ret;
}
设计亮点:
- 延迟分配:只有命中黑名单时才创建新列表,零开销快速路径
- 安全兜底:如果所有实例都在黑名单,返回原列表防止服务完全不可用
- SERVICE 级开关:每个服务可独立控制是否启用黑名单
完整请求流转图
Service-A (Consumer) Service-B (Provider)
┌───────────────────┐ ┌───────────────────┐
│ Feign 调用 │ │ Eureka register() │
│ order-service │ │ │ │
│ │ │ │ ▼ │
│ ▼ │ │ RegisterInterceptor│
│ Ribbon.getServers()│ │ → etcd put │
│ │ │ │ /providers/B │
│ ▼ │ └───────────────────┘
│ RibbonInterceptor │
│ → check blacklist│──── TrafficManager ────── etcd Watch ──── /providers/*
│ → filter B (下线) │ (DELETE event)
│ → return [C, D] │
│ │ │
│ ▼ │
│ 发往 C 或 D │
└───────────────────┘
流量黑名单机制
TrafficManager 双层索引设计
TrafficManager 内存数据结构:
blacklistEntries (ConcurrentHashMap):
┌──────────────────┬────────────────────────────────────────┐
│ instanceId │ BlacklistEntry │
├──────────────────┼────────────────────────────────────────┤
│ 10.0.1.5:8080 │ {serviceName:"order-svc", expireAt:T1}│
│ 10.0.1.6:8080 │ {serviceName:"order-svc", expireAt:T2}│
│ 10.0.2.3:9090 │ {serviceName:"pay-svc", expireAt:T3}│
└──────────────────┴────────────────────────────────────────┘
serviceBlacklistIndex (ConcurrentHashMap):
┌──────────────────┬──────────────────────────────┐
│ serviceName │ Set<instanceId> │
├──────────────────┼──────────────────────────────┤
│ order-svc │ {10.0.1.5:8080, 10.0.1.6:..} │
│ pay-svc │ {10.0.2.3:9090} │
└──────────────────┴──────────────────────────────┘
查询: isInBlacklist("10.0.1.5:8080")
→ O(1) 查 blacklistEntries
→ 检查 expireAt,过期则 CAS 删除(懒删除)
黑名单 Watch 事件处理
// TrafficManager.java — Watch 回调
private void handleBlacklistWatchEvent(WatchEvent event) {
String key = event.getKey(); // /oneagent/traffic_manager/blacklist/{svc}/{inst}
String instanceId = extractInstanceId(key);
String serviceName = extractServiceName(key);
switch (event.getType()) {
case PUT:
// 解析 BlacklistValue: {ttlMs, expireTimeMs}
BlacklistValue bv = deserialize(event.getValue());
long expireAt = System.currentTimeMillis() + bv.getTtlMs();
BlacklistEntry entry = new BlacklistEntry(serviceName, expireAt);
blacklistEntries.put(instanceId, entry);
serviceBlacklistIndex
.computeIfAbsent(serviceName, k -> ConcurrentHashMap.newKeySet())
.add(instanceId);
break;
case DELETE:
blacklistEntries.remove(instanceId);
Set<String> set = serviceBlacklistIndex.get(serviceName);
if (set != null) set.remove(instanceId);
break;
}
}
黑名单查询 — 懒删除策略
// TrafficManager.java — 快速路径 + 懒过期
public static boolean isInBlacklist(String instanceId) {
BlacklistEntry entry = blacklistEntries.get(instanceId);
if (entry == null) return false;
// 检查本地 TTL 是否过期
if (System.currentTimeMillis() > entry.getExpireAt()) {
// CAS 删除:防止并发重复删除
blacklistEntries.remove(instanceId, entry);
Set<String> set = serviceBlacklistIndex.get(entry.getServiceName());
if (set != null) set.remove(instanceId);
return false;
}
return true;
}
设计要点:
- 双重过期:etcd 侧 Lease TTL + 本地 expireAt,双保险
- CAS 懒删除:查询时检测过期并原子移除,无需后台清理线程
- O(1) 查询:ConcurrentHashMap 直接定位,对 Ribbon 调用零延迟影响
四层优雅下线
四层防护体系
OneAgent 设计了四层递进式下线防护,确保在各种场景下都能实现零流量损失的优雅退出:
Layer 1: PreStop 文件监听 (最早触发, K8s 场景)
│
▼
Layer 2: Signal Handler (SIGTERM/SIGINT)
│
▼
Layer 3: JVM Shutdown Hook (ServiceManager.shutdown)
│
▼
Layer 4: etcd Lease 自动过期 (兜底, kill -9 场景)
Layer 1: PreStopFileShutdownTrigger
K8s PreStop Hook 通过写文件触发下线流程,比 SIGTERM 更早:
// PreStopFileShutdownTrigger.java
public class PreStopFileShutdownTrigger {
// 守护线程轮询检测 prestop 文件
private final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(
r -> new Thread(r, "oneagent-prestop-watcher"));
public void start() {
// 启动时清理残留文件,防止误触发
Files.deleteIfExists(Paths.get(prestopFilePath));
executor.scheduleWithFixedDelay(() -> {
String content = readFirstLine(prestopFilePath);
if ("stop".equals(content)) {
// 触发下线!
providerDeregManager.cleanupAllProviders();
trafficManager.waitForTrafficDrain();
}
}, 0, checkIntervalMs, TimeUnit.MILLISECONDS);
}
}
K8s Pod YAML 配置示例:
lifecycle:
preStop:
exec:
command:
- /bin/sh
- -c
- |
echo 'stop' > /tmp/prestop # 触发 OneAgent 下线
sleep 15 # 等待流量排空
# SIGTERM 将在 sleep 结束后发出
Layer 2 & 3: Signal Handler + Shutdown Hook
SIGTERM / SIGINT 信号到达
│
▼
Signal Handler (RegisterInterceptor 中注册)
├── ProviderDeregistrationManager.cleanupAllProviders()
│ ├── 删除 etcd Provider 节点
│ ├── 加入黑名单 (如果条件满足)
│ └── 上报 INSTANCE_OFFLINE 事件
└── TrafficManager.waitForTrafficDrain()
└── sleep(DRAIN_WAIT_TIME_MS) // 等待在途请求完成
│
▼
JVM Shutdown Hook (ServiceManager.shutdown → EtcdClientService.shutdown)
├── 检查 cleaned 标志 (AtomicBoolean CAS)
│ └── 如果已被 PreStop/Signal 处理 → 跳过
├── 未处理则执行完整清理流程
├── 关闭 OneAgentDataReporter
└── 关闭 EtcdClient (撤销 Lease, 断开连接)
Layer 4: etcd Lease 自动过期
异常退出场景 (kill -9, OOM, 物理机宕机):
│
├── Keepalive 心跳停止
├── etcd 等待 Lease TTL 过期 (默认 60s)
├── Provider 节点自动删除
│
▼
Consumer 侧的 Watch 检测到 DELETE 事件
└── TrafficManager 将该实例加入黑名单
└── 后续请求自动绕过该实例
ProviderDeregistrationManager — 核心清理逻辑
// ProviderDeregistrationManager.java
public void cleanupAllProviders() {
// AtomicBoolean CAS: 确保只执行一次
if (!cleaned.compareAndSet(false, true)) return;
for (Map.Entry<String, ProviderInfo> entry : registeredProviders.entrySet()) {
String providerPath = entry.getKey();
ProviderInfo info = entry.getValue();
// 1. 删除 etcd Provider 节点
boolean deleted = etcdClient.delete(providerPath);
if (deleted) {
// 2. 加入黑名单 (条件: 该服务有 >1 个 Provider && 黑名单未满)
if (providerCount > 1 && blacklistCount < maxCount) {
String blacklistKey = prefix + "/traffic_manager/blacklist/"
+ info.getServiceName() + "/" + info.getInstanceId();
String blacklistValue = "{\"ttlMs\":" + ttlMs
+ ",\"expireTimeMs\":" + (now + ttlMs) + "}";
etcdClient.putWithLease(blacklistKey, blacklistValue, leaseId);
}
// 3. 上报 INSTANCE_OFFLINE 事件
eventCollector.reportEvent(AgentEvent.instanceOffline(info));
registeredProviders.remove(providerPath);
} else {
// 删除失败: 重置标志允许重试
cleaned.set(false);
}
}
}
完整下线时序图 (K8s 场景)
时间轴 K8s PreStop Trigger Agent Core
────────────────────────────────────────────────────────────────────────────
T+0s Pod 终止信号 ─────▶ preStop Hook
echo 'stop' ─▶ 检测到文件 ──────▶ 删除 Provider
加入黑名单
上报事件
T+1s waitForDrain ────▶ sleep 10s
(等待在途请求)
T+5s sleep 结束
发送 SIGTERM ───────────────────▶ Signal Handler
cleaned=true → 跳过 (已处理)
T+5s SIGTERM ──────────────────────▶ JVM Shutdown Hook
cleaned=true → 跳过 (已处理)
关闭 Reporter
关闭 EtcdClient
T+11s 排空完成, 进程退出
────────────────────────────────────────────────────────────────────────────
Consumer 侧: Watch 收到 DELETE → 黑名单生效 → 后续请求绕过下线实例
数据上报与事件采集
上报子系统架构
┌──────────────────────────────────────────────────────────┐
│ OneAgentDataReporter │
│ (HTTP POST 定时上报) │
│ │
│ ┌────────────────────┐ ┌────────────────────────┐ │
│ │ EventCollector │ │ MetricCollector │ │
│ │ │ │ │ │
│ │ - INSTANCE_ONLINE │ │ - 方法调用计数 │ │
│ │ - INSTANCE_OFFLINE│ │ - etcd 操作延迟 │ │
│ │ - BLACKLIST_ADD │ │ - 黑名单过滤统计 │ │
│ │ - BLACKLIST_FAIL │ │ - Keepalive 成功/失败 │ │
│ │ - CONFIG_CHANGE │ │ │ │
│ └────────────────────┘ └────────────────────────┘ │
│ │
│ 定时器: 每 N 秒收集一次 → 合并 → HTTP POST → 后端 │
└──────────────────────────────────────────────────────────┘
关键事件类型
| 事件 | 触发时机 | 携带信息 |
|---|---|---|
| INSTANCE_ONLINE | Provider 首次注册到 etcd | serviceName, instanceId, 元数据 |
| INSTANCE_OFFLINE | Provider 主动下线清理 | serviceName, instanceId, 运行时长 |
| BLACKLIST_ADD | 实例加入黑名单 | targetService, instanceId, ttlMs |
| BLACKLIST_FAIL | 黑名单写入失败 | 失败原因, targetService |
| CONFIG_CHANGE | 动态配置变更 | 变更字段, 旧值, 新值 |
设计精华
核心设计模式与完整链路追踪示例
核心设计模式
1. ThreadLocal 上下文管理(零锁追踪)
每个线程通过 ThreadLocal<TracingContext> 持有自己的追踪上下文,Span 的创建、停止、归档全部是线程本地操作,无需任何同步。这是 Agent 高性能的关键:
// 无锁操作链:
// 1. CONTEXT.get() → ThreadLocal 读取
// 2. activeSpanStack.addLast() → LinkedList 本地操作
// 3. span.tag(key, value) → 对象字段赋值
// 4. activeSpanStack.removeLast() → LinkedList 本地操作
// 5. segment.archive(span) → LinkedList.add()
// 全程无锁!
2. SPI + 注解的服务覆盖机制
通过 @DefaultImplementor 和 @OverrideImplementor 注解,配合 Java SPI 的 ServiceLoader,实现了一套灵活的服务替换体系:
优势:
├── 默认实现开箱即用(gRPC 上报)
├── 只需加入对应 JAR 即可替换(Kafka 上报)
├── 不需要修改任何配置文件
└── 编译时检查覆盖关系(注解的 value 指定被覆盖的类)
3. Witness 版本兼容机制
通过检查特定类是否存在来判断框架版本,而非读取 pom.xml 或 MANIFEST 版本号:
优势:
├── 运行时检测,不依赖构建信息
├── 同一框架不同版本的插件可以共存
├── 版本不匹配时安全跳过,不抛异常
└── 支持检测类和方法两个粒度
4. ClassLoader 隔离策略
SkyWalking 设计了 AgentClassLoader 体系来避免类加载冲突:
Bootstrap ClassLoader
└── App ClassLoader
├── AgentClassLoader (默认) ← Agent 核心类
└── AgentClassLoader (per-app) ← 每个目标 ClassLoader 对应一个
└── 拦截器实例缓存
为什么需要隔离?
├── Agent 依赖的 Guava/gRPC 版本可能与业务不同
├── 插件拦截器需要在目标 ClassLoader 的上下文中运行
└── 防止 Agent 内部类泄露到业务代码空间
5. 异步缓冲 + 优雅降级
整个数据上报链路遵循"不能影响业务"的核心原则:
降级策略:
├── DataCarrier 缓冲满 → 丢弃 Segment(不阻塞业务线程)
├── gRPC 未连接 → 丢弃 Segment(计数 + 日志)
├── gRPC 发送超时 → 中断上报(不影响下次)
├── Span 数量超限 → 标记 isSizeLimited,停止创建新 Span
├── Agent 初始化失败 → 静默退出,不影响应用启动
└── 任何插件异常 → catch 后继续,不传播到业务代码
6. 延迟加载(Lazy Loading)
贯穿整个 Agent 的懒加载策略,最小化启动开销:
| 组件 | 延迟时机 |
|---|---|
| 拦截器实例 | 目标方法首次被调用时才加载 |
| gRPC Channel | 首次需要上报时才建立连接 |
| TracingContext | 首次创建 Span 时才实例化 |
| AgentClassLoader | 对应目标 ClassLoader 首次出现时 |
完整链路追踪示例
一次 HTTP 请求的完整追踪之旅
假设:Service-A 收到一个 HTTP 请求,调用数据库后再通过 Feign 调用 Service-B。
Step 1: HTTP 请求到达 Service-A
────────────────────────────────
Tomcat/Spring MVC 插件触发:
→ ContextManager.createEntrySpan("POST /api/order", carrier)
→ 从 HTTP Header 中提取 sw8(如果存在父上下文)
→ ThreadLocal 中创建 TracingContext + TraceSegment
→ 压入 EntrySpan (spanId=0)
Step 2: 业务逻辑调用数据库
────────────────────────────────
MySQL 插件触发:
→ ContextManager.createExitSpan("MySQL/INSERT", "mysql:3306")
→ 压入 ExitSpan (spanId=1)
→ ExitSpan.tag("db.type", "MySQL")
→ ExitSpan.tag("db.statement", "INSERT INTO orders ...")
→ SQL 执行完毕
→ ContextManager.stopSpan()
→ ExitSpan pop 出栈,archive 到 Segment
Step 3: 通过 Feign 调用 Service-B
────────────────────────────────
Feign 插件触发:
→ ContextManager.createExitSpan("GET /api/payment", carrier, "svc-b:8080")
→ 压入 ExitSpan (spanId=2)
→ ContextManager.inject(carrier) // 注入上下文
→ HTTP Header: sw8 = "1-BASE64(traceId)-BASE64(segmentId)-2-..."
→ Feign 发起 HTTP 请求
→ 收到响应
→ ContextManager.stopSpan()
→ ExitSpan pop 出栈,archive 到 Segment
Step 4: HTTP 请求返回
────────────────────────────────
Tomcat/Spring MVC 插件触发:
→ ContextManager.stopSpan() // 停止 EntrySpan
→ EntrySpan pop 出栈,archive 到 Segment
→ activeSpanStack 为空!
→ TracingContext.finish()
→ 通知 TracingContextListener
Step 5: 数据上报
────────────────────────────────
TraceSegmentServiceClient.afterFinished(segment):
→ carrier.produce(segment) // 放入 DataCarrier
Consumer Thread (异步):
→ consume([segment1, segment2, ...])
→ segment.transform() // → SegmentObject (Protobuf)
→ gRPC stream.onNext(segmentObject)
→ stream.onCompleted()
→ OAP Server 接收并存储
Step 6: 同时在 Service-B
────────────────────────────────
Service-B 的 Spring MVC 插件:
→ 从 sw8 Header 反序列化 ContextCarrier
→ ContextManager.createEntrySpan("GET /api/payment", carrier)
→ context.extract(carrier) // 恢复父上下文
→ segment.ref(parentRef) // 关联到 Service-A 的 Segment
→ ... 执行业务逻辑 ...
→ 相同的上报流程
最终在 SkyWalking UI 上看到的拓扑
Trace Timeline (traceId: abc123)
┌─────────────────────────────────────────────────────────┐
│ Service-A │
│ ├─ EntrySpan: POST /api/order [0ms ─── 150ms] │
│ │ ├─ ExitSpan: MySQL INSERT [10ms ── 30ms] │
│ │ └─ ExitSpan: GET /api/payment [35ms ── 140ms] │
│ │
│ Service-B │
│ └─ EntrySpan: GET /api/payment [40ms ── 135ms] │
│ └─ ExitSpan: Redis SET [50ms ── 60ms] │
└─────────────────────────────────────────────────────────┘
总结与启示
SkyWalking Java Agent 的核心设计哲学
通读整个源码,SkyWalking Agent 的设计哲学可以概括为三个原则:
- 绝对不影响业务 — 所有可能阻塞或抛异常的操作都被 try-catch 包裹,数据丢弃优于业务卡顿
- 极致的性能优化 — ThreadLocal 无锁上下文、DataCarrier 无锁分区、延迟加载、对象复用
- 高度可扩展 — SPI 插件机制、服务覆盖注解、Witness 版本兼容,100+ 中间件支持
关键源码文件速查表
| 功能 | 核心类 | 路径 |
|---|---|---|
| Agent 入口 | SkyWalkingAgent | apm-agent/.../SkyWalkingAgent.java |
| 服务管理 | ServiceManager | apm-agent-core/.../boot/ServiceManager.java |
| 插件加载 | PluginBootstrap | apm-agent-core/.../plugin/PluginBootstrap.java |
| 插件索引 | PluginFinder | apm-agent-core/.../plugin/PluginFinder.java |
| 插件基类 | AbstractClassEnhancePluginDefine | apm-agent-core/.../plugin/AbstractClassEnhancePluginDefine.java |
| 字节码增强 | ClassEnhancePluginDefine | apm-agent-core/.../enhance/ClassEnhancePluginDefine.java |
| 方法拦截 | InstMethodsInter | apm-agent-core/.../enhance/InstMethodsInter.java |
| 上下文管理 | ContextManager | apm-agent-core/.../context/ContextManager.java |
| 追踪上下文 | TracingContext | apm-agent-core/.../context/TracingContext.java |
| 线程追踪片段 | TraceSegment | apm-agent-core/.../context/trace/TraceSegment.java |
| 跨进程传播 | ContextCarrier | apm-agent-core/.../context/ContextCarrier.java |
| 数据上报 | TraceSegmentServiceClient | apm-agent-core/.../remote/TraceSegmentServiceClient.java |
| 连接管理 | GRPCChannelManager | apm-agent-core/.../remote/GRPCChannelManager.java |
| 异步缓冲 | DataCarrier | apm-commons/.../datacarrier/DataCarrier.java |
| 配置定义 | Config | apm-agent-core/.../conf/Config.java |
可借鉴的架构思想
SPI 插件化
通过 ServiceLoader + def 文件实现零耦合插件发现,适用于任何需要扩展点的系统
ThreadLocal 上下文
线程本地存储实现无锁操作,适用于高并发场景下的请求级数据传递
异步缓冲
生产者-消费者 + 可丢弃策略,适用于不能阻塞主线程的旁路数据采集
字节码增强
ByteBuddy 运行时修改类,适用于不修改源码就需要增加横切关注点的场景
基于 Apache SkyWalking Java Agent 源码分析 · 仅供研究学习
源码版权归 Apache SkyWalking 所有