package org.opendaylight.controller.protocol_plugin.openflow.core.internal; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.jboss.netty.util.HashedWheelTimer; import org.jboss.netty.util.Timeout; import org.jboss.netty.util.TimerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TrafficStatisticsHandler { private static final Logger logger = LoggerFactory .getLogger(EnhancedController.class); private Timeout statsTaskHandle = null; private Map currentCounterMap = new ConcurrentHashMap(); private Map lastCounterMap = new ConcurrentHashMap(); private Map lastMeasurementTStamp = new ConcurrentHashMap(); private List rawRateMeasurementData = new ArrayList(); private ConcurrentHashMap msgRcvEntityCounter = new ConcurrentHashMap(); private ConcurrentHashMap msgSndEntityCounter = new ConcurrentHashMap(); private static final long STATISTICS_RATE_INTERVAL = 25000; private static final int STATISTICS_PRINT_INTREVAL = 180; private static List packetInProcessingTimeList = new ArrayList(); private static List pendingTaskCountList = new ArrayList(); public static final String ENTITY_COUNTER_RCV_MSG = "SWITCHWISE_RCV_MSG_COUNT"; public static final String ENTITY_COUNTER_SND_MSG = "SWITCHWISE_SND_MSG_COUNT"; private HashedWheelTimer hashedWheelTimer = null; public static final String ADDED_SWITCHES = "ADDED_SWITCHES"; public static final String CONNECTED_SWITCHES = "CONNECTED_SWITCHES"; public static final String DELETED_SWITCHES = "DELETED_SWITCHES"; public static final String DISCONNECTED_SWITCHES = "DISCONNECTED_SWITCHES"; public static final String SWITCH_ERROR = "SWITCH_ERROR"; public static final String EXCEPTION_CAUGHT = "EXCEPTION_CAUGHT"; public static final String MESSAGE_RECEIVED = "MESSAGE_RECEIVED"; // DO RATE-MEASUREMENTS public static final String MSG_LISTENER_INVOCATION = "MSG_LISTENER_INVOCATION"; public static final String HELLO_RECEIVED = "HELLO_RECEIVED"; public static final String HELLO_SENT = "HELLO_SENT"; public static final String ECHO_REQUEST_SENT = "ECHO_REQUEST_SENT"; public static final String ECHO_REQUEST_RECEIVED = "ECHO_REQUEST_RECEIVED"; public static final String ECHO_REPLY_SENT = "ECHO_REPLY_SENT"; public static final String ECHO_REPLY_RECEIVED = "ECHO_REPLY_RECEIVED"; public static final String FEATURES_REQUEST_SENT = "FEATURES_REQUEST_SENT"; public static final String FEATURES_REQUEST_RECEIVED = "FEATURES_REQUEST_RECEIVED"; public static final String FEATURES_REPLY_SENT = "FEATURES_REPLY_SENT"; public static final String FEATURES_REPLY_RECEIVED = "FEATURES_REPLY_RECEIVED"; public static final String CONFIG_REQUEST_SENT = "CONFIG_REQUEST_SENT"; public static final String CONFIG_REQUEST_RECEIVED = "CONFIG_REQUEST_RECEIVED"; public static final String CONFIG_REPLY_SENT = "CONFIG_REPLY_SENT"; public static final String CONFIG_REPLY_RECEIVED = "CONFIG_REPLY_RECEIVED"; public static final String BARRIER_REQUEST_SENT = "BARRIER_REQUEST_SENT"; public static final String BARRIER_REPLY_RECEIVED = "BARRIER_REPLY_RECEIVED"; public static final String ERROR_MSG_RECEIVED = "ERROR_MSG_RECEIVED"; public static final String PORT_STATUS_RECEIVED = "PORT_STATUS"; public static final String PACKET_IN_RECEIVED = "PACKET_IN"; // DO RATE-MEASUREMENTS public static final String FLOW_MOD_SENT = "FLOW_MOD_SENT"; // DO RATE-MEASUREMENTS ==> To be determined as to where to collect this data from public static final String STATS_REQUEST_SENT = "STATS_REQUEST_SENT"; // DO RATE-MEASUREMENTS ==> To be determined as to where to collect this data from public static final String STATS_RESPONSE_RECEIVED = "STATS_RESPONSE_RECEIVED"; public static final String UPDATE_PHYSICAL_PORT = "UPDATE_PHYSICAL_PORT"; private static final int TASK_SCHEDULE_INITIAL_DELAY_IN_SECONDS = 10; private int trackPktInProcessing = 0; private static final int PKT_IN_PROCESSING_DURATION_SAMPLING_COUNT = 100000; public TrafficStatisticsHandler(HashedWheelTimer timer){ this.hashedWheelTimer = timer; } public void init(){ currentCounterMap.put(MSG_LISTENER_INVOCATION, new AtomicLong(0)); currentCounterMap.put(ADDED_SWITCHES, new AtomicLong(0)); currentCounterMap.put(DELETED_SWITCHES, new AtomicLong(0)); currentCounterMap.put(CONNECTED_SWITCHES, new AtomicLong(0)); currentCounterMap.put(DISCONNECTED_SWITCHES, new AtomicLong(0)); currentCounterMap.put(SWITCH_ERROR, new AtomicLong(0)); currentCounterMap.put(HELLO_RECEIVED, new AtomicLong(0)); currentCounterMap.put(HELLO_SENT, new AtomicLong(0)); currentCounterMap.put(ECHO_REQUEST_SENT, new AtomicLong(0)); currentCounterMap.put(ECHO_REQUEST_RECEIVED, new AtomicLong(0)); currentCounterMap.put(ECHO_REPLY_SENT, new AtomicLong(0)); currentCounterMap.put(ECHO_REPLY_RECEIVED, new AtomicLong(0)); currentCounterMap.put(EXCEPTION_CAUGHT, new AtomicLong(0)); currentCounterMap.put(MESSAGE_RECEIVED, new AtomicLong(0)); currentCounterMap.put(FEATURES_REQUEST_SENT, new AtomicLong(0)); currentCounterMap.put(FEATURES_REQUEST_RECEIVED, new AtomicLong(0)); currentCounterMap.put(FEATURES_REPLY_SENT, new AtomicLong(0)); currentCounterMap.put(FEATURES_REPLY_RECEIVED, new AtomicLong(0)); currentCounterMap.put(CONFIG_REQUEST_SENT, new AtomicLong(0)); currentCounterMap.put(CONFIG_REQUEST_RECEIVED, new AtomicLong(0)); currentCounterMap.put(CONFIG_REPLY_SENT, new AtomicLong(0)); currentCounterMap.put(CONFIG_REPLY_RECEIVED, new AtomicLong(0)); currentCounterMap.put(BARRIER_REQUEST_SENT, new AtomicLong(0)); currentCounterMap.put(BARRIER_REPLY_RECEIVED, new AtomicLong(0)); currentCounterMap.put(ERROR_MSG_RECEIVED, new AtomicLong(0)); currentCounterMap.put(PORT_STATUS_RECEIVED, new AtomicLong(0)); currentCounterMap.put(PACKET_IN_RECEIVED, new AtomicLong(0)); currentCounterMap.put(FLOW_MOD_SENT, new AtomicLong(0)); currentCounterMap.put(STATS_REQUEST_SENT, new AtomicLong(0)); currentCounterMap.put(STATS_RESPONSE_RECEIVED, new AtomicLong(0)); currentCounterMap.put(UPDATE_PHYSICAL_PORT, new AtomicLong(0)); lastCounterMap.put(MSG_LISTENER_INVOCATION, new AtomicLong(0)); lastCounterMap.put(ADDED_SWITCHES, new AtomicLong(0)); lastCounterMap.put(DELETED_SWITCHES, new AtomicLong(0)); lastCounterMap.put(CONNECTED_SWITCHES, new AtomicLong(0)); lastCounterMap.put(DISCONNECTED_SWITCHES, new AtomicLong(0)); lastCounterMap.put(SWITCH_ERROR, new AtomicLong(0)); lastCounterMap.put(HELLO_RECEIVED, new AtomicLong(0)); lastCounterMap.put(HELLO_SENT, new AtomicLong(0)); lastCounterMap.put(FEATURES_REQUEST_SENT, new AtomicLong(0)); lastCounterMap.put(FEATURES_REQUEST_RECEIVED, new AtomicLong(0)); lastCounterMap.put(ECHO_REQUEST_SENT, new AtomicLong(0)); lastCounterMap.put(ECHO_REQUEST_RECEIVED, new AtomicLong(0)); lastCounterMap.put(ECHO_REPLY_SENT, new AtomicLong(0)); lastCounterMap.put(ECHO_REPLY_RECEIVED, new AtomicLong(0)); lastCounterMap.put(EXCEPTION_CAUGHT, new AtomicLong(0)); lastCounterMap.put(MESSAGE_RECEIVED, new AtomicLong(0)); lastCounterMap.put(FEATURES_REPLY_SENT, new AtomicLong(0)); lastCounterMap.put(FEATURES_REPLY_RECEIVED, new AtomicLong(0)); lastCounterMap.put(CONFIG_REQUEST_SENT, new AtomicLong(0)); lastCounterMap.put(CONFIG_REQUEST_RECEIVED, new AtomicLong(0)); lastCounterMap.put(CONFIG_REPLY_SENT, new AtomicLong(0)); lastCounterMap.put(CONFIG_REPLY_RECEIVED, new AtomicLong(0)); lastCounterMap.put(BARRIER_REQUEST_SENT, new AtomicLong(0)); lastCounterMap.put(BARRIER_REPLY_RECEIVED, new AtomicLong(0)); lastCounterMap.put(ERROR_MSG_RECEIVED, new AtomicLong(0)); lastCounterMap.put(PORT_STATUS_RECEIVED, new AtomicLong(0)); lastCounterMap.put(PACKET_IN_RECEIVED, new AtomicLong(0)); lastCounterMap.put(FLOW_MOD_SENT, new AtomicLong(0)); lastCounterMap.put(STATS_REQUEST_SENT, new AtomicLong(0)); lastCounterMap.put(STATS_RESPONSE_RECEIVED, new AtomicLong(0)); lastCounterMap.put(UPDATE_PHYSICAL_PORT, new AtomicLong(0)); lastMeasurementTStamp.put(MSG_LISTENER_INVOCATION, new Long(0)); lastMeasurementTStamp.put(ADDED_SWITCHES, new Long(0)); lastMeasurementTStamp.put(DELETED_SWITCHES, new Long(0)); lastMeasurementTStamp.put(CONNECTED_SWITCHES, new Long(0)); lastMeasurementTStamp.put(DISCONNECTED_SWITCHES, new Long(0)); lastMeasurementTStamp.put(SWITCH_ERROR, new Long(0)); lastMeasurementTStamp.put(HELLO_RECEIVED, new Long(0)); lastMeasurementTStamp.put(HELLO_SENT, new Long(0)); lastMeasurementTStamp.put(ECHO_REQUEST_SENT, new Long(0)); lastMeasurementTStamp.put(ECHO_REQUEST_RECEIVED, new Long(0)); lastMeasurementTStamp.put(ECHO_REPLY_SENT, new Long(0)); lastMeasurementTStamp.put(ECHO_REPLY_RECEIVED, new Long(0)); lastMeasurementTStamp.put(EXCEPTION_CAUGHT, new Long(0)); lastMeasurementTStamp.put(MESSAGE_RECEIVED, new Long(0)); lastMeasurementTStamp.put(FEATURES_REQUEST_SENT, new Long(0)); lastMeasurementTStamp.put(FEATURES_REQUEST_RECEIVED, new Long(0)); lastMeasurementTStamp.put(FEATURES_REPLY_SENT, new Long(0)); lastMeasurementTStamp.put(FEATURES_REPLY_RECEIVED, new Long(0)); lastMeasurementTStamp.put(CONFIG_REQUEST_SENT, new Long(0)); lastMeasurementTStamp.put(CONFIG_REQUEST_RECEIVED, new Long(0)); lastMeasurementTStamp.put(CONFIG_REPLY_SENT, new Long(0)); lastMeasurementTStamp.put(CONFIG_REPLY_RECEIVED, new Long(0)); lastMeasurementTStamp.put(BARRIER_REQUEST_SENT, new Long(0)); lastMeasurementTStamp.put(BARRIER_REPLY_RECEIVED, new Long(0)); lastMeasurementTStamp.put(ERROR_MSG_RECEIVED, new Long(0)); lastMeasurementTStamp.put(PORT_STATUS_RECEIVED, new Long(0)); lastMeasurementTStamp.put(PACKET_IN_RECEIVED, new Long(0)); lastMeasurementTStamp.put(FLOW_MOD_SENT, new Long(0)); lastMeasurementTStamp.put(STATS_REQUEST_SENT, new Long(0)); lastMeasurementTStamp.put(STATS_RESPONSE_RECEIVED, new Long(0)); lastMeasurementTStamp.put(UPDATE_PHYSICAL_PORT, new Long(0)); /* rateMap.put(HELLO_SENT, new Double(0.00000000)); rateMap.put(FEATURES_REQUEST, new Double(0.00000000)); rateMap.put(FEATURES_REPLY, new Double(0.00000000)); rateMap.put(CONFIG_REQUEST, new Double(0.00000000)); rateMap.put(CONFIG_REPLY, new Double(0.00000000)); rateMap.put(PORT_STATUS, new Double(0.00000000)); rateMap.put(PACKET_IN, new Double(0.00000000)); rateMap.put(FLOW_MOD_SENT, new Double(0.00000000)); history.put(HELLO_SENT, new ArrayList()); history.put(FEATURES_REQUEST, new ArrayList()); history.put(FEATURES_REPLY, new ArrayList()); history.put(CONFIG_REQUEST, new ArrayList()); history.put(CONFIG_REPLY, new ArrayList()); history.put(PORT_STATUS, new ArrayList()); history.put(PACKET_IN, new ArrayList()); history.put(FLOW_MOD_SENT, new ArrayList()); */ statsTaskHandle = this.hashedWheelTimer.newTimeout(new StatsOutTask(), TASK_SCHEDULE_INITIAL_DELAY_IN_SECONDS, TimeUnit.SECONDS); } public void stopStatsHandler(){ if (statsTaskHandle != null){ statsTaskHandle.cancel(); } } public void reportPacketInProcessingTime(long duration){ trackPktInProcessing++; if (trackPktInProcessing > PKT_IN_PROCESSING_DURATION_SAMPLING_COUNT){ packetInProcessingTimeList.add(new Long(duration)); trackPktInProcessing = 0; } } public void addEntityForCounter(Integer entityID, String counterType){ if (counterType.equalsIgnoreCase(ENTITY_COUNTER_RCV_MSG)){ msgRcvEntityCounter.put(entityID, new AtomicLong(0)); } else{ msgSndEntityCounter.put(entityID, new AtomicLong(0)); } } public void countForEntitySimpleMeasurement(Integer entityID, String counterType){ if (counterType.equalsIgnoreCase(ENTITY_COUNTER_RCV_MSG)){ //msgRcvEntityCounter.get(entityID).incrementAndGet(); } else{ //msgSndEntityCounter.get(entityID).incrementAndGet(); } } public void countForSimpleMeasurement(String counterName){ currentCounterMap.get(counterName).incrementAndGet(); } public void countForRateMeasurement(String counterName){ long currCntr = currentCounterMap.get(counterName).incrementAndGet(); if (lastMeasurementTStamp.get(counterName) == 0){ lastMeasurementTStamp.put(counterName, System.currentTimeMillis()); } Long currentCount = new Long(currCntr); Long lastCount = lastCounterMap.get(counterName).get(); //Double rate = 0.00000000000; if ((currentCount - lastCount) == STATISTICS_RATE_INTERVAL){ Long currentTime = System.currentTimeMillis(); Long lastTime = lastMeasurementTStamp.get(counterName); //rate = new Double((STATISTICS_RATE_INTERVAL/(currentTime-lastTime))*1000); //convert to count/sec rawRateMeasurementData.add("CN:" + counterName + ",CC:" + currentCount + ",LC:" + lastCount + ",CT:" + currentTime + ",LT:" + lastTime + ",CV:" + ((STATISTICS_RATE_INTERVAL/(currentTime-lastTime))*1000)); lastCounterMap.put(counterName, new AtomicLong(currentCount)); lastMeasurementTStamp.put(counterName, currentTime); //history.get(counterName).add(String.valueOf(rate.doubleValue())); //rateMap.put(counterName, rate); } } private class StatsOutTask implements TimerTask { @Override public void run(Timeout timeout) throws Exception { statsTaskHandle = timeout; logger.warn(">>>>>>Raw Counter values at controller BEGIN<<<<<<<<"); for (Entry entry : currentCounterMap.entrySet()){ logger.warn("{} {}", entry.getKey(), entry.getValue()); } logger.warn(">>>>>>Counter values at controller END <<<<<<<<"); logger.warn(">>>>>>Entity Counter values at controller BEGIN<<<<<<<<"); for (Entry entry : msgRcvEntityCounter.entrySet()){ logger.warn("SwitchID {} : Rcv Msg Count {}", entry.getKey(), entry.getValue()); } logger.warn(">>>>>>Entity Counter values at controller END <<<<<<<<"); logger.warn(">>>>>>Raw data rate values at controller BEGIN<<<<<<<<"); for (String str : rawRateMeasurementData ){ logger.warn("{}", str); } logger.warn(">>>>>>Raw data rate values at controller END <<<<<<<<"); if (packetInProcessingTimeList.size() > 0){ logger.warn("================ MAX PACKET_IN PROC TIME in microseconds : {}", Collections.max(packetInProcessingTimeList)/1000); logger.warn("================ MIN PACKET_IN PROC TIME in microseconds : {}", Collections.min(packetInProcessingTimeList)/1000); long v = 0L; int track = 0; for (Long val : packetInProcessingTimeList){ v = v + val.longValue(); track++; } logger.warn("================ AVG PACKET_IN PROC TIME in microseconds : {}", ((double)(v/track))/1000); } hashedWheelTimer.newTimeout(this, STATISTICS_PRINT_INTREVAL, TimeUnit.SECONDS); } } }