package org.opendaylight.openflowplugin.openflow.md.queue;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.slf4j.Logger;
* message counter (by type)
*/
public class MessageSpyCounterImpl implements MessageObservatory<DataContainer> {
-
+
private static final Logger LOG = LoggerFactory
.getLogger(MessageSpyCounterImpl.class);
-
- private Map<STATISTIC_GROUP, Map<Class<? extends DataContainer>, AtomicLong[]>> inputStats = new ConcurrentHashMap<>();
-
+
+ private static final class MessageCounters {
+ private static final AtomicLongFieldUpdater<MessageCounters> UPDATER = AtomicLongFieldUpdater.newUpdater(MessageCounters.class, "current");
+ private volatile long current;
+ private long cumulative;
+
+ public synchronized long accumulate() {
+ final long inc = UPDATER.getAndSet(this, 0);
+ cumulative += inc;
+ return inc;
+ }
+
+ public synchronized long getCumulative() {
+ return cumulative;
+ }
+
+ public long increment() {
+ return UPDATER.incrementAndGet(this);
+ }
+ }
+
+ private final ConcurrentMap<STATISTIC_GROUP, ConcurrentMap<Class<? extends DataContainer>, MessageCounters>> inputStats = new ConcurrentHashMap<>();
+
@Override
- public void spyIn(DataContainer message) {
- AtomicLong[] counters = getCounters(message, STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_IN_SUCCESS);
- counters[0].incrementAndGet();
+ public void spyIn(final DataContainer message) {
+ getCounters(message, STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_IN_SUCCESS).increment();
+ }
+
+ @Override
+ public void spyOut(final DataContainer message) {
+ getCounters(message, STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS).increment();
+ }
+
+ @Override
+ public void spyMessage(final DataContainer message, final STATISTIC_GROUP statGroup) {
+ getCounters(message, statGroup).increment();
}
/**
* @param statGroup TODO
* @return
*/
- private AtomicLong[] getCounters(DataContainer message, STATISTIC_GROUP statGroup) {
+ private MessageCounters getCounters(final DataContainer message, final STATISTIC_GROUP statGroup) {
Class<? extends DataContainer> msgType = message.getImplementedInterface();
- Map<Class<? extends DataContainer>, AtomicLong[]> groupData = getOrCreateGroupData(statGroup);
- AtomicLong[] counters = getOrCreateCountersPair(msgType, groupData);
+ ConcurrentMap<Class<? extends DataContainer>, MessageCounters> groupData = getOrCreateGroupData(statGroup);
+ MessageCounters counters = getOrCreateCountersPair(msgType, groupData);
return counters;
}
-
- private static AtomicLong[] getOrCreateCountersPair(Class<? extends DataContainer> msgType, Map<Class<? extends DataContainer>, AtomicLong[]> groupData) {
- AtomicLong[] counters = groupData.get(msgType);
- synchronized(groupData) {
- if (counters == null) {
- counters = new AtomicLong[] {new AtomicLong(), new AtomicLong()};
- groupData.put(msgType, counters);
- }
+
+ private static MessageCounters getOrCreateCountersPair(final Class<? extends DataContainer> msgType, final ConcurrentMap<Class<? extends DataContainer>,MessageCounters> groupData) {
+ final MessageCounters lookup = groupData.get(msgType);
+ if (lookup != null) {
+ return lookup;
}
- return counters;
+
+ final MessageCounters newCounters = new MessageCounters();
+ final MessageCounters check = groupData.putIfAbsent(msgType, newCounters);
+ return check == null ? newCounters : check;
+
}
- private Map<Class<? extends DataContainer>, AtomicLong[]> getOrCreateGroupData(STATISTIC_GROUP statGroup) {
- Map<Class<? extends DataContainer>, AtomicLong[]> groupData = null;
- synchronized(inputStats) {
- groupData = inputStats.get(statGroup);
- if (groupData == null) {
- groupData = new HashMap<>();
- inputStats.put(statGroup, groupData);
- }
+ private ConcurrentMap<Class<? extends DataContainer>, MessageCounters> getOrCreateGroupData(final STATISTIC_GROUP statGroup) {
+ final ConcurrentMap<Class<? extends DataContainer>, MessageCounters> lookup = inputStats.get(statGroup);
+ if (lookup != null) {
+ return lookup;
}
- return groupData;
- }
- @Override
- public void spyOut(DataContainer message) {
- AtomicLong[] counters = getCounters(message, STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
- counters[0].incrementAndGet();
+ final ConcurrentMap<Class<? extends DataContainer>, MessageCounters> newmap = new ConcurrentHashMap<>();
+ final ConcurrentMap<Class<? extends DataContainer>, MessageCounters> check = inputStats.putIfAbsent(statGroup, newmap);
+
+ return check == null ? newmap : check;
}
@Override
@Override
public List<String> dumpMessageCounts() {
List<String> dump = new ArrayList<>();
+
for (STATISTIC_GROUP statGroup : STATISTIC_GROUP.values()) {
- Map<Class<? extends DataContainer>, AtomicLong[]> groupData = inputStats.get(statGroup);
+ Map<Class<? extends DataContainer>, MessageCounters> groupData = inputStats.get(statGroup);
if (groupData != null) {
- for (Entry<Class<? extends DataContainer>, AtomicLong[]> statEntry : groupData.entrySet()) {
- long amountPerInterval = statEntry.getValue()[0].getAndSet(0);
- long cumulativeAmount = statEntry.getValue()[1].addAndGet(amountPerInterval);
+ for (Entry<Class<? extends DataContainer>, MessageCounters> statEntry : groupData.entrySet()) {
+ long amountPerInterval = statEntry.getValue().accumulate();
+ long cumulativeAmount = statEntry.getValue().getCumulative();
dump.add(String.format("%s: MSG[%s] -> +%d | %d",
statGroup,
- statEntry.getKey().getSimpleName(), amountPerInterval, cumulativeAmount));
+ statEntry.getKey().getSimpleName(),
+ amountPerInterval, cumulativeAmount));
}
-
} else {
dump.add(String.format("%s: no activity detected", statGroup));
}
}
return dump;
}
-
- @Override
- public void spyMessage(DataContainer message, STATISTIC_GROUP statGroup) {
- AtomicLong[] counters = getCounters(message, statGroup);
- counters[0].incrementAndGet();
- }
}