This is a fix for the latch to countdown in case of exceptions and
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / OFStatisticsManager.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.nio.ByteBuffer;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.Deque;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.Map.Entry;
19 import java.util.Set;
20 import java.util.Timer;
21 import java.util.TimerTask;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.CopyOnWriteArraySet;
26 import java.util.concurrent.LinkedBlockingDeque;
27 import java.util.concurrent.LinkedBlockingQueue;
28
29 import org.eclipse.osgi.framework.console.CommandInterpreter;
30 import org.eclipse.osgi.framework.console.CommandProvider;
31 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
32 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsListener;
33 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
34 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
36 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
37 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
38 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
39 import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
40 import org.opendaylight.controller.sal.core.Node;
41 import org.opendaylight.controller.sal.core.NodeConnector;
42 import org.opendaylight.controller.sal.core.Property;
43 import org.opendaylight.controller.sal.core.UpdateType;
44 import org.opendaylight.controller.sal.utils.HexEncode;
45 import org.openflow.protocol.OFError;
46 import org.openflow.protocol.OFMatch;
47 import org.openflow.protocol.OFPort;
48 import org.openflow.protocol.OFStatisticsRequest;
49 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
50 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
51 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
52 import org.openflow.protocol.statistics.OFPortStatisticsReply;
53 import org.openflow.protocol.statistics.OFPortStatisticsRequest;
54 import org.openflow.protocol.statistics.OFQueueStatisticsRequest;
55 import org.openflow.protocol.statistics.OFStatistics;
56 import org.openflow.protocol.statistics.OFStatisticsType;
57 import org.openflow.protocol.statistics.OFTableStatistics;
58 import org.openflow.protocol.statistics.OFVendorStatistics;
59 import org.openflow.util.HexString;
60 import org.osgi.framework.BundleContext;
61 import org.osgi.framework.FrameworkUtil;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 /**
66  * It periodically polls the different OF statistics from the OF switches and
67  * caches them for quick retrieval for the above layers' modules It also
68  * provides an API to directly query the switch about the statistics
69  */
70 public class OFStatisticsManager implements IOFStatisticsManager,
71 IInventoryShimExternalListener, CommandProvider {
72     private static final Logger log = LoggerFactory.getLogger(OFStatisticsManager.class);
73     private static final int INITIAL_SIZE = 64;
74     private static final long FLOW_STATS_PERIOD = 10000;
75     private static final long DESC_STATS_PERIOD = 60000;
76     private static final long PORT_STATS_PERIOD = 5000;
77     private static final long TABLE_STATS_PERIOD = 10000;
78     private static final long TICK = 1000;
79     private static short statisticsTickNumber = (short) (FLOW_STATS_PERIOD / TICK);
80     private static short descriptionTickNumber = (short) (DESC_STATS_PERIOD / TICK);
81     private static short portTickNumber = (short) (PORT_STATS_PERIOD / TICK);
82     private static short tableTickNumber = (short) (TABLE_STATS_PERIOD / TICK);
83     private static short factoredSamples = (short) 2;
84     private static short counter = 1;
85     private IController controller = null;
86     private ConcurrentMap<Long, List<OFStatistics>> flowStatistics;
87     private ConcurrentMap<Long, List<OFStatistics>> descStatistics;
88     private ConcurrentMap<Long, List<OFStatistics>> portStatistics;
89     private ConcurrentMap<Long, List<OFStatistics>> tableStatistics;
90     private List<OFStatistics> dummyList;
91     private ConcurrentMap<Long, StatisticsTicks> statisticsTimerTicks;
92     protected BlockingQueue<StatsRequest> pendingStatsRequests;
93     protected BlockingQueue<Long> switchPortStatsUpdated;
94     private Thread statisticsCollector;
95     private Thread txRatesUpdater;
96     private Timer statisticsTimer;
97     private TimerTask statisticsTimerTask;
98     private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
99     // Per port sampled (every portStatsPeriod) transmit rate
100     private Map<Long, Map<Short, TxRates>> txRates;
101     private Set<IOFStatisticsListener> statisticsListeners;
102
103     /**
104      * The object containing the latest factoredSamples tx rate samples for a
105      * given switch port
106      */
107     protected class TxRates {
108         // contains the latest factoredSamples sampled transmitted bytes
109         Deque<Long> sampledTxBytes;
110
111         public TxRates() {
112             sampledTxBytes = new LinkedBlockingDeque<Long>();
113         }
114
115         public void update(Long txBytes) {
116             /*
117              * Based on how many samples our average works on, we might have to
118              * remove the oldest sample
119              */
120             if (sampledTxBytes.size() == factoredSamples) {
121                 sampledTxBytes.removeLast();
122             }
123
124             // Add the latest sample to the top of the queue
125             sampledTxBytes.addFirst(txBytes);
126         }
127
128         /**
129          * Returns the average transmit rate in bps
130          *
131          * @return the average transmit rate [bps]
132          */
133         public long getAverageTxRate() {
134             long average = 0;
135             /*
136              * If we cannot provide the value for the time window length set
137              */
138             if (sampledTxBytes.size() < factoredSamples) {
139                 return average;
140             }
141             long increment = sampledTxBytes.getFirst() - sampledTxBytes
142                     .getLast();
143             long timePeriod = factoredSamples * PORT_STATS_PERIOD / TICK;
144             average = (8L * increment) / timePeriod;
145             return average;
146         }
147     }
148
149     public void setController(IController core) {
150         this.controller = core;
151     }
152
153     public void unsetController(IController core) {
154         if (this.controller == core) {
155             this.controller = null;
156         }
157     }
158
159     private short getStatsQueueSize() {
160         String statsQueueSizeStr = System.getProperty("of.statsQueueSize");
161         short statsQueueSize = INITIAL_SIZE;
162         if (statsQueueSizeStr != null) {
163             try {
164                 statsQueueSize = Short.parseShort(statsQueueSizeStr);
165                 if (statsQueueSize <= 0) {
166                     statsQueueSize = INITIAL_SIZE;
167                 }
168             } catch (Exception e) {
169             }
170         }
171         return statsQueueSize;
172     }
173
174     IPluginOutConnectionService connectionPluginOutService;
175     void setIPluginOutConnectionService(IPluginOutConnectionService s) {
176         connectionPluginOutService = s;
177     }
178
179     void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
180         if (connectionPluginOutService == s) {
181             connectionPluginOutService = null;
182         }
183     }
184
185     /**
186      * Function called by the dependency manager when all the required
187      * dependencies are satisfied
188      *
189      */
190     void init() {
191         flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
192         descStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
193         portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
194         tableStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
195         dummyList = new ArrayList<OFStatistics>(1);
196         pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(getStatsQueueSize());
197         statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(INITIAL_SIZE);
198         switchPortStatsUpdated = new LinkedBlockingQueue<Long>(INITIAL_SIZE);
199         switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(INITIAL_SIZE);
200         txRates = new HashMap<Long, Map<Short, TxRates>>(INITIAL_SIZE);
201         statisticsListeners = new CopyOnWriteArraySet<IOFStatisticsListener>();
202
203         configStatsPollIntervals();
204
205         // Initialize managed timers
206         statisticsTimer = new Timer("Statistics Timer Ticks");
207         statisticsTimerTask = new TimerTask() {
208             @Override
209             public void run() {
210                 decrementTicks();
211             }
212         };
213
214         // Initialize Statistics collector thread
215         statisticsCollector = new Thread(new Runnable() {
216             @Override
217             public void run() {
218                 while (true) {
219                     try {
220                         StatsRequest req = pendingStatsRequests.take();
221                         queryStatisticsInternal(req.switchId, req.type);
222                     } catch (InterruptedException e) {
223                         log.warn("Flow Statistics Collector thread "
224                                 + "interrupted", e);
225                         return;
226                     }
227                 }
228             }
229         }, "Statistics Collector");
230
231         // Initialize Tx Rate Updater thread
232         txRatesUpdater = new Thread(new Runnable() {
233             @Override
234             public void run() {
235                 while (true) {
236                     try {
237                         long switchId = switchPortStatsUpdated.take();
238                         updatePortsTxRate(switchId);
239                     } catch (InterruptedException e) {
240                         log.warn("TX Rate Updater thread interrupted", e);
241                         return;
242                     }
243                 }
244             }
245         }, "TX Rate Updater");
246     }
247
248     /**
249      * Function called by the dependency manager when at least one dependency
250      * become unsatisfied or when the component is shutting down because for
251      * example bundle is being stopped.
252      *
253      */
254     void destroy() {
255     }
256
257     /**
258      * Function called by dependency manager after "init ()" is called and after
259      * the services provided by the class are registered in the service registry
260      *
261      */
262     void start() {
263         // Start managed timers
264         statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, TICK);
265
266         // Start statistics collector thread
267         statisticsCollector.start();
268
269         // Start bandwidth utilization computer thread
270         txRatesUpdater.start();
271
272         // OSGI console
273         registerWithOSGIConsole();
274     }
275
276     /**
277      * Function called by the dependency manager before the services exported by
278      * the component are unregistered, this will be followed by a "destroy ()"
279      * calls
280      *
281      */
282     void stop() {
283         // Stop managed timers
284         statisticsTimer.cancel();
285     }
286
287     public void setStatisticsListener(IOFStatisticsListener s) {
288         this.statisticsListeners.add(s);
289     }
290
291     public void unsetStatisticsListener(IOFStatisticsListener s) {
292         if (s != null) {
293             this.statisticsListeners.remove(s);
294         }
295     }
296
297     private void registerWithOSGIConsole() {
298         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
299         bundleContext.registerService(CommandProvider.class.getName(), this, null);
300     }
301
302     private static class StatsRequest {
303         protected Long switchId;
304         protected OFStatisticsType type;
305
306         public StatsRequest(Long d, OFStatisticsType t) {
307             switchId = d;
308             type = t;
309         }
310
311         @Override
312         public String toString() {
313             return "SReq = {switchId=" + switchId + ", type=" + type + "}";
314         }
315
316         @Override
317         public int hashCode() {
318             final int prime = 31;
319             int result = 1;
320             result = prime * result
321                     + ((switchId == null) ? 0 : switchId.hashCode());
322             result = prime * result + ((type == null) ? 0 : type.ordinal());
323             return result;
324         }
325
326         @Override
327         public boolean equals(Object obj) {
328             if (this == obj) {
329                 return true;
330             }
331             if (obj == null) {
332                 return false;
333             }
334             if (getClass() != obj.getClass()) {
335                 return false;
336             }
337             StatsRequest other = (StatsRequest) obj;
338             if (switchId == null) {
339                 if (other.switchId != null) {
340                     return false;
341                 }
342             } else if (!switchId.equals(other.switchId)) {
343                 return false;
344             }
345             if (type != other.type) {
346                 return false;
347             }
348             return true;
349         }
350     }
351
352     private void addStatisticsTicks(Long switchId) {
353         switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume
354                                                                   // switch
355                                                                   // supports
356                                                                   // Vendor
357                                                                   // extension
358                                                                   // stats
359         statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
360         log.debug("Added Switch {} to target pool",
361                 HexString.toHexString(switchId.longValue()));
362     }
363
364     protected static class StatisticsTicks {
365         private short flowStatisticsTicks;
366         private short descriptionTicks;
367         private short portStatisticsTicks;
368         private short tableStatisticsTicks;
369
370         public StatisticsTicks(boolean scattered) {
371             if (scattered) {
372                 // scatter bursts by statisticsTickPeriod
373                 if (++counter < 0) {
374                     counter = 0;
375                 } // being paranoid here
376                 flowStatisticsTicks = (short) (1 + counter
377                         % statisticsTickNumber);
378                 descriptionTicks = (short) (1 + counter % descriptionTickNumber);
379                 portStatisticsTicks = (short) (1 + counter % portTickNumber);
380                 tableStatisticsTicks = (short) (1 + counter % tableTickNumber);
381             } else {
382                 flowStatisticsTicks = statisticsTickNumber;
383                 descriptionTicks = descriptionTickNumber;
384                 portStatisticsTicks = portTickNumber;
385                 tableStatisticsTicks = tableTickNumber;
386             }
387         }
388
389         public boolean decrementFlowTicksIsZero() {
390             // Please ensure no code is inserted between the if check and the
391             // flowStatisticsTicks reset
392             if (--flowStatisticsTicks == 0) {
393                 flowStatisticsTicks = statisticsTickNumber;
394                 return true;
395             }
396             return false;
397         }
398
399         public boolean decrementDescTicksIsZero() {
400             // Please ensure no code is inserted between the if check and the
401             // descriptionTicks reset
402             if (--descriptionTicks == 0) {
403                 descriptionTicks = descriptionTickNumber;
404                 return true;
405             }
406             return false;
407         }
408
409         public boolean decrementPortTicksIsZero() {
410             // Please ensure no code is inserted between the if check and the
411             // descriptionTicks reset
412             if (--portStatisticsTicks == 0) {
413                 portStatisticsTicks = portTickNumber;
414                 return true;
415             }
416             return false;
417         }
418
419         public boolean decrementTableTicksIsZero() {
420             // Please ensure no code is inserted between the if check and the
421             // descriptionTicks reset
422             if(--tableStatisticsTicks == 0) {
423                 tableStatisticsTicks = tableTickNumber;
424                 return true;
425             }
426             return false;
427         }
428
429         @Override
430         public String toString() {
431             return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks
432                     + ",pT=" + portStatisticsTicks + ",tT=" + tableStatisticsTicks + "}";
433         }
434     }
435
436     private void printInfoMessage(String type, StatsRequest request) {
437         log.info("{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
438                 new Object[] {type, HexString.toHexString(request.switchId), pendingStatsRequests.size(),
439                 statisticsCollector.getState().toString() });
440     }
441
442     protected void decrementTicks() {
443         StatsRequest request = null;
444         for (Map.Entry<Long, StatisticsTicks> entry : statisticsTimerTicks
445                 .entrySet()) {
446             StatisticsTicks clock = entry.getValue();
447             Long switchId = entry.getKey();
448             if (clock.decrementFlowTicksIsZero()) {
449                 request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ?
450                         new StatsRequest(switchId, OFStatisticsType.VENDOR) :
451                         new StatsRequest(switchId, OFStatisticsType.FLOW);
452                 // If a request for this switch is already in the queue, skip to
453                 // add this new request
454                 if (!pendingStatsRequests.contains(request)
455                         && false == pendingStatsRequests.offer(request)) {
456                     printInfoMessage("Flow", request);
457                 }
458             }
459
460             if (clock.decrementDescTicksIsZero()) {
461                 request = new StatsRequest(switchId, OFStatisticsType.DESC);
462                 // If a request for this switch is already in the queue, skip to
463                 // add this new request
464                 if (!pendingStatsRequests.contains(request)
465                         && false == pendingStatsRequests.offer(request)) {
466                     printInfoMessage("Description", request);
467                 }
468             }
469
470             if (clock.decrementPortTicksIsZero()) {
471                 request = new StatsRequest(switchId, OFStatisticsType.PORT);
472                 // If a request for this switch is already in the queue, skip to
473                 // add this new request
474                 if (!pendingStatsRequests.contains(request)
475                         && false == pendingStatsRequests.offer(request)) {
476                     printInfoMessage("Port", request);
477                 }
478             }
479
480             if(clock.decrementTableTicksIsZero()) {
481                 request = new StatsRequest(switchId, OFStatisticsType.TABLE);
482                 // If a request for this switch is already in the queue, skip to
483                 // add this new request
484                 if (!pendingStatsRequests.contains(request)
485                         && false == pendingStatsRequests.offer(request)) {
486                     printInfoMessage("Table", request);
487                 }
488             }
489         }
490     }
491
492     private void removeStatsRequestTasks(Long switchId) {
493         log.debug("Cleaning Statistics database for switch {}",
494                 HexEncode.longToHexString(switchId));
495         // To be safe, let's attempt removal of both VENDOR and FLOW request. It
496         // does not hurt
497         pendingStatsRequests.remove(new StatsRequest(switchId,
498                 OFStatisticsType.VENDOR));
499         pendingStatsRequests.remove(new StatsRequest(switchId,
500                 OFStatisticsType.FLOW));
501         pendingStatsRequests.remove(new StatsRequest(switchId,
502                 OFStatisticsType.DESC));
503         pendingStatsRequests.remove(new StatsRequest(switchId,
504                 OFStatisticsType.PORT));
505         pendingStatsRequests.remove(new StatsRequest(switchId,
506                 OFStatisticsType.TABLE));
507         // Take care of the TX rate databases
508         switchPortStatsUpdated.remove(switchId);
509         txRates.remove(switchId);
510     }
511
512     private void clearFlowStatsAndTicks(Long switchId) {
513         statisticsTimerTicks.remove(switchId);
514         removeStatsRequestTasks(switchId);
515         flowStatistics.remove(switchId);
516         log.debug("Statistics removed for switch {}",
517                 HexString.toHexString(switchId));
518     }
519
520     private void queryStatisticsInternal(Long switchId, OFStatisticsType statType) {
521
522         // Query the switch on all matches
523         List<OFStatistics> values = this.fetchStatisticsFromSwitch(switchId, statType, null);
524
525         // If got a valid response update local cache and notify listeners
526         if (values != null && !values.isEmpty()) {
527             switch (statType) {
528                 case FLOW:
529                 case VENDOR:
530                     flowStatistics.put(switchId, values);
531                     notifyFlowUpdate(switchId, values);
532                     break;
533                 case DESC:
534                     // Overwrite cache
535                     descStatistics.put(switchId, values);
536                     // Notify who may be interested in a description change
537                     notifyDescriptionUpdate(switchId, values);
538                     break;
539                 case PORT:
540                     // Overwrite cache with new port statistics for this switch
541                     portStatistics.put(switchId, values);
542
543                     // Wake up the thread which maintains the TX byte counters for
544                     // each port
545                     switchPortStatsUpdated.offer(switchId);
546                     notifyPortUpdate(switchId, values);
547                     break;
548                 case TABLE:
549                     // Overwrite cache
550                     tableStatistics.put(switchId, values);
551                     notifyTableUpdate(switchId, values);
552                     break;
553                 default:
554             }
555         }
556     }
557
558     private void notifyDescriptionUpdate(Long switchId, List<OFStatistics> values) {
559         for (IOFStatisticsListener l : this.statisticsListeners) {
560             l.descriptionStatisticsRefreshed(switchId, values);
561         }
562     }
563
564     private void notifyFlowUpdate(Long switchId, List<OFStatistics> values) {
565         if (values.get(0) instanceof OFVendorStatistics) {
566             values = this.v6StatsListToOFStatsList(values);
567         }
568
569         for (IOFStatisticsListener l : this.statisticsListeners) {
570             l.flowStatisticsRefreshed(switchId, values);
571         }
572
573     }
574
575     private void notifyPortUpdate(Long switchId, List<OFStatistics> values) {
576         for (IOFStatisticsListener l : this.statisticsListeners) {
577             l.portStatisticsRefreshed(switchId, values);
578         }
579     }
580
581     private void notifyTableUpdate(Long switchId, List<OFStatistics> values) {
582         for (IOFStatisticsListener l : this.statisticsListeners) {
583             l.tableStatisticsRefreshed(switchId, values);
584         }
585     }
586
587     /*
588      * Generic function to get the statistics form an OF switch
589      */
590     @SuppressWarnings("unchecked")
591     private List<OFStatistics> fetchStatisticsFromSwitch(Long switchId,
592             OFStatisticsType statsType, Object target) {
593         List<OFStatistics> values = null;
594         String type = null;
595         ISwitch sw = controller.getSwitch(switchId);
596
597         if (sw != null) {
598             OFStatisticsRequest req = new OFStatisticsRequest();
599             req.setStatisticType(statsType);
600             int requestLength = req.getLengthU();
601
602             if (statsType == OFStatisticsType.FLOW) {
603                 OFMatch match = null;
604                 if (target == null) {
605                     // All flows request
606                     match = new OFMatch();
607                     match.setWildcards(0xffffffff);
608                 } else if (!(target instanceof OFMatch)) {
609                     // Malformed request
610                     log.warn("Invalid target type for Flow stats request: {}",
611                             target.getClass());
612                     return null;
613                 } else {
614                     // Specific flow request
615                     match = (OFMatch) target;
616                 }
617                 OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest();
618                 specificReq.setMatch(match);
619                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
620                 specificReq.setTableId((byte) 0xff);
621                 req.setStatistics(Collections
622                         .singletonList((OFStatistics) specificReq));
623                 requestLength += specificReq.getLength();
624                 type = "FLOW";
625             } else if (statsType == OFStatisticsType.VENDOR) {
626                 V6StatsRequest specificReq = new V6StatsRequest();
627                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
628                 specificReq.setTableId((byte) 0xff);
629                 req.setStatistics(Collections
630                         .singletonList((OFStatistics) specificReq));
631                 requestLength += specificReq.getLength();
632                 type = "VENDOR";
633             } else if (statsType == OFStatisticsType.AGGREGATE) {
634                 OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest();
635                 OFMatch match = new OFMatch();
636                 match.setWildcards(0xffffffff);
637                 specificReq.setMatch(match);
638                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
639                 specificReq.setTableId((byte) 0xff);
640                 req.setStatistics(Collections
641                         .singletonList((OFStatistics) specificReq));
642                 requestLength += specificReq.getLength();
643                 type = "AGGREGATE";
644             } else if (statsType == OFStatisticsType.PORT) {
645                 short targetPort;
646                 if (target == null) {
647                     // All ports request
648                     targetPort = OFPort.OFPP_NONE.getValue();
649                 } else if (!(target instanceof Short)) {
650                     // Malformed request
651                     log.warn("Invalid target type for Port stats request: {}",
652                             target.getClass());
653                     return null;
654                 } else {
655                     // Specific port request
656                     targetPort = (Short) target;
657                 }
658                 OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest();
659                 specificReq.setPortNumber(targetPort);
660                 req.setStatistics(Collections
661                         .singletonList((OFStatistics) specificReq));
662                 requestLength += specificReq.getLength();
663                 type = "PORT";
664             } else if (statsType == OFStatisticsType.QUEUE) {
665                 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
666                 specificReq.setPortNumber(OFPort.OFPP_ALL.getValue());
667                 specificReq.setQueueId(0xffffffff);
668                 req.setStatistics(Collections
669                         .singletonList((OFStatistics) specificReq));
670                 requestLength += specificReq.getLength();
671                 type = "QUEUE";
672             } else if (statsType == OFStatisticsType.DESC) {
673                 type = "DESC";
674             } else if (statsType == OFStatisticsType.TABLE) {
675                 if(target != null){
676                     if (!(target instanceof Byte)) {
677                         // Malformed request
678                         log.warn("Invalid table id for table stats request: {}",
679                                 target.getClass());
680                         return null;
681                     }
682                     byte targetTable = (Byte) target;
683                     OFTableStatistics specificReq = new OFTableStatistics();
684                     specificReq.setTableId(targetTable);
685                     req.setStatistics(Collections
686                             .singletonList((OFStatistics) specificReq));
687                     requestLength += specificReq.getLength();
688                 }
689                 type = "TABLE";
690             }
691             req.setLengthU(requestLength);
692             Object result = sw.getStatistics(req);
693
694             if (result == null) {
695                 log.warn("Request Timed Out for ({}) from switch {}", type,
696                         HexString.toHexString(switchId));
697             } else if (result instanceof OFError) {
698                 log.warn("Switch {} failed to handle ({}) stats request: {}",
699                         new Object[] { HexString.toHexString(switchId), type,
700                         Utils.getOFErrorString((OFError) result) });
701                 if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
702                     log.warn(
703                             "Switching back to regular Flow stats requests for switch {}",
704                             HexString.toHexString(switchId));
705                     this.switchSupportsVendorExtStats.put(switchId,
706                             Boolean.FALSE);
707                 }
708             } else {
709                 values = (List<OFStatistics>) result;
710             }
711         }
712         return values;
713     }
714
715     @Override
716     public List<OFStatistics> getOFFlowStatistics(Long switchId) {
717         List<OFStatistics> list = flowStatistics.get(switchId);
718
719         /*
720          * Check on emptiness as interference between add and get is still
721          * possible on the inner list (the concurrentMap entry's value)
722          */
723         return (list == null || list.isEmpty()) ? this.dummyList
724                 : (list.get(0) instanceof OFVendorStatistics) ? this
725                         .v6StatsListToOFStatsList(list) : list;
726     }
727
728     @Override
729     public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch, short priority) {
730         List<OFStatistics> statsList = flowStatistics.get(switchId);
731
732         /*
733          * Check on emptiness as interference between add and get is still
734          * possible on the inner list (the concurrentMap entry's value)
735          */
736         if (statsList == null || statsList.isEmpty()) {
737             return this.dummyList;
738         }
739
740         if (statsList.get(0) instanceof OFVendorStatistics) {
741             /*
742              * Caller could provide regular OF match when we instead pull the
743              * vendor statistics from this node Caller is not supposed to know
744              * whether this switch supports vendor extensions statistics
745              * requests
746              */
747             V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch
748                     : new V6Match(ofMatch);
749
750             List<OFStatistics> targetList = v6StatsListToOFStatsList(statsList);
751             for (OFStatistics stats : targetList) {
752                 V6StatsReply v6Stats = (V6StatsReply) stats;
753                 V6Match v6Match = v6Stats.getMatch();
754                 if (v6Stats.getPriority() == priority && v6Match.equals(targetMatch)) {
755                     List<OFStatistics> list = new ArrayList<OFStatistics>();
756                     list.add(stats);
757                     return list;
758                 }
759             }
760         } else {
761             for (OFStatistics stats : statsList) {
762                 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
763                 if (flowStats.getPriority() == priority && flowStats.getMatch().equals(ofMatch)) {
764                     List<OFStatistics> list = new ArrayList<OFStatistics>();
765                     list.add(stats);
766                     return list;
767                 }
768             }
769         }
770         return this.dummyList;
771     }
772
773     /*
774      * Converts the v6 vendor statistics to the OFStatistics
775      */
776     private List<OFStatistics> v6StatsListToOFStatsList(
777             List<OFStatistics> statistics) {
778         List<OFStatistics> v6statistics = new ArrayList<OFStatistics>();
779         if (statistics != null && !statistics.isEmpty()) {
780             for (OFStatistics stats : statistics) {
781                 if (stats instanceof OFVendorStatistics) {
782                     List<OFStatistics> r = getV6ReplyStatistics((OFVendorStatistics) stats);
783                     if (r != null) {
784                         v6statistics.addAll(r);
785                     }
786                 }
787             }
788         }
789         return v6statistics;
790     }
791
792     private static List<OFStatistics> getV6ReplyStatistics(
793             OFVendorStatistics stat) {
794         int length = stat.getLength();
795         List<OFStatistics> results = new ArrayList<OFStatistics>();
796         if (length < 12)
797             return null; // Nicira Hdr is 12 bytes. We need atleast that much
798         ByteBuffer data = ByteBuffer.allocate(length);
799         stat.writeTo(data);
800         data.rewind();
801         if (log.isTraceEnabled()) {
802             log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}",
803                     HexString.toHexString(data.array()));
804         }
805
806         int vendor = data.getInt(); // first 4 bytes is vendor id.
807         if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) {
808             log.warn("Unexpected vendor id: 0x{}", Integer.toHexString(vendor));
809             return null;
810         } else {
811             // go ahead by 8 bytes which is 8 bytes of 0
812             data.getLong(); // should be all 0's
813             length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have
814                           // been consumed
815         }
816
817         V6StatsReply v6statsreply;
818         int min_len;
819         while (length > 0) {
820             v6statsreply = new V6StatsReply();
821             min_len = v6statsreply.getLength();
822             if (length < v6statsreply.getLength())
823                 break;
824             v6statsreply.setActionFactory(stat.getActionFactory());
825             v6statsreply.readFrom(data);
826             if (v6statsreply.getLength() < min_len)
827                 break;
828             v6statsreply.setVendorId(vendor);
829             log.trace("V6StatsReply: {}", v6statsreply);
830             length -= v6statsreply.getLength();
831             results.add(v6statsreply);
832         }
833         return results;
834     }
835
836     @Override
837     public List<OFStatistics> queryStatistics(Long switchId,
838             OFStatisticsType statType, Object target) {
839         /*
840          * Caller does not know and it is not supposed to know whether this
841          * switch supports vendor extension. We adjust the target for him
842          */
843         if (statType == OFStatisticsType.FLOW) {
844             if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
845                 statType = OFStatisticsType.VENDOR;
846             }
847         }
848
849         List<OFStatistics> list = this.fetchStatisticsFromSwitch(switchId, statType,
850                 target);
851
852         return (list == null) ? null :
853             (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list) : list;
854     }
855
856     @Override
857     public List<OFStatistics> getOFDescStatistics(Long switchId) {
858         if (!descStatistics.containsKey(switchId))
859             return this.dummyList;
860
861         return descStatistics.get(switchId);
862     }
863
864     @Override
865     public List<OFStatistics> getOFPortStatistics(Long switchId) {
866         if (!portStatistics.containsKey(switchId)) {
867             return this.dummyList;
868         }
869
870         return portStatistics.get(switchId);
871     }
872
873     @Override
874     public List<OFStatistics> getOFPortStatistics(Long switchId, short portId) {
875         if (!portStatistics.containsKey(switchId)) {
876             return this.dummyList;
877         }
878         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
879         for (OFStatistics stats : portStatistics.get(switchId)) {
880             if (((OFPortStatisticsReply) stats).getPortNumber() == portId) {
881                 list.add(stats);
882                 break;
883             }
884         }
885         return list;
886     }
887
888     @Override
889     public List<OFStatistics> getOFTableStatistics(Long switchId) {
890         if (!tableStatistics.containsKey(switchId)) {
891             return this.dummyList;
892         }
893
894         return tableStatistics.get(switchId);
895     }
896
897     @Override
898     public List<OFStatistics> getOFTableStatistics(Long switchId, Byte tableId) {
899         if (!tableStatistics.containsKey(switchId)) {
900             return this.dummyList;
901         }
902
903         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
904         for (OFStatistics stats : tableStatistics.get(switchId)) {
905             if (((OFTableStatistics) stats).getTableId() == tableId) {
906                 list.add(stats);
907                 break;
908             }
909         }
910         return list;
911     }
912
913     @Override
914     public int getFlowsNumber(long switchId) {
915         return this.flowStatistics.get(switchId).size();
916     }
917
918     /*
919      * InventoryShim replay for us all the switch addition which happened before
920      * we were brought up
921      */
922     @Override
923     public void updateNode(Node node, UpdateType type, Set<Property> props) {
924         Long switchId = (Long) node.getID();
925         switch (type) {
926         case ADDED:
927             addStatisticsTicks(switchId);
928             break;
929         case REMOVED:
930             clearFlowStatsAndTicks(switchId);
931         default:
932         }
933     }
934
935     @Override
936     public void updateNodeConnector(NodeConnector nodeConnector,
937             UpdateType type, Set<Property> props) {
938         // No action
939     }
940
941     /**
942      * Update the cached port rates for this switch with the latest retrieved
943      * port transmit byte count
944      *
945      * @param switchId
946      */
947     private synchronized void updatePortsTxRate(long switchId) {
948         List<OFStatistics> newPortStatistics = this.portStatistics.get(switchId);
949         if (newPortStatistics == null) {
950             return;
951         }
952         Map<Short, TxRates> rates = this.txRates.get(switchId);
953         if (rates == null) {
954             // First time rates for this switch are added
955             rates = new HashMap<Short, TxRates>();
956             txRates.put(switchId, rates);
957         }
958         for (OFStatistics stats : newPortStatistics) {
959             OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats;
960             short port = newPortStat.getPortNumber();
961             TxRates portRatesHolder = rates.get(port);
962             if (portRatesHolder == null) {
963                 // First time rates for this port are added
964                 portRatesHolder = new TxRates();
965                 rates.put(port, portRatesHolder);
966             }
967             // Get and store the number of transmitted bytes for this port
968             // And handle the case where agent does not support the counter
969             long transmitBytes = newPortStat.getTransmitBytes();
970             long value = (transmitBytes < 0) ? 0 : transmitBytes;
971             portRatesHolder.update(value);
972         }
973     }
974
975     @Override
976     public synchronized long getTransmitRate(Long switchId, Short port) {
977         long average = 0;
978         if (switchId == null || port == null) {
979             return average;
980         }
981         Map<Short, TxRates> perSwitch = txRates.get(switchId);
982         if (perSwitch == null) {
983             return average;
984         }
985         TxRates portRates = perSwitch.get(port);
986         if (portRates == null) {
987             return average;
988         }
989         return portRates.getAverageTxRate();
990     }
991
992     /*
993      * Manual switch name configuration code
994      */
995     @Override
996     public String getHelp() {
997         StringBuffer help = new StringBuffer();
998         help.append("---OF Statistics Manager utilities---\n");
999         help.append("\t ofdumpstatsmgr         - "
1000                 + "Print Internal Stats Mgr db\n");
1001         help.append("\t ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds) - "
1002                 + "Set/Show flow/port/dedscription stats poll intervals\n");
1003         return help.toString();
1004     }
1005
1006     private boolean isValidSwitchId(String switchId) {
1007         String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$";
1008         String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$";
1009
1010         return (switchId != null && (switchId.matches(regexDatapathID) || switchId
1011                 .matches(regexDatapathIDLong)));
1012     }
1013
1014     public long getSwitchIDLong(String switchId) {
1015         int radix = 16;
1016         String switchString = "0";
1017
1018         if (isValidSwitchId(switchId)) {
1019             if (switchId.contains(":")) {
1020                 // Handle the 00:00:AA:BB:CC:DD:EE:FF notation
1021                 switchString = switchId.replace(":", "");
1022             } else if (switchId.contains("-")) {
1023                 // Handle the 00-00-AA-BB-CC-DD-EE-FF notation
1024                 switchString = switchId.replace("-", "");
1025             } else {
1026                 // Handle the 0123456789ABCDEF notation
1027                 switchString = switchId;
1028             }
1029         }
1030         return Long.parseLong(switchString, radix);
1031     }
1032
1033     /*
1034      * Internal information dump code
1035      */
1036     private String prettyPrintSwitchMap(ConcurrentMap<Long, StatisticsTicks> map) {
1037         StringBuffer buffer = new StringBuffer();
1038         buffer.append("{");
1039         for (Entry<Long, StatisticsTicks> entry : map.entrySet()) {
1040             buffer.append(HexString.toHexString(entry.getKey()) + "="
1041                     + entry.getValue().toString() + " ");
1042         }
1043         buffer.append("}");
1044         return buffer.toString();
1045     }
1046
1047     public void _ofdumpstatsmgr(CommandInterpreter ci) {
1048         ci.println("Global Counter: " + counter);
1049         ci.println("Timer Ticks: " + prettyPrintSwitchMap(statisticsTimerTicks));
1050         ci.println("PendingStatsQueue: " + pendingStatsRequests);
1051         ci.println("PendingStatsQueue size: " + pendingStatsRequests.size());
1052         ci.println("Stats Collector alive: " + statisticsCollector.isAlive());
1053         ci.println("Stats Collector State: "
1054                 + statisticsCollector.getState().toString());
1055         ci.println("StatsTimer: " + statisticsTimer.toString());
1056         ci.println("Flow Stats Period: " + statisticsTickNumber + " s");
1057         ci.println("Desc Stats Period: " + descriptionTickNumber + " s");
1058         ci.println("Port Stats Period: " + portTickNumber + " s");
1059         ci.println("Table Stats Period: " + tableTickNumber + " s");
1060     }
1061
1062     public void _resetSwitchCapability(CommandInterpreter ci) {
1063         String sidString = ci.nextArgument();
1064         Long sid = null;
1065         if (sidString == null) {
1066             ci.println("Insert the switch id (numeric value)");
1067             return;
1068         }
1069         try {
1070             sid = Long.valueOf(sidString);
1071             this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE);
1072             ci.println("Vendor capability for switch " + sid + " set to "
1073                     + this.switchSupportsVendorExtStats.get(sid));
1074         } catch (NumberFormatException e) {
1075             ci.println("Invalid switch id. Has to be numeric.");
1076         }
1077
1078     }
1079
1080     public void _ofbw(CommandInterpreter ci) {
1081         String sidString = ci.nextArgument();
1082         Long sid = null;
1083         if (sidString == null) {
1084             ci.println("Insert the switch id (numeric value)");
1085             return;
1086         }
1087         try {
1088             sid = Long.valueOf(sidString);
1089         } catch (NumberFormatException e) {
1090             ci.println("Invalid switch id. Has to be numeric.");
1091         }
1092         if (sid != null) {
1093             Map<Short, TxRates> thisSwitchRates = txRates.get(sid);
1094             ci.println("Bandwidth utilization (" + factoredSamples
1095                     * portTickNumber + " sec average) for switch "
1096                     + HexEncode.longToHexString(sid) + ":");
1097             if (thisSwitchRates == null) {
1098                 ci.println("Not available");
1099             } else {
1100                 for (Entry<Short, TxRates> entry : thisSwitchRates.entrySet()) {
1101                     ci.println("Port: " + entry.getKey() + ": "
1102                             + entry.getValue().getAverageTxRate() + " bps");
1103                 }
1104             }
1105         }
1106     }
1107
1108     public void _txratewindow(CommandInterpreter ci) {
1109         String averageWindow = ci.nextArgument();
1110         short seconds = 0;
1111         if (averageWindow == null) {
1112             ci.println("Insert the length in seconds of the median "
1113                     + "window for tx rate");
1114             ci.println("Current: " + factoredSamples * portTickNumber + " secs");
1115             return;
1116         }
1117         try {
1118             seconds = Short.valueOf(averageWindow);
1119         } catch (NumberFormatException e) {
1120             ci.println("Invalid period.");
1121         }
1122         OFStatisticsManager.factoredSamples = (short) (seconds / portTickNumber);
1123         ci.println("New: " + factoredSamples * portTickNumber + " secs");
1124     }
1125
1126     public void _ofstatsmgrintervals(CommandInterpreter ci) {
1127         String flowStatsInterv = ci.nextArgument();
1128         String portStatsInterv = ci.nextArgument();
1129         String descStatsInterv = ci.nextArgument();
1130         String tableStatsInterv = ci.nextArgument();
1131
1132         if (flowStatsInterv == null || portStatsInterv == null
1133                 || descStatsInterv == null) {
1134             ci.println("Usage: ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds)");
1135             ci.println("Current Values: fP=" + statisticsTickNumber + "sec pP="
1136                     + portTickNumber + "sec dP=" + descriptionTickNumber + "sec tP=" + tableTickNumber + " sec");
1137             return;
1138         }
1139         Short fP, pP, dP, tP;
1140         try {
1141             fP = Short.parseShort(flowStatsInterv);
1142             pP = Short.parseShort(portStatsInterv);
1143             dP = Short.parseShort(descStatsInterv);
1144             tP = Short.parseShort(tableStatsInterv);
1145         } catch (Exception e) {
1146             ci.println("Invalid format values: " + e.getMessage());
1147             return;
1148         }
1149
1150         if (pP <= 1 || fP <= 1 || dP <= 1 || tP <= 1) {
1151             ci.println("Invalid values. fP, pP, dP, tP have to be greater than 1.");
1152             return;
1153         }
1154
1155         statisticsTickNumber = fP;
1156         portTickNumber = pP;
1157         descriptionTickNumber = dP;
1158         tableTickNumber = tP;
1159
1160         ci.println("New Values: fP=" + statisticsTickNumber + "s pP="
1161                 + portTickNumber + "s dP=" + descriptionTickNumber + "s tP="
1162                 + tableTickNumber + "s");
1163     }
1164
1165     /**
1166      * This method retrieves user configurations from config.ini and updates
1167      * statisticsTickNumber/portTickNumber/descriptionTickNumber accordingly.
1168      */
1169     private void configStatsPollIntervals() {
1170         String fsStr = System.getProperty("of.flowStatsPollInterval");
1171         String psStr = System.getProperty("of.portStatsPollInterval");
1172         String dsStr = System.getProperty("of.descStatsPollInterval");
1173         String tsStr = System.getProperty("of.tableStatsPollInterval");
1174         Short fs, ps, ds, ts;
1175
1176         if (fsStr != null) {
1177             try {
1178                 fs = Short.parseShort(fsStr);
1179                 if (fs > 0) {
1180                     statisticsTickNumber = fs;
1181                 }
1182             } catch (Exception e) {
1183             }
1184         }
1185
1186         if (psStr != null) {
1187             try {
1188                 ps = Short.parseShort(psStr);
1189                 if (ps > 0) {
1190                     portTickNumber = ps;
1191                 }
1192             } catch (Exception e) {
1193             }
1194         }
1195
1196         if (dsStr != null) {
1197             try {
1198                 ds = Short.parseShort(dsStr);
1199                 if (ds > 0) {
1200                     descriptionTickNumber = ds;
1201                 }
1202             } catch (Exception e) {
1203             }
1204         }
1205
1206         if (tsStr != null) {
1207             try{
1208                 ts = Short.parseShort(tsStr);
1209                 if (ts > 0) {
1210                     tableTickNumber = ts;
1211                 }
1212             } catch (Exception e) {
1213             }
1214         }
1215     }
1216 }