环境:

spring-cloud-dependencies: Hoxton.SR9

spring-boot-dependencies:2.3.2.RELEASE

spring-cloud-alibaba-dependencies:2.2.5.RELEASE

spring-cloud-starter-bootstrap:3.0.2

rocketmq-spring-boot-starter:2.2.2

dubbo:2.7.8

rocketMQ:4.9.4

skywalking:8.8.0


实现log: request rocketMQ dubbo


文档中的代码不太全,只放了部分关键代码


1 logback配置

首先项目添加依赖

<dependency>
    <groupId>org.apache.skywalking</groupId>
    <artifactId>apm-toolkit-logback-1.x</artifactId>
    <version>${apache.skywalking.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.skywalking</groupId>
    <artifactId>apm-toolkit-trace</artifactId>
    <version>${apache.skywalking.version}</version>
</dependency>

日志配置文件 主要是tid输出

<property name="DEFAULT_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%tid] [%thread] %-5level %logger{36} -%msg%n"/>


本地日志:appender

   <appender name="${APPENDER_NAME}" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_HOME}/${LOG_FILENAME}</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/${LOG_FILENAME}.%d{yyyy-MM-dd}.gz</fileNamePattern>
            <MaxHistory>30</MaxHistory>
        </rollingPolicy>
        <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
            <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
                <pattern>${LOG_PATTERN}</pattern>
            </layout>
        </encoder>
<!--        <encoder>-->
<!--            <pattern>%-4relative [%thread] %-5level %logger{35} - %msg%n</pattern>-->
<!--        </encoder>-->
    </appender>

上传skywalking 日志:appender

<appender name="${TRACE_APPENDER_NAME}" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender">
    <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
        <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
            <pattern>${LOG_PATTERN}</pattern>
        </layout>
        <charset>utf-8</charset>
    </encoder>
</appender>


2. 请求日志


添加requesterFilter 


// OncePerRequestFilter 保证只处理一次

public class HttpRequestLogFilter extends OncePerRequestFilter{
    //这里用了两个logger 一个只写成功的请求日志,一个写异常
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpRequestLogFilter.class);
    // 使用logger记录日志
    Logger logger = LoggerFactory.getLogger(DEFAULT_LOGGER_NAME);
    
    @Override
    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
       throws ServletException, IOException {
       ....省略一部分
       
            try {
            filterChain.doFilter(httpServletRequestWrapper, httpServletResponseWrapper);
        } finally {
            long end = System.currentTimeMillis();
            //请求处理完成后写日志
            saveLogData(request, httpServletRequestWrapper, httpServletResponseWrapper, end - start);
        }
    }
    /**
     * 写日志
     */
    private void saveLogData(HttpServletRequest request,
        CachedHttpServletRequestWrapper httpServletRequestWrapper,
        CachedHttpServletResponseWrapper httpServletResponseWrapper, long time) {
    try {
        // 如果使用了Writer就需要刷新流
        httpServletRequestWrapper.flushStream();
        httpServletResponseWrapper.flushStream();
        byte[] requestData = httpServletRequestWrapper.getCachedStream().getCached();
        byte[] responseData = httpServletResponseWrapper.getCachedStream().getCached();
        String requestString = requestData == null ? StringUtils.EMPTY : new String(requestData);
        String responseString = responseData == null ? StringUtils.EMPTY : new String(responseData);
        // 非 text 内容,隐藏
        if (!isTextContentType(httpServletRequestWrapper.getContentType())) {
            requestString = DEFAULT_IGNORE_TEXT;
        }
        if (!isTextContentType(httpServletResponseWrapper.getContentType())) {
            responseString = DEFAULT_IGNORE_TEXT;
        }
        // 处理请求参数map
        Map params = request.getParameterMap();
        params = Maps.newHashMap(params);
        String paramString = StringUtils.EMPTY;
        List<String> pairs = Lists.newArrayList();
        if (MapUtils.isNotEmpty(params)) {
            for (Object name : params.keySet()) {
                Object value = params.get(name);
                if (value instanceof String) {
                    pairs.add(name + "=" + StringUtils.trimToEmpty((String) value));
                } else if (value instanceof String[]) {
                    String[] values = (String[]) value;
                    for (String v : values) {
                        pairs.add(name + "=" + StringUtils.trimToEmpty(v));
                    }
                } else if (value != null) {
                    pairs.add(name + "=" + value);
                }
            }
            paramString = Joiner.on("&").join(pairs);
        }
        if (StringUtils.equals(request.getContentType(), MediaType.APPLICATION_FORM_URLENCODED_VALUE)) {
            paramString = URLDecoder.decode(paramString, "UTF-8");
        }
        // 使用logger记录日志
        StringBuilder buffer = new StringBuilder();
        buffer.append(logString("process_time", time + " ms"));
        buffer.append(logString("uri", request.getRequestURI() + "?" + paramString));
        buffer.append(logString("request", requestString.replaceAll("\n|\r", "")));
        buffer.append(logString("response", responseString.replaceAll("\n|\r", "")));
        logger.info(buffer.toString());
    } catch (Exception e) {
        LOGGER.warn("log request data error", e);
    } finally {
        closeQuietly(httpServletRequestWrapper.getCachedStream());
        closeQuietly(httpServletResponseWrapper.getCachedStream());
    }
    }
}



