监控总体图:
红色:监控中心 - dubbo-simple-monitor
黄色:provider
蓝色:consumer
统计总体流程:
- MonitorFilter向DubboMonitor发送数据
- DubboMonitor将数据进行聚合后(默认聚合1min中的统计数据)暂存到ConcurrentMap<Statistics, AtomicReference<long[]>> statisticsMap,然后使用一个含有3个线程(线程名字:DubboMonitorSendTimer)的线程池每隔1min钟,调用SimpleMonitorService遍历发送statisticsMap中的统计数据,每发送完毕一个,就重置当前的Statistics的AtomicReference<long[]>
- SimpleMonitorService将这些聚合数据塞入BlockingQueue<URL> queue中(队列大写为100000)
- SimpleMonitorService使用一个后台线程(线程名为:DubboMonitorAsyncWriteLogThread)将queue中的数据写入文件(该线程以死循环的形式来写)
- SimpleMonitorService还会使用一个含有1个线程(线程名字:DubboMonitorTimer)的线程池每隔5min钟,将文件中的统计数据画成图表
注意:
- SimpleMonitorService理解为一个服务提供者;而provider和consumer都是一个服务消费者,所以二者的DubboMonitor中的MonitorService实例都是一个代理实例。
- dubbo-monitor计数监控不支持异步调用下的数据监控
一、dubbo-monitor使用
在配置文件中添加:
1
即开启了monitor监控,并且指定了监控中心服务器为“10.211.55.5:9090”。
9090端口是Prometheus的默认端口,dubbo提供的监控中心比较简陋,我们后续会使用Prometheus作为监控中心来存储监控数据。
二、服务端加载monitor配置
doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)中:
1 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { 2 ... 3 if (registryURLs != null && registryURLs.size() > 0) { 4 for (URL registryURL : registryURLs) { 5 ... 6 URL monitorUrl = loadMonitor(registryURL); 7 if (monitorUrl != null) { 8 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); 9 }10 ...11 }12 } else {13 ...14 }15 }
其中loadMonitor(URL registryURL)方法主要用于创建MonitorConfig对象(如果monitor配置在dubbo.properties中的话),并且设置属性,之后设置到数据总线Url中。
1 protected URL loadMonitor(URL registryURL) { 2 if (monitor == null) { 3 String monitorAddress = ConfigUtils.getProperty("dubbo.monitor.address"); 4 String monitorProtocol = ConfigUtils.getProperty("dubbo.monitor.protocol"); 5 if ((monitorAddress == null || monitorAddress.length() == 0) && (monitorProtocol == null || monitorProtocol.length() == 0)) { 6 return null; 7 } 8 9 monitor = new MonitorConfig();10 if (monitorAddress != null && monitorAddress.length() > 0) {11 monitor.setAddress(monitorAddress);12 }13 if (monitorProtocol != null && monitorProtocol.length() > 0) {14 monitor.setProtocol(monitorProtocol);15 }16 }17 appendProperties(monitor);18 ...19 }
三、消费端加载monitor配置
createProxy(Map<String, String> map)中:
1 Listus = loadRegistries(false); 2 if (us != null && us.size() > 0) { 3 for (URL u : us) { 4 URL monitorUrl = loadMonitor(u); 5 if (monitorUrl != null) { 6 map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); 7 } 8 ... 9 }10 }
四、MonitorFilter收集监控数据
consumer端在发起调用之前会先走filter链;provider端在接收到请求时也是先走filter链,然后才进行真正的业务逻辑处理。默认情况下,在consumer和provider的filter链中都会有Monitorfilter。
1 /** 2 * MonitorFilter. (SPI, Singleton, ThreadSafe) 3 */ 4 @Activate(group = {Constants.PROVIDER, Constants.CONSUMER}) 5 public class MonitorFilter implements Filter { 6 7 private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class); 8 9 // key: 接口名.方法名 value: 当前的并发数 10 private final ConcurrentMapconcurrents = new ConcurrentHashMap (); 11 12 private MonitorFactory monitorFactory;// MonitorFactory$Adaptive 13 14 public void setMonitorFactory(MonitorFactory monitorFactory) { 15 this.monitorFactory = monitorFactory; 16 } 17 18 // intercepting invocation 19 public Result invoke(Invoker invoker, Invocation invocation) throws RpcException { 20 if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) { // 开启了monitor监控 21 RpcContext context = RpcContext.getContext(); // provider must fetch context before invoke() gets called 22 String remoteHost = context.getRemoteHost(); 23 long start = System.currentTimeMillis(); // record start timestamp 24 getConcurrent(invoker, invocation).incrementAndGet(); // 并发数+1 25 try { 26 Result result = invoker.invoke(invocation); // proceed invocation chain 27 collect(invoker, invocation, result, remoteHost, start, false);// 收集统计数据 28 return result; 29 } catch (RpcException e) { 30 collect(invoker, invocation, null, remoteHost, start, true);// 发生异常时收集统计数据 31 throw e; 32 } finally { 33 getConcurrent(invoker, invocation).decrementAndGet(); // 并发数-1 34 } 35 } else { 36 return invoker.invoke(invocation); 37 } 38 } 39 40 // collect info 41 private void collect(Invoker invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) { 42 try { 43 // ---- service statistics ---- 44 long elapsed = System.currentTimeMillis() - start; // 此次调用花费的时间 45 int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count 46 String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY); 47 String service = invoker.getInterface().getName(); // service name 48 String method = RpcUtils.getMethodName(invocation); // method name 49 URL url = invoker.getUrl().getUrlParameter(Constants.MONITOR_KEY); 50 Monitor monitor = monitorFactory.getMonitor(url);//根据monitorUrl获取Monitor实现(默认使用DubboMonitor) 51 if (monitor == null) { 52 return; 53 } 54 int localPort; 55 String remoteKey; 56 String remoteValue; 57 if (Constants.CONSUMER_SIDE.equals(invoker.getUrl().getParameter(Constants.SIDE_KEY))) { 58 // ---- for service consumer ---- 59 localPort = 0; 60 remoteKey = MonitorService.PROVIDER; 61 remoteValue = invoker.getUrl().getAddress(); 62 } else { 63 // ---- for service provider ---- 64 localPort = invoker.getUrl().getPort(); 65 remoteKey = MonitorService.CONSUMER; 66 remoteValue = remoteHost; 67 } 68 String input = "", output = ""; 69 if (invocation.getAttachment(Constants.INPUT_KEY) != null) { 70 input = invocation.getAttachment(Constants.INPUT_KEY); 71 } 72 if (result != null && result.getAttachment(Constants.OUTPUT_KEY) != null) { 73 output = result.getAttachment(Constants.OUTPUT_KEY); 74 } 75 monitor.collect(new URL(Constants.COUNT_PROTOCOL, 76 NetUtils.getLocalHost(), localPort, 77 service + "/" + method, 78 MonitorService.APPLICATION, application, 79 MonitorService.INTERFACE, service, 80 MonitorService.METHOD, method, 81 remoteKey, remoteValue, 82 error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1",// 成功失败数 83 MonitorService.ELAPSED, String.valueOf(elapsed),// 调用消耗的时间 84 MonitorService.CONCURRENT, String.valueOf(concurrent),// 并发数 85 Constants.INPUT_KEY, input, 86 Constants.OUTPUT_KEY, output)); 87 } catch (Throwable t) { 88 logger.error("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t); 89 } 90 } 91 92 // concurrent counter 93 private AtomicInteger getConcurrent(Invoker invoker, Invocation invocation) { 94 String key = invoker.getInterface().getName() + "." + invocation.getMethodName(); 95 AtomicInteger concurrent = concurrents.get(key); 96 if (concurrent == null) { 97 concurrents.putIfAbsent(key, new AtomicInteger()); 98 concurrent = concurrents.get(key); 99 }100 return concurrent;101 }102 103 }
调用之前,记录调用开始时间、并发数,之后进行调用,最后进行统计数据收集:
- 获取计算各种统计数据(调用消耗时间、调用成功/错误数等)
- 使用MonitorFactory获取Monitor
- 将统计数据构造成url
- 使用Monitor收集这些统计数据
获取Monitor的源码后续再说。这里获取到的是DubboMonitor实例。
五、DubboMonitor聚合监控数据
1 private static final int LENGTH = 10; 2 private final ConcurrentMap> statisticsMap = new ConcurrentHashMap >(); 3 4 // 聚合统计数据 5 public void collect(URL url) { 6 // data to collect from url 7 int success = url.getParameter(MonitorService.SUCCESS, 0); 8 int failure = url.getParameter(MonitorService.FAILURE, 0); 9 int input = url.getParameter(MonitorService.INPUT, 0);10 int output = url.getParameter(MonitorService.OUTPUT, 0);11 int elapsed = url.getParameter(MonitorService.ELAPSED, 0);12 int concurrent = url.getParameter(MonitorService.CONCURRENT, 0);13 // init atomic reference14 Statistics statistics = new Statistics(url);15 AtomicReference reference = statisticsMap.get(statistics);16 if (reference == null) {17 statisticsMap.putIfAbsent(statistics, new AtomicReference ());18 reference = statisticsMap.get(statistics);19 }20 // use CompareAndSet to sum21 long[] current;22 long[] update = new long[LENGTH];23 do {24 current = reference.get();25 if (current == null) {26 update[0] = success;27 update[1] = failure;28 update[2] = input;29 update[3] = output;30 update[4] = elapsed;31 update[5] = concurrent;32 update[6] = input;33 update[7] = output;34 update[8] = elapsed;35 update[9] = concurrent;36 } else {37 update[0] = current[0] + success;38 update[1] = current[1] + failure;39 update[2] = current[2] + input;40 update[3] = current[3] + output;41 update[4] = current[4] + elapsed;42 update[5] = (current[5] + concurrent) / 2;43 update[6] = current[6] > input ? current[6] : input;44 update[7] = current[7] > output ? current[7] : output;45 update[8] = current[8] > elapsed ? current[8] : elapsed;46 update[9] = current[9] > concurrent ? current[9] : concurrent;47 }48 } while (!reference.compareAndSet(current, update));49 }
实际上这里聚合了1min钟的统计数据到statisticsMap中。
六、Monitor使用MonitorService存储数据到队列
1 private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("DubboMonitorSendTimer", true)); 2 private final ScheduledFuture sendFuture; 3 private final long monitorInterval; 4 5 public DubboMonitor(InvokermonitorInvoker, MonitorService monitorService) { 6 this.monitorInvoker = monitorInvoker; 7 this.monitorService = monitorService; 8 this.monitorInterval = monitorInvoker.getUrl().getPositiveParameter("interval", 60000); 9 // collect timer for collecting statistics data10 sendFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {11 public void run() {12 // collect data13 try {14 send();15 } catch (Throwable t) {16 logger.error("Unexpected error occur at send statistic, cause: " + t.getMessage(), t);17 }18 }19 }, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);20 }
1 public void send() { 2 if (logger.isInfoEnabled()) { 3 logger.info("Send statistics to monitor " + getUrl()); 4 } 5 String timestamp = String.valueOf(System.currentTimeMillis()); 6 for (Map.Entry> entry : statisticsMap.entrySet()) { 7 // get statistics data 8 Statistics statistics = entry.getKey(); 9 AtomicReference reference = entry.getValue();10 long[] numbers = reference.get();11 long success = numbers[0];12 long failure = numbers[1];13 long input = numbers[2];14 long output = numbers[3];15 long elapsed = numbers[4];16 long concurrent = numbers[5];17 long maxInput = numbers[6];18 long maxOutput = numbers[7];19 long maxElapsed = numbers[8];20 long maxConcurrent = numbers[9];21 22 // send statistics data23 URL url = statistics.getUrl()24 .addParameters(MonitorService.TIMESTAMP, timestamp,25 MonitorService.SUCCESS, String.valueOf(success),26 MonitorService.FAILURE, String.valueOf(failure),27 MonitorService.INPUT, String.valueOf(input),28 MonitorService.OUTPUT, String.valueOf(output),29 MonitorService.ELAPSED, String.valueOf(elapsed),30 MonitorService.CONCURRENT, String.valueOf(concurrent),31 MonitorService.MAX_INPUT, String.valueOf(maxInput),32 MonitorService.MAX_OUTPUT, String.valueOf(maxOutput),33 MonitorService.MAX_ELAPSED, String.valueOf(maxElapsed),34 MonitorService.MAX_CONCURRENT, String.valueOf(maxConcurrent)35 );36 monitorService.collect(url);37 38 // reset39 long[] current;40 long[] update = new long[LENGTH];41 do {42 current = reference.get();43 if (current == null) {44 update[0] = 0;45 update[1] = 0;46 update[2] = 0;47 update[3] = 0;48 update[4] = 0;49 update[5] = 0;50 } else {51 update[0] = current[0] - success;52 update[1] = current[1] - failure;53 update[2] = current[2] - input;54 update[3] = current[3] - output;55 update[4] = current[4] - elapsed;56 update[5] = current[5] - concurrent;57 }58 } while (!reference.compareAndSet(current, update));59 }60 }
- 首先从聚合数据存储器statisticsMap中获取相关统计数据并存储到数据总线Url中
- 之后调用MonitorService(这里是SimpleMonitorService),将统计数据存储到一个BlockingQueue中
注意:这里有一个改进点
- 由于monitorService.collect(url)是远程调用,这里在for循环体中执行远程调用,实际上是不合适的,我们可以将所有的url先暂存在一个List<URL>中,最后,使用一次monitorService.collect(urlList)即可 - 此时,可适当缩短数据发送时间。
SimpleMonitorService:
1 private final BlockingQueuequeue;2 3 public void collect(URL statistics) {4 queue.offer(statistics);5 if (logger.isInfoEnabled()) {6 logger.info("collect statistics: " + statistics);7 }8 }
七、MonitorService将数据写入本地文件
1 private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboMonitorTimer", true)); 2 private final ScheduledFuture chartFuture; 3 private final Thread writeThread; 4 private final BlockingQueuequeue; 5 private String statisticsDirectory = "statistics"; 6 private String chartsDirectory = "charts"; 7 private volatile boolean running = true; 8 9 public SimpleMonitorService() {10 queue = new LinkedBlockingQueue (Integer.parseInt(ConfigUtils.getProperty("dubbo.monitor.queue", "100000")));11 writeThread = new Thread(new Runnable() {12 public void run() {13 while (running) {14 try {15 write(); // write statistics16 } catch (Throwable t) {17 logger.error("Unexpected error occur at write stat log, cause: " + t.getMessage(), t);18 try {19 Thread.sleep(5000); // retry after 5 secs20 } catch (Throwable t2) {21 }22 }23 }24 }25 });26 writeThread.setDaemon(true);27 writeThread.setName("DubboMonitorAsyncWriteLogThread");28 writeThread.start();29 chartFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {30 public void run() {31 try {32 draw(); // draw chart33 } catch (Throwable t) {34 logger.error("Unexpected error occur at draw stat chart, cause: " + t.getMessage(), t);35 }36 }37 }, 1, 300, TimeUnit.SECONDS);38 statisticsDirectory = ConfigUtils.getProperty("dubbo.statistics.directory");39 chartsDirectory = ConfigUtils.getProperty("dubbo.charts.directory");40 }
write()将统计数据写入文件,draw()将统计数据画成图片。这两种方式在实际使用中都不会用到。
最后来看一下获取Monitor实例的过程(帮助我们开发自定义的Monitor):
1 Monitor monitor = monitorFactory.getMonitor(url);
MonitorFilter中的monitorFactory实例是:MonitorFactory$Adaptive。
1 package com.alibaba.dubbo.monitor; 2 3 import com.alibaba.dubbo.common.extension.ExtensionLoader; 4 5 public class MonitorFactory$Adaptive implements com.alibaba.dubbo.monitor.MonitorFactory { 6 public com.alibaba.dubbo.monitor.Monitor getMonitor(com.alibaba.dubbo.common.URL arg0) { 7 if (arg0 == null) throw new IllegalArgumentException("url == null"); 8 com.alibaba.dubbo.common.URL url = arg0; 9 String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());10 if (extName == null)11 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.monitor.MonitorFactory) name from url(" + url.toString() + ") use keys([protocol])");12 com.alibaba.dubbo.monitor.MonitorFactory extension = (com.alibaba.dubbo.monitor.MonitorFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.monitor.MonitorFactory.class).getExtension(extName);13 return extension.getMonitor(arg0);14 }15 }
首先,根据输入的url中的protocol来获取相关的MonitorFactory(这里protocol默认是dubbo,所以获取的是DubboMonitorFactory,可以通过指定dubbo:monitor标签中的protocol属性来自定义获取XxxMonitorFactory),之后调用DubboMonitorFactory.getMonitor(arg0)。该方法在其父类AbstractMonitorFactory中:
1 /** 2 * AbstractMonitorFactory. (SPI, Singleton, ThreadSafe) 3 */ 4 public abstract class AbstractMonitorFactory implements MonitorFactory { 5 private static final Logger logger = LoggerFactory.getLogger(AbstractMonitorFactory.class); 6 7 // lock for getting monitor center 8 private static final ReentrantLock LOCK = new ReentrantLock(); 9 10 // monitor centers Map11 private static final Map MONITORS = new ConcurrentHashMap ();12 13 private static final Map > FUTURES = new ConcurrentHashMap >();14 15 private static final ExecutorService executor = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue (), new NamedThreadFactory("DubboMonitorCreator", true));16 17 public static Collection getMonitors() {18 return Collections.unmodifiableCollection(MONITORS.values());19 }20 21 public Monitor getMonitor(URL url) {22 url = url.setPath(MonitorService.class.getName()).addParameter(Constants.INTERFACE_KEY, MonitorService.class.getName());23 String key = url.toServiceStringWithoutResolving();// dubbo://10.211.55.5:9090/com.alibaba.dubbo.monitor.MonitorService24 Monitor monitor = MONITORS.get(key);25 Future future = FUTURES.get(key);26 if (monitor != null || future != null) {27 return monitor;28 }29 30 LOCK.lock();31 try {32 monitor = MONITORS.get(key);33 future = FUTURES.get(key);34 if (monitor != null || future != null) {35 return monitor;36 }37 38 final URL monitorUrl = url;39 // 使用另外的线程MonitorCreator来创建Monitor实例(原因是:即使Monitor创建失败,也不会影响主流程)40 final ListenableFutureTask listenableFutureTask = ListenableFutureTask.create(new MonitorCreator(monitorUrl));41 listenableFutureTask.addListener(new MonitorListener(key));42 executor.execute(listenableFutureTask);43 FUTURES.put(key, listenableFutureTask);44 45 return null;46 } finally {47 // unlock48 LOCK.unlock();49 }50 }51 52 protected abstract Monitor createMonitor(URL url);53 54 // Callable和Runnable一样,也是创建一个线程去执行,只是Callable有返回值(T call()),而Runnable无返回值(void run())55 class MonitorCreator implements Callable {56 57 private URL url;58 59 public MonitorCreator(URL url) {60 this.url = url;61 }62 63 @Override64 public Monitor call() throws Exception {65 Monitor monitor = AbstractMonitorFactory.this.createMonitor(url);//调用子类的createMonitor方法创建Monitor66 return monitor;67 }68 }69 70 class MonitorListener implements Runnable {71 72 private String key;73 74 public MonitorListener(String key) {75 this.key = key;76 }77 // listenableFutureTask一旦isDone()完成(正常完成、抛出异常、被中断等),就会立即执行该方法78 @Override79 public void run() {80 try {81 ListenableFuture listenableFuture = AbstractMonitorFactory.FUTURES.get(key);82 AbstractMonitorFactory.MONITORS.put(key, listenableFuture.get());83 AbstractMonitorFactory.FUTURES.remove(key);84 } catch (InterruptedException e) {85 logger.warn("Thread was interrupted unexpectedly, monitor will never be got.");86 AbstractMonitorFactory.FUTURES.remove(key);87 } catch (ExecutionException e) {88 logger.warn("Create monitor failed, monitor data will not be collected until you fix this problem. ", e);89 }90 }91 }92 }
来看DubboMonitorFactory.createMonitor(url):
1 /** 2 * DefaultMonitorFactory 3 */ 4 public class DubboMonitorFactory extends AbstractMonitorFactory { 5 6 private Protocol protocol; 7 8 private ProxyFactory proxyFactory; 9 10 public void setProtocol(Protocol protocol) {11 this.protocol = protocol;12 }13 14 public void setProxyFactory(ProxyFactory proxyFactory) {15 this.proxyFactory = proxyFactory;16 }17 18 @Override19 protected Monitor createMonitor(URL url) {20 url = url.setProtocol(url.getParameter(Constants.PROTOCOL_KEY, "dubbo"));21 if (url.getPath() == null || url.getPath().length() == 0) {22 url = url.setPath(MonitorService.class.getName());23 }24 String filter = url.getParameter(Constants.REFERENCE_FILTER_KEY);25 if (filter == null || filter.length() == 0) {26 filter = "";27 } else {28 filter = filter + ",";29 }30 url = url.addParameters(Constants.CLUSTER_KEY, "failsafe", Constants.CHECK_KEY, String.valueOf(false),31 Constants.REFERENCE_FILTER_KEY, filter + "-monitor");32 // 创建Invoker,内部会构造与MonitorService实现类SimpleMonitorService所在的检测中心dubbo-simple-monitor的长连接33 Invoker monitorInvoker = protocol.refer(MonitorService.class, url);34 // 获取MonitorService的代理35 MonitorService monitorService = proxyFactory.getProxy(monitorInvoker);36 return new DubboMonitor(monitorInvoker, monitorService);37 }38 39 }
注意:这里的SimpleMonitorService其实会部署在dubbo-simple-monitor中,被provider和consumer中的DubboMonitor调用。