Skip to content

Commit

Permalink
- Added Metrics to extract metrics for micrometer (https://issues.red…
Browse files Browse the repository at this point in the history
…hat.com/browse/JGRP-2796)

- ResourceDMBean now accepts filters (JGRP-2796)
- Added AttributeType.SCALAR to selected attributes
- JChannelProbeHandler now supports "metrics" key to dump metrics
  • Loading branch information
belaban committed Apr 29, 2024
1 parent 891c45e commit 956886a
Show file tree
Hide file tree
Showing 17 changed files with 387 additions and 243 deletions.
17 changes: 16 additions & 1 deletion src/org/jgroups/JChannelProbeHandler.java
Expand Up @@ -5,6 +5,7 @@
import org.jgroups.logging.LogFactory;
import org.jgroups.stack.DiagnosticsHandler;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Metrics;
import org.jgroups.util.Util;

import java.lang.management.ManagementFactory;
Expand Down Expand Up @@ -55,6 +56,20 @@ public Map<String, String> handleProbe(String... keys) {
}
continue;
}
if(key.startsWith("metrics")) {
Map<String,Map<String,Metrics.Entry<Object>>> m=Metrics.extract(ch, Metrics.IS_NUMBER);
Map<String,Map<String,Metrics.Entry<Number>>> metrics=Metrics.convert(m);
StringBuilder sb=new StringBuilder();
for(Map.Entry<String,Map<String,Metrics.Entry<Number>>> e: metrics.entrySet()) {
sb.append(String.format("%s:\n", e.getKey()));
for(Map.Entry<String,Metrics.Entry<Number>> e2: e.getValue().entrySet()) {
Metrics.Entry<Number> val=e2.getValue();
sb.append(String.format(" %s: %s\n", e2.getKey(), e2.getValue().supplier().get()));
}
}
map.put(key, sb.toString());
continue;
}
if(key.startsWith("threads")) {
ThreadMXBean bean=ManagementFactory.getThreadMXBean();
boolean cpu_supported=bean.isThreadCpuTimeSupported();
Expand Down Expand Up @@ -178,7 +193,7 @@ else if(val.startsWith("wtime"))
}

