1 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
3 import java.util.ArrayList;
4 import java.util.Collections;
7 import java.util.Map.Entry;
8 import java.util.concurrent.ConcurrentHashMap;
9 import java.util.concurrent.TimeUnit;
10 import java.util.concurrent.atomic.AtomicLong;
12 import org.jboss.netty.util.HashedWheelTimer;
13 import org.jboss.netty.util.Timeout;
14 import org.jboss.netty.util.TimerTask;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
20 public class TrafficStatisticsHandler {
23 private static final Logger logger = LoggerFactory
24 .getLogger(EnhancedController.class);
26 private Timeout statsTaskHandle = null;
28 private Map<String, AtomicLong> currentCounterMap = new ConcurrentHashMap<String, AtomicLong>();
29 private Map<String, AtomicLong> lastCounterMap = new ConcurrentHashMap<String, AtomicLong>();
30 private Map<String, Long> lastMeasurementTStamp = new ConcurrentHashMap<String, Long>();
31 private List<String> rawRateMeasurementData = new ArrayList<String>();
33 private ConcurrentHashMap<Integer, AtomicLong> msgRcvEntityCounter =
34 new ConcurrentHashMap<Integer, AtomicLong>();
35 private ConcurrentHashMap<Integer, AtomicLong> msgSndEntityCounter =
36 new ConcurrentHashMap<Integer, AtomicLong>();
40 private static final long STATISTICS_RATE_INTERVAL = 25000;
41 private static final int STATISTICS_PRINT_INTREVAL = 180;
42 private static List<Long> packetInProcessingTimeList = new ArrayList<Long>();
43 private static List<Integer> pendingTaskCountList = new ArrayList<Integer>();
46 public static final String ENTITY_COUNTER_RCV_MSG = "SWITCHWISE_RCV_MSG_COUNT";
47 public static final String ENTITY_COUNTER_SND_MSG = "SWITCHWISE_SND_MSG_COUNT";
49 private HashedWheelTimer hashedWheelTimer = null;
51 public static final String ADDED_SWITCHES = "ADDED_SWITCHES";
52 public static final String CONNECTED_SWITCHES = "CONNECTED_SWITCHES";
53 public static final String DELETED_SWITCHES = "DELETED_SWITCHES";
54 public static final String DISCONNECTED_SWITCHES = "DISCONNECTED_SWITCHES";
55 public static final String SWITCH_ERROR = "SWITCH_ERROR";
59 public static final String EXCEPTION_CAUGHT = "EXCEPTION_CAUGHT";
60 public static final String MESSAGE_RECEIVED = "MESSAGE_RECEIVED"; // DO RATE-MEASUREMENTS
62 public static final String MSG_LISTENER_INVOCATION = "MSG_LISTENER_INVOCATION";
63 public static final String HELLO_RECEIVED = "HELLO_RECEIVED";
64 public static final String HELLO_SENT = "HELLO_SENT";
65 public static final String ECHO_REQUEST_SENT = "ECHO_REQUEST_SENT";
66 public static final String ECHO_REQUEST_RECEIVED = "ECHO_REQUEST_RECEIVED";
67 public static final String ECHO_REPLY_SENT = "ECHO_REPLY_SENT";
68 public static final String ECHO_REPLY_RECEIVED = "ECHO_REPLY_RECEIVED";
69 public static final String FEATURES_REQUEST_SENT = "FEATURES_REQUEST_SENT";
70 public static final String FEATURES_REQUEST_RECEIVED = "FEATURES_REQUEST_RECEIVED";
71 public static final String FEATURES_REPLY_SENT = "FEATURES_REPLY_SENT";
72 public static final String FEATURES_REPLY_RECEIVED = "FEATURES_REPLY_RECEIVED";
73 public static final String CONFIG_REQUEST_SENT = "CONFIG_REQUEST_SENT";
74 public static final String CONFIG_REQUEST_RECEIVED = "CONFIG_REQUEST_RECEIVED";
75 public static final String CONFIG_REPLY_SENT = "CONFIG_REPLY_SENT";
76 public static final String CONFIG_REPLY_RECEIVED = "CONFIG_REPLY_RECEIVED";
77 public static final String BARRIER_REQUEST_SENT = "BARRIER_REQUEST_SENT";
78 public static final String BARRIER_REPLY_RECEIVED = "BARRIER_REPLY_RECEIVED";
79 public static final String ERROR_MSG_RECEIVED = "ERROR_MSG_RECEIVED";
80 public static final String PORT_STATUS_RECEIVED = "PORT_STATUS";
81 public static final String PACKET_IN_RECEIVED = "PACKET_IN"; // DO RATE-MEASUREMENTS
82 public static final String FLOW_MOD_SENT = "FLOW_MOD_SENT"; // DO RATE-MEASUREMENTS ==> To be determined as to where to collect this data from
83 public static final String STATS_REQUEST_SENT = "STATS_REQUEST_SENT"; // DO RATE-MEASUREMENTS ==> To be determined as to where to collect this data from
84 public static final String STATS_RESPONSE_RECEIVED = "STATS_RESPONSE_RECEIVED";
86 public static final String UPDATE_PHYSICAL_PORT = "UPDATE_PHYSICAL_PORT";
88 private static final int TASK_SCHEDULE_INITIAL_DELAY_IN_SECONDS = 10;
90 private int trackPktInProcessing = 0;
91 private static final int PKT_IN_PROCESSING_DURATION_SAMPLING_COUNT = 100000;
94 public TrafficStatisticsHandler(HashedWheelTimer timer){
95 this.hashedWheelTimer = timer;
101 currentCounterMap.put(MSG_LISTENER_INVOCATION, new AtomicLong(0));
102 currentCounterMap.put(ADDED_SWITCHES, new AtomicLong(0));
103 currentCounterMap.put(DELETED_SWITCHES, new AtomicLong(0));
104 currentCounterMap.put(CONNECTED_SWITCHES, new AtomicLong(0));
105 currentCounterMap.put(DISCONNECTED_SWITCHES, new AtomicLong(0));
106 currentCounterMap.put(SWITCH_ERROR, new AtomicLong(0));
107 currentCounterMap.put(HELLO_RECEIVED, new AtomicLong(0));
108 currentCounterMap.put(HELLO_SENT, new AtomicLong(0));
109 currentCounterMap.put(ECHO_REQUEST_SENT, new AtomicLong(0));
110 currentCounterMap.put(ECHO_REQUEST_RECEIVED, new AtomicLong(0));
111 currentCounterMap.put(ECHO_REPLY_SENT, new AtomicLong(0));
112 currentCounterMap.put(ECHO_REPLY_RECEIVED, new AtomicLong(0));
113 currentCounterMap.put(EXCEPTION_CAUGHT, new AtomicLong(0));
114 currentCounterMap.put(MESSAGE_RECEIVED, new AtomicLong(0));
115 currentCounterMap.put(FEATURES_REQUEST_SENT, new AtomicLong(0));
116 currentCounterMap.put(FEATURES_REQUEST_RECEIVED, new AtomicLong(0));
117 currentCounterMap.put(FEATURES_REPLY_SENT, new AtomicLong(0));
118 currentCounterMap.put(FEATURES_REPLY_RECEIVED, new AtomicLong(0));
119 currentCounterMap.put(CONFIG_REQUEST_SENT, new AtomicLong(0));
120 currentCounterMap.put(CONFIG_REQUEST_RECEIVED, new AtomicLong(0));
121 currentCounterMap.put(CONFIG_REPLY_SENT, new AtomicLong(0));
122 currentCounterMap.put(CONFIG_REPLY_RECEIVED, new AtomicLong(0));
123 currentCounterMap.put(BARRIER_REQUEST_SENT, new AtomicLong(0));
124 currentCounterMap.put(BARRIER_REPLY_RECEIVED, new AtomicLong(0));
125 currentCounterMap.put(ERROR_MSG_RECEIVED, new AtomicLong(0));
126 currentCounterMap.put(PORT_STATUS_RECEIVED, new AtomicLong(0));
127 currentCounterMap.put(PACKET_IN_RECEIVED, new AtomicLong(0));
128 currentCounterMap.put(FLOW_MOD_SENT, new AtomicLong(0));
129 currentCounterMap.put(STATS_REQUEST_SENT, new AtomicLong(0));
130 currentCounterMap.put(STATS_RESPONSE_RECEIVED, new AtomicLong(0));
131 currentCounterMap.put(UPDATE_PHYSICAL_PORT, new AtomicLong(0));
133 lastCounterMap.put(MSG_LISTENER_INVOCATION, new AtomicLong(0));
134 lastCounterMap.put(ADDED_SWITCHES, new AtomicLong(0));
135 lastCounterMap.put(DELETED_SWITCHES, new AtomicLong(0));
136 lastCounterMap.put(CONNECTED_SWITCHES, new AtomicLong(0));
137 lastCounterMap.put(DISCONNECTED_SWITCHES, new AtomicLong(0));
138 lastCounterMap.put(SWITCH_ERROR, new AtomicLong(0));
139 lastCounterMap.put(HELLO_RECEIVED, new AtomicLong(0));
140 lastCounterMap.put(HELLO_SENT, new AtomicLong(0));
141 lastCounterMap.put(FEATURES_REQUEST_SENT, new AtomicLong(0));
142 lastCounterMap.put(FEATURES_REQUEST_RECEIVED, new AtomicLong(0));
143 lastCounterMap.put(ECHO_REQUEST_SENT, new AtomicLong(0));
144 lastCounterMap.put(ECHO_REQUEST_RECEIVED, new AtomicLong(0));
145 lastCounterMap.put(ECHO_REPLY_SENT, new AtomicLong(0));
146 lastCounterMap.put(ECHO_REPLY_RECEIVED, new AtomicLong(0));
147 lastCounterMap.put(EXCEPTION_CAUGHT, new AtomicLong(0));
148 lastCounterMap.put(MESSAGE_RECEIVED, new AtomicLong(0));
149 lastCounterMap.put(FEATURES_REPLY_SENT, new AtomicLong(0));
150 lastCounterMap.put(FEATURES_REPLY_RECEIVED, new AtomicLong(0));
151 lastCounterMap.put(CONFIG_REQUEST_SENT, new AtomicLong(0));
152 lastCounterMap.put(CONFIG_REQUEST_RECEIVED, new AtomicLong(0));
153 lastCounterMap.put(CONFIG_REPLY_SENT, new AtomicLong(0));
154 lastCounterMap.put(CONFIG_REPLY_RECEIVED, new AtomicLong(0));
155 lastCounterMap.put(BARRIER_REQUEST_SENT, new AtomicLong(0));
156 lastCounterMap.put(BARRIER_REPLY_RECEIVED, new AtomicLong(0));
157 lastCounterMap.put(ERROR_MSG_RECEIVED, new AtomicLong(0));
158 lastCounterMap.put(PORT_STATUS_RECEIVED, new AtomicLong(0));
159 lastCounterMap.put(PACKET_IN_RECEIVED, new AtomicLong(0));
160 lastCounterMap.put(FLOW_MOD_SENT, new AtomicLong(0));
161 lastCounterMap.put(STATS_REQUEST_SENT, new AtomicLong(0));
162 lastCounterMap.put(STATS_RESPONSE_RECEIVED, new AtomicLong(0));
163 lastCounterMap.put(UPDATE_PHYSICAL_PORT, new AtomicLong(0));
165 lastMeasurementTStamp.put(MSG_LISTENER_INVOCATION, new Long(0));
166 lastMeasurementTStamp.put(ADDED_SWITCHES, new Long(0));
167 lastMeasurementTStamp.put(DELETED_SWITCHES, new Long(0));
168 lastMeasurementTStamp.put(CONNECTED_SWITCHES, new Long(0));
169 lastMeasurementTStamp.put(DISCONNECTED_SWITCHES, new Long(0));
170 lastMeasurementTStamp.put(SWITCH_ERROR, new Long(0));
171 lastMeasurementTStamp.put(HELLO_RECEIVED, new Long(0));
172 lastMeasurementTStamp.put(HELLO_SENT, new Long(0));
173 lastMeasurementTStamp.put(ECHO_REQUEST_SENT, new Long(0));
174 lastMeasurementTStamp.put(ECHO_REQUEST_RECEIVED, new Long(0));
175 lastMeasurementTStamp.put(ECHO_REPLY_SENT, new Long(0));
176 lastMeasurementTStamp.put(ECHO_REPLY_RECEIVED, new Long(0));
177 lastMeasurementTStamp.put(EXCEPTION_CAUGHT, new Long(0));
178 lastMeasurementTStamp.put(MESSAGE_RECEIVED, new Long(0));
179 lastMeasurementTStamp.put(FEATURES_REQUEST_SENT, new Long(0));
180 lastMeasurementTStamp.put(FEATURES_REQUEST_RECEIVED, new Long(0));
181 lastMeasurementTStamp.put(FEATURES_REPLY_SENT, new Long(0));
182 lastMeasurementTStamp.put(FEATURES_REPLY_RECEIVED, new Long(0));
183 lastMeasurementTStamp.put(CONFIG_REQUEST_SENT, new Long(0));
184 lastMeasurementTStamp.put(CONFIG_REQUEST_RECEIVED, new Long(0));
185 lastMeasurementTStamp.put(CONFIG_REPLY_SENT, new Long(0));
186 lastMeasurementTStamp.put(CONFIG_REPLY_RECEIVED, new Long(0));
187 lastMeasurementTStamp.put(BARRIER_REQUEST_SENT, new Long(0));
188 lastMeasurementTStamp.put(BARRIER_REPLY_RECEIVED, new Long(0));
189 lastMeasurementTStamp.put(ERROR_MSG_RECEIVED, new Long(0));
190 lastMeasurementTStamp.put(PORT_STATUS_RECEIVED, new Long(0));
191 lastMeasurementTStamp.put(PACKET_IN_RECEIVED, new Long(0));
192 lastMeasurementTStamp.put(FLOW_MOD_SENT, new Long(0));
193 lastMeasurementTStamp.put(STATS_REQUEST_SENT, new Long(0));
194 lastMeasurementTStamp.put(STATS_RESPONSE_RECEIVED, new Long(0));
195 lastMeasurementTStamp.put(UPDATE_PHYSICAL_PORT, new Long(0));
198 rateMap.put(HELLO_SENT, new Double(0.00000000));
199 rateMap.put(FEATURES_REQUEST, new Double(0.00000000));
200 rateMap.put(FEATURES_REPLY, new Double(0.00000000));
201 rateMap.put(CONFIG_REQUEST, new Double(0.00000000));
202 rateMap.put(CONFIG_REPLY, new Double(0.00000000));
203 rateMap.put(PORT_STATUS, new Double(0.00000000));
204 rateMap.put(PACKET_IN, new Double(0.00000000));
205 rateMap.put(FLOW_MOD_SENT, new Double(0.00000000));
208 history.put(HELLO_SENT, new ArrayList());
209 history.put(FEATURES_REQUEST, new ArrayList());
210 history.put(FEATURES_REPLY, new ArrayList());
211 history.put(CONFIG_REQUEST, new ArrayList());
212 history.put(CONFIG_REPLY, new ArrayList());
213 history.put(PORT_STATUS, new ArrayList());
214 history.put(PACKET_IN, new ArrayList());
215 history.put(FLOW_MOD_SENT, new ArrayList());
218 statsTaskHandle = this.hashedWheelTimer.newTimeout(new StatsOutTask(),
219 TASK_SCHEDULE_INITIAL_DELAY_IN_SECONDS, TimeUnit.SECONDS);
223 public void stopStatsHandler(){
224 if (statsTaskHandle != null){
225 statsTaskHandle.cancel();
229 public void reportPacketInProcessingTime(long duration){
230 trackPktInProcessing++;
231 if (trackPktInProcessing > PKT_IN_PROCESSING_DURATION_SAMPLING_COUNT){
232 packetInProcessingTimeList.add(new Long(duration));
233 trackPktInProcessing = 0;
237 public void addEntityForCounter(Integer entityID, String counterType){
238 if (counterType.equalsIgnoreCase(ENTITY_COUNTER_RCV_MSG)){
239 msgRcvEntityCounter.put(entityID, new AtomicLong(0));
242 msgSndEntityCounter.put(entityID, new AtomicLong(0));
246 public void countForEntitySimpleMeasurement(Integer entityID, String counterType){
247 if (counterType.equalsIgnoreCase(ENTITY_COUNTER_RCV_MSG)){
248 //msgRcvEntityCounter.get(entityID).incrementAndGet();
251 //msgSndEntityCounter.get(entityID).incrementAndGet();
256 public void countForSimpleMeasurement(String counterName){
257 currentCounterMap.get(counterName).incrementAndGet();
260 public void countForRateMeasurement(String counterName){
262 long currCntr = currentCounterMap.get(counterName).incrementAndGet();
263 if (lastMeasurementTStamp.get(counterName) == 0){
264 lastMeasurementTStamp.put(counterName, System.currentTimeMillis());
267 Long currentCount = new Long(currCntr);
268 Long lastCount = lastCounterMap.get(counterName).get();
270 //Double rate = 0.00000000000;
271 if ((currentCount - lastCount) == STATISTICS_RATE_INTERVAL){
272 Long currentTime = System.currentTimeMillis();
273 Long lastTime = lastMeasurementTStamp.get(counterName);
274 //rate = new Double((STATISTICS_RATE_INTERVAL/(currentTime-lastTime))*1000); //convert to count/sec
275 rawRateMeasurementData.add("CN:" + counterName +
276 ",CC:" + currentCount +
278 ",CT:" + currentTime +
280 ",CV:" + ((STATISTICS_RATE_INTERVAL/(currentTime-lastTime))*1000));
281 lastCounterMap.put(counterName, new AtomicLong(currentCount));
282 lastMeasurementTStamp.put(counterName, currentTime);
283 //history.get(counterName).add(String.valueOf(rate.doubleValue()));
284 //rateMap.put(counterName, rate);
290 private class StatsOutTask implements TimerTask {
293 public void run(Timeout timeout) throws Exception {
295 statsTaskHandle = timeout;
296 logger.warn(">>>>>>Raw Counter values at controller BEGIN<<<<<<<<");
298 for (Entry<String, AtomicLong> entry : currentCounterMap.entrySet()){
299 logger.warn("{} {}", entry.getKey(), entry.getValue());
301 logger.warn(">>>>>>Counter values at controller END <<<<<<<<");
303 logger.warn(">>>>>>Entity Counter values at controller BEGIN<<<<<<<<");
305 for (Entry<Integer, AtomicLong> entry : msgRcvEntityCounter.entrySet()){
306 logger.warn("SwitchID {} : Rcv Msg Count {}", entry.getKey(), entry.getValue());
308 logger.warn(">>>>>>Entity Counter values at controller END <<<<<<<<");
310 logger.warn(">>>>>>Raw data rate values at controller BEGIN<<<<<<<<");
312 for (String str : rawRateMeasurementData ){
313 logger.warn("{}", str);
315 logger.warn(">>>>>>Raw data rate values at controller END <<<<<<<<");
318 if (packetInProcessingTimeList.size() > 0){
319 logger.warn("================ MAX PACKET_IN PROC TIME in microseconds : {}",
320 Collections.max(packetInProcessingTimeList)/1000);
321 logger.warn("================ MIN PACKET_IN PROC TIME in microseconds : {}",
322 Collections.min(packetInProcessingTimeList)/1000);
325 for (Long val : packetInProcessingTimeList){
326 v = v + val.longValue();
329 logger.warn("================ AVG PACKET_IN PROC TIME in microseconds : {}",
330 ((double)(v/track))/1000);
332 hashedWheelTimer.newTimeout(this, STATISTICS_PRINT_INTREVAL, TimeUnit.SECONDS);