博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
第十八章 dubbo-monitor计数监控
阅读量:6234 次
发布时间:2019-06-22

本文共 23955 字,大约阅读时间需要 79 分钟。

监控总体图:

红色:监控中心 -  dubbo-simple-monitor

黄色:provider

蓝色:consumer

统计总体流程

  • MonitorFilter向DubboMonitor发送数据
  • DubboMonitor将数据进行聚合后(默认聚合1min中的统计数据)暂存到ConcurrentMap<Statistics, AtomicReference<long[]>> statisticsMap,然后使用一个含有3个线程(线程名字:DubboMonitorSendTimer)的线程池每隔1min钟,调用SimpleMonitorService遍历发送statisticsMap中的统计数据,每发送完毕一个,就重置当前的Statistics的AtomicReference<long[]>
  • SimpleMonitorService将这些聚合数据塞入BlockingQueue<URL> queue中(队列大写为100000)
  • SimpleMonitorService使用一个后台线程(线程名为:DubboMonitorAsyncWriteLogThread)将queue中的数据写入文件(该线程以死循环的形式来写)
  • SimpleMonitorService还会使用一个含有1个线程(线程名字:DubboMonitorTimer)的线程池每隔5min钟,将文件中的统计数据画成图表

注意:

  • SimpleMonitorService理解为一个服务提供者;而provider和consumer都是一个服务消费者,所以二者的DubboMonitor中的MonitorService实例都是一个代理实例。
  • dubbo-monitor计数监控不支持异步调用下的数据监控

 

一、dubbo-monitor使用

在配置文件中添加:

1 

即开启了monitor监控,并且指定了监控中心服务器为“10.211.55.5:9090”。

9090端口是Prometheus的默认端口,dubbo提供的监控中心比较简陋,我们后续会使用Prometheus作为监控中心来存储监控数据。

 

二、服务端加载monitor配置

doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)中:

1             if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { 2                 ... 3                 if (registryURLs != null && registryURLs.size() > 0) { 4                     for (URL registryURL : registryURLs) { 5                         ... 6                         URL monitorUrl = loadMonitor(registryURL); 7                         if (monitorUrl != null) { 8                             url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); 9                         }10                        ...11                     }12                 } else {13                     ...14                 }15             }

 其中loadMonitor(URL registryURL)方法主要用于创建MonitorConfig对象(如果monitor配置在dubbo.properties中的话),并且设置属性,之后设置到数据总线Url中。

1     protected URL loadMonitor(URL registryURL) { 2         if (monitor == null) { 3             String monitorAddress = ConfigUtils.getProperty("dubbo.monitor.address"); 4             String monitorProtocol = ConfigUtils.getProperty("dubbo.monitor.protocol"); 5             if ((monitorAddress == null || monitorAddress.length() == 0) && (monitorProtocol == null || monitorProtocol.length() == 0)) { 6                 return null; 7             } 8  9             monitor = new MonitorConfig();10             if (monitorAddress != null && monitorAddress.length() > 0) {11                 monitor.setAddress(monitorAddress);12             }13             if (monitorProtocol != null && monitorProtocol.length() > 0) {14                 monitor.setProtocol(monitorProtocol);15             }16         }17         appendProperties(monitor);18         ...19     }

 

三、消费端加载monitor配置

createProxy(Map<String, String> map)中:

1                 List
us = loadRegistries(false); 2 if (us != null && us.size() > 0) { 3 for (URL u : us) { 4 URL monitorUrl = loadMonitor(u); 5 if (monitorUrl != null) { 6 map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); 7 } 8 ... 9 }10 }

 

四、MonitorFilter收集监控数据

consumer端在发起调用之前会先走filter链;provider端在接收到请求时也是先走filter链,然后才进行真正的业务逻辑处理。默认情况下,在consumer和provider的filter链中都会有Monitorfilter。

