环境:
spring-cloud-dependencies: Hoxton.SR9
spring-boot-dependencies:2.3.2.RELEASE
spring-cloud-alibaba-dependencies:2.2.5.RELEASE
spring-cloud-starter-bootstrap:3.0.2
rocketmq-spring-boot-starter:2.2.2
dubbo:2.7.8
rocketMQ:4.9.4
skywalking:8.8.0
实现log: request rocketMQ dubbo
文档中的代码不太全,只放了部分关键代码
1 logback配置
首先项目添加依赖
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-logback-1.x</artifactId>
<version>${apache.skywalking.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-trace</artifactId>
<version>${apache.skywalking.version}</version>
</dependency>日志配置文件 主要是tid输出
<property name="DEFAULT_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%tid] [%thread] %-5level %logger{36} -%msg%n"/>本地日志:appender
<appender name="${APPENDER_NAME}" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_HOME}/${LOG_FILENAME}</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/${LOG_FILENAME}.%d{yyyy-MM-dd}.gz</fileNamePattern>
<MaxHistory>30</MaxHistory>
</rollingPolicy>
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
<pattern>${LOG_PATTERN}</pattern>
</layout>
</encoder>
<!-- <encoder>-->
<!-- <pattern>%-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>-->
<!-- </encoder>-->
</appender>上传skywalking 日志:appender
<appender name="${TRACE_APPENDER_NAME}" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
<pattern>${LOG_PATTERN}</pattern>
</layout>
<charset>utf-8</charset>
</encoder>
</appender>2. 请求日志
添加requesterFilter
// OncePerRequestFilter 保证只处理一次
public class HttpRequestLogFilter extends OncePerRequestFilter{
//这里用了两个logger 一个只写成功的请求日志,一个写异常
private static final Logger LOGGER = LoggerFactory.getLogger(HttpRequestLogFilter.class);
// 使用logger记录日志
Logger logger = LoggerFactory.getLogger(DEFAULT_LOGGER_NAME);
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
throws ServletException, IOException {
....省略一部分
try {
filterChain.doFilter(httpServletRequestWrapper, httpServletResponseWrapper);
} finally {
long end = System.currentTimeMillis();
//请求处理完成后写日志
saveLogData(request, httpServletRequestWrapper, httpServletResponseWrapper, end - start);
}
}
/**
* 写日志
*/
private void saveLogData(HttpServletRequest request,
CachedHttpServletRequestWrapper httpServletRequestWrapper,
CachedHttpServletResponseWrapper httpServletResponseWrapper, long time) {
try {
// 如果使用了Writer就需要刷新流
httpServletRequestWrapper.flushStream();
httpServletResponseWrapper.flushStream();
byte[] requestData = httpServletRequestWrapper.getCachedStream().getCached();
byte[] responseData = httpServletResponseWrapper.getCachedStream().getCached();
String requestString = requestData == null ? StringUtils.EMPTY : new String(requestData);
String responseString = responseData == null ? StringUtils.EMPTY : new String(responseData);
// 非 text 内容,隐藏
if (!isTextContentType(httpServletRequestWrapper.getContentType())) {
requestString = DEFAULT_IGNORE_TEXT;
}
if (!isTextContentType(httpServletResponseWrapper.getContentType())) {
responseString = DEFAULT_IGNORE_TEXT;
}
// 处理请求参数map
Map params = request.getParameterMap();
params = Maps.newHashMap(params);
String paramString = StringUtils.EMPTY;
List<String> pairs = Lists.newArrayList();
if (MapUtils.isNotEmpty(params)) {
for (Object name : params.keySet()) {
Object value = params.get(name);
if (value instanceof String) {
pairs.add(name + "=" + StringUtils.trimToEmpty((String) value));
} else if (value instanceof String[]) {
String[] values = (String[]) value;
for (String v : values) {
pairs.add(name + "=" + StringUtils.trimToEmpty(v));
}
} else if (value != null) {
pairs.add(name + "=" + value);
}
}
paramString = Joiner.on("&").join(pairs);
}
if (StringUtils.equals(request.getContentType(), MediaType.APPLICATION_FORM_URLENCODED_VALUE)) {
paramString = URLDecoder.decode(paramString, "UTF-8");
}
// 使用logger记录日志
StringBuilder buffer = new StringBuilder();
buffer.append(logString("process_time", time + " ms"));
buffer.append(logString("uri", request.getRequestURI() + "?" + paramString));
buffer.append(logString("request", requestString.replaceAll("\n|\r", "")));
buffer.append(logString("response", responseString.replaceAll("\n|\r", "")));
logger.info(buffer.toString());
} catch (Exception e) {
LOGGER.warn("log request data error", e);
} finally {
closeQuietly(httpServletRequestWrapper.getCachedStream());
closeQuietly(httpServletResponseWrapper.getCachedStream());
}
}
}3.RocketMQ日志记录
使用RocketMQ的hook来记录日志
这里会发现有个TRACE_HANDLER -- 之前rocketMQ一直打不出来traceId
所以自己写了个TraceHandler 来处理,后来把版本对应好后,使用现成的plugins插件,就没问题了,也就不需要使用自定义的TraceHandler 了。
3.1 RocketConsumeMessageHook
public class RocketConsumeMessageHook implements ConsumeMessageHook {
private final Logger logger = LoggerFactory.getLogger("rocketmq");
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
MDC.put("msgId", context.getMsgList().get(0).getMsgId());
// TRACE_HANDLER.beforeMethod(context.getMsgList());
for (MessageExt messageExt : context.getMsgList()) {
logger.info("准备消费消息,topic:{}, group:{}", messageExt.getTopic(), context.getConsumerGroup());
}
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
for (MessageExt messageExt : context.getMsgList()) {
logger.info("消息消费完毕,topic:{}, group:{}, success:{}, status:{}", messageExt.getTopic(),
context.getConsumerGroup(), context.isSuccess(), context.getStatus());
}
// TRACE_HANDLER.afterMethod(context.getStatus());
}
}3.2 RocketMqSendMessageHook 发送端的hook
@Component
public class RocketMqSendMessageHook implements SendMessageHook {
Logger logger = LoggerFactory.getLogger("rocketmq");
@Override
public String hookName() {
return "rocket_sendMessage_Hook";
}
@Override
public void sendMessageBefore(SendMessageContext context) {
//这里加了个msgId
MDC.put("msgId",
context.getMessage().getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
logger.info("准备发送消息,topic:{}", context.getMessage().getTopic());
}
@Override
public void sendMessageAfter(SendMessageContext context) {
SendResult sendResult = context.getSendResult();
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
logger.info("消息发送成功,topic:{}", context.getMessage().getTopic());
} else {
logger.error("消息发送失败,topic:{}, reason:{}", context.getMessage().getTopic(), sendResult.getSendStatus());
}
}
}4.dubbo 这里比较恶心
插件是有的,但是在provider端接收不到consumer端的traceId,只能自己来处理下了。
agent主要原理就是在加载class的时候写入对应的拦截处理代码。ByteBudder
可以看下dubbo的plugin源码
主要实现三个方法 前置拦截,后置拦截,与异常处理,使用dubbo的RpcContext来隐式传递对应的参数
拦截中在consumer中向RpcContext中添加参数,传递到provider中。
在provider中 从RpcContext中获取参数 设置到ContextCarrier中。
public class DubboInterceptor implements InstanceMethodsAroundInterceptor {
public static final String ARGUMENTS = "arguments";
/**
* <h2>Consumer:</h2> The serialized trace context data will
* inject to the {@link RpcContext#attachments} for transport to provider side.
* <p>
* <h2>Provider:</h2> The serialized trace context data will extract from
* {@link RpcContext#attachments}. current trace segment will ref if the serialize context data is not null.
*/
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
Invoker invoker = (Invoker) allArguments[0];
Invocation invocation = (Invocation) allArguments[1];
RpcContext rpcContext = RpcContext.getContext();
boolean isConsumer = rpcContext.isConsumerSide();
URL requestURL = invoker.getUrl();
AbstractSpan span;
final String host = requestURL.getHost();
final int port = requestURL.getPort();
boolean needCollectArguments;
int argumentsLengthThreshold;
if (isConsumer) {
final ContextCarrier contextCarrier = new ContextCarrier();
span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
//invocation.getAttachments().put("contextData", contextDataStr);
//@see https://github.com/alibaba/dubbo/blob/dubbo-2.5.3/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcInvocation.java#L154-L161
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
rpcContext.getAttachments().put(next.getHeadKey(), next.getHeadValue());
if (invocation.getAttachments().containsKey(next.getHeadKey())) {
invocation.getAttachments().remove(next.getHeadKey());
}
}
needCollectArguments = DubboPluginConfig.Plugin.Dubbo.COLLECT_CONSUMER_ARGUMENTS;
argumentsLengthThreshold = DubboPluginConfig.Plugin.Dubbo.CONSUMER_ARGUMENTS_LENGTH_THRESHOLD;
} else {
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
}
span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
span.setPeer(rpcContext.getRemoteAddressString());
needCollectArguments = DubboPluginConfig.Plugin.Dubbo.COLLECT_PROVIDER_ARGUMENTS;
argumentsLengthThreshold = DubboPluginConfig.Plugin.Dubbo.PROVIDER_ARGUMENTS_LENGTH_THRESHOLD;
}
Tags.URL.set(span, generateRequestURL(requestURL, invocation));
collectArguments(needCollectArguments, argumentsLengthThreshold, span, invocation);
span.setComponent(ComponentsDefine.DUBBO);
SpanLayer.asRPCFramework(span);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
Result result = (Result) ret;
try {
if (result != null && result.getException() != null) {
dealException(result.getException());
}
} catch (RpcException e) {
dealException(e);
}
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
dealException(t);
}
/**
* Log the throwable, which occurs in Dubbo RPC service.
*/
private void dealException(Throwable throwable) {
AbstractSpan span = ContextManager.activeSpan();
span.log(throwable);
}
}既然插件不起作用那么自己来
我们在dubbo的Filter中来扩展
首先引入对应的包
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-agent-core</artifactId>
<version>${apache.skywalking.version}</version>
</dependency>filter中
@Activate(group = {CommonConstants.PROVIDER, CommonConstants.CONSUMER})
public class DubboLogFilter implements Filter {
private static final Logger LOGGER = LoggerFactory.getLogger("dubbo");
private static final DubboTraceHandler traceHandler = new DubboTraceHandler();
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
AccessLogData requestLogDate = buildAccessLogData(invoker, inv);
traceHandler.beforeMethod(invoker, inv);
Result result = invoker.invoke(inv);
//省略一堆......
LOGGER.info("[cast={}][request={}][response={}]", finish - start, requestContent, respContent);
traceHandler.afterMethod(result);
return result;
}
}这里核心就是 DubboTraceHandler 与源码中对应,添加三个方法 直接copy plugin中的代码,改吧改吧就好了。
public class DubboTraceHandler {
public static final String ARGUMENTS = "arguments";
public final void beforeMethod(Invoker<?> invoker, Invocation invocation) {
RpcContext rpcContext = RpcContext.getContext();
boolean isConsumer = rpcContext.isConsumerSide();
URL requestURL = invoker.getUrl();
AbstractSpan span;
final String host = requestURL.getHost();
final int port = requestURL.getPort();
boolean needCollectArguments;
int argumentsLengthThreshold;
if (isConsumer) {
final ContextCarrier contextCarrier = new ContextCarrier();
span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier,
host + ":" + port);
//invocation.getAttachments().put("contextData", contextDataStr);
//@see https://github.com/alibaba/dubbo/blob/dubbo-2.5.3/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcInvocation.java#L154-L161
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
rpcContext.getAttachments().put(next.getHeadKey(), next.getHeadValue());
if (invocation.getAttachments().containsKey(next.getHeadKey())) {
invocation.getAttachments().remove(next.getHeadKey());
}
}
needCollectArguments = DubboPluginConfig.Plugin.Dubbo.COLLECT_CONSUMER_ARGUMENTS;
argumentsLengthThreshold = DubboPluginConfig.Plugin.Dubbo.CONSUMER_ARGUMENTS_LENGTH_THRESHOLD;
} else {
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
}
span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
span.setPeer(rpcContext.getRemoteAddressString());
needCollectArguments = DubboPluginConfig.Plugin.Dubbo.COLLECT_PROVIDER_ARGUMENTS;
argumentsLengthThreshold = DubboPluginConfig.Plugin.Dubbo.PROVIDER_ARGUMENTS_LENGTH_THRESHOLD;
}
Tags.URL.set(span, generateRequestURL(requestURL, invocation));
collectArguments(needCollectArguments, argumentsLengthThreshold, span, invocation);
span.setComponent(ComponentsDefine.DUBBO);
SpanLayer.asRPCFramework(span);
}
public Result afterMethod(Result result) {
try {
if (result != null && result.getException() != null) {
dealException(result.getException());
}
} catch (RpcException e) {
dealException(e);
}
ContextManager.stopSpan();
return result;
}
public final void handleMethodException(Throwable t) {
dealException(t);
}
}乐享:知识积累,快乐无限。