由于cat-client目录下的代码官方已经不再维护更新,所以分析lib目录下的cat客户端源码
CAT客户端的源码分析以官方的使用QuickStart入手
如上图代码所示:
通过Cat客户端开启了一个Transaction,记录整个方法的运行时间。在过程中“记录”(插入)了成功的Event事件,和两个Metric业务指标。最后设置Transaction成功 SUCCESS ,标记结束 complete() 。下面以上述代码,分析客户端源码
com.dianping.cat.Cat#newTransaction方法传入type和name作为参数,二者是CAT报表展示时的目录层级分类,type下包含name。 isEnabled检查是否启用CAT客户端后,获取 DefaultMessageProducer ,这部分逻辑上一章已经描述过。 通过 DefaultMessageProducer 执行newTransaction方法
public static Transaction newTransaction(String type, String name) { if (isEnabled()) { try { return Cat.getProducer().newTransaction(type, name); } catch (Exception e) { errorHandler(e); return NullMessage.TRANSACTION; } } else { return NullMessage.TRANSACTION; } }com.dianping.cat.message.internal.DefaultMessageProducer#newTransaction方法中, 首先通过DefaultMessageManager检查ThreadLocal中是否已有Context上下文。如果没有创建Context上下文,则执行setUp方法执行创建。 由于Cat客户端是单实例的,因此要使用ThreadLocal保证采集数据的线程安全,避免使用锁保证性能 完成Context上下文的读取/创建后,生成一个DefaultTransaction的Message。 开启Transaction,设置其中相关信息
public Transaction newTransaction(String type, String name) { // 1.获取或者创建ThreadLocal的上下文 if (!manager.hasContext()) { manager.setup(); } // 2.创建消息 DefaultTransaction transaction = new DefaultTransaction(type, name, manager); // 3.开启Transaction manager.start(transaction, false); return transaction; }获取上下文的方法主要包含在com.dianping.cat.message.internal.DefaultMessageManager类中,其主要逻辑如下 setUp时会读取采样率,默认配置为1.0即全部采样。如果设置采样率小于1,会设置标记位,在发送时再进行处理。(采样逻辑以后文章介绍)
@Override public boolean hasContext() { // 从ThreadLocal中读上下文 Context context = this.context.get(); return context != null; } @Override public void setup() { // 没读到时创建一个Context Context ctx = new Context(domain, hostName, ip); double samplingRate = configService.getSamplingRate(); // 判断是否命中采样 if (samplingRate < 1.0 && hitSample(samplingRate)) { ctx.tree.setHitSample(true); } // 保存到ThreadLocal中 context.set(ctx); } private boolean hitSample(double sampleRatio) { // sampleCount是单例Manager的一个单实例,因此不同线程调用setup时,一定会使sampleCount增加,触发采样判断 int count = sampleCount.incrementAndGet(); return count % ((int) (1.0 / sampleRatio)) == 0; }创建Transaction的逻辑较为简单,引用MessageManager,保存Transaction开始时间等,设置基础的消息内容
public DefaultTransaction(String type, String name, MessageManager manager) { super(type, name); this.manager = manager; durationStart = System.nanoTime(); } // com.dianping.cat.message.internal.AbstractMessage#AbstractMessage AbstractMessage(String type, String name) { this.type = String.valueOf(type); this.name = String.valueOf(name); timestampInMillis = System.currentTimeMillis(); }通过Manager开启Transaction是核心逻辑 com.dianping.cat.message.internal.DefaultMessageManager#start,通过上下文,开启Transaction
public void start(Transaction transaction, boolean forked) { Context ctx = getContext(); if (ctx != null) { ctx.start(transaction, forked); } else if (firstMessage) { // Cat客户端正常调用时,这部分逻辑不会进入,因为已经setUp // 如果直接调用,调用不规范,则会进入这部分 firstMessage = false; LOGGER.error("CAT client is not enabled because it's not initialized yet"); } }com.dianping.cat.message.internal.DefaultMessageManager.Context#start中: stack变量是Context保存Transaction的集合private Stack<Transaction> stack 当栈为空时,说明当前线程上下文中没有父Transaction,messageTree中直接保存当前message 当栈不为空时,从栈顶读取消息,将当前transaction设置为其子transaction。保存transaction之间的父子关系,即transaction“树”。 最后,将当前消息压入栈。对于异步Transaction暂时不解释 注:如果Transaction是子消息,那么只是消息只是入栈,并没有修改Context上下文MessageTree所引用的Message对象。Tree中的Transaction类型Message,只保存根节点
public void start(Transaction transaction, boolean forked) { if (!stack.isEmpty()) { // Do NOT make strong reference from parent transaction to forked transaction. // Instead, we create a "soft" reference to forked transaction later, via linkAsRunAway() // By doing so, there is no need for synchronization between parent and child threads. // Both threads can complete() anytime despite the other thread. if (!(transaction instanceof ForkedTransaction)) { // 读取栈顶消息,但是不取出 Transaction parent = stack.peek(); // 添加子消息 addTransactionChild(transaction, parent); } } else { tree.setMessage(transaction); } if (!forked) { // 消息入栈 stack.push(transaction); } }添加子消息时,对消息的触发时间是否是当前小时、子消息是否超过2000条进行判断。 如果不满组响应条件,则会触发截断操作,发送一次消息。之后,将消息添加到Transaction的List中
关于长度Length length保存在线程上下文中,每当添加一个子transaction时加一,累加到root transaction发送清空上下文为止; 同级transaction有多个兄弟时,也一定是前一个兄弟先end,再压入新的弟transaction。 length = 深度 + 个数
// 传入当前消息和parent private void addTransactionChild(Message message, Transaction transaction) { long treePeriod = tree.getMessage().getTimestamp() / HOUR; long messagePeriod = (message.getTimestamp() - 10 * 1000L) / HOUR; // 10 seconds extra time allowed // 判断时间周期是否还是当前小时 // 上下文中父子消息数量超过2000 if (treePeriod < messagePeriod || length >= SIZE) { // 发送 validator.truncateAndFlush(this, message.getTimestamp()); } // 添加子消息到父消息的集合中 transaction.addChild(message); // length在上下文中保存直到发送后清理,因此一个root,length = root下子节点总数 length++; } // Transaction中子消息的集合添加操作com.dianping.cat.message.internal.DefaultTransaction#addChild public DefaultTransaction addChild(Message message) { if (children == null) { children = new ArrayList<Message>(); } children.add(message); return this; }validator使用TransactionHelper实现,truncateAndFlush方法的目的是 将现有的消息复制并发送,同时记录消息ID等,通过消息ID串联两个Transaction。将过大、或者过期的Transaction消息按长度、时间分割为两个Transaction。 其逻辑如下(删除了ForkedTransaction相关逻辑)
获取当前的MessageTree,并读取/生成相应的MessageId,ParentId,RootId,用于链接消息树。将当消息Transaction拷贝到一个临时target中,并且递归source和其childMessage,将信息转移到target对象上创建一个DefaultEvent,记录串联事件拷贝当前MessageTree,将traget设置到其中的Message,将新tree的Id设置为childId,将新tree的parentId设置为当前tree的Id。完成串联 public void truncateAndFlush(Context ctx, long timestamp) { // 获取当前的Message和stack,Message为父 MessageTree tree = ctx.tree; Stack<Transaction> stack = ctx.stack; Message message = tree.getMessage(); if (message instanceof DefaultTransaction) { // 检查ID,rootId,ChildId,为发送后关联消息链做准备 String id = tree.getMessageId(); if (id == null) { id = nextMessageId(); tree.setMessageId(id); } String rootId = tree.getRootMessageId(); String childId = nextMessageId(); Transaction source = (Transaction) message; // 新建一个Transaction消息,拷贝父数据 DefaultTransaction target = new DefaultTransaction(source.getType(), source.getName(), DefaultMessageManager.this); target.setTimestamp(source.getTimestamp()); target.setDurationInMicros(source.getDurationInMicros()); target.setStatus(Message.SUCCESS); // 递归迁移原有的父信息到traget中 migrateMessage(stack, source, target, 1); if (message instanceof DefaultTransaction) { // 增加到子logview的下一个链接,向下关联 // Event报表的RemoteCall.Next可以看到该链路的整合信息 DefaultEvent next = new DefaultEvent("RemoteCall", "Next"); next.addData(childId); next.setStatus(Message.SUCCESS); target.addChild(next); } else { // 在forked情况下,最后一个messageTree需要和父亲线程上下文保持关联,在堆栈里面加入已经发送的logview的链接,这个地方是向上关联 // forked先不看 } // 拷贝当前MessageTree到一个临时tree t MessageTree t = tree.copy(); // 设置message相关信息 t.setMessage(target); ctx.tree.setMessageId(childId); ctx.tree.setParentMessageId(id); ctx.tree.setRootMessageId(rootId != null ? rootId : id); if (ctx.knownExceptions != null) { ctx.knownExceptions.clear(); } ctx.length = stack.size(); ctx.totalDurationInMicros = ctx.totalDurationInMicros + target.getDurationInMicros(); // 将t发送,但不清理当前Context上下文。 flush(t, false); } }递归算法解释如下 首先需要说明,更具Context的start方法逻辑已知,stack中只要栈顶有transaction,新建的transaction一定属于其子transaction。 那么栈内的元素父子关系可能如下图,T1是root位于栈底,T1.1.1.1是T1的第3层子节点。上下文tree中的Message始终引用根节点
递归时传入4个参数:
stack是transaction保存的堆栈,用于在递归中读取transactionsource可以理解为当前待迁移数据的节点traget是存储新的节点数据的引用树level是栈的索引,从1开始,即第一层子节点 private void migrateMessage(Stack<Transaction> stack, Transaction source, Transaction target, int level) { Transaction current = level < stack.size() ? stack.get(level) : null; boolean shouldKeep = false; final List<Message> childs = source.getChildren(); for (Message child : childs) { if (child != current) { target.addChild(child); } else { DefaultTransaction cloned = new DefaultTransaction(current.getType(), current.getName(), DefaultMessageManager.this); cloned.setTimestamp(current.getTimestamp()); cloned.setDurationInMicros(current.getDurationInMicros()); cloned.setStatus(Message.SUCCESS); target.addChild(cloned); migrateMessage(stack, current, cloned, level + 1); shouldKeep = true; } } // 清理节点,保留干净的父子关系 source.getChildren().clear(); if (shouldKeep) { // add it back source.addChild(current); } }通过递归,调整level+1,读取下一个子节点,如果子节点存在且与自身不同,继续递归。直到不符合条件弹出。 target完成了整个子树的链路引用,示意图如下
综上,完成了Transaction的开启过程。
先跳过为Transaction树添加消息的方法,研究transaction如何完成/关闭,执行发送。 DefaultTransaction的complete实现如下:
校验completed标记位,避免重复执行计算Transaction的运行时间,注意是纳秒,除以1000L,支持ms级的精确度(纳秒统计内存级别的时间)设置标记位检查是否属于Problem报表?是,则标记该消息不可丢弃,不可丢弃则保证problem不受采样控制执行end结束方法 public void complete() { try { if (isCompleted()) { // 校验标记位,是否已经标记过 } else { // 计算Transaction的运行时间 if (durationInMicro == -1) { durationInMicro = (System.nanoTime() - durationStart) / 1000L; } setCompleted(true); // 检查problem报表 if (manager != null && isProblem(this, manager, type, durationInMicro / 1000L)) { manager.getThreadLocalMessageTree().setDiscardPrivate(false); } // 执行结束方法 if (manager != null) { manager.end(this); } } } catch (Exception e) { // ignore } }校验Problem主要用于当transaction是type是long-sql等数据CAT默认检查项的类型,判断是否超限的方法。
取type,从缓存的threshold map中读取阈值未读到缓存时,从config中读取配置,并进行缓存超限时,记录每分钟type-name对应的告警次数,小于60次为true private boolean isProblem(Transaction t, MessageManager manager, String type, long duration) { boolean problem; // transaction成功状态下记录异常Problem if (t.isSuccess()) { // map,缓存每个type对应的threshold Integer value = map.get(type); if (value != null) { // 已经读取过配置,直接从内存取 problem = duration > value; } else { // 没读到,读config配置 int threshold = Integer.MAX_VALUE; ClientConfigService config = manager.getConfigService(); // 获取枚举,读取相关配置 ProblemLongType longType = ProblemLongType.findByMessageType(type); if (longType != null) { switch (longType) { // 通过配置文件读取配置 case LONG_CACHE: threshold = config.getLongConfigThreshold(ProblemLongType.LONG_CACHE.getName()); break; case LONG_CALL: threshold = config.getLongConfigThreshold(ProblemLongType.LONG_CALL.getName()); break; case LONG_SERVICE: threshold = config.getLongConfigThreshold(ProblemLongType.LONG_SERVICE.getName()); break; case LONG_SQL: threshold = config.getLongConfigThreshold(ProblemLongType.LONG_SQL.getName()); break; case LONG_URL: threshold = config.getLongConfigThreshold(ProblemLongType.LONG_URL.getName()); break; case LONG_MQ: threshold = config.getLongConfigThreshold(ProblemLongType.LONG_MQ.getName()); break; } } // 缓存配置 map.put(type, threshold); // 满足Threshold,记录problem数量,最大60条,超过时丢弃 problem = duration > threshold && recordProblem(t.getTimestamp(), t.getType() + t.getName()); } } else { problem = recordProblem(t.getTimestamp(), t.getType() + t.getName()); } return problem; } // 记录type和name对应的problem数量 private boolean recordProblem(long time, String id) { try { long minute = time / 1000 / 60; ConcurrentHashMap<String, AtomicInteger> count = DefaultTransaction.count.get(minute); if (count == null) { count = new ConcurrentHashMap<String, AtomicInteger>(); ConcurrentHashMap<String, AtomicInteger> oldCount = DefaultTransaction.count.putIfAbsent(minute, count); if (oldCount != null) { count = oldCount; } } AtomicInteger value = count.get(id); if (value == null) { value = new AtomicInteger(0); AtomicInteger oldValue = count.putIfAbsent(id, value); if (oldValue != null) { value = oldValue; } } return value.incrementAndGet() <= 60; } catch (Exception e) { return false; } }枚举中定义了会被Problem报表采集的type 我们可以自己埋点时设置type对应枚举,实现统计problem
public enum ProblemLongType { LONG_CACHE("long-cache", 25) { @Override protected boolean checkLongType(String type) { return type.startsWith("Squirrel.") || type.startsWith("Cellar.") || type.startsWith("Cache."); } }, LONG_CALL("long-call", 100) { @Override protected boolean checkLongType(String type) { return "PigeonCall".equals(type) || "OctoCall".equals(type) || "Call".equals(type); } }, LONG_SERVICE("long-service", 100) { @Override protected boolean checkLongType(String type) { return "PigeonService".equals(type) || "OctoService".equals(type) || "Service".equals(type); } }, LONG_SQL("long-sql", 100) { @Override protected boolean checkLongType(String type) { return "SQL".equals(type); } }, LONG_URL("long-url", 1000) { @Override protected boolean checkLongType(String type) { return "URL".equals(type); } }, LONG_MQ("long-mq", 100) { @Override protected boolean checkLongType(String type) { return "MtmqRecvMessage".equals(type) || "MafkaRecvMessage".equals(type); } } // 其他代码 }complete方法最后调用com.dianping.cat.message.internal.DefaultMessageManager#end,结束指定transaction消息。它通过获取当先线程上下文ctx实现end
public void end(Transaction transaction) { Context ctx = getContext(); if (ctx != null) { ctx.end(this, transaction); } }com.dianping.cat.message.internal.DefaultMessageManager.Context#end方法的实现逻辑如下:
校验transaction的stack是否有数据需要发送判断元素是否是栈顶的transaction,若不是则循环判断弹出,直到找到相等的transaction判断Stack已经完全清空,说明消息树中的根Transaction已经弹出,可以将整个tree发送。保证只有当根transaction可以结束时,才发送消息拷贝当前tree,并清空,将tree发送 public boolean end(DefaultMessageManager manager, Transaction transaction) { if (!stack.isEmpty()) { Transaction current = stack.pop(); // 判断栈顶元素是否是要结束的transaction if (transaction == current) { } else { // 将不相同的transaction全部弹出,直到相等或空栈 while (transaction != current && !stack.empty()) { current = stack.pop(); } } // 当栈为空时继续 if (stack.isEmpty()) { // 先不考虑forktransaction final List<ForkableTransaction> forkableTransactions = tree.getForkableTransactions(); if (forkableTransactions != null && !forkableTransactions.isEmpty()) { for (ForkableTransaction forkableTransaction : forkableTransactions) { forkableTransaction.complete(); } } // 拷贝树,将上下文清空 MessageTree tree = this.tree.copy(); this.tree.setMessageId(null); this.tree.setMessage(null); // totalDurationInMicros大于0,说明是transant的消息,不能采样处理 if (totalDurationInMicros > 0) { tree.setDiscardPrivate(false); } // 发送消息 manager.flush(tree, true); return true; } } return false; }com.dianping.cat.message.internal.DefaultMessageManager#flush的发送逻辑相对简单:
调用TCP客户端发送消息重置上下文数据 public void flush(MessageTree tree, boolean clearContext) { if (sender != null) { // TCPSender发送 sender.send(tree); if (clearContext) { // 重置上下文 reset(); } } else { throttleTimes++; if (throttleTimes % 10000 == 0 || throttleTimes == 1) { LOGGER.info("Cat Message is throttled! Times:" + throttleTimes); } } }LogEvent方法可以单独发送一个Event事件,或者向一个父Transaction插入一个Event事件。Cat的接口如下,主要逻辑与创建Transaction相近,最终通过Producer创建Event
/** * Log an event in one shot. * * @param type event type * @param name event name * @param status "0" means success, otherwise means error code * @param nameValuePairs name value pairs in the format of "a=1&b=2&..." */ public static void logEvent(String type, String name, String status, String nameValuePairs) { if (isEnabled()) { try { Cat.getProducer().logEvent(type, name, status, nameValuePairs); } catch (Exception e) { errorHandler(e); } } }com.dianping.cat.message.internal.DefaultMessageProducer#logEvent方法的实现相对很简单。 实例化event对象,记录type、name,data即完成了创建。由于Event是孤立的事件,没有相关关系,可以直接complete,标记完成执行发送
@Override public void logEvent(String type, String name, String status, String nameValuePairs) { // 实例化Event,设置数据 Event event = newEvent(type, name); if (nameValuePairs != null && nameValuePairs.length() > 0) { event.addData(nameValuePairs); } event.setStatus(status); // 标记完成,准备发送 event.complete(); }com.dianping.cat.message.internal.DefaultEvent#complete方法中,设置标记位,通过DefaultMessageManager中在当前线程上下文中添加当前Event消息
public void complete() { setCompleted(true); // 添加消息 if (manager != null) { manager.add(this); } } // com.dianping.cat.message.internal.DefaultMessageManager#add public void add(Message message) { Context ctx = getContext(); if (ctx != null) { ctx.add(message); } }com.dianping.cat.message.internal.DefaultMessageManager.Context#add方法中:
通过stack判断是否有父Transaction,如果没有transaction,复制tree通过flush发送,与transaction发送类似如果有父Transaction,则给该Transaciton添加一个child,但是不向stack中保存 public void add(Message message) { if (stack.isEmpty()) { MessageTree tree = this.tree.copy(); tree.setMessage(message); flush(tree, true); } else { Transaction parent = stack.peek(); addTransactionChild(message, parent); } } public void add(Message message) { if (stack.isEmpty()) { MessageTree tree = this.tree.copy(); tree.setMessage(message); flush(tree, true); } else { Transaction parent = stack.peek(); addTransactionChild(message, parent); } }Cat接口中logMetricForCount方法如下,与其他接口类似,先检查初始化,然后获取实例添加测量埋点。
public static void logMetricForCount(String name, int quantity, Map<String, String> tags) { if (isEnabled()) { // 初始化 checkAndInitialize(); try { // 获取Metric聚合实例,添加测量点 MetricTagAggregator.getInstance().addCountMetric(name, quantity, tags); } catch (Exception e) { // 记录异常 errorHandler(e); } } }com.dianping.cat.analyzer.MetricTagAggregator#addCountMetric()方法的实现如下
校验并获取MetricTagItem:MetricTagItem是针对业务指标的计数类,原始计数存储在其内部 metrics是Aggregator中的一个线程安全Map,用来保存用户指定的name与计数tag直接的关系 对MetricTagItem执行Count加一操作 public void addCountMetric(String name, int quantity, Map<String, String> tags) { MetricTagItem metricTagItem = makeSureMetricExist(name, tags, metrics); addCountMetric(quantity, metricTagItem); } private void addCountMetric(int quantity, MetricTagItem metricTagItem) { // 添加方法很简单,原子类直接自增 metricTagItem.count.addAndGet(quantity); } private MetricTagItem makeSureMetricExist(String key, Map<String, String> tags, Map<String, Map<String, MetricTagItem>> metrics) { // 从<name,<tagkey, Item>>的Map中,取<tagkey, Item> Map<String, MetricTagItem> items = metrics.get(key); // itemsmap不存在,双重检查并创建,但是个人认为这个双重检查有点多余 if (null == items) { synchronized (this) { items = metrics.get(key); if (null == items) { items = new ConcurrentHashMap<String, MetricTagItem>(); metrics.put(key, items); } } } // 根据插入的tagsMap拼装key,格式如name=zhangsan&age=20,空EMPTY String tagKey = buildTagKey(tags); // 获取计数类 MetricTagItem item = items.get(tagKey); if (null == item) { // 一个name下的metric最多不超过1000 if (items.size() >= MAX_KEY_SIZE) { Cat.logEvent("cat.TooManyTagValuesForMetric", key); tagKey = OTHERS; } // 双重检查,创建item类 item = items.get(tagKey); if (null == item) { synchronized (this) { item = items.get(tagKey); if (null == item) { item = createMetricItem(tagKey); items.put(tagKey, item); } } } } return item; } private MetricTagItem createMetricItem(String key) { // 创建一个item MetricTagItem item = new MetricTagItem(); item.setKey(key); // metric的门限是用户自定义设置的 // com.dianping.cat.analyzer.MetricTagAggregator#setMetricSlowThreshold Integer threshold = metricThresholds.get(key); if (threshold != null) { item.setSlowThreshold(threshold); } return item; }该方法用于记录时段内的平均值,与5.1的主要区别在于计数方式不同
public void addTimerMetric(String name, long durationInMillis, Map<String, String> tags) { MetricTagItem item = makeSureMetricExist(name, tags, metrics); addTimerMetric(durationInMillis, item); } private void addTimerMetric(long durationInMillis, MetricTagItem item) { // 记录数量和时间 item.count.incrementAndGet(); item.sum.addAndGet(durationInMillis); if (item.slowThreshold > 0 && durationInMillis > item.slowThreshold) { item.slowCount.incrementAndGet(); } }注: Metric以及下面的Error发送并不是通过当前线程上下文的消息树发送。而是通过统一的线程采集记录在Aggregator中的数据,拼装发送。下一节为详细介绍
Cat中的logError接口不再赘述,与其他Message接口类似
public static void logError(String message, Throwable cause) { if (isEnabled()) { try { Cat.getProducer().logError(message, cause); } catch (Exception e) { errorHandler(e); } } }直接进入producer逻辑:
判断同一个异常是否已经存在与ctx上下文中,上下文会保存一个异常的集合,记录产生的异常。该异常集合在ctx reset时完成清空重置。因为buildStackInfo需要IO操作,所以要避免重复,减少IO。根据异常构建detailMessage的内容通过logEvent记录异常事件 public void logError(String message, Throwable cause) { if (notExsitCause(cause)) { String detailMessage = buildStackInfo(message, cause); final String name = cause.getClass().getName(); if (cause instanceof Error) { logEvent("Error", name, ERROR, detailMessage); } else if (cause instanceof RuntimeException) { logEvent("RuntimeException", name, ERROR, detailMessage); } else { logEvent("Exception", name, ERROR, detailMessage); } } }buildStackInfo方法中,通过流写入数据。并记录异常的次数,以分钟为单位,最多记录80条。超过80条,则无法记录。 这个缓存记录会通过StatusTask管理
private String buildStackInfo(String message, Throwable cause) { StringWriter writer = new StringWriter(2048); if (message != null) { // 把message写到writer中,message是异常描述 writer.write(message); writer.write(' '); } if (recordStackTrace(cause.getClass().getName())) { // when build stack, cat will report the message tree. Cat.getManager().getThreadLocalMessageTree().setDiscardPrivate(false); cause.printStackTrace(new PrintWriter(writer)); } else { writer.write("The exception with same name will print stack eighty times in one minute, discard exception stack to avoid abnormal performance bottleneck"); } return writer.toString(); } // 这个逻辑类似complete标记transaction时的recordProblem逻辑 private boolean recordStackTrace(String exception) { try { long minute = System.currentTimeMillis() / 1000 / 60; ConcurrentHashMap<String, AtomicInteger> stack = DefaultMessageProducer.stack.get(minute); if (stack == null) { stack = new ConcurrentHashMap<String, AtomicInteger>(); ConcurrentHashMap<String, AtomicInteger> oldStack = DefaultMessageProducer.stack.putIfAbsent(minute, stack); if (oldStack != null) { stack = oldStack; } } AtomicInteger value = stack.get(exception); if (value == null) { value = new AtomicInteger(0); AtomicInteger oldValue = stack.putIfAbsent(exception, value); if (oldValue != null) { value = oldValue; } } // 大于当前分钟80不记录 return value.incrementAndGet() <= 80; } catch (Exception e) { return true; } }errorHandler方法在Cat客户端中用于记录客户端自身的报错。方法中会判断报错次数,限制次数小于3时记录。 且,errorCount是Cat中的静态变量,没有重置方法,因此一个客户端最多记录3次自身异常。
private static void errorHandler(Exception e) { if (isEnabled() && errorCount < 3) { // errorCount是 errorCount++; CatLogger.getInstance().error(e.getMessage(), e); } }CatLogger的error方法,最终会调用out方法。 该方法会落盘,修改/data/applogs/cat 目录下的cat_client_{0,date,yyyyMMdd}.log 文件。 因此,考虑要控制次数,并判断是否是开发模式
private void out(String severity, String message, Throwable throwable) { lock.lock(); try { String timedMessage = formatMessage(severity, message); if (devMode) { console(timedMessage); if (throwable != null) { throwable.printStackTrace(System.out); } } else { try { BufferedWriter writer = getWriter(); if (writer != null) { writer.write(timedMessage); writer.newLine(); if (throwable != null) { throwable.printStackTrace(new PrintWriter(writer)); } writer.flush(); } } catch (Exception e) { console(formatMessage("ERROR", timedMessage + e.toString())); } } } finally { lock.unlock(); } }目前,互联网服务大多是分布式的,因此跨服务的远程调用,Cat也提供了响应的监控方式。对于分布式追踪的原理,可以参考Google Dapper论文,此处不再赘述。 只需要了解,Cat的MessageTree支持设置
RootMessageId 消息入口服务产生的Id,整个链路透传,可以标记一整个链路ParentMessageId 上游服务/调用者产生的IdMessageId 下游服务/服务提供者产生的Id,当前消息Id通过三种MessageId即可串联整个链路 例如, 一个调用方(client)通过Rest方式调用一个服务提供者(Server)的方法式,可能需要设置一个拦截器,记录相关Id,进行埋点
/** * 调用方伪代码 */ // 创建Transaction,准备记录调用时间 Transaction t = Cat.newTransaction(CatConstants.TYPE_CALL, request.getURI().toString()); try { HttpHeaders headers = request.getHeaders(); // 通过HttpHeader,保存和传递CAT调用链上下文 Context ctx = new CatContext(); // 执行客户端埋点 Cat.logRemoteCallClient(ctx); headers.add(CatHttpConstants.CAT_HTTP_HEADER_ROOT_MESSAGE_ID, ctx.getProperty(Cat.Context.ROOT)); headers.add(CatHttpConstants.CAT_HTTP_HEADER_PARENT_MESSAGE_ID, ctx.getProperty(Cat.Context.PARENT)); headers.add(CatHttpConstants.CAT_HTTP_HEADER_CHILD_MESSAGE_ID, ctx.getProperty(Cat.Context.CHILD)); // 保证请求继续被执行 ClientHttpResponse response = execution.execute(request, body); t.setStatus(Transaction.SUCCESS); return response; } catch (Exception e) { Cat.getProducer().logError(e); t.setStatus(e); throw e; } finally { // 标记Transaction完成 t.complete(); }服务端对消息的记录伪代码如下:
// 从header中读数据 CatContext catContext = new CatContext(); catContext.addProperty(Cat.Context.ROOT, request.getHeader(CatHttpConstants.CAT_HTTP_HEADER_ROOT_MESSAGE_ID)); catContext.addProperty(Cat.Context.PARENT, request.getHeader(CatHttpConstants.CAT_HTTP_HEADER_PARENT_MESSAGE_ID)); catContext.addProperty(Cat.Context.CHILD, request.getHeader(CatHttpConstants.CAT_HTTP_HEADER_CHILD_MESSAGE_ID)); // 记录server端信息 Cat.logRemoteCallServer(catContext); Transaction t = Cat.newTransaction(CatConstants.TYPE_SERVICE, url); try { // 执行servletFilter,完成调用链 Cat.logEvent("Service.method", request.getMethod(), Message.SUCCESS, request.getRequestURL().toString()); Cat.logEvent("Service.client", request.getRemoteHost()); filterChain.doFilter(servletRequest, servletResponse); t.setStatus(Transaction.SUCCESS); } catch (Exception ex) { t.setStatus(ex); Cat.logError(ex); throw ex; } finally { t.complete(); }com.dianping.cat.Cat#logRemoteCallClient()用于在客户端侧记录埋点数据,主要逻辑为
从线程上下文读取对应的MessageTree获取/创建rootId,MessageId创建RpcServerId时,要传入服务端的domain,不传默认default。但是,若同一ip的client机器上存在多个应用,都调用了一个远程调用服务。那么此时id可能重复设置Cat.Context上下文,保存Id public static void logRemoteCallClient(Context ctx, String domain) { if (isEnabled()) { try { MessageTree tree = Cat.getManager().getThreadLocalMessageTree(); String messageId = tree.getMessageId(); if (messageId == null) { messageId = Cat.getProducer().createMessageId(); tree.setMessageId(messageId); } // 根据server的domain创建MessageId,MessageId的创建之前介绍过了,默认default String childId = Cat.getProducer().createRpcServerId(domain); Cat.logEvent(CatConstants.TYPE_REMOTE_CALL, "", Event.SUCCESS, childId); String root = tree.getRootMessageId(); if (root == null) { root = messageId; } ctx.addProperty(Context.ROOT, root); ctx.addProperty(Context.PARENT, messageId); ctx.addProperty(Context.CHILD, childId); } catch (Exception e) { errorHandler(e); } } }Cat中包含一个内部接口Context,用来跨进程时保存MessageId,为tree串联远程调用的消息。 Context接口的两个方法可以自定义实现来决定如何保存三种Id。,例如最简单的用Map来存储
public interface Context { String ROOT = "_catRootMessageId"; String PARENT = "_catParentMessageId"; String CHILD = "_catChildMessageId"; String DISCARD = "_catDiscard"; void addProperty(String key, String value); String getProperty(String key); } public class CatContext implements Cat.Context { private Map<String, String> properties = new HashMap<>(); @Override public void addProperty(String key, String value) { properties.put(key, value); } @Override public String getProperty(String key) { return properties.get(key); } }在服务端记录埋点信息,将ctx中的三种Id拿出并设置到MessageTree中
public static void logRemoteCallServer(Context ctx) { if (isEnabled()) { try { MessageTree tree = Cat.getManager().getThreadLocalMessageTree(); String childId = ctx.getProperty(Context.CHILD); String rootId = ctx.getProperty(Context.ROOT); String parentId = ctx.getProperty(Context.PARENT); if (parentId != null) { tree.setParentMessageId(parentId); } if (rootId != null) { tree.setRootMessageId(rootId); } if (childId != null) { tree.setMessageId(childId); } } catch (Exception e) { errorHandler(e); } } }以上是CAT客户端API的源码介绍,下一篇将介绍客户端与服务端的通信、消息上报逻辑