1 /**  2  * MonitorFilter. (SPI, Singleton, ThreadSafe)  3  */  4 @Activate(group = {Constants.PROVIDER, Constants.CONSUMER})  5 public class MonitorFilter implements Filter {  6   7     private static final Logger logger = LoggerFactory.getLogger(MonitorFilter.class);  8   9     // key: 接口名.方法名 value: 当前的并发数 10     private final ConcurrentMap
concurrents = new ConcurrentHashMap
(); 11 12 private MonitorFactory monitorFactory;// MonitorFactory$Adaptive 13 14 public void setMonitorFactory(MonitorFactory monitorFactory) { 15 this.monitorFactory = monitorFactory; 16 } 17 18 // intercepting invocation 19 public Result invoke(Invoker
invoker, Invocation invocation) throws RpcException { 20 if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {
// 开启了monitor监控 21 RpcContext context = RpcContext.getContext(); // provider must fetch context before invoke() gets called 22 String remoteHost = context.getRemoteHost(); 23 long start = System.currentTimeMillis(); // record start timestamp 24 getConcurrent(invoker, invocation).incrementAndGet(); // 并发数+1 25 try { 26 Result result = invoker.invoke(invocation); // proceed invocation chain 27 collect(invoker, invocation, result, remoteHost, start, false);// 收集统计数据 28 return result; 29 } catch (RpcException e) { 30 collect(invoker, invocation, null, remoteHost, start, true);// 发生异常时收集统计数据 31 throw e; 32 } finally { 33 getConcurrent(invoker, invocation).decrementAndGet(); // 并发数-1 34 } 35 } else { 36 return invoker.invoke(invocation); 37 } 38 } 39 40 // collect info 41 private void collect(Invoker
invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) { 42 try { 43 // ---- service statistics ---- 44 long elapsed = System.currentTimeMillis() - start; // 此次调用花费的时间 45 int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count 46 String application = invoker.getUrl().getParameter(Constants.APPLICATION_KEY); 47 String service = invoker.getInterface().getName(); // service name 48 String method = RpcUtils.getMethodName(invocation); // method name 49 URL url = invoker.getUrl().getUrlParameter(Constants.MONITOR_KEY); 50 Monitor monitor = monitorFactory.getMonitor(url);//根据monitorUrl获取Monitor实现(默认使用DubboMonitor) 51 if (monitor == null) { 52 return; 53 } 54 int localPort; 55 String remoteKey; 56 String remoteValue; 57 if (Constants.CONSUMER_SIDE.equals(invoker.getUrl().getParameter(Constants.SIDE_KEY))) { 58 // ---- for service consumer ---- 59 localPort = 0; 60 remoteKey = MonitorService.PROVIDER; 61 remoteValue = invoker.getUrl().getAddress(); 62 } else { 63 // ---- for service provider ---- 64 localPort = invoker.getUrl().getPort(); 65 remoteKey = MonitorService.CONSUMER; 66 remoteValue = remoteHost; 67 } 68 String input = "", output = ""; 69 if (invocation.getAttachment(Constants.INPUT_KEY) != null) { 70 input = invocation.getAttachment(Constants.INPUT_KEY); 71 } 72 if (result != null && result.getAttachment(Constants.OUTPUT_KEY) != null) { 73 output = result.getAttachment(Constants.OUTPUT_KEY); 74 } 75 monitor.collect(new URL(Constants.COUNT_PROTOCOL, 76 NetUtils.getLocalHost(), localPort, 77 service + "/" + method, 78 MonitorService.APPLICATION, application, 79 MonitorService.INTERFACE, service, 80 MonitorService.METHOD, method, 81 remoteKey, remoteValue, 82 error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1",// 成功失败数 83 MonitorService.ELAPSED, String.valueOf(elapsed),// 调用消耗的时间 84 MonitorService.CONCURRENT, String.valueOf(concurrent),// 并发数 85 Constants.INPUT_KEY, input, 86 Constants.OUTPUT_KEY, output)); 87 } catch (Throwable t) { 88 logger.error("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t); 89 } 90 } 91 92 // concurrent counter 93 private AtomicInteger getConcurrent(Invoker
invoker, Invocation invocation) { 94 String key = invoker.getInterface().getName() + "." + invocation.getMethodName(); 95 AtomicInteger concurrent = concurrents.get(key); 96 if (concurrent == null) { 97 concurrents.putIfAbsent(key, new AtomicInteger()); 98 concurrent = concurrents.get(key); 99 }100 return concurrent;101 }102 103 }

调用之前,记录调用开始时间、并发数,之后进行调用,最后进行统计数据收集:

  • 获取计算各种统计数据(调用消耗时间、调用成功/错误数等)
  • 使用MonitorFactory获取Monitor
  • 将统计数据构造成url
  • 使用Monitor收集这些统计数据

获取Monitor的源码后续再说。这里获取到的是DubboMonitor实例。

 

五、DubboMonitor聚合监控数据

1     private static final int LENGTH = 10; 2     private final ConcurrentMap
> statisticsMap = new ConcurrentHashMap
>(); 3 4 // 聚合统计数据 5 public void collect(URL url) { 6 // data to collect from url 7 int success = url.getParameter(MonitorService.SUCCESS, 0); 8 int failure = url.getParameter(MonitorService.FAILURE, 0); 9 int input = url.getParameter(MonitorService.INPUT, 0);10 int output = url.getParameter(MonitorService.OUTPUT, 0);11 int elapsed = url.getParameter(MonitorService.ELAPSED, 0);12 int concurrent = url.getParameter(MonitorService.CONCURRENT, 0);13 // init atomic reference14 Statistics statistics = new Statistics(url);15 AtomicReference
reference = statisticsMap.get(statistics);16 if (reference == null) {17 statisticsMap.putIfAbsent(statistics, new AtomicReference
());18 reference = statisticsMap.get(statistics);19 }20 // use CompareAndSet to sum21 long[] current;22 long[] update = new long[LENGTH];23 do {24 current = reference.get();25 if (current == null) {26 update[0] = success;27 update[1] = failure;28 update[2] = input;29 update[3] = output;30 update[4] = elapsed;31 update[5] = concurrent;32 update[6] = input;33 update[7] = output;34 update[8] = elapsed;35 update[9] = concurrent;36 } else {37 update[0] = current[0] + success;38 update[1] = current[1] + failure;39 update[2] = current[2] + input;40 update[3] = current[3] + output;41 update[4] = current[4] + elapsed;42 update[5] = (current[5] + concurrent) / 2;43 update[6] = current[6] > input ? current[6] : input;44 update[7] = current[7] > output ? current[7] : output;45 update[8] = current[8] > elapsed ? current[8] : elapsed;46 update[9] = current[9] > concurrent ? current[9] : concurrent;47 }48 } while (!reference.compareAndSet(current, update));49 }

实际上这里聚合了1min钟的统计数据到statisticsMap中。

 

六、Monitor使用MonitorService存储数据到队列

1     private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3, new NamedThreadFactory("DubboMonitorSendTimer", true)); 2     private final ScheduledFuture
sendFuture; 3 private final long monitorInterval; 4 5 public DubboMonitor(Invoker
monitorInvoker, MonitorService monitorService) { 6 this.monitorInvoker = monitorInvoker; 7 this.monitorService = monitorService; 8 this.monitorInterval = monitorInvoker.getUrl().getPositiveParameter("interval", 60000); 9 // collect timer for collecting statistics data10 sendFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {11 public void run() {12 // collect data13 try {14 send();15 } catch (Throwable t) {16 logger.error("Unexpected error occur at send statistic, cause: " + t.getMessage(), t);17 }18 }19 }, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);20 }
1     public void send() { 2         if (logger.isInfoEnabled()) { 3             logger.info("Send statistics to monitor " + getUrl()); 4         } 5         String timestamp = String.valueOf(System.currentTimeMillis()); 6         for (Map.Entry
> entry : statisticsMap.entrySet()) { 7 // get statistics data 8 Statistics statistics = entry.getKey(); 9 AtomicReference
reference = entry.getValue();10 long[] numbers = reference.get();11 long success = numbers[0];12 long failure = numbers[1];13 long input = numbers[2];14 long output = numbers[3];15 long elapsed = numbers[4];16 long concurrent = numbers[5];17 long maxInput = numbers[6];18 long maxOutput = numbers[7];19 long maxElapsed = numbers[8];20 long maxConcurrent = numbers[9];21 22 // send statistics data23 URL url = statistics.getUrl()24 .addParameters(MonitorService.TIMESTAMP, timestamp,25 MonitorService.SUCCESS, String.valueOf(success),26 MonitorService.FAILURE, String.valueOf(failure),27 MonitorService.INPUT, String.valueOf(input),28 MonitorService.OUTPUT, String.valueOf(output),29 MonitorService.ELAPSED, String.valueOf(elapsed),30 MonitorService.CONCURRENT, String.valueOf(concurrent),31 MonitorService.MAX_INPUT, String.valueOf(maxInput),32 MonitorService.MAX_OUTPUT, String.valueOf(maxOutput),33 MonitorService.MAX_ELAPSED, String.valueOf(maxElapsed),34 MonitorService.MAX_CONCURRENT, String.valueOf(maxConcurrent)35 );36 monitorService.collect(url);37 38 // reset39 long[] current;40 long[] update = new long[LENGTH];41 do {42 current = reference.get();43 if (current == null) {44 update[0] = 0;45 update[1] = 0;46 update[2] = 0;47 update[3] = 0;48 update[4] = 0;49 update[5] = 0;50 } else {51 update[0] = current[0] - success;52 update[1] = current[1] - failure;53 update[2] = current[2] - input;54 update[3] = current[3] - output;55 update[4] = current[4] - elapsed;56 update[5] = current[5] - concurrent;57 }58 } while (!reference.compareAndSet(current, update));59 }60 }
  • 首先从聚合数据存储器statisticsMap中获取相关统计数据并存储到数据总线Url中
  • 之后调用MonitorService(这里是SimpleMonitorService),将统计数据存储到一个BlockingQueue中

注意:这里有一个改进点

  • 由于monitorService.collect(url)是远程调用,这里在for循环体中执行远程调用,实际上是不合适的,我们可以将所有的url先暂存在一个List<URL>中,最后,使用一次monitorService.collect(urlList)即可 - 此时,可适当缩短数据发送时间。

SimpleMonitorService:

1     private final BlockingQueue
queue;2 3 public void collect(URL statistics) {4 queue.offer(statistics);5 if (logger.isInfoEnabled()) {6 logger.info("collect statistics: " + statistics);7 }8 }

 

七、MonitorService将数据写入本地文件

1     private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboMonitorTimer", true)); 2     private final ScheduledFuture
chartFuture; 3 private final Thread writeThread; 4 private final BlockingQueue
queue; 5 private String statisticsDirectory = "statistics"; 6 private String chartsDirectory = "charts"; 7 private volatile boolean running = true; 8 9 public SimpleMonitorService() {10 queue = new LinkedBlockingQueue
(Integer.parseInt(ConfigUtils.getProperty("dubbo.monitor.queue", "100000")));11 writeThread = new Thread(new Runnable() {12 public void run() {13 while (running) {14 try {15 write(); // write statistics16 } catch (Throwable t) {17 logger.error("Unexpected error occur at write stat log, cause: " + t.getMessage(), t);18 try {19 Thread.sleep(5000); // retry after 5 secs20 } catch (Throwable t2) {21 }22 }23 }24 }25 });26 writeThread.setDaemon(true);27 writeThread.setName("DubboMonitorAsyncWriteLogThread");28 writeThread.start();29 chartFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {30 public void run() {31 try {32 draw(); // draw chart33 } catch (Throwable t) {34 logger.error("Unexpected error occur at draw stat chart, cause: " + t.getMessage(), t);35 }36 }37 }, 1, 300, TimeUnit.SECONDS);38 statisticsDirectory = ConfigUtils.getProperty("dubbo.statistics.directory");39 chartsDirectory = ConfigUtils.getProperty("dubbo.charts.directory");40 }

write()将统计数据写入文件,draw()将统计数据画成图片。这两种方式在实际使用中都不会用到。

 

最后来看一下获取Monitor实例的过程(帮助我们开发自定义的Monitor):

1 Monitor monitor = monitorFactory.getMonitor(url);

MonitorFilter中的monitorFactory实例是:MonitorFactory$Adaptive。

1 package com.alibaba.dubbo.monitor; 2  3 import com.alibaba.dubbo.common.extension.ExtensionLoader; 4  5 public class MonitorFactory$Adaptive implements com.alibaba.dubbo.monitor.MonitorFactory { 6     public com.alibaba.dubbo.monitor.Monitor getMonitor(com.alibaba.dubbo.common.URL arg0) { 7         if (arg0 == null) throw new IllegalArgumentException("url == null"); 8         com.alibaba.dubbo.common.URL url = arg0; 9         String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());10         if (extName == null)11             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.monitor.MonitorFactory) name from url(" + url.toString() + ") use keys([protocol])");12         com.alibaba.dubbo.monitor.MonitorFactory extension = (com.alibaba.dubbo.monitor.MonitorFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.monitor.MonitorFactory.class).getExtension(extName);13         return extension.getMonitor(arg0);14     }15 }

首先,根据输入的url中的protocol来获取相关的MonitorFactory(这里protocol默认是dubbo,所以获取的是DubboMonitorFactory,可以通过指定dubbo:monitor标签中的protocol属性来自定义获取XxxMonitorFactory),之后调用DubboMonitorFactory.getMonitor(arg0)。该方法在其父类AbstractMonitorFactory中:

1 /** 2  * AbstractMonitorFactory. (SPI, Singleton, ThreadSafe) 3  */ 4 public abstract class AbstractMonitorFactory implements MonitorFactory { 5     private static final Logger logger = LoggerFactory.getLogger(AbstractMonitorFactory.class); 6  7     // lock for getting monitor center 8     private static final ReentrantLock LOCK = new ReentrantLock(); 9 10     // monitor centers Map
11 private static final Map
MONITORS = new ConcurrentHashMap
();12 13 private static final Map
> FUTURES = new ConcurrentHashMap
>();14 15 private static final ExecutorService executor = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue
(), new NamedThreadFactory("DubboMonitorCreator", true));16 17 public static Collection
getMonitors() {18 return Collections.unmodifiableCollection(MONITORS.values());19 }20 21 public Monitor getMonitor(URL url) {22 url = url.setPath(MonitorService.class.getName()).addParameter(Constants.INTERFACE_KEY, MonitorService.class.getName());23 String key = url.toServiceStringWithoutResolving();// dubbo://10.211.55.5:9090/com.alibaba.dubbo.monitor.MonitorService24 Monitor monitor = MONITORS.get(key);25 Future
future = FUTURES.get(key);26 if (monitor != null || future != null) {27 return monitor;28 }29 30 LOCK.lock();31 try {32 monitor = MONITORS.get(key);33 future = FUTURES.get(key);34 if (monitor != null || future != null) {35 return monitor;36 }37 38 final URL monitorUrl = url;39 // 使用另外的线程MonitorCreator来创建Monitor实例(原因是:即使Monitor创建失败,也不会影响主流程)40 final ListenableFutureTask
listenableFutureTask = ListenableFutureTask.create(new MonitorCreator(monitorUrl));41 listenableFutureTask.addListener(new MonitorListener(key));42 executor.execute(listenableFutureTask);43 FUTURES.put(key, listenableFutureTask);44 45 return null;46 } finally {47 // unlock48 LOCK.unlock();49 }50 }51 52 protected abstract Monitor createMonitor(URL url);53 54 // Callable和Runnable一样,也是创建一个线程去执行,只是Callable有返回值(T call()),而Runnable无返回值(void run())55 class MonitorCreator implements Callable
{56 57 private URL url;58 59 public MonitorCreator(URL url) {60 this.url = url;61 }62 63 @Override64 public Monitor call() throws Exception {65 Monitor monitor = AbstractMonitorFactory.this.createMonitor(url);//调用子类的createMonitor方法创建Monitor66 return monitor;67 }68 }69 70 class MonitorListener implements Runnable {71 72 private String key;73 74 public MonitorListener(String key) {75 this.key = key;76 }77 // listenableFutureTask一旦isDone()完成(正常完成、抛出异常、被中断等),就会立即执行该方法78 @Override79 public void run() {80 try {81 ListenableFuture
listenableFuture = AbstractMonitorFactory.FUTURES.get(key);82 AbstractMonitorFactory.MONITORS.put(key, listenableFuture.get());83 AbstractMonitorFactory.FUTURES.remove(key);84 } catch (InterruptedException e) {85 logger.warn("Thread was interrupted unexpectedly, monitor will never be got.");86 AbstractMonitorFactory.FUTURES.remove(key);87 } catch (ExecutionException e) {88 logger.warn("Create monitor failed, monitor data will not be collected until you fix this problem. ", e);89 }90 }91 }92 }

来看DubboMonitorFactory.createMonitor(url):

1 /** 2  * DefaultMonitorFactory 3  */ 4 public class DubboMonitorFactory extends AbstractMonitorFactory { 5  6     private Protocol protocol; 7  8     private ProxyFactory proxyFactory; 9 10     public void setProtocol(Protocol protocol) {11         this.protocol = protocol;12     }13 14     public void setProxyFactory(ProxyFactory proxyFactory) {15         this.proxyFactory = proxyFactory;16     }17 18     @Override19     protected Monitor createMonitor(URL url) {20         url = url.setProtocol(url.getParameter(Constants.PROTOCOL_KEY, "dubbo"));21         if (url.getPath() == null || url.getPath().length() == 0) {22             url = url.setPath(MonitorService.class.getName());23         }24         String filter = url.getParameter(Constants.REFERENCE_FILTER_KEY);25         if (filter == null || filter.length() == 0) {26             filter = "";27         } else {28             filter = filter + ",";29         }30         url = url.addParameters(Constants.CLUSTER_KEY, "failsafe", Constants.CHECK_KEY, String.valueOf(false),31                 Constants.REFERENCE_FILTER_KEY, filter + "-monitor");32         // 创建Invoker
,内部会构造与MonitorService实现类SimpleMonitorService所在的检测中心dubbo-simple-monitor的长连接33 Invoker
monitorInvoker = protocol.refer(MonitorService.class, url);34 // 获取MonitorService的代理35 MonitorService monitorService = proxyFactory.getProxy(monitorInvoker);36 return new DubboMonitor(monitorInvoker, monitorService);37 }38 39 }

注意:这里的SimpleMonitorService其实会部署在dubbo-simple-monitor中,被provider和consumer中的DubboMonitor调用。

转载地址:http://jpqna.baihongyu.com/

你可能感兴趣的文章
URAL 1009 K-based Numbers
查看>>
android 知识点汇总
查看>>
android之Notification通知
查看>>
C# 生成等比缩略图的类
查看>>
安利 : プログラミングで彼女をつくる 全攻略~
查看>>
1022. Digital Library (30)
查看>>
Canvas入门(2):图形渐变和图像形变换
查看>>
DataAccess通用数据库访问类,简单易用,功能强悍
查看>>
启动MYSQL密码审计插件
查看>>
spring的事务操作
查看>>
Extensions for Spatial Data
查看>>
Hadoop HDFS 用户指南
查看>>
primefaces 查询 点击按钮 加载 动画 ajax loader
查看>>
Java单例模式——并非看起来那么简单
查看>>
curl库pycurl实例及参数详解
查看>>
actor中!(tell)与forward的差别
查看>>
Android - Activity定制横屏(landscape)显示
查看>>
SQL中 EXCEPT、INTERSECT用法
查看>>
基于Token的WEB后台认证机制
查看>>
[Python] Reuse Code in Multiple Projects with Python Modules
查看>>