Performacne improvements via adding a netty-based openflowj and openflow plugin;...
[controller.git] / opendaylight / protocol_plugins / openflow_netty / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / TrafficStatisticsHandler.java
diff --git a/opendaylight/protocol_plugins/openflow_netty/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/TrafficStatisticsHandler.java b/opendaylight/protocol_plugins/openflow_netty/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/TrafficStatisticsHandler.java
new file mode 100644 (file)
index 0000000..540ec39
--- /dev/null
@@ -0,0 +1,336 @@
+package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+public class TrafficStatisticsHandler {
+
+
+    private static final Logger logger = LoggerFactory
+            .getLogger(EnhancedController.class);
+
+    private Timeout statsTaskHandle = null;
+
+    private Map<String, AtomicLong> currentCounterMap = new ConcurrentHashMap<String, AtomicLong>();
+    private Map<String, AtomicLong> lastCounterMap = new ConcurrentHashMap<String, AtomicLong>();
+    private Map<String, Long> lastMeasurementTStamp = new ConcurrentHashMap<String, Long>();
+    private List<String> rawRateMeasurementData = new ArrayList<String>();
+
+    private ConcurrentHashMap<Integer, AtomicLong> msgRcvEntityCounter =
+            new ConcurrentHashMap<Integer, AtomicLong>();
+    private ConcurrentHashMap<Integer, AtomicLong> msgSndEntityCounter =
+            new ConcurrentHashMap<Integer, AtomicLong>();
+
+
+
+    private static final long STATISTICS_RATE_INTERVAL = 25000;
+    private static final int STATISTICS_PRINT_INTREVAL = 180;
+    private static List<Long> packetInProcessingTimeList = new ArrayList<Long>();
+    private static List<Integer> pendingTaskCountList = new ArrayList<Integer>();
+
+
+    public static final String ENTITY_COUNTER_RCV_MSG = "SWITCHWISE_RCV_MSG_COUNT";
+    public static final String ENTITY_COUNTER_SND_MSG = "SWITCHWISE_SND_MSG_COUNT";
+
+    private HashedWheelTimer hashedWheelTimer = null;
+
+    public static final String ADDED_SWITCHES = "ADDED_SWITCHES";
+    public static final String CONNECTED_SWITCHES = "CONNECTED_SWITCHES";
+    public static final String DELETED_SWITCHES = "DELETED_SWITCHES";
+    public static final String DISCONNECTED_SWITCHES = "DISCONNECTED_SWITCHES";
+    public static final String SWITCH_ERROR = "SWITCH_ERROR";
+
+
+
+    public static final String EXCEPTION_CAUGHT = "EXCEPTION_CAUGHT";
+    public static final String MESSAGE_RECEIVED = "MESSAGE_RECEIVED";  // DO RATE-MEASUREMENTS
+
+    public static final String MSG_LISTENER_INVOCATION = "MSG_LISTENER_INVOCATION";
+    public static final String HELLO_RECEIVED = "HELLO_RECEIVED";
+    public static final String HELLO_SENT = "HELLO_SENT";
+    public static final String ECHO_REQUEST_SENT = "ECHO_REQUEST_SENT";
+    public static final String ECHO_REQUEST_RECEIVED = "ECHO_REQUEST_RECEIVED";
+    public static final String ECHO_REPLY_SENT = "ECHO_REPLY_SENT";
+    public static final String ECHO_REPLY_RECEIVED = "ECHO_REPLY_RECEIVED";
+    public static final String FEATURES_REQUEST_SENT = "FEATURES_REQUEST_SENT";
+    public static final String FEATURES_REQUEST_RECEIVED = "FEATURES_REQUEST_RECEIVED";
+    public static final String FEATURES_REPLY_SENT = "FEATURES_REPLY_SENT";
+    public static final String FEATURES_REPLY_RECEIVED = "FEATURES_REPLY_RECEIVED";
+    public static final String CONFIG_REQUEST_SENT = "CONFIG_REQUEST_SENT";
+    public static final String CONFIG_REQUEST_RECEIVED = "CONFIG_REQUEST_RECEIVED";
+    public static final String CONFIG_REPLY_SENT = "CONFIG_REPLY_SENT";
+    public static final String CONFIG_REPLY_RECEIVED = "CONFIG_REPLY_RECEIVED";
+    public static final String BARRIER_REQUEST_SENT = "BARRIER_REQUEST_SENT";
+    public static final String BARRIER_REPLY_RECEIVED = "BARRIER_REPLY_RECEIVED";
+    public static final String ERROR_MSG_RECEIVED = "ERROR_MSG_RECEIVED";
+    public static final String PORT_STATUS_RECEIVED = "PORT_STATUS";
+    public static final String PACKET_IN_RECEIVED = "PACKET_IN";        // DO RATE-MEASUREMENTS
+    public static final String FLOW_MOD_SENT = "FLOW_MOD_SENT";            // DO RATE-MEASUREMENTS ==> To be determined as to where to collect this data from
+    public static final String STATS_REQUEST_SENT = "STATS_REQUEST_SENT";     // DO RATE-MEASUREMENTS ==> To be determined as to where to collect this data from
+    public static final String STATS_RESPONSE_RECEIVED = "STATS_RESPONSE_RECEIVED";
+
+    public static final String UPDATE_PHYSICAL_PORT = "UPDATE_PHYSICAL_PORT";
+
+    private static final int TASK_SCHEDULE_INITIAL_DELAY_IN_SECONDS = 10;
+
+    private int trackPktInProcessing = 0;
+    private static final int PKT_IN_PROCESSING_DURATION_SAMPLING_COUNT = 100000;
+
+
+    public TrafficStatisticsHandler(HashedWheelTimer timer){
+        this.hashedWheelTimer = timer;
+    }
+
+
+    public void init(){
+
+        currentCounterMap.put(MSG_LISTENER_INVOCATION, new AtomicLong(0));
+        currentCounterMap.put(ADDED_SWITCHES, new AtomicLong(0));
+        currentCounterMap.put(DELETED_SWITCHES, new AtomicLong(0));
+        currentCounterMap.put(CONNECTED_SWITCHES, new AtomicLong(0));
+        currentCounterMap.put(DISCONNECTED_SWITCHES, new AtomicLong(0));
+        currentCounterMap.put(SWITCH_ERROR, new AtomicLong(0));
+        currentCounterMap.put(HELLO_RECEIVED, new AtomicLong(0));
+        currentCounterMap.put(HELLO_SENT, new AtomicLong(0));
+        currentCounterMap.put(ECHO_REQUEST_SENT, new AtomicLong(0));
+        currentCounterMap.put(ECHO_REQUEST_RECEIVED, new AtomicLong(0));
+        currentCounterMap.put(ECHO_REPLY_SENT, new AtomicLong(0));
+        currentCounterMap.put(ECHO_REPLY_RECEIVED, new AtomicLong(0));
+        currentCounterMap.put(EXCEPTION_CAUGHT, new AtomicLong(0));
+        currentCounterMap.put(MESSAGE_RECEIVED, new AtomicLong(0));
+        currentCounterMap.put(FEATURES_REQUEST_SENT, new AtomicLong(0));
+        currentCounterMap.put(FEATURES_REQUEST_RECEIVED, new AtomicLong(0));
+        currentCounterMap.put(FEATURES_REPLY_SENT, new AtomicLong(0));
+        currentCounterMap.put(FEATURES_REPLY_RECEIVED, new AtomicLong(0));
+        currentCounterMap.put(CONFIG_REQUEST_SENT, new AtomicLong(0));
+        currentCounterMap.put(CONFIG_REQUEST_RECEIVED, new AtomicLong(0));
+        currentCounterMap.put(CONFIG_REPLY_SENT, new AtomicLong(0));
+        currentCounterMap.put(CONFIG_REPLY_RECEIVED, new AtomicLong(0));
+        currentCounterMap.put(BARRIER_REQUEST_SENT, new AtomicLong(0));
+        currentCounterMap.put(BARRIER_REPLY_RECEIVED, new AtomicLong(0));
+        currentCounterMap.put(ERROR_MSG_RECEIVED, new AtomicLong(0));
+        currentCounterMap.put(PORT_STATUS_RECEIVED, new AtomicLong(0));
+        currentCounterMap.put(PACKET_IN_RECEIVED, new AtomicLong(0));
+        currentCounterMap.put(FLOW_MOD_SENT, new AtomicLong(0));
+        currentCounterMap.put(STATS_REQUEST_SENT, new AtomicLong(0));
+        currentCounterMap.put(STATS_RESPONSE_RECEIVED, new AtomicLong(0));
+        currentCounterMap.put(UPDATE_PHYSICAL_PORT, new AtomicLong(0));
+
+        lastCounterMap.put(MSG_LISTENER_INVOCATION, new AtomicLong(0));
+        lastCounterMap.put(ADDED_SWITCHES, new AtomicLong(0));
+        lastCounterMap.put(DELETED_SWITCHES, new AtomicLong(0));
+        lastCounterMap.put(CONNECTED_SWITCHES, new AtomicLong(0));
+        lastCounterMap.put(DISCONNECTED_SWITCHES, new AtomicLong(0));
+        lastCounterMap.put(SWITCH_ERROR, new AtomicLong(0));
+        lastCounterMap.put(HELLO_RECEIVED, new AtomicLong(0));
+        lastCounterMap.put(HELLO_SENT, new AtomicLong(0));
+        lastCounterMap.put(FEATURES_REQUEST_SENT, new AtomicLong(0));
+        lastCounterMap.put(FEATURES_REQUEST_RECEIVED, new AtomicLong(0));
+        lastCounterMap.put(ECHO_REQUEST_SENT, new AtomicLong(0));
+        lastCounterMap.put(ECHO_REQUEST_RECEIVED, new AtomicLong(0));
+        lastCounterMap.put(ECHO_REPLY_SENT, new AtomicLong(0));
+        lastCounterMap.put(ECHO_REPLY_RECEIVED, new AtomicLong(0));
+        lastCounterMap.put(EXCEPTION_CAUGHT, new AtomicLong(0));
+        lastCounterMap.put(MESSAGE_RECEIVED, new AtomicLong(0));
+        lastCounterMap.put(FEATURES_REPLY_SENT, new AtomicLong(0));
+        lastCounterMap.put(FEATURES_REPLY_RECEIVED, new AtomicLong(0));
+        lastCounterMap.put(CONFIG_REQUEST_SENT, new AtomicLong(0));
+        lastCounterMap.put(CONFIG_REQUEST_RECEIVED, new AtomicLong(0));
+        lastCounterMap.put(CONFIG_REPLY_SENT, new AtomicLong(0));
+        lastCounterMap.put(CONFIG_REPLY_RECEIVED, new AtomicLong(0));
+        lastCounterMap.put(BARRIER_REQUEST_SENT, new AtomicLong(0));
+        lastCounterMap.put(BARRIER_REPLY_RECEIVED, new AtomicLong(0));
+        lastCounterMap.put(ERROR_MSG_RECEIVED, new AtomicLong(0));
+        lastCounterMap.put(PORT_STATUS_RECEIVED, new AtomicLong(0));
+        lastCounterMap.put(PACKET_IN_RECEIVED, new AtomicLong(0));
+        lastCounterMap.put(FLOW_MOD_SENT, new AtomicLong(0));
+        lastCounterMap.put(STATS_REQUEST_SENT, new AtomicLong(0));
+        lastCounterMap.put(STATS_RESPONSE_RECEIVED, new AtomicLong(0));
+        lastCounterMap.put(UPDATE_PHYSICAL_PORT, new AtomicLong(0));
+
+        lastMeasurementTStamp.put(MSG_LISTENER_INVOCATION, new Long(0));
+        lastMeasurementTStamp.put(ADDED_SWITCHES, new Long(0));
+        lastMeasurementTStamp.put(DELETED_SWITCHES, new Long(0));
+        lastMeasurementTStamp.put(CONNECTED_SWITCHES, new Long(0));
+        lastMeasurementTStamp.put(DISCONNECTED_SWITCHES, new Long(0));
+        lastMeasurementTStamp.put(SWITCH_ERROR, new Long(0));
+        lastMeasurementTStamp.put(HELLO_RECEIVED, new Long(0));
+        lastMeasurementTStamp.put(HELLO_SENT, new Long(0));
+        lastMeasurementTStamp.put(ECHO_REQUEST_SENT, new Long(0));
+        lastMeasurementTStamp.put(ECHO_REQUEST_RECEIVED, new Long(0));
+        lastMeasurementTStamp.put(ECHO_REPLY_SENT, new Long(0));
+        lastMeasurementTStamp.put(ECHO_REPLY_RECEIVED, new Long(0));
+        lastMeasurementTStamp.put(EXCEPTION_CAUGHT, new Long(0));
+        lastMeasurementTStamp.put(MESSAGE_RECEIVED, new Long(0));
+        lastMeasurementTStamp.put(FEATURES_REQUEST_SENT, new Long(0));
+        lastMeasurementTStamp.put(FEATURES_REQUEST_RECEIVED, new Long(0));
+        lastMeasurementTStamp.put(FEATURES_REPLY_SENT, new Long(0));
+        lastMeasurementTStamp.put(FEATURES_REPLY_RECEIVED, new Long(0));
+        lastMeasurementTStamp.put(CONFIG_REQUEST_SENT, new Long(0));
+        lastMeasurementTStamp.put(CONFIG_REQUEST_RECEIVED, new Long(0));
+        lastMeasurementTStamp.put(CONFIG_REPLY_SENT, new Long(0));
+        lastMeasurementTStamp.put(CONFIG_REPLY_RECEIVED, new Long(0));
+        lastMeasurementTStamp.put(BARRIER_REQUEST_SENT, new Long(0));
+        lastMeasurementTStamp.put(BARRIER_REPLY_RECEIVED, new Long(0));
+        lastMeasurementTStamp.put(ERROR_MSG_RECEIVED, new Long(0));
+        lastMeasurementTStamp.put(PORT_STATUS_RECEIVED, new Long(0));
+        lastMeasurementTStamp.put(PACKET_IN_RECEIVED, new Long(0));
+        lastMeasurementTStamp.put(FLOW_MOD_SENT, new Long(0));
+        lastMeasurementTStamp.put(STATS_REQUEST_SENT, new Long(0));
+        lastMeasurementTStamp.put(STATS_RESPONSE_RECEIVED, new Long(0));
+        lastMeasurementTStamp.put(UPDATE_PHYSICAL_PORT, new Long(0));
+
+        /*
+        rateMap.put(HELLO_SENT, new Double(0.00000000));
+        rateMap.put(FEATURES_REQUEST, new Double(0.00000000));
+        rateMap.put(FEATURES_REPLY, new Double(0.00000000));
+        rateMap.put(CONFIG_REQUEST, new Double(0.00000000));
+        rateMap.put(CONFIG_REPLY, new Double(0.00000000));
+        rateMap.put(PORT_STATUS, new Double(0.00000000));
+        rateMap.put(PACKET_IN, new Double(0.00000000));
+        rateMap.put(FLOW_MOD_SENT, new Double(0.00000000));
+
+
+        history.put(HELLO_SENT, new ArrayList());
+        history.put(FEATURES_REQUEST, new ArrayList());
+        history.put(FEATURES_REPLY, new ArrayList());
+        history.put(CONFIG_REQUEST, new ArrayList());
+        history.put(CONFIG_REPLY, new ArrayList());
+        history.put(PORT_STATUS, new ArrayList());
+        history.put(PACKET_IN, new ArrayList());
+        history.put(FLOW_MOD_SENT, new ArrayList());
+        */
+
+        statsTaskHandle = this.hashedWheelTimer.newTimeout(new StatsOutTask(),
+                TASK_SCHEDULE_INITIAL_DELAY_IN_SECONDS, TimeUnit.SECONDS);
+
+    }
+
+    public void stopStatsHandler(){
+        if (statsTaskHandle != null){
+            statsTaskHandle.cancel();
+        }
+    }
+
+    public void reportPacketInProcessingTime(long duration){
+        trackPktInProcessing++;
+        if (trackPktInProcessing > PKT_IN_PROCESSING_DURATION_SAMPLING_COUNT){
+            packetInProcessingTimeList.add(new Long(duration));
+            trackPktInProcessing = 0;
+        }
+    }
+
+    public void addEntityForCounter(Integer entityID, String counterType){
+        if (counterType.equalsIgnoreCase(ENTITY_COUNTER_RCV_MSG)){
+            msgRcvEntityCounter.put(entityID, new AtomicLong(0));
+        }
+        else{
+            msgSndEntityCounter.put(entityID, new AtomicLong(0));
+        }
+    }
+
+    public void countForEntitySimpleMeasurement(Integer entityID, String counterType){
+        if (counterType.equalsIgnoreCase(ENTITY_COUNTER_RCV_MSG)){
+            //msgRcvEntityCounter.get(entityID).incrementAndGet();
+        }
+        else{
+            //msgSndEntityCounter.get(entityID).incrementAndGet();
+        }
+    }
+
+
+    public void countForSimpleMeasurement(String counterName){
+        currentCounterMap.get(counterName).incrementAndGet();
+    }
+
+    public void countForRateMeasurement(String counterName){
+
+        long currCntr = currentCounterMap.get(counterName).incrementAndGet();
+        if (lastMeasurementTStamp.get(counterName) == 0){
+            lastMeasurementTStamp.put(counterName, System.currentTimeMillis());
+        }
+
+        Long currentCount = new Long(currCntr);
+        Long lastCount = lastCounterMap.get(counterName).get();
+
+        //Double rate = 0.00000000000;
+        if ((currentCount - lastCount) == STATISTICS_RATE_INTERVAL){
+            Long currentTime = System.currentTimeMillis();
+            Long lastTime = lastMeasurementTStamp.get(counterName);
+            //rate = new Double((STATISTICS_RATE_INTERVAL/(currentTime-lastTime))*1000); //convert to count/sec
+            rawRateMeasurementData.add("CN:" + counterName +
+                    ",CC:" + currentCount +
+                    ",LC:" + lastCount +
+                    ",CT:" + currentTime +
+                    ",LT:" + lastTime +
+                    ",CV:" + ((STATISTICS_RATE_INTERVAL/(currentTime-lastTime))*1000));
+            lastCounterMap.put(counterName, new AtomicLong(currentCount));
+            lastMeasurementTStamp.put(counterName, currentTime);
+            //history.get(counterName).add(String.valueOf(rate.doubleValue()));
+            //rateMap.put(counterName, rate);
+        }
+
+    }
+
+
+    private class StatsOutTask implements TimerTask {
+
+        @Override
+        public void run(Timeout timeout) throws Exception {
+
+            statsTaskHandle = timeout;
+            logger.warn(">>>>>>Raw Counter values at controller BEGIN<<<<<<<<");
+
+            for (Entry<String, AtomicLong> entry : currentCounterMap.entrySet()){
+                logger.warn("{} {}", entry.getKey(), entry.getValue());
+            }
+            logger.warn(">>>>>>Counter values at controller END  <<<<<<<<");
+
+            logger.warn(">>>>>>Entity Counter values at controller BEGIN<<<<<<<<");
+
+            for (Entry<Integer, AtomicLong> entry : msgRcvEntityCounter.entrySet()){
+                logger.warn("SwitchID {} : Rcv Msg Count {}", entry.getKey(), entry.getValue());
+            }
+            logger.warn(">>>>>>Entity Counter values at controller END  <<<<<<<<");
+
+            logger.warn(">>>>>>Raw data rate values at controller BEGIN<<<<<<<<");
+
+            for (String str : rawRateMeasurementData ){
+                logger.warn("{}", str);
+            }
+            logger.warn(">>>>>>Raw data rate values at controller END  <<<<<<<<");
+
+
+            if (packetInProcessingTimeList.size() > 0){
+                logger.warn("================ MAX PACKET_IN PROC TIME in microseconds : {}",
+                        Collections.max(packetInProcessingTimeList)/1000);
+                logger.warn("================ MIN PACKET_IN PROC TIME in microseconds : {}",
+                        Collections.min(packetInProcessingTimeList)/1000);
+                long v = 0L;
+                int track = 0;
+                for (Long val : packetInProcessingTimeList){
+                    v = v + val.longValue();
+                    track++;
+                }
+                logger.warn("================ AVG PACKET_IN PROC TIME in microseconds : {}",
+                        ((double)(v/track))/1000);
+            }
+            hashedWheelTimer.newTimeout(this, STATISTICS_PRINT_INTREVAL, TimeUnit.SECONDS);
+        }
+    }
+
+}