diff --git a/collector/src/main/java/io/prometheus/jmx/JmxCollector.java b/collector/src/main/java/io/prometheus/jmx/JmxCollector.java index 450083da7..a0e48f307 100644 --- a/collector/src/main/java/io/prometheus/jmx/JmxCollector.java +++ b/collector/src/main/java/io/prometheus/jmx/JmxCollector.java @@ -88,9 +88,25 @@ private static class Config { long lastUpdate = 0L; MatchedRulesCache rulesCache; + + private JmxScraper _jmxScraper; + + JmxScraper jmxScraper() { + if (_jmxScraper == null) { + _jmxScraper = + new JmxScraper( + jmxUrl, + username, + password, + ssl, + includeObjectNames, + excludeObjectNames, + objectNameAttributeFilter); + } + return _jmxScraper; + } } - private PrometheusRegistry prometheusRegistry; private Config config; private File configFile; private long createTimeNanoSecs = System.nanoTime(); @@ -101,8 +117,6 @@ private static class Config { private Gauge jmxScrapeError; private Gauge jmxScrapeCachedBeans; - private final JmxMBeanPropertyCache jmxMBeanPropertyCache = new JmxMBeanPropertyCache(); - public JmxCollector(File in) throws IOException, MalformedObjectNameException { this(in, null); } @@ -130,8 +144,6 @@ public JmxCollector register() { } public JmxCollector register(PrometheusRegistry prometheusRegistry) { - this.prometheusRegistry = prometheusRegistry; - configReloadSuccess = Counter.builder() .name("jmx_config_reload_success_total") @@ -193,6 +205,8 @@ private void reloadConfig() { try { Map newYamlConfig = new Yaml().load(fr); + // TODO: call config.Close() on the old config. But need to make sure that it's + // not used by other threads. config = loadConfig(newYamlConfig); config.lastUpdate = configFile.lastModified(); configReloadSuccess.inc(); @@ -716,18 +730,6 @@ public MetricSnapshots collect() { Receiver receiver = new Receiver(config, stalenessTracker); - JmxScraper scraper = - new JmxScraper( - config.jmxUrl, - config.username, - config.password, - config.ssl, - config.includeObjectNames, - config.excludeObjectNames, - config.objectNameAttributeFilter, - receiver, - jmxMBeanPropertyCache); - long start = System.nanoTime(); double error = 0; @@ -736,7 +738,7 @@ public MetricSnapshots collect() { throw new IllegalStateException("JMXCollector waiting for startDelaySeconds"); } try { - scraper.doScrape(); + config.jmxScraper().doScrape(receiver); } catch (Exception e) { error = 1; StringWriter sw = new StringWriter(); diff --git a/collector/src/main/java/io/prometheus/jmx/JmxMBeanPropertyCache.java b/collector/src/main/java/io/prometheus/jmx/JmxMBeanPropertyCache.java index 0b738daf2..8f640752d 100644 --- a/collector/src/main/java/io/prometheus/jmx/JmxMBeanPropertyCache.java +++ b/collector/src/main/java/io/prometheus/jmx/JmxMBeanPropertyCache.java @@ -64,6 +64,13 @@ public JmxMBeanPropertyCache() { this.keyPropertiesPerBean = new ConcurrentHashMap<>(); } + public JmxMBeanPropertyCache(Set mBeanNames) { + this(); + for (ObjectName mBeanName : mBeanNames) { + getKeyPropertyList(mBeanName); + } + } + Map> getKeyPropertiesPerBean() { return keyPropertiesPerBean; } diff --git a/collector/src/main/java/io/prometheus/jmx/JmxScraper.java b/collector/src/main/java/io/prometheus/jmx/JmxScraper.java index 926c764fb..7182d306f 100644 --- a/collector/src/main/java/io/prometheus/jmx/JmxScraper.java +++ b/collector/src/main/java/io/prometheus/jmx/JmxScraper.java @@ -28,6 +28,7 @@ import javax.management.openmbean.CompositeType; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularType; +import javax.management.relation.MBeanServerNotificationFilter; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; @@ -35,7 +36,7 @@ import javax.naming.Context; import javax.rmi.ssl.SslRMIClientSocketFactory; -class JmxScraper { +class JmxScraper implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(JmxScraper.class); @@ -50,14 +51,35 @@ void recordBean( Object value); } - private final MBeanReceiver receiver; private final String jmxUrl; private final String username; private final String password; private final boolean ssl; private final List includeObjectNames, excludeObjectNames; - private final ObjectNameAttributeFilter objectNameAttributeFilter; - private final JmxMBeanPropertyCache jmxMBeanPropertyCache; + // TODO: accept lists of attributes rather than filter object + private final ObjectNameAttributeFilter defaultObjectNameAttributeFilter; + + // Values cached per connection. + private MBeanServerConnection cachedBeanConn; + private JMXConnector jmxc; + private NotificationListener beanListener; + private Cache cache; + private boolean cacheIsStale = false; + + private class Cache { + private final Set mBeanNames; + private final ObjectNameAttributeFilter objectNameAttributeFilter; + private final JmxMBeanPropertyCache jmxMBeanPropertyCache; + + private Cache( + Set mBeanNames, + ObjectNameAttributeFilter objectNameAttributeFilter, + JmxMBeanPropertyCache jmxMBeanPropertyCache) { + this.mBeanNames = mBeanNames; + this.objectNameAttributeFilter = objectNameAttributeFilter; + this.jmxMBeanPropertyCache = jmxMBeanPropertyCache; + } + } public JmxScraper( String jmxUrl, @@ -66,87 +88,141 @@ public JmxScraper( boolean ssl, List includeObjectNames, List excludeObjectNames, - ObjectNameAttributeFilter objectNameAttributeFilter, - MBeanReceiver receiver, - JmxMBeanPropertyCache jmxMBeanPropertyCache) { + ObjectNameAttributeFilter objectNameAttributeFilter) { this.jmxUrl = jmxUrl; - this.receiver = receiver; this.username = username; this.password = password; this.ssl = ssl; this.includeObjectNames = includeObjectNames; this.excludeObjectNames = excludeObjectNames; - this.objectNameAttributeFilter = objectNameAttributeFilter; - this.jmxMBeanPropertyCache = jmxMBeanPropertyCache; + this.defaultObjectNameAttributeFilter = objectNameAttributeFilter; } - /** - * Get a list of mbeans on host_port and scrape their values. - * - *

Values are passed to the receiver in a single thread. - */ - public void doScrape() throws Exception { - MBeanServerConnection beanConn; - JMXConnector jmxc = null; + @Override + public synchronized void close() throws Exception { + if (cachedBeanConn != null && beanListener != null) { + // This one needs to be explicitly removed because in the case of a + // in-process JMX connection MBeanServerConnection is a singleton + // and all listeners are added to the same object. + cachedBeanConn.removeNotificationListener( + MBeanServerDelegate.DELEGATE_NAME, beanListener); + } + cachedBeanConn = null; + beanListener = null; + + if (jmxc != null) { + jmxc.close(); + } + jmxc = null; + + cache = null; + } + + private MBeanServerConnection connectToMBeanServer() throws Exception { if (jmxUrl.isEmpty()) { - beanConn = ManagementFactory.getPlatformMBeanServer(); - } else { - Map environment = new HashMap<>(); - if (username != null - && username.length() != 0 - && password != null - && password.length() != 0) { - String[] credent = new String[] {username, password}; - environment.put(javax.management.remote.JMXConnector.CREDENTIALS, credent); - } - if (ssl) { - environment.put(Context.SECURITY_PROTOCOL, "ssl"); - SslRMIClientSocketFactory clientSocketFactory = new SslRMIClientSocketFactory(); - environment.put( - RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, - clientSocketFactory); - - if (!"true".equalsIgnoreCase(System.getenv("RMI_REGISTRY_SSL_DISABLED"))) { - environment.put("com.sun.jndi.rmi.factory.socket", clientSocketFactory); - } + return ManagementFactory.getPlatformMBeanServer(); + } + + Map environment = new HashMap<>(); + if (username != null + && username.length() != 0 + && password != null + && password.length() != 0) { + String[] credent = new String[] {username, password}; + environment.put(javax.management.remote.JMXConnector.CREDENTIALS, credent); + } + if (ssl) { + environment.put(Context.SECURITY_PROTOCOL, "ssl"); + SslRMIClientSocketFactory clientSocketFactory = new SslRMIClientSocketFactory(); + environment.put( + RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, clientSocketFactory); + + if (!"true".equalsIgnoreCase(System.getenv("RMI_REGISTRY_SSL_DISABLED"))) { + environment.put("com.sun.jndi.rmi.factory.socket", clientSocketFactory); } + } + jmxc = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), environment); + return jmxc.getMBeanServerConnection(); + } - jmxc = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), environment); - beanConn = jmxc.getMBeanServerConnection(); + private synchronized MBeanServerConnection getMBeanServerConnection() throws Exception { + if (cachedBeanConn == null) { + cacheIsStale = true; + cachedBeanConn = connectToMBeanServer(); + // Subscribe to MBeans register/unregister events to invalidate cache + beanListener = + (notification, handback) -> { + String type = notification.getType(); + if (MBeanServerNotification.REGISTRATION_NOTIFICATION.equals(type) + || MBeanServerNotification.UNREGISTRATION_NOTIFICATION.equals( + type)) { + LOGGER.log(FINE, "Marking cache as stale due to %s", type); + // Mark cache as stale instead of refreshing it immediately + // to debounce multiple notifications. + synchronized (this) { + cacheIsStale = true; + } + } + }; + MBeanServerNotificationFilter filter = new MBeanServerNotificationFilter(); + filter.enableAllObjectNames(); + cachedBeanConn.addNotificationListener( + MBeanServerDelegate.DELEGATE_NAME, beanListener, filter, null); } - try { - // Query MBean names, see #89 for reasons queryMBeans() is used instead of queryNames() - Set mBeanNames = new HashSet<>(); - for (ObjectName name : includeObjectNames) { - for (ObjectInstance instance : beanConn.queryMBeans(name, null)) { - mBeanNames.add(instance.getObjectName()); - } + if (cacheIsStale) { + cache = fetchCache(cachedBeanConn); + cacheIsStale = false; + } + return cachedBeanConn; + } + + private Cache fetchCache(MBeanServerConnection beanConn) throws Exception { + // Query MBean names, see #89 for reasons queryMBeans() is used instead of queryNames() + Set mBeanNames = new HashSet<>(); + for (ObjectName name : includeObjectNames) { + for (ObjectInstance instance : beanConn.queryMBeans(name, null)) { + mBeanNames.add(instance.getObjectName()); } + } - for (ObjectName name : excludeObjectNames) { - for (ObjectInstance instance : beanConn.queryMBeans(name, null)) { - mBeanNames.remove(instance.getObjectName()); - } + for (ObjectName name : excludeObjectNames) { + for (ObjectInstance instance : beanConn.queryMBeans(name, null)) { + mBeanNames.remove(instance.getObjectName()); } + } + + ObjectNameAttributeFilter attributeFilter = defaultObjectNameAttributeFilter.dup(); + attributeFilter.onlyKeepMBeans(mBeanNames); + + return new Cache(mBeanNames, attributeFilter, new JmxMBeanPropertyCache(mBeanNames)); + } - // Now that we have *only* the whitelisted mBeans, remove any old ones from the cache - // and dynamic attribute filter: - jmxMBeanPropertyCache.onlyKeepMBeans(mBeanNames); - objectNameAttributeFilter.onlyKeepMBeans(mBeanNames); + /** + * Get a list of mbeans on host_port and scrape their values. + * + *

Values are passed to the receiver in a single thread. + */ + public synchronized void doScrape(MBeanReceiver receiver) throws Exception { + // Method is synchronized to avoid multiple scrapes running concurrently + // and let one of them refresh the cache in the middle of the scrape. - for (ObjectName objectName : mBeanNames) { + try { + MBeanServerConnection beanConn = getMBeanServerConnection(); + + for (ObjectName objectName : cache.mBeanNames) { long start = System.nanoTime(); - scrapeBean(beanConn, objectName); + scrapeBean(receiver, beanConn, objectName); LOGGER.log(FINE, "TIME: %d ns for %s", System.nanoTime() - start, objectName); } } finally { - if (jmxc != null) { - jmxc.close(); - } + // reconnect to resolve connection issues + // TODO: should it make a single retry with a new connection? + close(); } } - private void scrapeBean(MBeanServerConnection beanConn, ObjectName mBeanName) { + private void scrapeBean( + MBeanReceiver receiver, MBeanServerConnection beanConn, ObjectName mBeanName) { MBeanInfo mBeanInfo; try { @@ -168,7 +244,7 @@ private void scrapeBean(MBeanServerConnection beanConn, ObjectName mBeanName) { continue; } - if (objectNameAttributeFilter.exclude(mBeanName, mBeanAttributeInfo.getName())) { + if (cache.objectNameAttributeFilter.exclude(mBeanName, mBeanAttributeInfo.getName())) { continue; } @@ -198,7 +274,7 @@ private void scrapeBean(MBeanServerConnection beanConn, ObjectName mBeanName) { e.getMessage()); // couldn't get them all in one go, try them 1 by 1 - processAttributesOneByOne(beanConn, mBeanName, name2MBeanAttributeInfo); + processAttributesOneByOne(receiver, beanConn, mBeanName, name2MBeanAttributeInfo); return; } @@ -228,9 +304,10 @@ private void scrapeBean(MBeanServerConnection beanConn, ObjectName mBeanName) { name2MBeanAttributeInfo.get(attribute.getName()); LOGGER.log(FINE, "%s_%s process", mBeanName, mBeanAttributeInfo.getName()); processBeanValue( + receiver, mBeanName, mBeanDomain, - jmxMBeanPropertyCache.getKeyPropertyList(mBeanName), + cache.jmxMBeanPropertyCache.getKeyPropertyList(mBeanName), new LinkedList<>(), mBeanAttributeInfo.getName(), mBeanAttributeInfo.getType(), @@ -252,6 +329,7 @@ private void scrapeBean(MBeanServerConnection beanConn, ObjectName mBeanName) { } private void processAttributesOneByOne( + MBeanReceiver receiver, MBeanServerConnection beanConn, ObjectName mbeanName, Map name2AttrInfo) { @@ -266,9 +344,10 @@ private void processAttributesOneByOne( LOGGER.log(FINE, "%s_%s process", mbeanName, attr.getName()); processBeanValue( + receiver, mbeanName, mbeanName.getDomain(), - jmxMBeanPropertyCache.getKeyPropertyList(mbeanName), + cache.jmxMBeanPropertyCache.getKeyPropertyList(mbeanName), new LinkedList<>(), attr.getName(), attr.getType(), @@ -283,6 +362,7 @@ private void processAttributesOneByOne( * pass of getting the values/names out in a way it can be processed elsewhere easily. */ private void processBeanValue( + MBeanReceiver receiver, ObjectName objectName, String domain, LinkedHashMap beanProperties, @@ -302,7 +382,7 @@ private void processBeanValue( value = ((java.util.Date) value).getTime() / 1000.0; } LOGGER.log(FINE, "%s%s%s scrape: %s", domain, beanProperties, attrName, value); - this.receiver.recordBean( + receiver.recordBean( domain, beanProperties, attrKeys, attrName, attrType, attrDescription, value); } else if (value instanceof CompositeData) { LOGGER.log(FINE, "%s%s%s scrape: compositedata", domain, beanProperties, attrName); @@ -314,6 +394,7 @@ private void processBeanValue( String typ = type.getType(key).getTypeName(); Object valu = composite.get(key); processBeanValue( + receiver, objectName, domain, beanProperties, @@ -380,6 +461,7 @@ private void processBeanValue( name = attrName; } processBeanValue( + receiver, objectName, domain, l2s, @@ -400,6 +482,7 @@ private void processBeanValue( Optional optional = (Optional) value; if (optional.isPresent()) { processBeanValue( + receiver, objectName, domain, beanProperties, @@ -412,6 +495,7 @@ private void processBeanValue( } else if (value.getClass().isEnum()) { LOGGER.log(FINE, "%s%s%s scrape: %s", domain, beanProperties, attrName, value); processBeanValue( + receiver, objectName, domain, beanProperties, @@ -421,7 +505,7 @@ private void processBeanValue( attrDescription, value.toString()); } else { - objectNameAttributeFilter.add(objectName, attrName); + cache.objectNameAttributeFilter.add(objectName, attrName); LOGGER.log(FINE, "%s%s scrape: %s not exported", domain, beanProperties, attrType); } } @@ -453,10 +537,8 @@ public static void main(String[] args) throws Exception { (args.length > 3 && "ssl".equalsIgnoreCase(args[3])), objectNames, new LinkedList<>(), - objectNameAttributeFilter, - new StdoutWriter(), - new JmxMBeanPropertyCache()) - .doScrape(); + objectNameAttributeFilter) + .doScrape(new StdoutWriter()); } else if (args.length > 0) { new JmxScraper( args[0], @@ -465,10 +547,8 @@ public static void main(String[] args) throws Exception { false, objectNames, new LinkedList<>(), - objectNameAttributeFilter, - new StdoutWriter(), - new JmxMBeanPropertyCache()) - .doScrape(); + objectNameAttributeFilter) + .doScrape(new StdoutWriter()); } else { new JmxScraper( "", @@ -477,10 +557,8 @@ public static void main(String[] args) throws Exception { false, objectNames, new LinkedList<>(), - objectNameAttributeFilter, - new StdoutWriter(), - new JmxMBeanPropertyCache()) - .doScrape(); + objectNameAttributeFilter) + .doScrape(new StdoutWriter()); } } } diff --git a/collector/src/main/java/io/prometheus/jmx/ObjectNameAttributeFilter.java b/collector/src/main/java/io/prometheus/jmx/ObjectNameAttributeFilter.java index f8317853a..d4395a4cd 100644 --- a/collector/src/main/java/io/prometheus/jmx/ObjectNameAttributeFilter.java +++ b/collector/src/main/java/io/prometheus/jmx/ObjectNameAttributeFilter.java @@ -52,6 +52,13 @@ private ObjectNameAttributeFilter() { dynamicExcludeObjectNameAttributesMap = new ConcurrentHashMap<>(); } + public ObjectNameAttributeFilter dup() { + ObjectNameAttributeFilter copy = new ObjectNameAttributeFilter(); + copy.configExcludeObjectNameAttributesMap.putAll(configExcludeObjectNameAttributesMap); + copy.dynamicExcludeObjectNameAttributesMap.putAll(dynamicExcludeObjectNameAttributesMap); + return copy; + } + /** * Method to initialize the ObjectNameAttributeFilter *