环境:
spring-cloud-dependencies: Hoxton.SR9
spring-boot-dependencies:2.3.2.RELEASE
spring-cloud-alibaba-dependencies:2.2.5.RELEASE
spring-cloud-starter-bootstrap:3.0.2
rocketmq-spring-boot-starter:2.2.2
dubbo:2.7.8
rocketMQ:4.9.4
skywalking:8.8.0
实现log: request rocketMQ dubbo
文档中的代码不太全,只放了部分关键代码
1 logback配置
首先项目添加依赖
<dependency> <groupId>org.apache.skywalking</groupId> <artifactId>apm-toolkit-logback-1.x</artifactId> <version>${apache.skywalking.version}</version> </dependency> <dependency> <groupId>org.apache.skywalking</groupId> <artifactId>apm-toolkit-trace</artifactId> <version>${apache.skywalking.version}</version> </dependency>
日志配置文件 主要是tid输出
<property name="DEFAULT_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%tid] [%thread] %-5level %logger{36} -%msg%n"/>
本地日志:appender
<appender name="${APPENDER_NAME}" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOG_HOME}/${LOG_FILENAME}</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${LOG_HOME}/${LOG_FILENAME}.%d{yyyy-MM-dd}.gz</fileNamePattern> <MaxHistory>30</MaxHistory> </rollingPolicy> <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder"> <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout"> <pattern>${LOG_PATTERN}</pattern> </layout> </encoder> <!-- <encoder>--> <!-- <pattern>%-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>--> <!-- </encoder>--> </appender>
上传skywalking 日志:appender
<appender name="${TRACE_APPENDER_NAME}" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender"> <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder"> <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout"> <pattern>${LOG_PATTERN}</pattern> </layout> <charset>utf-8</charset> </encoder> </appender>
2. 请求日志
添加requesterFilter
// OncePerRequestFilter 保证只处理一次 public class HttpRequestLogFilter extends OncePerRequestFilter{ //这里用了两个logger 一个只写成功的请求日志,一个写异常 private static final Logger LOGGER = LoggerFactory.getLogger(HttpRequestLogFilter.class); // 使用logger记录日志 Logger logger = LoggerFactory.getLogger(DEFAULT_LOGGER_NAME); @Override protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException { ....省略一部分 try { filterChain.doFilter(httpServletRequestWrapper, httpServletResponseWrapper); } finally { long end = System.currentTimeMillis(); //请求处理完成后写日志 saveLogData(request, httpServletRequestWrapper, httpServletResponseWrapper, end - start); } } /** * 写日志 */ private void saveLogData(HttpServletRequest request, CachedHttpServletRequestWrapper httpServletRequestWrapper, CachedHttpServletResponseWrapper httpServletResponseWrapper, long time) { try { // 如果使用了Writer就需要刷新流 httpServletRequestWrapper.flushStream(); httpServletResponseWrapper.flushStream(); byte[] requestData = httpServletRequestWrapper.getCachedStream().getCached(); byte[] responseData = httpServletResponseWrapper.getCachedStream().getCached(); String requestString = requestData == null ? StringUtils.EMPTY : new String(requestData); String responseString = responseData == null ? StringUtils.EMPTY : new String(responseData); // 非 text 内容,隐藏 if (!isTextContentType(httpServletRequestWrapper.getContentType())) { requestString = DEFAULT_IGNORE_TEXT; } if (!isTextContentType(httpServletResponseWrapper.getContentType())) { responseString = DEFAULT_IGNORE_TEXT; } // 处理请求参数map Map params = request.getParameterMap(); params = Maps.newHashMap(params); String paramString = StringUtils.EMPTY; List<String> pairs = Lists.newArrayList(); if (MapUtils.isNotEmpty(params)) { for (Object name : params.keySet()) { Object value = params.get(name); if (value instanceof String) { pairs.add(name + "=" + StringUtils.trimToEmpty((String) value)); } else if (value instanceof String[]) { String[] values = (String[]) value; for (String v : values) { pairs.add(name + "=" + StringUtils.trimToEmpty(v)); } } else if (value != null) { pairs.add(name + "=" + value); } } paramString = Joiner.on("&").join(pairs); } if (StringUtils.equals(request.getContentType(), MediaType.APPLICATION_FORM_URLENCODED_VALUE)) { paramString = URLDecoder.decode(paramString, "UTF-8"); } // 使用logger记录日志 StringBuilder buffer = new StringBuilder(); buffer.append(logString("process_time", time + " ms")); buffer.append(logString("uri", request.getRequestURI() + "?" + paramString)); buffer.append(logString("request", requestString.replaceAll("\n|\r", ""))); buffer.append(logString("response", responseString.replaceAll("\n|\r", ""))); logger.info(buffer.toString()); } catch (Exception e) { LOGGER.warn("log request data error", e); } finally { closeQuietly(httpServletRequestWrapper.getCachedStream()); closeQuietly(httpServletResponseWrapper.getCachedStream()); } } }
3.RocketMQ日志记录
使用RocketMQ的hook来记录日志
这里会发现有个TRACE_HANDLER -- 之前rocketMQ一直打不出来traceId
所以自己写了个TraceHandler 来处理,后来把版本对应好后,使用现成的plugins插件,就没问题了,也就不需要使用自定义的TraceHandler 了。
3.1 RocketConsumeMessageHook
public class RocketConsumeMessageHook implements ConsumeMessageHook { private final Logger logger = LoggerFactory.getLogger("rocketmq"); @Override public void consumeMessageBefore(ConsumeMessageContext context) { MDC.put("msgId", context.getMsgList().get(0).getMsgId()); // TRACE_HANDLER.beforeMethod(context.getMsgList()); for (MessageExt messageExt : context.getMsgList()) { logger.info("准备消费消息,topic:{}, group:{}", messageExt.getTopic(), context.getConsumerGroup()); } } @Override public void consumeMessageAfter(ConsumeMessageContext context) { for (MessageExt messageExt : context.getMsgList()) { logger.info("消息消费完毕,topic:{}, group:{}, success:{}, status:{}", messageExt.getTopic(), context.getConsumerGroup(), context.isSuccess(), context.getStatus()); } // TRACE_HANDLER.afterMethod(context.getStatus()); } }
3.2 RocketMqSendMessageHook 发送端的hook
@Component public class RocketMqSendMessageHook implements SendMessageHook { Logger logger = LoggerFactory.getLogger("rocketmq"); @Override public String hookName() { return "rocket_sendMessage_Hook"; } @Override public void sendMessageBefore(SendMessageContext context) { //这里加了个msgId MDC.put("msgId", context.getMessage().getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); logger.info("准备发送消息,topic:{}", context.getMessage().getTopic()); } @Override public void sendMessageAfter(SendMessageContext context) { SendResult sendResult = context.getSendResult(); if (sendResult.getSendStatus() == SendStatus.SEND_OK) { logger.info("消息发送成功,topic:{}", context.getMessage().getTopic()); } else { logger.error("消息发送失败,topic:{}, reason:{}", context.getMessage().getTopic(), sendResult.getSendStatus()); } } }
4.dubbo 这里比较恶心
插件是有的,但是在provider端接收不到consumer端的traceId,只能自己来处理下了。
agent主要原理就是在加载class的时候写入对应的拦截处理代码。ByteBudder
可以看下dubbo的plugin源码
主要实现三个方法 前置拦截,后置拦截,与异常处理,使用dubbo的RpcContext来隐式传递对应的参数
拦截中在consumer中向RpcContext中添加参数,传递到provider中。
在provider中 从RpcContext中获取参数 设置到ContextCarrier中。
public class DubboInterceptor implements InstanceMethodsAroundInterceptor { public static final String ARGUMENTS = "arguments"; /** * <h2>Consumer:</h2> The serialized trace context data will * inject to the {@link RpcContext#attachments} for transport to provider side. * <p> * <h2>Provider:</h2> The serialized trace context data will extract from * {@link RpcContext#attachments}. current trace segment will ref if the serialize context data is not null. */ @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable { Invoker invoker = (Invoker) allArguments[0]; Invocation invocation = (Invocation) allArguments[1]; RpcContext rpcContext = RpcContext.getContext(); boolean isConsumer = rpcContext.isConsumerSide(); URL requestURL = invoker.getUrl(); AbstractSpan span; final String host = requestURL.getHost(); final int port = requestURL.getPort(); boolean needCollectArguments; int argumentsLengthThreshold; if (isConsumer) { final ContextCarrier contextCarrier = new ContextCarrier(); span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port); //invocation.getAttachments().put("contextData", contextDataStr); //@see https://github.com/alibaba/dubbo/blob/dubbo-2.5.3/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcInvocation.java#L154-L161 CarrierItem next = contextCarrier.items(); while (next.hasNext()) { next = next.next(); rpcContext.getAttachments().put(next.getHeadKey(), next.getHeadValue()); if (invocation.getAttachments().containsKey(next.getHeadKey())) { invocation.getAttachments().remove(next.getHeadKey()); } } needCollectArguments = DubboPluginConfig.Plugin.Dubbo.COLLECT_CONSUMER_ARGUMENTS; argumentsLengthThreshold = DubboPluginConfig.Plugin.Dubbo.CONSUMER_ARGUMENTS_LENGTH_THRESHOLD; } else { ContextCarrier contextCarrier = new ContextCarrier(); CarrierItem next = contextCarrier.items(); while (next.hasNext()) { next = next.next(); next.setHeadValue(rpcContext.getAttachment(next.getHeadKey())); } span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier); span.setPeer(rpcContext.getRemoteAddressString()); needCollectArguments = DubboPluginConfig.Plugin.Dubbo.COLLECT_PROVIDER_ARGUMENTS; argumentsLengthThreshold = DubboPluginConfig.Plugin.Dubbo.PROVIDER_ARGUMENTS_LENGTH_THRESHOLD; } Tags.URL.set(span, generateRequestURL(requestURL, invocation)); collectArguments(needCollectArguments, argumentsLengthThreshold, span, invocation); span.setComponent(ComponentsDefine.DUBBO); SpanLayer.asRPCFramework(span); } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable { Result result = (Result) ret; try { if (result != null && result.getException() != null) { dealException(result.getException()); } } catch (RpcException e) { dealException(e); } ContextManager.stopSpan(); return ret; } @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) { dealException(t); } /** * Log the throwable, which occurs in Dubbo RPC service. */ private void dealException(Throwable throwable) { AbstractSpan span = ContextManager.activeSpan(); span.log(throwable); } }
既然插件不起作用那么自己来
我们在dubbo的Filter中来扩展
首先引入对应的包
<dependency> <groupId>org.apache.skywalking</groupId> <artifactId>apm-agent-core</artifactId> <version>${apache.skywalking.version}</version> </dependency>
filter中
@Activate(group = {CommonConstants.PROVIDER, CommonConstants.CONSUMER}) public class DubboLogFilter implements Filter { private static final Logger LOGGER = LoggerFactory.getLogger("dubbo"); private static final DubboTraceHandler traceHandler = new DubboTraceHandler(); @Override public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException { AccessLogData requestLogDate = buildAccessLogData(invoker, inv); traceHandler.beforeMethod(invoker, inv); Result result = invoker.invoke(inv); //省略一堆...... LOGGER.info("[cast={}][request={}][response={}]", finish - start, requestContent, respContent); traceHandler.afterMethod(result); return result; } }
这里核心就是 DubboTraceHandler 与源码中对应,添加三个方法 直接copy plugin中的代码,改吧改吧就好了。
public class DubboTraceHandler { public static final String ARGUMENTS = "arguments"; public final void beforeMethod(Invoker<?> invoker, Invocation invocation) { RpcContext rpcContext = RpcContext.getContext(); boolean isConsumer = rpcContext.isConsumerSide(); URL requestURL = invoker.getUrl(); AbstractSpan span; final String host = requestURL.getHost(); final int port = requestURL.getPort(); boolean needCollectArguments; int argumentsLengthThreshold; if (isConsumer) { final ContextCarrier contextCarrier = new ContextCarrier(); span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port); //invocation.getAttachments().put("contextData", contextDataStr); //@see https://github.com/alibaba/dubbo/blob/dubbo-2.5.3/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcInvocation.java#L154-L161 CarrierItem next = contextCarrier.items(); while (next.hasNext()) { next = next.next(); rpcContext.getAttachments().put(next.getHeadKey(), next.getHeadValue()); if (invocation.getAttachments().containsKey(next.getHeadKey())) { invocation.getAttachments().remove(next.getHeadKey()); } } needCollectArguments = DubboPluginConfig.Plugin.Dubbo.COLLECT_CONSUMER_ARGUMENTS; argumentsLengthThreshold = DubboPluginConfig.Plugin.Dubbo.CONSUMER_ARGUMENTS_LENGTH_THRESHOLD; } else { ContextCarrier contextCarrier = new ContextCarrier(); CarrierItem next = contextCarrier.items(); while (next.hasNext()) { next = next.next(); next.setHeadValue(rpcContext.getAttachment(next.getHeadKey())); } span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier); span.setPeer(rpcContext.getRemoteAddressString()); needCollectArguments = DubboPluginConfig.Plugin.Dubbo.COLLECT_PROVIDER_ARGUMENTS; argumentsLengthThreshold = DubboPluginConfig.Plugin.Dubbo.PROVIDER_ARGUMENTS_LENGTH_THRESHOLD; } Tags.URL.set(span, generateRequestURL(requestURL, invocation)); collectArguments(needCollectArguments, argumentsLengthThreshold, span, invocation); span.setComponent(ComponentsDefine.DUBBO); SpanLayer.asRPCFramework(span); } public Result afterMethod(Result result) { try { if (result != null && result.getException() != null) { dealException(result.getException()); } } catch (RpcException e) { dealException(e); } ContextManager.stopSpan(); return result; } public final void handleMethodException(Throwable t) { dealException(t); } }
乐享:知识积累,快乐无限。