public String[] supportedKeys() {
return new String[]{"reset-stats", "jmx", "op=<operation>[<args>]", "ops",
return new String[]{"reset-stats", "jmx", "op=<operation>[<args>]", "ops", "metrics",
"threads[=<filter>[=<limit>]]", "enable-cpu", "enable-contention", "disable-cpu", "disable-contention"};
}

Expand Down
83 changes: 51 additions & 32 deletions src/org/jgroups/jmx/ResourceDMBean.java
Expand Up @@ -7,6 +7,7 @@
import org.jgroups.conf.AttributeType;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
import org.jgroups.util.Average;
import org.jgroups.util.Util;

import javax.management.*;
Expand Down Expand Up @@ -43,7 +44,7 @@ public class ResourceDMBean implements DynamicMBean {
protected final boolean expose_all;
protected final Log log=LogFactory.getLog(ResourceDMBean.class);
protected final Object obj;
protected List<Object> objs;
protected List<Object> components;
protected final MBeanAttributeInfo[] attrInfo;
protected final MBeanOperationInfo[] opInfo;
protected final HashMap<String,AttributeEntry> atts=new HashMap<>();
Expand All @@ -54,27 +55,31 @@ public class ResourceDMBean implements DynamicMBean {
protected static final Predicate<AccessibleObject> FILTER=obj -> obj.isAnnotationPresent(ManagedAttribute.class) ||
(obj.isAnnotationPresent(Property.class) && obj.getAnnotation(Property.class).exposeAsManagedAttribute());


public ResourceDMBean(Object instance) {
this(instance, null);
}

public ResourceDMBean(Object instance, Predicate<AccessibleObject> filter) {
if(instance == null)
throw new NullPointerException("Cannot make an MBean wrapper for null instance");
this.obj=instance;
Class<? extends Object> c=obj.getClass();
expose_all=c.isAnnotationPresent(MBean.class) && c.getAnnotation(MBean.class).exposeAll();

findFields(instance);
findMethods(instance);
findFields(instance, filter, null);
findMethods(instance, filter, null);
fixFields(instance);

List<Object> objects=Util.getComponents(instance);
if(objects != null) {
for(Object inst: objects) {
if(inst != null) {
if(objs == null)
objs=new ArrayList<>();
objs.add(inst);
findFields(inst);
findMethods(inst);
if(components == null)
components=new ArrayList<>();
String prefix=Util.methodNameToAttributeName(inst.getClass().getSimpleName());
components.add(inst);
findFields(inst, filter, prefix);
findMethods(inst, filter, prefix);
fixFields(inst);
}
}
Expand Down Expand Up @@ -150,8 +155,8 @@ public Object invoke(String name, Object[] args, String[] sig) throws MBeanExcep
for(int i=0;i < classes.length;i++)
classes[i]=getClassForName(sig[i]);
Method method=null;
if(objs != null) {
for(Object o: objs) {
if(components != null) {
for(Object o: components) {
try {
method=o.getClass().getMethod(name, classes);
}
Expand Down Expand Up @@ -273,6 +278,10 @@ public static boolean isFractional(Class<?> cl) {
return cl.equals(float.class) || cl.equals(Float.class) || cl.equals(double.class) || cl.equals(Double.class);
}

public static boolean isNumber(Class<?> cl) {
return isNumeric(cl) || isFractional(cl) || Number.class.isAssignableFrom(cl) || Average.class.isAssignableFrom(cl);
}


protected static AttributeType getType(AccessibleObject ao) {
Property prop=ao.getAnnotation(Property.class);
Expand Down Expand Up @@ -306,15 +315,18 @@ protected static Class<?> getClassForName(String name) throws ClassNotFoundExcep
throw new ClassNotFoundException("Class " + name + " cannot be found");
}

protected void findMethods(Object instance) {
protected void findMethods(Object instance, Predicate<AccessibleObject> filter, String prefix) {
// find all methods but don't include methods from Object class
List<Method> methods = new ArrayList<>(Arrays.asList(instance.getClass().getMethods()));
methods.removeAll(OBJECT_METHODS);

for(Method method: methods) {

// does method have @ManagedAttribute annotation?
if(method.isAnnotationPresent(ManagedAttribute.class) || method.isAnnotationPresent(Property.class)) {
exposeManagedAttribute(method, instance);
if(filter != null && !filter.test(method))
continue;
exposeManagedAttribute(method, instance, prefix);
}
//or @ManagedOperation
else if (method.isAnnotationPresent(ManagedOperation.class) || expose_all){
Expand All @@ -338,7 +350,7 @@ protected void fixFields(Object instance) {



protected void exposeManagedAttribute(Method method, Object instance) {
protected void exposeManagedAttribute(Method method, Object instance, String prefix) {
String methodName=method.getName();
ManagedAttribute attr_annotation=method.getAnnotation(ManagedAttribute.class);
Property prop=method.getAnnotation(Property.class);
Expand Down Expand Up @@ -367,7 +379,7 @@ protected void exposeManagedAttribute(Method method, Object instance) {
}

String descr=attr_annotation != null ? attr_annotation.description() : prop != null? prop.description() : null;
AttributeEntry attr=atts.get(attr_name);
AttributeEntry attr=atts.get(prefix(prefix,attr_name));
if(attr != null) {
if(isSetMethod(method)) {
if(attr.setter != null) {
Expand All @@ -389,17 +401,19 @@ protected void exposeManagedAttribute(Method method, Object instance) {
else { // create a new entry in atts
boolean is_setter=isSetMethod(method);
String type=is_setter? method.getParameterTypes()[0].getCanonicalName() : method.getReturnType().getCanonicalName();
MBeanAttributeInfo info=new MBeanAttributeInfo(attr_name, type, descr, true, writable, methodName.startsWith("is"));
AttributeEntry entry=new AttributeEntry(Util.methodNameToAttributeName(methodName), info);
MBeanAttributeInfo info=new MBeanAttributeInfo(prefix(attr_name, prefix), type, descr, true, writable, methodName.startsWith("is"));
AttributeEntry entry=new AttributeEntry(method, Util.methodNameToAttributeName(methodName), info);
if(is_setter)
entry.setter(new MethodAccessor(method, instance));
else
entry.getter(new MethodAccessor(method, instance));
atts.put(attr_name, entry);
atts.put(prefix(attr_name, prefix), entry);
}
}


protected static String prefix(String s, String prefix) {
return prefix == null? s : prefix + "." + s;
}

/** Finds an accessor for an attribute. Tries to find getAttrName(), isAttrName(), attrName() methods. If not
* found, tries to use reflection to get the value of attr_name. If still not found, creates a NullAccessor. */
Expand All @@ -416,7 +430,6 @@ protected static Accessor findGetter(Object target, String attr_name) {
Field field=Util.getField(clazz, attr_name);
if(field != null)
return new FieldAccessor(field, target);

return new NoopAccessor();
}

Expand Down Expand Up @@ -461,24 +474,28 @@ protected static String toLowerCase(String input) {
}


protected void findFields(Object instance) {
protected void findFields(Object instance, Predicate<AccessibleObject> filter, String prefix) {
// traverse class hierarchy and find all annotated fields
for(Class<?> clazz=instance.getClass(); clazz != null && clazz != Object.class; clazz=clazz.getSuperclass()) {

Field[] fields=clazz.getDeclaredFields();
for(Field field: fields) {
ManagedAttribute attr=field.getAnnotation(ManagedAttribute.class);
ManagedAttribute annotation=field.getAnnotation(ManagedAttribute.class);
Property prop=field.getAnnotation(Property.class);
boolean expose_prop=prop != null && prop.exposeAsManagedAttribute();
boolean expose=attr != null || expose_prop;
boolean expose=annotation != null || expose_prop;

if(expose) {
String fieldName=attr != null? attr.name() : (prop != null? prop.name() : null);
if(filter != null && !filter.test(field))
continue;

String fieldName=annotation != null? annotation.name() : (prop != null? prop.name() : null);
if(fieldName != null && fieldName.trim().isEmpty())
fieldName=field.getName();
if(prefix != null)
fieldName=prefix + "." + fieldName;

String descr=attr != null? attr.description() : prop.description();
boolean writable=attr != null? attr.writable() : prop.writable();
String descr=annotation != null? annotation.description() : prop.description();
boolean writable=annotation != null? annotation.writable() : prop.writable();

MBeanAttributeInfo info=new MBeanAttributeInfo(fieldName,
field.getType().getCanonicalName(),
Expand All @@ -487,7 +504,7 @@ protected void findFields(Object instance) {
!Modifier.isFinal(field.getModifiers()) && writable,
false);

atts.put(fieldName, new AttributeEntry(field.getName(), info));
atts.put(fieldName, new AttributeEntry(field, field.getName(), info));
}
}
}
Expand Down Expand Up @@ -531,25 +548,27 @@ protected boolean setNamedAttribute(Attribute attribute) {


public static class AttributeEntry {

protected final AccessibleObject type; // method of field
/** The name of the field or method. Can be different from the key in atts when name in @Property or
* @ManagedAttribute was used */
protected final String name;
protected final MBeanAttributeInfo info;
protected Accessor getter;
protected Accessor setter;

public AttributeEntry(String name, MBeanAttributeInfo info) {
this(name, info, null, null);
public AttributeEntry(AccessibleObject type, String name, MBeanAttributeInfo info) {
this(type, name, info, null, null);
}

public AttributeEntry(String name, MBeanAttributeInfo info, Accessor getter, Accessor setter) {
public AttributeEntry(AccessibleObject type, String name, MBeanAttributeInfo info, Accessor getter, Accessor setter) {
this.type=type;
this.name=name;
this.info=info;
this.getter=getter;
this.setter=setter;
}

public AccessibleObject type() {return type;}
public String name() {return name;}
public MBeanAttributeInfo info() {return info;}
public Accessor getter() {return getter;}
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/BARRIER.java
Expand Up @@ -86,7 +86,7 @@ public int getNumberOfInFlightThreads() {
return in_flight_threads.size();
}

@ManagedAttribute
@ManagedAttribute(description="Number of threads in flight",type=AttributeType.SCALAR)
public int getInFlightThreadsCount() {
return getNumberOfInFlightThreads();
}
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/Discovery.java
Expand Up @@ -87,7 +87,7 @@ public abstract class Discovery extends Protocol {

/* --------------------------------------------- JMX ------------------------------------------------------ */

@ManagedAttribute(description="Total number of discovery requests sent ")
@ManagedAttribute(description="Total number of discovery requests sent",type=AttributeType.SCALAR)
protected int num_discovery_requests;

/* --------------------------------------------- Fields ------------------------------------------------------ */
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/FD_SOCK2.java
Expand Up @@ -75,7 +75,7 @@ public class FD_SOCK2 extends Protocol implements Receiver, ConnectionListener,
@Property(description="SO_LINGER in seconds. Default of -1 disables it")
protected int linger=-1; // SO_LINGER (number of seconds, -1 disables it)

@ManagedAttribute(description="Number of suspect events emitted")
@ManagedAttribute(description="Number of suspect events emitted",type=AttributeType.SCALAR)
protected int num_suspect_events;

@ManagedAttribute(description="True when this member is leaving the cluster, set to false when joining")
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/MERGE3.java
Expand Up @@ -89,7 +89,7 @@ public class MERGE3 extends Protocol {
@ManagedAttribute(description="Whether or not the current member is the coordinator")
protected volatile boolean is_coord;

@ManagedAttribute(description="Number of times a MERGE event was sent up the stack")
@ManagedAttribute(description="Number of times a MERGE event was sent up the stack",type=AttributeType.SCALAR)
protected int num_merge_events;


Expand Down
6 changes: 2 additions & 4 deletions src/org/jgroups/protocols/MsgStats.java
Expand Up @@ -45,6 +45,7 @@ public class MsgStats {
protected final LongAdder num_batches_received=new LongAdder();

/** The average number of messages in a received {@link MessageBatch} */
@ManagedAttribute(description="Returns the average batch size of received batches")
protected final AverageMinMax avg_batch_size=new AverageMinMax();

@ManagedAttribute(description="Number of multicast bytes sent",type=BYTES)
Expand All @@ -66,10 +67,7 @@ public class MsgStats {
@ManagedAttribute(description="Number of messages received (mcasts and ucasts received)",type=SCALAR)
public long getNumMsgsReceived() {return num_mcasts_received.sum() + num_ucasts_received.sum();}

@ManagedAttribute(description="Returns the average batch size of received batches")
public String getAvgBatchSize() {return avg_batch_size.toString();}

public AverageMinMax avgBatchSize() {return avg_batch_size;}
public AverageMinMax getAvgBatchSize() {return avg_batch_size;}

@ManagedAttribute(description="Total number of bytes sent (unicast + multicast bytes)",type=BYTES)
public long getNumBytesSent() {return num_mcast_bytes_sent.sum() + num_ucast_bytes_sent.sum();}
Expand Down
15 changes: 5 additions & 10 deletions src/org/jgroups/protocols/RED.java
Expand Up @@ -48,7 +48,9 @@ public class RED extends Protocol {
"longer to reflect the current queue size.")
protected double weight_factor=1;

@ManagedAttribute(description="The number of dropped messages",type=AttributeType.SCALAR)
protected final LongAdder dropped_msgs=new LongAdder(); // dropped messages
@ManagedAttribute(description="Total number of messages processed",type=AttributeType.SCALAR)
protected final LongAdder total_msgs=new LongAdder(); // total messages looked at

protected Bundler bundler;
Expand All @@ -60,17 +62,10 @@ public class RED extends Protocol {
public boolean isEnabled() {return enabled;}
public RED setEnabled(boolean e) {enabled=e; return this;}
public double getMinThreshold() {return min_threshold;}



@ManagedAttribute(description="The number of dropped messages",type=AttributeType.SCALAR)
public long getDroppedMessages() {return dropped_msgs.sum();}

@ManagedAttribute(description="Total number of messages processed",type=AttributeType.SCALAR)
public long getTotalMessages() {return total_msgs.sum();}

public long getDroppedMessages() {return dropped_msgs.sum();}
public long getTotalMessages() {return total_msgs.sum();}
@ManagedAttribute(description="Percentage of all messages that were dropped")
public double getDropRate() {return dropped_msgs.sum() / (double)total_msgs.sum();}
public double getDropRate() {return dropped_msgs.sum() / (double)total_msgs.sum();}


public void start() throws Exception {
Expand Down

0 comments on commit 956886a

Please sign in to comment.