BUG-529: imrpove statistics collection 40/7540/1
authorRobert Varga <robert.varga@pantheon.sk>
Fri, 30 May 2014 15:54:31 +0000 (17:54 +0200)
committerRobert Varga <robert.varga@pantheon.sk>
Fri, 30 May 2014 15:56:41 +0000 (17:56 +0200)
This removes synchronized blocks in statistics gathering, as these are
multi-threaded codepaths and we saw contention here.

Change-Id: Ie99ea3fd2e16b66af533e773af04d7978c2c0ac1
Signed-off-by: Robert Varga <robert.varga@pantheon.sk>
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpyCounterImpl.java

index 6af42b9065b52fd2c670903fa8ec23ef6237de2d..6890f169cadef194eb3c609c311990d3477f19b2 100644 (file)
@@ -9,12 +9,12 @@
 package org.opendaylight.openflowplugin.openflow.md.queue;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.slf4j.Logger;
@@ -24,16 +24,45 @@ import org.slf4j.LoggerFactory;
  * message counter (by type)
  */
 public class MessageSpyCounterImpl implements MessageObservatory<DataContainer> {
-    
+
     private static final Logger LOG = LoggerFactory
             .getLogger(MessageSpyCounterImpl.class);
-    
-    private Map<STATISTIC_GROUP, Map<Class<? extends DataContainer>, AtomicLong[]>> inputStats = new ConcurrentHashMap<>();
-    
+
+    private static final class MessageCounters {
+        private static final AtomicLongFieldUpdater<MessageCounters> UPDATER = AtomicLongFieldUpdater.newUpdater(MessageCounters.class, "current");
+        private volatile long current;
+        private long cumulative;
+
+        public synchronized long accumulate() {
+            final long inc = UPDATER.getAndSet(this, 0);
+            cumulative += inc;
+            return inc;
+        }
+
+        public synchronized long getCumulative() {
+            return cumulative;
+        }
+
+        public long increment() {
+            return UPDATER.incrementAndGet(this);
+        }
+    }
+
+    private final ConcurrentMap<STATISTIC_GROUP, ConcurrentMap<Class<? extends DataContainer>, MessageCounters>> inputStats = new ConcurrentHashMap<>();
+
     @Override
-    public void spyIn(DataContainer message) {
-        AtomicLong[] counters = getCounters(message, STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_IN_SUCCESS);
-        counters[0].incrementAndGet();  
+    public void spyIn(final DataContainer message) {
+        getCounters(message, STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_IN_SUCCESS).increment();
+    }
+
+    @Override
+    public void spyOut(final DataContainer message) {
+        getCounters(message, STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS).increment();
+    }
+
+    @Override
+    public void spyMessage(final DataContainer message, final STATISTIC_GROUP statGroup) {
+        getCounters(message, statGroup).increment();
     }
 
     /**
@@ -41,40 +70,35 @@ public class MessageSpyCounterImpl implements MessageObservatory<DataContainer>
      * @param statGroup TODO
      * @return
      */
-    private AtomicLong[] getCounters(DataContainer message, STATISTIC_GROUP statGroup) {
+    private MessageCounters getCounters(final DataContainer message, final STATISTIC_GROUP statGroup) {
         Class<? extends DataContainer> msgType = message.getImplementedInterface();
-        Map<Class<? extends DataContainer>, AtomicLong[]> groupData = getOrCreateGroupData(statGroup);
-        AtomicLong[] counters = getOrCreateCountersPair(msgType, groupData);
+        ConcurrentMap<Class<? extends DataContainer>, MessageCounters> groupData = getOrCreateGroupData(statGroup);
+        MessageCounters counters = getOrCreateCountersPair(msgType, groupData);
         return counters;
     }
-    
-    private static AtomicLong[] getOrCreateCountersPair(Class<? extends DataContainer> msgType, Map<Class<? extends DataContainer>, AtomicLong[]> groupData) {
-        AtomicLong[] counters = groupData.get(msgType);
-        synchronized(groupData) {
-            if (counters == null) {
-                counters = new AtomicLong[] {new AtomicLong(), new AtomicLong()};
-                groupData.put(msgType, counters);
-            } 
+
+    private static MessageCounters getOrCreateCountersPair(final Class<? extends DataContainer> msgType, final ConcurrentMap<Class<? extends DataContainer>,MessageCounters> groupData) {
+        final MessageCounters lookup = groupData.get(msgType);
+        if (lookup != null) {
+            return lookup;
         }
-        return counters;
+
+        final MessageCounters newCounters = new MessageCounters();
+        final MessageCounters check = groupData.putIfAbsent(msgType, newCounters);
+        return check == null ? newCounters : check;
+
     }
 
-    private Map<Class<? extends DataContainer>, AtomicLong[]> getOrCreateGroupData(STATISTIC_GROUP statGroup) {
-        Map<Class<? extends DataContainer>, AtomicLong[]> groupData = null;
-        synchronized(inputStats) {
-            groupData = inputStats.get(statGroup);
-            if (groupData == null) {
-                groupData = new HashMap<>();
-                inputStats.put(statGroup, groupData);
-            }
+    private ConcurrentMap<Class<? extends DataContainer>, MessageCounters> getOrCreateGroupData(final STATISTIC_GROUP statGroup) {
+        final ConcurrentMap<Class<? extends DataContainer>, MessageCounters> lookup = inputStats.get(statGroup);
+        if (lookup != null) {
+            return lookup;
         }
-        return groupData;
-    }
 
-    @Override
-    public void spyOut(DataContainer message) {
-        AtomicLong[] counters = getCounters(message, STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
-        counters[0].incrementAndGet();  
+        final ConcurrentMap<Class<? extends DataContainer>, MessageCounters> newmap = new ConcurrentHashMap<>();
+        final ConcurrentMap<Class<? extends DataContainer>, MessageCounters> check = inputStats.putIfAbsent(statGroup, newmap);
+
+        return check == null ? newmap : check;
     }
 
     @Override
@@ -90,27 +114,22 @@ public class MessageSpyCounterImpl implements MessageObservatory<DataContainer>
     @Override
     public List<String> dumpMessageCounts() {
         List<String> dump = new ArrayList<>();
+
         for (STATISTIC_GROUP statGroup : STATISTIC_GROUP.values()) {
-            Map<Class<? extends DataContainer>, AtomicLong[]> groupData = inputStats.get(statGroup);
+            Map<Class<? extends DataContainer>, MessageCounters> groupData = inputStats.get(statGroup);
             if (groupData != null) {
-                for (Entry<Class<? extends DataContainer>, AtomicLong[]> statEntry : groupData.entrySet()) {
-                    long amountPerInterval = statEntry.getValue()[0].getAndSet(0);
-                    long cumulativeAmount = statEntry.getValue()[1].addAndGet(amountPerInterval);
+                for (Entry<Class<? extends DataContainer>, MessageCounters> statEntry : groupData.entrySet()) {
+                    long amountPerInterval = statEntry.getValue().accumulate();
+                    long cumulativeAmount = statEntry.getValue().getCumulative();
                     dump.add(String.format("%s: MSG[%s] -> +%d | %d",
                             statGroup,
-                            statEntry.getKey().getSimpleName(), amountPerInterval, cumulativeAmount));
+                            statEntry.getKey().getSimpleName(),
+                            amountPerInterval, cumulativeAmount));
                 }
-                
             } else {
                 dump.add(String.format("%s: no activity detected", statGroup));
             }
         }
         return dump;
     }
-
-    @Override
-    public void spyMessage(DataContainer message, STATISTIC_GROUP statGroup) {
-        AtomicLong[] counters = getCounters(message, statGroup);
-        counters[0].incrementAndGet();
-    }
 }