3.RocketMQ日志记录

使用RocketMQ的hook来记录日志

这里会发现有个TRACE_HANDLER  -- 之前rocketMQ一直打不出来traceId

所以自己写了个TraceHandler 来处理,后来把版本对应好后,使用现成的plugins插件,就没问题了,也就不需要使用自定义的TraceHandler 了。


3.1 RocketConsumeMessageHook


public class RocketConsumeMessageHook implements ConsumeMessageHook {
    private final Logger logger = LoggerFactory.getLogger("rocketmq");
    @Override
    public void consumeMessageBefore(ConsumeMessageContext context) {
        MDC.put("msgId", context.getMsgList().get(0).getMsgId());
//        TRACE_HANDLER.beforeMethod(context.getMsgList());
        for (MessageExt messageExt : context.getMsgList()) {
            logger.info("准备消费消息,topic:{}, group:{}", messageExt.getTopic(), context.getConsumerGroup());
        }
    }
    @Override
    public void consumeMessageAfter(ConsumeMessageContext context) {
        for (MessageExt messageExt : context.getMsgList()) {
            logger.info("消息消费完毕,topic:{}, group:{}, success:{}, status:{}", messageExt.getTopic(),
                    context.getConsumerGroup(), context.isSuccess(), context.getStatus());
        }
//        TRACE_HANDLER.afterMethod(context.getStatus());
    }
}


3.2 RocketMqSendMessageHook  发送端的hook


@Component
public class RocketMqSendMessageHook implements SendMessageHook {
    Logger logger = LoggerFactory.getLogger("rocketmq");
    @Override
    public String hookName() {
        return "rocket_sendMessage_Hook";
    }
    @Override
    public void sendMessageBefore(SendMessageContext context) {
    //这里加了个msgId
        MDC.put("msgId",
                context.getMessage().getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
        logger.info("准备发送消息,topic:{}", context.getMessage().getTopic());
    }
    @Override
    public void sendMessageAfter(SendMessageContext context) {
        SendResult sendResult = context.getSendResult();
        if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
            logger.info("消息发送成功,topic:{}", context.getMessage().getTopic());
        } else {
            logger.error("消息发送失败,topic:{}, reason:{}", context.getMessage().getTopic(), sendResult.getSendStatus());
        }
    }
}



4.dubbo 这里比较恶心

插件是有的,但是在provider端接收不到consumer端的traceId,只能自己来处理下了。

agent主要原理就是在加载class的时候写入对应的拦截处理代码。ByteBudder

可以看下dubbo的plugin源码

主要实现三个方法 前置拦截,后置拦截,与异常处理,使用dubbo的RpcContext来隐式传递对应的参数


拦截中在consumer中向RpcContext中添加参数,传递到provider中。

在provider中 从RpcContext中获取参数 设置到ContextCarrier中。


public class DubboInterceptor implements InstanceMethodsAroundInterceptor {
    public static final String ARGUMENTS = "arguments";
    /**
     * <h2>Consumer:</h2> The serialized trace context data will
     * inject to the {@link RpcContext#attachments} for transport to provider side.
     * <p>
     * <h2>Provider:</h2> The serialized trace context data will extract from
     * {@link RpcContext#attachments}. current trace segment will ref if the serialize context data is not null.
     */
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {
        Invoker invoker = (Invoker) allArguments[0];
        Invocation invocation = (Invocation) allArguments[1];
        RpcContext rpcContext = RpcContext.getContext();
        boolean isConsumer = rpcContext.isConsumerSide();
        URL requestURL = invoker.getUrl();
        AbstractSpan span;
        final String host = requestURL.getHost();
        final int port = requestURL.getPort();
        boolean needCollectArguments;
        int argumentsLengthThreshold;
        if (isConsumer) {
            final ContextCarrier contextCarrier = new ContextCarrier();
            span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
            //invocation.getAttachments().put("contextData", contextDataStr);
            //@see https://github.com/alibaba/dubbo/blob/dubbo-2.5.3/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcInvocation.java#L154-L161
            CarrierItem next = contextCarrier.items();
            while (next.hasNext()) {
                next = next.next();
                rpcContext.getAttachments().put(next.getHeadKey(), next.getHeadValue());
                if (invocation.getAttachments().containsKey(next.getHeadKey())) {
                    invocation.getAttachments().remove(next.getHeadKey());
                }
            }
            needCollectArguments = DubboPluginConfig.Plugin.Dubbo.COLLECT_CONSUMER_ARGUMENTS;
            argumentsLengthThreshold = DubboPluginConfig.Plugin.Dubbo.CONSUMER_ARGUMENTS_LENGTH_THRESHOLD;
        } else {
            ContextCarrier contextCarrier = new ContextCarrier();
            CarrierItem next = contextCarrier.items();
            while (next.hasNext()) {
                next = next.next();
                next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
            }
            span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
            span.setPeer(rpcContext.getRemoteAddressString());
            needCollectArguments = DubboPluginConfig.Plugin.Dubbo.COLLECT_PROVIDER_ARGUMENTS;
            argumentsLengthThreshold = DubboPluginConfig.Plugin.Dubbo.PROVIDER_ARGUMENTS_LENGTH_THRESHOLD;
        }
        Tags.URL.set(span, generateRequestURL(requestURL, invocation));
        collectArguments(needCollectArguments, argumentsLengthThreshold, span, invocation);
        span.setComponent(ComponentsDefine.DUBBO);
        SpanLayer.asRPCFramework(span);
    }
    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              Object ret) throws Throwable {
        Result result = (Result) ret;
        try {
            if (result != null && result.getException() != null) {
                dealException(result.getException());
            }
        } catch (RpcException e) {
            dealException(e);
        }
        ContextManager.stopSpan();
        return ret;
    }
    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                      Class<?>[] argumentsTypes, Throwable t) {
        dealException(t);
    }
    /**
     * Log the throwable, which occurs in Dubbo RPC service.
     */
    private void dealException(Throwable throwable) {
        AbstractSpan span = ContextManager.activeSpan();
        span.log(throwable);
    }
    
 }


