X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fprotocol_plugins%2Fopenflow_netty%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fprotocol_plugin%2Fopenflow%2Fcore%2Finternal%2FTrafficStatisticsHandler.java;fp=opendaylight%2Fprotocol_plugins%2Fopenflow_netty%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fprotocol_plugin%2Fopenflow%2Fcore%2Finternal%2FTrafficStatisticsHandler.java;h=540ec39e57e295ca883813f8aa6fd9c16167cf7b;hb=85073423c6069e4b58fffde7cf19c806b2b52dd5;hp=0000000000000000000000000000000000000000;hpb=c5630f2945eb5370f9829514ef72de41d41eb2be;p=controller.git diff --git a/opendaylight/protocol_plugins/openflow_netty/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/TrafficStatisticsHandler.java b/opendaylight/protocol_plugins/openflow_netty/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/TrafficStatisticsHandler.java new file mode 100644 index 0000000000..540ec39e57 --- /dev/null +++ b/opendaylight/protocol_plugins/openflow_netty/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/TrafficStatisticsHandler.java @@ -0,0 +1,336 @@ +package org.opendaylight.controller.protocol_plugin.openflow.core.internal; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.TimerTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + + +public class TrafficStatisticsHandler { + + + private static final Logger logger = LoggerFactory + .getLogger(EnhancedController.class); + + private Timeout statsTaskHandle = null; + + private Map currentCounterMap = new ConcurrentHashMap(); + private Map lastCounterMap = new ConcurrentHashMap(); + private Map lastMeasurementTStamp = new ConcurrentHashMap(); + private List rawRateMeasurementData = new ArrayList(); + + private ConcurrentHashMap msgRcvEntityCounter = + new ConcurrentHashMap(); + private ConcurrentHashMap msgSndEntityCounter = + new ConcurrentHashMap(); + + + + private static final long STATISTICS_RATE_INTERVAL = 25000; + private static final int STATISTICS_PRINT_INTREVAL = 180; + private static List packetInProcessingTimeList = new ArrayList(); + private static List pendingTaskCountList = new ArrayList(); + + + public static final String ENTITY_COUNTER_RCV_MSG = "SWITCHWISE_RCV_MSG_COUNT"; + public static final String ENTITY_COUNTER_SND_MSG = "SWITCHWISE_SND_MSG_COUNT"; + + private HashedWheelTimer hashedWheelTimer = null; + + public static final String ADDED_SWITCHES = "ADDED_SWITCHES"; + public static final String CONNECTED_SWITCHES = "CONNECTED_SWITCHES"; + public static final String DELETED_SWITCHES = "DELETED_SWITCHES"; + public static final String DISCONNECTED_SWITCHES = "DISCONNECTED_SWITCHES"; + public static final String SWITCH_ERROR = "SWITCH_ERROR"; + + + + public static final String EXCEPTION_CAUGHT = "EXCEPTION_CAUGHT"; + public static final String MESSAGE_RECEIVED = "MESSAGE_RECEIVED"; // DO RATE-MEASUREMENTS + + public static final String MSG_LISTENER_INVOCATION = "MSG_LISTENER_INVOCATION"; + public static final String HELLO_RECEIVED = "HELLO_RECEIVED"; + public static final String HELLO_SENT = "HELLO_SENT"; + public static final String ECHO_REQUEST_SENT = "ECHO_REQUEST_SENT"; + public static final String ECHO_REQUEST_RECEIVED = "ECHO_REQUEST_RECEIVED"; + public static final String ECHO_REPLY_SENT = "ECHO_REPLY_SENT"; + public static final String ECHO_REPLY_RECEIVED = "ECHO_REPLY_RECEIVED"; + public static final String FEATURES_REQUEST_SENT = "FEATURES_REQUEST_SENT"; + public static final String FEATURES_REQUEST_RECEIVED = "FEATURES_REQUEST_RECEIVED"; + public static final String FEATURES_REPLY_SENT = "FEATURES_REPLY_SENT"; + public static final String FEATURES_REPLY_RECEIVED = "FEATURES_REPLY_RECEIVED"; + public static final String CONFIG_REQUEST_SENT = "CONFIG_REQUEST_SENT"; + public static final String CONFIG_REQUEST_RECEIVED = "CONFIG_REQUEST_RECEIVED"; + public static final String CONFIG_REPLY_SENT = "CONFIG_REPLY_SENT"; + public static final String CONFIG_REPLY_RECEIVED = "CONFIG_REPLY_RECEIVED"; + public static final String BARRIER_REQUEST_SENT = "BARRIER_REQUEST_SENT"; + public static final String BARRIER_REPLY_RECEIVED = "BARRIER_REPLY_RECEIVED"; + public static final String ERROR_MSG_RECEIVED = "ERROR_MSG_RECEIVED"; + public static final String PORT_STATUS_RECEIVED = "PORT_STATUS"; + public static final String PACKET_IN_RECEIVED = "PACKET_IN"; // DO RATE-MEASUREMENTS + public static final String FLOW_MOD_SENT = "FLOW_MOD_SENT"; // DO RATE-MEASUREMENTS ==> To be determined as to where to collect this data from + public static final String STATS_REQUEST_SENT = "STATS_REQUEST_SENT"; // DO RATE-MEASUREMENTS ==> To be determined as to where to collect this data from + public static final String STATS_RESPONSE_RECEIVED = "STATS_RESPONSE_RECEIVED"; + + public static final String UPDATE_PHYSICAL_PORT = "UPDATE_PHYSICAL_PORT"; + + private static final int TASK_SCHEDULE_INITIAL_DELAY_IN_SECONDS = 10; + + private int trackPktInProcessing = 0; + private static final int PKT_IN_PROCESSING_DURATION_SAMPLING_COUNT = 100000; + + + public TrafficStatisticsHandler(HashedWheelTimer timer){ + this.hashedWheelTimer = timer; + } + + + public void init(){ + + currentCounterMap.put(MSG_LISTENER_INVOCATION, new AtomicLong(0)); + currentCounterMap.put(ADDED_SWITCHES, new AtomicLong(0)); + currentCounterMap.put(DELETED_SWITCHES, new AtomicLong(0)); + currentCounterMap.put(CONNECTED_SWITCHES, new AtomicLong(0)); + currentCounterMap.put(DISCONNECTED_SWITCHES, new AtomicLong(0)); + currentCounterMap.put(SWITCH_ERROR, new AtomicLong(0)); + currentCounterMap.put(HELLO_RECEIVED, new AtomicLong(0)); + currentCounterMap.put(HELLO_SENT, new AtomicLong(0)); + currentCounterMap.put(ECHO_REQUEST_SENT, new AtomicLong(0)); + currentCounterMap.put(ECHO_REQUEST_RECEIVED, new AtomicLong(0)); + currentCounterMap.put(ECHO_REPLY_SENT, new AtomicLong(0)); + currentCounterMap.put(ECHO_REPLY_RECEIVED, new AtomicLong(0)); + currentCounterMap.put(EXCEPTION_CAUGHT, new AtomicLong(0)); + currentCounterMap.put(MESSAGE_RECEIVED, new AtomicLong(0)); + currentCounterMap.put(FEATURES_REQUEST_SENT, new AtomicLong(0)); + currentCounterMap.put(FEATURES_REQUEST_RECEIVED, new AtomicLong(0)); + currentCounterMap.put(FEATURES_REPLY_SENT, new AtomicLong(0)); + currentCounterMap.put(FEATURES_REPLY_RECEIVED, new AtomicLong(0)); + currentCounterMap.put(CONFIG_REQUEST_SENT, new AtomicLong(0)); + currentCounterMap.put(CONFIG_REQUEST_RECEIVED, new AtomicLong(0)); + currentCounterMap.put(CONFIG_REPLY_SENT, new AtomicLong(0)); + currentCounterMap.put(CONFIG_REPLY_RECEIVED, new AtomicLong(0)); + currentCounterMap.put(BARRIER_REQUEST_SENT, new AtomicLong(0)); + currentCounterMap.put(BARRIER_REPLY_RECEIVED, new AtomicLong(0)); + currentCounterMap.put(ERROR_MSG_RECEIVED, new AtomicLong(0)); + currentCounterMap.put(PORT_STATUS_RECEIVED, new AtomicLong(0)); + currentCounterMap.put(PACKET_IN_RECEIVED, new AtomicLong(0)); + currentCounterMap.put(FLOW_MOD_SENT, new AtomicLong(0)); + currentCounterMap.put(STATS_REQUEST_SENT, new AtomicLong(0)); + currentCounterMap.put(STATS_RESPONSE_RECEIVED, new AtomicLong(0)); + currentCounterMap.put(UPDATE_PHYSICAL_PORT, new AtomicLong(0)); + + lastCounterMap.put(MSG_LISTENER_INVOCATION, new AtomicLong(0)); + lastCounterMap.put(ADDED_SWITCHES, new AtomicLong(0)); + lastCounterMap.put(DELETED_SWITCHES, new AtomicLong(0)); + lastCounterMap.put(CONNECTED_SWITCHES, new AtomicLong(0)); + lastCounterMap.put(DISCONNECTED_SWITCHES, new AtomicLong(0)); + lastCounterMap.put(SWITCH_ERROR, new AtomicLong(0)); + lastCounterMap.put(HELLO_RECEIVED, new AtomicLong(0)); + lastCounterMap.put(HELLO_SENT, new AtomicLong(0)); + lastCounterMap.put(FEATURES_REQUEST_SENT, new AtomicLong(0)); + lastCounterMap.put(FEATURES_REQUEST_RECEIVED, new AtomicLong(0)); + lastCounterMap.put(ECHO_REQUEST_SENT, new AtomicLong(0)); + lastCounterMap.put(ECHO_REQUEST_RECEIVED, new AtomicLong(0)); + lastCounterMap.put(ECHO_REPLY_SENT, new AtomicLong(0)); + lastCounterMap.put(ECHO_REPLY_RECEIVED, new AtomicLong(0)); + lastCounterMap.put(EXCEPTION_CAUGHT, new AtomicLong(0)); + lastCounterMap.put(MESSAGE_RECEIVED, new AtomicLong(0)); + lastCounterMap.put(FEATURES_REPLY_SENT, new AtomicLong(0)); + lastCounterMap.put(FEATURES_REPLY_RECEIVED, new AtomicLong(0)); + lastCounterMap.put(CONFIG_REQUEST_SENT, new AtomicLong(0)); + lastCounterMap.put(CONFIG_REQUEST_RECEIVED, new AtomicLong(0)); + lastCounterMap.put(CONFIG_REPLY_SENT, new AtomicLong(0)); + lastCounterMap.put(CONFIG_REPLY_RECEIVED, new AtomicLong(0)); + lastCounterMap.put(BARRIER_REQUEST_SENT, new AtomicLong(0)); + lastCounterMap.put(BARRIER_REPLY_RECEIVED, new AtomicLong(0)); + lastCounterMap.put(ERROR_MSG_RECEIVED, new AtomicLong(0)); + lastCounterMap.put(PORT_STATUS_RECEIVED, new AtomicLong(0)); + lastCounterMap.put(PACKET_IN_RECEIVED, new AtomicLong(0)); + lastCounterMap.put(FLOW_MOD_SENT, new AtomicLong(0)); + lastCounterMap.put(STATS_REQUEST_SENT, new AtomicLong(0)); + lastCounterMap.put(STATS_RESPONSE_RECEIVED, new AtomicLong(0)); + lastCounterMap.put(UPDATE_PHYSICAL_PORT, new AtomicLong(0)); + + lastMeasurementTStamp.put(MSG_LISTENER_INVOCATION, new Long(0)); + lastMeasurementTStamp.put(ADDED_SWITCHES, new Long(0)); + lastMeasurementTStamp.put(DELETED_SWITCHES, new Long(0)); + lastMeasurementTStamp.put(CONNECTED_SWITCHES, new Long(0)); + lastMeasurementTStamp.put(DISCONNECTED_SWITCHES, new Long(0)); + lastMeasurementTStamp.put(SWITCH_ERROR, new Long(0)); + lastMeasurementTStamp.put(HELLO_RECEIVED, new Long(0)); + lastMeasurementTStamp.put(HELLO_SENT, new Long(0)); + lastMeasurementTStamp.put(ECHO_REQUEST_SENT, new Long(0)); + lastMeasurementTStamp.put(ECHO_REQUEST_RECEIVED, new Long(0)); + lastMeasurementTStamp.put(ECHO_REPLY_SENT, new Long(0)); + lastMeasurementTStamp.put(ECHO_REPLY_RECEIVED, new Long(0)); + lastMeasurementTStamp.put(EXCEPTION_CAUGHT, new Long(0)); + lastMeasurementTStamp.put(MESSAGE_RECEIVED, new Long(0)); + lastMeasurementTStamp.put(FEATURES_REQUEST_SENT, new Long(0)); + lastMeasurementTStamp.put(FEATURES_REQUEST_RECEIVED, new Long(0)); + lastMeasurementTStamp.put(FEATURES_REPLY_SENT, new Long(0)); + lastMeasurementTStamp.put(FEATURES_REPLY_RECEIVED, new Long(0)); + lastMeasurementTStamp.put(CONFIG_REQUEST_SENT, new Long(0)); + lastMeasurementTStamp.put(CONFIG_REQUEST_RECEIVED, new Long(0)); + lastMeasurementTStamp.put(CONFIG_REPLY_SENT, new Long(0)); + lastMeasurementTStamp.put(CONFIG_REPLY_RECEIVED, new Long(0)); + lastMeasurementTStamp.put(BARRIER_REQUEST_SENT, new Long(0)); + lastMeasurementTStamp.put(BARRIER_REPLY_RECEIVED, new Long(0)); + lastMeasurementTStamp.put(ERROR_MSG_RECEIVED, new Long(0)); + lastMeasurementTStamp.put(PORT_STATUS_RECEIVED, new Long(0)); + lastMeasurementTStamp.put(PACKET_IN_RECEIVED, new Long(0)); + lastMeasurementTStamp.put(FLOW_MOD_SENT, new Long(0)); + lastMeasurementTStamp.put(STATS_REQUEST_SENT, new Long(0)); + lastMeasurementTStamp.put(STATS_RESPONSE_RECEIVED, new Long(0)); + lastMeasurementTStamp.put(UPDATE_PHYSICAL_PORT, new Long(0)); + + /* + rateMap.put(HELLO_SENT, new Double(0.00000000)); + rateMap.put(FEATURES_REQUEST, new Double(0.00000000)); + rateMap.put(FEATURES_REPLY, new Double(0.00000000)); + rateMap.put(CONFIG_REQUEST, new Double(0.00000000)); + rateMap.put(CONFIG_REPLY, new Double(0.00000000)); + rateMap.put(PORT_STATUS, new Double(0.00000000)); + rateMap.put(PACKET_IN, new Double(0.00000000)); + rateMap.put(FLOW_MOD_SENT, new Double(0.00000000)); + + + history.put(HELLO_SENT, new ArrayList()); + history.put(FEATURES_REQUEST, new ArrayList()); + history.put(FEATURES_REPLY, new ArrayList()); + history.put(CONFIG_REQUEST, new ArrayList()); + history.put(CONFIG_REPLY, new ArrayList()); + history.put(PORT_STATUS, new ArrayList()); + history.put(PACKET_IN, new ArrayList()); + history.put(FLOW_MOD_SENT, new ArrayList()); + */ + + statsTaskHandle = this.hashedWheelTimer.newTimeout(new StatsOutTask(), + TASK_SCHEDULE_INITIAL_DELAY_IN_SECONDS, TimeUnit.SECONDS); + + } + + public void stopStatsHandler(){ + if (statsTaskHandle != null){ + statsTaskHandle.cancel(); + } + } + + public void reportPacketInProcessingTime(long duration){ + trackPktInProcessing++; + if (trackPktInProcessing > PKT_IN_PROCESSING_DURATION_SAMPLING_COUNT){ + packetInProcessingTimeList.add(new Long(duration)); + trackPktInProcessing = 0; + } + } + + public void addEntityForCounter(Integer entityID, String counterType){ + if (counterType.equalsIgnoreCase(ENTITY_COUNTER_RCV_MSG)){ + msgRcvEntityCounter.put(entityID, new AtomicLong(0)); + } + else{ + msgSndEntityCounter.put(entityID, new AtomicLong(0)); + } + } + + public void countForEntitySimpleMeasurement(Integer entityID, String counterType){ + if (counterType.equalsIgnoreCase(ENTITY_COUNTER_RCV_MSG)){ + //msgRcvEntityCounter.get(entityID).incrementAndGet(); + } + else{ + //msgSndEntityCounter.get(entityID).incrementAndGet(); + } + } + + + public void countForSimpleMeasurement(String counterName){ + currentCounterMap.get(counterName).incrementAndGet(); + } + + public void countForRateMeasurement(String counterName){ + + long currCntr = currentCounterMap.get(counterName).incrementAndGet(); + if (lastMeasurementTStamp.get(counterName) == 0){ + lastMeasurementTStamp.put(counterName, System.currentTimeMillis()); + } + + Long currentCount = new Long(currCntr); + Long lastCount = lastCounterMap.get(counterName).get(); + + //Double rate = 0.00000000000; + if ((currentCount - lastCount) == STATISTICS_RATE_INTERVAL){ + Long currentTime = System.currentTimeMillis(); + Long lastTime = lastMeasurementTStamp.get(counterName); + //rate = new Double((STATISTICS_RATE_INTERVAL/(currentTime-lastTime))*1000); //convert to count/sec + rawRateMeasurementData.add("CN:" + counterName + + ",CC:" + currentCount + + ",LC:" + lastCount + + ",CT:" + currentTime + + ",LT:" + lastTime + + ",CV:" + ((STATISTICS_RATE_INTERVAL/(currentTime-lastTime))*1000)); + lastCounterMap.put(counterName, new AtomicLong(currentCount)); + lastMeasurementTStamp.put(counterName, currentTime); + //history.get(counterName).add(String.valueOf(rate.doubleValue())); + //rateMap.put(counterName, rate); + } + + } + + + private class StatsOutTask implements TimerTask { + + @Override + public void run(Timeout timeout) throws Exception { + + statsTaskHandle = timeout; + logger.warn(">>>>>>Raw Counter values at controller BEGIN<<<<<<<<"); + + for (Entry entry : currentCounterMap.entrySet()){ + logger.warn("{} {}", entry.getKey(), entry.getValue()); + } + logger.warn(">>>>>>Counter values at controller END <<<<<<<<"); + + logger.warn(">>>>>>Entity Counter values at controller BEGIN<<<<<<<<"); + + for (Entry entry : msgRcvEntityCounter.entrySet()){ + logger.warn("SwitchID {} : Rcv Msg Count {}", entry.getKey(), entry.getValue()); + } + logger.warn(">>>>>>Entity Counter values at controller END <<<<<<<<"); + + logger.warn(">>>>>>Raw data rate values at controller BEGIN<<<<<<<<"); + + for (String str : rawRateMeasurementData ){ + logger.warn("{}", str); + } + logger.warn(">>>>>>Raw data rate values at controller END <<<<<<<<"); + + + if (packetInProcessingTimeList.size() > 0){ + logger.warn("================ MAX PACKET_IN PROC TIME in microseconds : {}", + Collections.max(packetInProcessingTimeList)/1000); + logger.warn("================ MIN PACKET_IN PROC TIME in microseconds : {}", + Collections.min(packetInProcessingTimeList)/1000); + long v = 0L; + int track = 0; + for (Long val : packetInProcessingTimeList){ + v = v + val.longValue(); + track++; + } + logger.warn("================ AVG PACKET_IN PROC TIME in microseconds : {}", + ((double)(v/track))/1000); + } + hashedWheelTimer.newTimeout(this, STATISTICS_PRINT_INTREVAL, TimeUnit.SECONDS); + } + } + +}