既然插件不起作用那么自己来

我们在dubbo的Filter中来扩展

首先引入对应的包


<dependency>    
    <groupId>org.apache.skywalking</groupId>    
    <artifactId>apm-agent-core</artifactId>    
    <version>${apache.skywalking.version}</version>
</dependency>


filter中

@Activate(group = {CommonConstants.PROVIDER, CommonConstants.CONSUMER})
public class DubboLogFilter implements Filter {
  private static final Logger LOGGER = LoggerFactory.getLogger("dubbo");
  private static final DubboTraceHandler traceHandler = new DubboTraceHandler();
    @Override
    public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
        AccessLogData requestLogDate = buildAccessLogData(invoker, inv);
        traceHandler.beforeMethod(invoker, inv);
    
        Result result = invoker.invoke(inv);
        
        //省略一堆......
        
        LOGGER.info("[cast={}][request={}][response={}]", finish - start, requestContent, respContent);
        
        traceHandler.afterMethod(result);
        return result;   
    }
    
}


这里核心就是 DubboTraceHandler  与源码中对应,添加三个方法 直接copy plugin中的代码,改吧改吧就好了。

public class DubboTraceHandler {

    public static final String ARGUMENTS = "arguments";

    public final void beforeMethod(Invoker<?> invoker, Invocation invocation) {
        RpcContext rpcContext = RpcContext.getContext();
        boolean isConsumer = rpcContext.isConsumerSide();
        URL requestURL = invoker.getUrl();

        AbstractSpan span;

        final String host = requestURL.getHost();
        final int port = requestURL.getPort();

        boolean needCollectArguments;
        int argumentsLengthThreshold;
        if (isConsumer) {
            final ContextCarrier contextCarrier = new ContextCarrier();
            span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier,
                    host + ":" + port);
            //invocation.getAttachments().put("contextData", contextDataStr);
            //@see https://github.com/alibaba/dubbo/blob/dubbo-2.5.3/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcInvocation.java#L154-L161
            CarrierItem next = contextCarrier.items();
            while (next.hasNext()) {
                next = next.next();
                rpcContext.getAttachments().put(next.getHeadKey(), next.getHeadValue());
                if (invocation.getAttachments().containsKey(next.getHeadKey())) {
                    invocation.getAttachments().remove(next.getHeadKey());
                }
            }
            needCollectArguments = DubboPluginConfig.Plugin.Dubbo.COLLECT_CONSUMER_ARGUMENTS;
            argumentsLengthThreshold = DubboPluginConfig.Plugin.Dubbo.CONSUMER_ARGUMENTS_LENGTH_THRESHOLD;
        } else {
            ContextCarrier contextCarrier = new ContextCarrier();
            CarrierItem next = contextCarrier.items();
            while (next.hasNext()) {
                next = next.next();
                next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
            }

            span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
            span.setPeer(rpcContext.getRemoteAddressString());
            needCollectArguments = DubboPluginConfig.Plugin.Dubbo.COLLECT_PROVIDER_ARGUMENTS;
            argumentsLengthThreshold = DubboPluginConfig.Plugin.Dubbo.PROVIDER_ARGUMENTS_LENGTH_THRESHOLD;
        }

        Tags.URL.set(span, generateRequestURL(requestURL, invocation));
        collectArguments(needCollectArguments, argumentsLengthThreshold, span, invocation);
        span.setComponent(ComponentsDefine.DUBBO);
        SpanLayer.asRPCFramework(span);
    }

    public Result afterMethod(Result result) {
        try {
            if (result != null && result.getException() != null) {
                dealException(result.getException());
            }
        } catch (RpcException e) {
            dealException(e);
        }

        ContextManager.stopSpan();
        return result;
    }

    public final void handleMethodException(Throwable t) {
        dealException(t);
    }

}






乐享:知识积累,快乐无限。