Merge "Fix flow installation state in UI"
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / OFStatisticsManager.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.nio.ByteBuffer;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.Deque;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.Map.Entry;
19 import java.util.Set;
20 import java.util.Timer;
21 import java.util.TimerTask;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.CopyOnWriteArraySet;
26 import java.util.concurrent.LinkedBlockingDeque;
27 import java.util.concurrent.LinkedBlockingQueue;
28
29 import org.eclipse.osgi.framework.console.CommandInterpreter;
30 import org.eclipse.osgi.framework.console.CommandProvider;
31 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
32 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsListener;
33 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
34 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
36 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
37 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
38 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
39 import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
40 import org.opendaylight.controller.sal.core.Node;
41 import org.opendaylight.controller.sal.core.NodeConnector;
42 import org.opendaylight.controller.sal.core.Property;
43 import org.opendaylight.controller.sal.core.UpdateType;
44 import org.opendaylight.controller.sal.utils.HexEncode;
45 import org.openflow.protocol.OFError;
46 import org.openflow.protocol.OFMatch;
47 import org.openflow.protocol.OFPort;
48 import org.openflow.protocol.OFStatisticsRequest;
49 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
50 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
51 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
52 import org.openflow.protocol.statistics.OFPortStatisticsReply;
53 import org.openflow.protocol.statistics.OFPortStatisticsRequest;
54 import org.openflow.protocol.statistics.OFQueueStatisticsRequest;
55 import org.openflow.protocol.statistics.OFStatistics;
56 import org.openflow.protocol.statistics.OFStatisticsType;
57 import org.openflow.protocol.statistics.OFTableStatistics;
58 import org.openflow.protocol.statistics.OFVendorStatistics;
59 import org.openflow.util.HexString;
60 import org.osgi.framework.BundleContext;
61 import org.osgi.framework.FrameworkUtil;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 /**
66  * Periodically polls the different OF statistics from the OF switches, caches
67  * them, and publishes results towards SAL. It also provides an API to directly
68  * query the switch for any specific statistics.
69  */
70 public class OFStatisticsManager implements IOFStatisticsManager, IInventoryShimExternalListener, CommandProvider {
71     private static final Logger log = LoggerFactory.getLogger(OFStatisticsManager.class);
72     private static final int INITIAL_SIZE = 64;
73     private static final long FLOW_STATS_PERIOD = 10000;
74     private static final long DESC_STATS_PERIOD = 60000;
75     private static final long PORT_STATS_PERIOD = 5000;
76     private static final long TABLE_STATS_PERIOD = 10000;
77     private static final long TICK = 1000;
78     private static short statisticsTickNumber = (short) (FLOW_STATS_PERIOD / TICK);
79     private static short descriptionTickNumber = (short) (DESC_STATS_PERIOD / TICK);
80     private static short portTickNumber = (short) (PORT_STATS_PERIOD / TICK);
81     private static short tableTickNumber = (short) (TABLE_STATS_PERIOD / TICK);
82     private static short factoredSamples = (short) 2;
83     private static short counter = 1;
84     private IController controller = null;
85     private ConcurrentMap<Long, List<OFStatistics>> flowStatistics;
86     private ConcurrentMap<Long, List<OFStatistics>> descStatistics;
87     private ConcurrentMap<Long, List<OFStatistics>> portStatistics;
88     private ConcurrentMap<Long, List<OFStatistics>> tableStatistics;
89     private ConcurrentMap<Long, StatisticsTicks> statisticsTimerTicks;
90     protected BlockingQueue<StatsRequest> pendingStatsRequests;
91     protected BlockingQueue<Long> switchPortStatsUpdated;
92     private Thread statisticsCollector;
93     private Thread txRatesUpdater;
94     private Timer statisticsTimer;
95     private TimerTask statisticsTimerTask;
96     private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
97     // Per port sampled (every portStatsPeriod) transmit rate
98     private Map<Long, Map<Short, TxRates>> txRates;
99     private Set<IOFStatisticsListener> statisticsListeners = new CopyOnWriteArraySet<IOFStatisticsListener>();
100
101     /**
102      * The object containing the latest factoredSamples tx rate samples for a
103      * given switch port
104      */
105     protected class TxRates {
106         // contains the latest factoredSamples sampled transmitted bytes
107         Deque<Long> sampledTxBytes;
108
109         public TxRates() {
110             sampledTxBytes = new LinkedBlockingDeque<Long>();
111         }
112
113         public void update(Long txBytes) {
114             /*
115              * Based on how many samples our average works on, we might have to
116              * remove the oldest sample
117              */
118             if (sampledTxBytes.size() == factoredSamples) {
119                 sampledTxBytes.removeLast();
120             }
121
122             // Add the latest sample to the top of the queue
123             sampledTxBytes.addFirst(txBytes);
124         }
125
126         /**
127          * Returns the average transmit rate in bps
128          *
129          * @return the average transmit rate [bps]
130          */
131         public long getAverageTxRate() {
132             long average = 0;
133             /*
134              * If we cannot provide the value for the time window length set
135              */
136             if (sampledTxBytes.size() < factoredSamples) {
137                 return average;
138             }
139             long increment = sampledTxBytes.getFirst() - sampledTxBytes
140                     .getLast();
141             long timePeriod = factoredSamples * PORT_STATS_PERIOD / TICK;
142             average = (8L * increment) / timePeriod;
143             return average;
144         }
145     }
146
147     public void setController(IController core) {
148         this.controller = core;
149     }
150
151     public void unsetController(IController core) {
152         if (this.controller == core) {
153             this.controller = null;
154         }
155     }
156
157     private short getStatsQueueSize() {
158         String statsQueueSizeStr = System.getProperty("of.statsQueueSize");
159         short statsQueueSize = INITIAL_SIZE;
160         if (statsQueueSizeStr != null) {
161             try {
162                 statsQueueSize = Short.parseShort(statsQueueSizeStr);
163                 if (statsQueueSize <= 0) {
164                     statsQueueSize = INITIAL_SIZE;
165                 }
166             } catch (Exception e) {
167             }
168         }
169         return statsQueueSize;
170     }
171
172     IPluginOutConnectionService connectionPluginOutService;
173     void setIPluginOutConnectionService(IPluginOutConnectionService s) {
174         connectionPluginOutService = s;
175     }
176
177     void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
178         if (connectionPluginOutService == s) {
179             connectionPluginOutService = null;
180         }
181     }
182
183     /**
184      * Function called by the dependency manager when all the required
185      * dependencies are satisfied
186      *
187      */
188     void init() {
189         flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
190         descStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
191         portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
192         tableStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
193         pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(getStatsQueueSize());
194         statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(INITIAL_SIZE);
195         switchPortStatsUpdated = new LinkedBlockingQueue<Long>(INITIAL_SIZE);
196         switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(INITIAL_SIZE);
197         txRates = new HashMap<Long, Map<Short, TxRates>>(INITIAL_SIZE);
198
199         configStatsPollIntervals();
200
201         // Initialize managed timers
202         statisticsTimer = new Timer("Statistics Timer Ticks");
203         statisticsTimerTask = new TimerTask() {
204             @Override
205             public void run() {
206                 decrementTicks();
207             }
208         };
209
210         // Initialize Statistics collector thread
211         statisticsCollector = new Thread(new Runnable() {
212             @Override
213             public void run() {
214                 while (true) {
215                     try {
216                         StatsRequest req = pendingStatsRequests.take();
217                         queryStatisticsInternal(req.switchId, req.type);
218                     } catch (InterruptedException e) {
219                         log.warn("Flow Statistics Collector thread "
220                                 + "interrupted", e);
221                         return;
222                     }
223                 }
224             }
225         }, "Statistics Collector");
226
227         // Initialize Tx Rate Updater thread
228         txRatesUpdater = new Thread(new Runnable() {
229             @Override
230             public void run() {
231                 while (true) {
232                     try {
233                         long switchId = switchPortStatsUpdated.take();
234                         updatePortsTxRate(switchId);
235                     } catch (InterruptedException e) {
236                         log.warn("TX Rate Updater thread interrupted", e);
237                         return;
238                     }
239                 }
240             }
241         }, "TX Rate Updater");
242     }
243
244     /**
245      * Function called by the dependency manager when at least one dependency
246      * become unsatisfied or when the component is shutting down because for
247      * example bundle is being stopped.
248      *
249      */
250     void destroy() {
251         statisticsListeners.clear();
252     }
253
254     /**
255      * Function called by dependency manager after "init ()" is called and after
256      * the services provided by the class are registered in the service registry
257      *
258      */
259     void start() {
260         // Start managed timers
261         statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, TICK);
262
263         // Start statistics collector thread
264         statisticsCollector.start();
265
266         // Start bandwidth utilization computer thread
267         txRatesUpdater.start();
268
269         // OSGI console
270         registerWithOSGIConsole();
271     }
272
273     /**
274      * Function called by the dependency manager before the services exported by
275      * the component are unregistered, this will be followed by a "destroy ()"
276      * calls
277      *
278      */
279     void stop() {
280         // Stop managed timers
281         statisticsTimer.cancel();
282     }
283
284     public void setStatisticsListener(IOFStatisticsListener s) {
285         this.statisticsListeners.add(s);
286     }
287
288     public void unsetStatisticsListener(IOFStatisticsListener s) {
289         if (s != null) {
290             this.statisticsListeners.remove(s);
291         }
292     }
293
294     private void registerWithOSGIConsole() {
295         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
296         bundleContext.registerService(CommandProvider.class.getName(), this, null);
297     }
298
299     private static class StatsRequest {
300         protected Long switchId;
301         protected OFStatisticsType type;
302
303         public StatsRequest(Long d, OFStatisticsType t) {
304             switchId = d;
305             type = t;
306         }
307
308         @Override
309         public String toString() {
310             return "SReq = {switchId=" + switchId + ", type=" + type + "}";
311         }
312
313         @Override
314         public int hashCode() {
315             final int prime = 31;
316             int result = 1;
317             result = prime * result
318                     + ((switchId == null) ? 0 : switchId.hashCode());
319             result = prime * result + ((type == null) ? 0 : type.ordinal());
320             return result;
321         }
322
323         @Override
324         public boolean equals(Object obj) {
325             if (this == obj) {
326                 return true;
327             }
328             if (obj == null) {
329                 return false;
330             }
331             if (getClass() != obj.getClass()) {
332                 return false;
333             }
334             StatsRequest other = (StatsRequest) obj;
335             if (switchId == null) {
336                 if (other.switchId != null) {
337                     return false;
338                 }
339             } else if (!switchId.equals(other.switchId)) {
340                 return false;
341             }
342             if (type != other.type) {
343                 return false;
344             }
345             return true;
346         }
347     }
348
349     private void addStatisticsTicks(Long switchId) {
350         switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume
351                                                                   // switch
352                                                                   // supports
353                                                                   // Vendor
354                                                                   // extension
355                                                                   // stats
356         statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
357         log.debug("Added Switch {} to target pool",
358                 HexString.toHexString(switchId.longValue()));
359     }
360
361     protected static class StatisticsTicks {
362         private short flowStatisticsTicks;
363         private short descriptionTicks;
364         private short portStatisticsTicks;
365         private short tableStatisticsTicks;
366
367         public StatisticsTicks(boolean scattered) {
368             if (scattered) {
369                 // scatter bursts by statisticsTickPeriod
370                 if (++counter < 0) {
371                     counter = 0;
372                 } // being paranoid here
373                 flowStatisticsTicks = (short) (1 + counter
374                         % statisticsTickNumber);
375                 descriptionTicks = (short) (1 + counter % descriptionTickNumber);
376                 portStatisticsTicks = (short) (1 + counter % portTickNumber);
377                 tableStatisticsTicks = (short) (1 + counter % tableTickNumber);
378             } else {
379                 flowStatisticsTicks = statisticsTickNumber;
380                 descriptionTicks = descriptionTickNumber;
381                 portStatisticsTicks = portTickNumber;
382                 tableStatisticsTicks = tableTickNumber;
383             }
384         }
385
386         public boolean decrementFlowTicksIsZero() {
387             // Please ensure no code is inserted between the if check and the
388             // flowStatisticsTicks reset
389             if (--flowStatisticsTicks == 0) {
390                 flowStatisticsTicks = statisticsTickNumber;
391                 return true;
392             }
393             return false;
394         }
395
396         public boolean decrementDescTicksIsZero() {
397             // Please ensure no code is inserted between the if check and the
398             // descriptionTicks reset
399             if (--descriptionTicks == 0) {
400                 descriptionTicks = descriptionTickNumber;
401                 return true;
402             }
403             return false;
404         }
405
406         public boolean decrementPortTicksIsZero() {
407             // Please ensure no code is inserted between the if check and the
408             // descriptionTicks reset
409             if (--portStatisticsTicks == 0) {
410                 portStatisticsTicks = portTickNumber;
411                 return true;
412             }
413             return false;
414         }
415
416         public boolean decrementTableTicksIsZero() {
417             // Please ensure no code is inserted between the if check and the
418             // descriptionTicks reset
419             if(--tableStatisticsTicks == 0) {
420                 tableStatisticsTicks = tableTickNumber;
421                 return true;
422             }
423             return false;
424         }
425
426         @Override
427         public String toString() {
428             return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks
429                     + ",pT=" + portStatisticsTicks + ",tT=" + tableStatisticsTicks + "}";
430         }
431     }
432
433     private void printInfoMessage(String type, StatsRequest request) {
434         log.info("{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
435                 new Object[] {type, HexString.toHexString(request.switchId), pendingStatsRequests.size(),
436                 statisticsCollector.getState().toString() });
437     }
438
439     protected void decrementTicks() {
440         StatsRequest request = null;
441         for (Map.Entry<Long, StatisticsTicks> entry : statisticsTimerTicks
442                 .entrySet()) {
443             StatisticsTicks clock = entry.getValue();
444             Long switchId = entry.getKey();
445             if (clock.decrementFlowTicksIsZero()) {
446                 request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ?
447                         new StatsRequest(switchId, OFStatisticsType.VENDOR) :
448                         new StatsRequest(switchId, OFStatisticsType.FLOW);
449                 // If a request for this switch is already in the queue, skip to
450                 // add this new request
451                 if (!pendingStatsRequests.contains(request)
452                         && false == pendingStatsRequests.offer(request)) {
453                     printInfoMessage("Flow", request);
454                 }
455             }
456
457             if (clock.decrementDescTicksIsZero()) {
458                 request = new StatsRequest(switchId, OFStatisticsType.DESC);
459                 // If a request for this switch is already in the queue, skip to
460                 // add this new request
461                 if (!pendingStatsRequests.contains(request)
462                         && false == pendingStatsRequests.offer(request)) {
463                     printInfoMessage("Description", request);
464                 }
465             }
466
467             if (clock.decrementPortTicksIsZero()) {
468                 request = new StatsRequest(switchId, OFStatisticsType.PORT);
469                 // If a request for this switch is already in the queue, skip to
470                 // add this new request
471                 if (!pendingStatsRequests.contains(request)
472                         && false == pendingStatsRequests.offer(request)) {
473                     printInfoMessage("Port", request);
474                 }
475             }
476
477             if(clock.decrementTableTicksIsZero()) {
478                 request = new StatsRequest(switchId, OFStatisticsType.TABLE);
479                 // If a request for this switch is already in the queue, skip to
480                 // add this new request
481                 if (!pendingStatsRequests.contains(request)
482                         && false == pendingStatsRequests.offer(request)) {
483                     printInfoMessage("Table", request);
484                 }
485             }
486         }
487     }
488
489     private void removeStatsRequestTasks(Long switchId) {
490         log.debug("Cleaning Statistics database for switch {}",
491                 HexEncode.longToHexString(switchId));
492         // To be safe, let's attempt removal of both VENDOR and FLOW request. It
493         // does not hurt
494         pendingStatsRequests.remove(new StatsRequest(switchId,
495                 OFStatisticsType.VENDOR));
496         pendingStatsRequests.remove(new StatsRequest(switchId,
497                 OFStatisticsType.FLOW));
498         pendingStatsRequests.remove(new StatsRequest(switchId,
499                 OFStatisticsType.DESC));
500         pendingStatsRequests.remove(new StatsRequest(switchId,
501                 OFStatisticsType.PORT));
502         pendingStatsRequests.remove(new StatsRequest(switchId,
503                 OFStatisticsType.TABLE));
504         // Take care of the TX rate databases
505         switchPortStatsUpdated.remove(switchId);
506         txRates.remove(switchId);
507     }
508
509     private void clearFlowStatsAndTicks(Long switchId) {
510         statisticsTimerTicks.remove(switchId);
511         removeStatsRequestTasks(switchId);
512         flowStatistics.remove(switchId);
513         log.debug("Statistics removed for switch {}",
514                 HexString.toHexString(switchId));
515     }
516
517     private void queryStatisticsInternal(Long switchId, OFStatisticsType statType) {
518
519         // Query the switch on all matches
520         List<OFStatistics> values = this.fetchStatisticsFromSwitch(switchId, statType, null);
521
522         // If got a valid response update local cache and notify listeners
523         if (!values.isEmpty()) {
524             switch (statType) {
525                 case FLOW:
526                 case VENDOR:
527                     flowStatistics.put(switchId, values);
528                     notifyFlowUpdate(switchId, values);
529                     break;
530                 case DESC:
531                     // Overwrite cache
532                     descStatistics.put(switchId, values);
533                     // Notify who may be interested in a description change
534                     notifyDescriptionUpdate(switchId, values);
535                     break;
536                 case PORT:
537                     // Overwrite cache with new port statistics for this switch
538                     portStatistics.put(switchId, values);
539
540                     // Wake up the thread which maintains the TX byte counters for
541                     // each port
542                     switchPortStatsUpdated.offer(switchId);
543                     notifyPortUpdate(switchId, values);
544                     break;
545                 case TABLE:
546                     // Overwrite cache
547                     tableStatistics.put(switchId, values);
548                     notifyTableUpdate(switchId, values);
549                     break;
550                 default:
551             }
552         }
553     }
554
555     private void notifyDescriptionUpdate(Long switchId, List<OFStatistics> values) {
556         for (IOFStatisticsListener l : this.statisticsListeners) {
557             l.descriptionStatisticsRefreshed(switchId, values);
558         }
559     }
560
561     private void notifyFlowUpdate(Long switchId, List<OFStatistics> values) {
562         if (values.get(0) instanceof OFVendorStatistics) {
563             values = this.v6StatsListToOFStatsList(values);
564         }
565
566         for (IOFStatisticsListener l : this.statisticsListeners) {
567             l.flowStatisticsRefreshed(switchId, values);
568         }
569
570     }
571
572     private void notifyPortUpdate(Long switchId, List<OFStatistics> values) {
573         for (IOFStatisticsListener l : this.statisticsListeners) {
574             l.portStatisticsRefreshed(switchId, values);
575         }
576     }
577
578     private void notifyTableUpdate(Long switchId, List<OFStatistics> values) {
579         for (IOFStatisticsListener l : this.statisticsListeners) {
580             l.tableStatisticsRefreshed(switchId, values);
581         }
582     }
583
584     /*
585      * Generic function to get the statistics form an OF switch
586      */
587     @SuppressWarnings("unchecked")
588     private List<OFStatistics> fetchStatisticsFromSwitch(Long switchId,
589             OFStatisticsType statsType, Object target) {
590         List<OFStatistics> values = Collections.emptyList();
591         String type = null;
592         ISwitch sw = controller.getSwitch(switchId);
593
594         if (sw != null) {
595             OFStatisticsRequest req = new OFStatisticsRequest();
596             req.setStatisticType(statsType);
597             int requestLength = req.getLengthU();
598
599             if (statsType == OFStatisticsType.FLOW) {
600                 OFMatch match = null;
601                 if (target == null) {
602                     // All flows request
603                     match = new OFMatch();
604                     match.setWildcards(0xffffffff);
605                 } else if (!(target instanceof OFMatch)) {
606                     // Malformed request
607                     log.warn("Invalid target type for Flow stats request: {}",
608                             target.getClass());
609                     return Collections.emptyList();
610                 } else {
611                     // Specific flow request
612                     match = (OFMatch) target;
613                 }
614                 OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest();
615                 specificReq.setMatch(match);
616                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
617                 specificReq.setTableId((byte) 0xff);
618                 req.setStatistics(Collections
619                         .singletonList((OFStatistics) specificReq));
620                 requestLength += specificReq.getLength();
621                 type = "FLOW";
622             } else if (statsType == OFStatisticsType.VENDOR) {
623                 V6StatsRequest specificReq = new V6StatsRequest();
624                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
625                 specificReq.setTableId((byte) 0xff);
626                 req.setStatistics(Collections
627                         .singletonList((OFStatistics) specificReq));
628                 requestLength += specificReq.getLength();
629                 type = "VENDOR";
630             } else if (statsType == OFStatisticsType.AGGREGATE) {
631                 OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest();
632                 OFMatch match = new OFMatch();
633                 match.setWildcards(0xffffffff);
634                 specificReq.setMatch(match);
635                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
636                 specificReq.setTableId((byte) 0xff);
637                 req.setStatistics(Collections
638                         .singletonList((OFStatistics) specificReq));
639                 requestLength += specificReq.getLength();
640                 type = "AGGREGATE";
641             } else if (statsType == OFStatisticsType.PORT) {
642                 short targetPort;
643                 if (target == null) {
644                     // All ports request
645                     targetPort = OFPort.OFPP_NONE.getValue();
646                 } else if (!(target instanceof Short)) {
647                     // Malformed request
648                     log.warn("Invalid target type for Port stats request: {}",
649                             target.getClass());
650                     return Collections.emptyList();
651                 } else {
652                     // Specific port request
653                     targetPort = (Short) target;
654                 }
655                 OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest();
656                 specificReq.setPortNumber(targetPort);
657                 req.setStatistics(Collections
658                         .singletonList((OFStatistics) specificReq));
659                 requestLength += specificReq.getLength();
660                 type = "PORT";
661             } else if (statsType == OFStatisticsType.QUEUE) {
662                 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
663                 specificReq.setPortNumber(OFPort.OFPP_ALL.getValue());
664                 specificReq.setQueueId(0xffffffff);
665                 req.setStatistics(Collections
666                         .singletonList((OFStatistics) specificReq));
667                 requestLength += specificReq.getLength();
668                 type = "QUEUE";
669             } else if (statsType == OFStatisticsType.DESC) {
670                 type = "DESC";
671             } else if (statsType == OFStatisticsType.TABLE) {
672                 if(target != null){
673                     if (!(target instanceof Byte)) {
674                         // Malformed request
675                         log.warn("Invalid table id for table stats request: {}",
676                                 target.getClass());
677                         return Collections.emptyList();
678                     }
679                     byte targetTable = (Byte) target;
680                     OFTableStatistics specificReq = new OFTableStatistics();
681                     specificReq.setTableId(targetTable);
682                     req.setStatistics(Collections
683                             .singletonList((OFStatistics) specificReq));
684                     requestLength += specificReq.getLength();
685                 }
686                 type = "TABLE";
687             }
688             req.setLengthU(requestLength);
689             Object result = sw.getStatistics(req);
690
691             if (result == null) {
692                 log.warn("Request Timed Out for ({}) from switch {}", type,
693                         HexString.toHexString(switchId));
694             } else if (result instanceof OFError) {
695                 log.warn("Switch {} failed to handle ({}) stats request: {}",
696                         new Object[] { HexString.toHexString(switchId), type,
697                         Utils.getOFErrorString((OFError) result) });
698                 if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
699                     log.warn(
700                             "Switching back to regular Flow stats requests for switch {}",
701                             HexString.toHexString(switchId));
702                     this.switchSupportsVendorExtStats.put(switchId,
703                             Boolean.FALSE);
704                 }
705             } else {
706                 values = (List<OFStatistics>) result;
707             }
708         }
709         return values;
710     }
711
712     @Override
713     public List<OFStatistics> getOFFlowStatistics(Long switchId) {
714         List<OFStatistics> list = flowStatistics.get(switchId);
715
716         /*
717          * Check on emptiness as interference between add and get is still
718          * possible on the inner list (the concurrentMap entry's value)
719          */
720         return (list == null || list.isEmpty()) ? Collections.<OFStatistics>emptyList()
721                 : (list.get(0) instanceof OFVendorStatistics) ? this
722                         .v6StatsListToOFStatsList(list) : list;
723     }
724
725     @Override
726     public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch, short priority) {
727         List<OFStatistics> statsList = flowStatistics.get(switchId);
728
729         /*
730          * Check on emptiness as interference between add and get is still
731          * possible on the inner list (the concurrentMap entry's value)
732          */
733         if (statsList == null || statsList.isEmpty()) {
734             return Collections.emptyList();
735         }
736
737         if (statsList.get(0) instanceof OFVendorStatistics) {
738             /*
739              * Caller could provide regular OF match when we instead pull the
740              * vendor statistics from this node Caller is not supposed to know
741              * whether this switch supports vendor extensions statistics
742              * requests
743              */
744             V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch
745                     : new V6Match(ofMatch);
746
747             List<OFStatistics> targetList = v6StatsListToOFStatsList(statsList);
748             for (OFStatistics stats : targetList) {
749                 V6StatsReply v6Stats = (V6StatsReply) stats;
750                 V6Match v6Match = v6Stats.getMatch();
751                 if (v6Stats.getPriority() == priority && targetMatch.equals(v6Match)) {
752                     List<OFStatistics> list = new ArrayList<OFStatistics>();
753                     list.add(stats);
754                     return list;
755                 }
756             }
757         } else {
758             for (OFStatistics stats : statsList) {
759                 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
760                 if (flowStats.getPriority() == priority && ofMatch.equals(flowStats.getMatch())) {
761                     List<OFStatistics> list = new ArrayList<OFStatistics>();
762                     list.add(stats);
763                     return list;
764                 }
765             }
766         }
767         return Collections.emptyList();
768     }
769
770     /*
771      * Converts the v6 vendor statistics to the OFStatistics
772      */
773     private List<OFStatistics> v6StatsListToOFStatsList(List<OFStatistics> statistics) {
774         if (statistics == null || statistics.isEmpty()) {
775             return Collections.emptyList();
776         }
777         List<OFStatistics> v6statistics = new ArrayList<OFStatistics>();
778         for (OFStatistics stats : statistics) {
779             if (stats instanceof OFVendorStatistics) {
780                 List<OFStatistics> r = getV6ReplyStatistics((OFVendorStatistics) stats);
781                 if (r != null) {
782                     v6statistics.addAll(r);
783                 }
784             }
785         }
786         return v6statistics;
787     }
788
789     private static List<OFStatistics> getV6ReplyStatistics(
790             OFVendorStatistics stat) {
791         int length = stat.getLength();
792         List<OFStatistics> results = new ArrayList<OFStatistics>();
793         if (length < 12) {
794             // Nicira Hdr is 12 bytes. We need at least that much
795             return Collections.emptyList();
796         }
797         ByteBuffer data = ByteBuffer.allocate(length);
798         stat.writeTo(data);
799         data.rewind();
800         if (log.isTraceEnabled()) {
801             log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}",
802                     HexString.toHexString(data.array()));
803         }
804
805         int vendor = data.getInt(); // first 4 bytes is vendor id.
806         if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) {
807             log.warn("Unexpected vendor id: 0x{}", Integer.toHexString(vendor));
808             return Collections.emptyList();
809         } else {
810             // go ahead by 8 bytes which is 8 bytes of 0
811             data.getLong(); // should be all 0's
812             length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have
813                           // been consumed
814         }
815
816         V6StatsReply v6statsreply;
817         int min_len;
818         while (length > 0) {
819             v6statsreply = new V6StatsReply();
820             min_len = v6statsreply.getLength();
821             if (length < v6statsreply.getLength()) {
822                 break;
823             }
824             v6statsreply.setActionFactory(stat.getActionFactory());
825             v6statsreply.readFrom(data);
826             if (v6statsreply.getLength() < min_len) {
827                 break;
828             }
829             v6statsreply.setVendorId(vendor);
830             log.trace("V6StatsReply: {}", v6statsreply);
831             length -= v6statsreply.getLength();
832             results.add(v6statsreply);
833         }
834         return results;
835     }
836
837     @Override
838     public List<OFStatistics> queryStatistics(Long switchId,
839             OFStatisticsType statType, Object target) {
840         /*
841          * Caller does not know and it is not supposed to know whether this
842          * switch supports vendor extension. We adjust the target for him
843          */
844         if (statType == OFStatisticsType.FLOW) {
845             if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
846                 statType = OFStatisticsType.VENDOR;
847             }
848         }
849
850         List<OFStatistics> list = this.fetchStatisticsFromSwitch(switchId, statType, target);
851
852         return (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list) : list;
853     }
854
855     @Override
856     public List<OFStatistics> getOFDescStatistics(Long switchId) {
857         if (!descStatistics.containsKey(switchId)) {
858             return Collections.emptyList();
859         }
860
861         return descStatistics.get(switchId);
862     }
863
864     @Override
865     public List<OFStatistics> getOFPortStatistics(Long switchId) {
866         if (!portStatistics.containsKey(switchId)) {
867             return Collections.emptyList();
868         }
869
870         return portStatistics.get(switchId);
871     }
872
873     @Override
874     public List<OFStatistics> getOFPortStatistics(Long switchId, short portId) {
875         if (!portStatistics.containsKey(switchId)) {
876             return Collections.emptyList();
877         }
878         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
879         for (OFStatistics stats : portStatistics.get(switchId)) {
880             if (((OFPortStatisticsReply) stats).getPortNumber() == portId) {
881                 list.add(stats);
882                 break;
883             }
884         }
885         return list;
886     }
887
888     @Override
889     public List<OFStatistics> getOFTableStatistics(Long switchId) {
890         if (!tableStatistics.containsKey(switchId)) {
891             return Collections.emptyList();
892         }
893
894         return tableStatistics.get(switchId);
895     }
896
897     @Override
898     public List<OFStatistics> getOFTableStatistics(Long switchId, Byte tableId) {
899         if (!tableStatistics.containsKey(switchId)) {
900             return Collections.emptyList();
901         }
902
903         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
904         for (OFStatistics stats : tableStatistics.get(switchId)) {
905             if (((OFTableStatistics) stats).getTableId() == tableId) {
906                 list.add(stats);
907                 break;
908             }
909         }
910         return list;
911     }
912
913     @Override
914     public int getFlowsNumber(long switchId) {
915         return this.flowStatistics.get(switchId).size();
916     }
917
918     /*
919      * InventoryShim replay for us all the switch addition which happened before
920      * we were brought up
921      */
922     @Override
923     public void updateNode(Node node, UpdateType type, Set<Property> props) {
924         Long switchId = (Long) node.getID();
925         switch (type) {
926         case ADDED:
927             addStatisticsTicks(switchId);
928             break;
929         case REMOVED:
930             clearFlowStatsAndTicks(switchId);
931         default:
932         }
933     }
934
935     @Override
936     public void updateNodeConnector(NodeConnector nodeConnector,
937             UpdateType type, Set<Property> props) {
938         // No action
939     }
940
941     /**
942      * Update the cached port rates for this switch with the latest retrieved
943      * port transmit byte count
944      *
945      * @param switchId
946      */
947     private synchronized void updatePortsTxRate(long switchId) {
948         List<OFStatistics> newPortStatistics = this.portStatistics.get(switchId);
949         if (newPortStatistics == null) {
950             return;
951         }
952         Map<Short, TxRates> rates = this.txRates.get(switchId);
953         if (rates == null) {
954             // First time rates for this switch are added
955             rates = new HashMap<Short, TxRates>();
956             txRates.put(switchId, rates);
957         }
958         for (OFStatistics stats : newPortStatistics) {
959             OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats;
960             short port = newPortStat.getPortNumber();
961             TxRates portRatesHolder = rates.get(port);
962             if (portRatesHolder == null) {
963                 // First time rates for this port are added
964                 portRatesHolder = new TxRates();
965                 rates.put(port, portRatesHolder);
966             }
967             // Get and store the number of transmitted bytes for this port
968             // And handle the case where agent does not support the counter
969             long transmitBytes = newPortStat.getTransmitBytes();
970             long value = (transmitBytes < 0) ? 0 : transmitBytes;
971             portRatesHolder.update(value);
972         }
973     }
974
975     @Override
976     public synchronized long getTransmitRate(Long switchId, Short port) {
977         long average = 0;
978         if (switchId == null || port == null) {
979             return average;
980         }
981         Map<Short, TxRates> perSwitch = txRates.get(switchId);
982         if (perSwitch == null) {
983             return average;
984         }
985         TxRates portRates = perSwitch.get(port);
986         if (portRates == null) {
987             return average;
988         }
989         return portRates.getAverageTxRate();
990     }
991
992     /*
993      * Manual switch name configuration code
994      */
995     @Override
996     public String getHelp() {
997         StringBuffer help = new StringBuffer();
998         help.append("---OF Statistics Manager utilities---\n");
999         help.append("\t ofdumpstatsmgr         - "
1000                 + "Print Internal Stats Mgr db\n");
1001         help.append("\t ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds) - "
1002                 + "Set/Show flow/port/dedscription stats poll intervals\n");
1003         return help.toString();
1004     }
1005
1006     private boolean isValidSwitchId(String switchId) {
1007         String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$";
1008         String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$";
1009
1010         return (switchId != null && (switchId.matches(regexDatapathID) || switchId
1011                 .matches(regexDatapathIDLong)));
1012     }
1013
1014     public long getSwitchIDLong(String switchId) {
1015         int radix = 16;
1016         String switchString = "0";
1017
1018         if (isValidSwitchId(switchId)) {
1019             if (switchId.contains(":")) {
1020                 // Handle the 00:00:AA:BB:CC:DD:EE:FF notation
1021                 switchString = switchId.replace(":", "");
1022             } else if (switchId.contains("-")) {
1023                 // Handle the 00-00-AA-BB-CC-DD-EE-FF notation
1024                 switchString = switchId.replace("-", "");
1025             } else {
1026                 // Handle the 0123456789ABCDEF notation
1027                 switchString = switchId;
1028             }
1029         }
1030         return Long.parseLong(switchString, radix);
1031     }
1032
1033     /*
1034      * Internal information dump code
1035      */
1036     private String prettyPrintSwitchMap(ConcurrentMap<Long, StatisticsTicks> map) {
1037         StringBuffer buffer = new StringBuffer();
1038         buffer.append("{");
1039         for (Entry<Long, StatisticsTicks> entry : map.entrySet()) {
1040             buffer.append(HexString.toHexString(entry.getKey()) + "="
1041                     + entry.getValue().toString() + " ");
1042         }
1043         buffer.append("}");
1044         return buffer.toString();
1045     }
1046
1047     public void _ofdumpstatsmgr(CommandInterpreter ci) {
1048         ci.println("Global Counter: " + counter);
1049         ci.println("Timer Ticks: " + prettyPrintSwitchMap(statisticsTimerTicks));
1050         ci.println("PendingStatsQueue: " + pendingStatsRequests);
1051         ci.println("PendingStatsQueue size: " + pendingStatsRequests.size());
1052         ci.println("Stats Collector alive: " + statisticsCollector.isAlive());
1053         ci.println("Stats Collector State: "
1054                 + statisticsCollector.getState().toString());
1055         ci.println("StatsTimer: " + statisticsTimer.toString());
1056         ci.println("Flow Stats Period: " + statisticsTickNumber + " s");
1057         ci.println("Desc Stats Period: " + descriptionTickNumber + " s");
1058         ci.println("Port Stats Period: " + portTickNumber + " s");
1059         ci.println("Table Stats Period: " + tableTickNumber + " s");
1060     }
1061
1062     public void _resetSwitchCapability(CommandInterpreter ci) {
1063         String sidString = ci.nextArgument();
1064         Long sid = null;
1065         if (sidString == null) {
1066             ci.println("Insert the switch id (numeric value)");
1067             return;
1068         }
1069         try {
1070             sid = Long.valueOf(sidString);
1071             this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE);
1072             ci.println("Vendor capability for switch " + sid + " set to "
1073                     + this.switchSupportsVendorExtStats.get(sid));
1074         } catch (NumberFormatException e) {
1075             ci.println("Invalid switch id. Has to be numeric.");
1076         }
1077
1078     }
1079
1080     public void _ofbw(CommandInterpreter ci) {
1081         String sidString = ci.nextArgument();
1082         Long sid = null;
1083         if (sidString == null) {
1084             ci.println("Insert the switch id (numeric value)");
1085             return;
1086         }
1087         try {
1088             sid = Long.valueOf(sidString);
1089         } catch (NumberFormatException e) {
1090             ci.println("Invalid switch id. Has to be numeric.");
1091         }
1092         if (sid != null) {
1093             Map<Short, TxRates> thisSwitchRates = txRates.get(sid);
1094             ci.println("Bandwidth utilization (" + factoredSamples
1095                     * portTickNumber + " sec average) for switch "
1096                     + HexEncode.longToHexString(sid) + ":");
1097             if (thisSwitchRates == null) {
1098                 ci.println("Not available");
1099             } else {
1100                 for (Entry<Short, TxRates> entry : thisSwitchRates.entrySet()) {
1101                     ci.println("Port: " + entry.getKey() + ": "
1102                             + entry.getValue().getAverageTxRate() + " bps");
1103                 }
1104             }
1105         }
1106     }
1107
1108     public void _txratewindow(CommandInterpreter ci) {
1109         String averageWindow = ci.nextArgument();
1110         short seconds = 0;
1111         if (averageWindow == null) {
1112             ci.println("Insert the length in seconds of the median "
1113                     + "window for tx rate");
1114             ci.println("Current: " + factoredSamples * portTickNumber + " secs");
1115             return;
1116         }
1117         try {
1118             seconds = Short.valueOf(averageWindow);
1119         } catch (NumberFormatException e) {
1120             ci.println("Invalid period.");
1121         }
1122         OFStatisticsManager.factoredSamples = (short) (seconds / portTickNumber);
1123         ci.println("New: " + factoredSamples * portTickNumber + " secs");
1124     }
1125
1126     public void _ofstatsmgrintervals(CommandInterpreter ci) {
1127         String flowStatsInterv = ci.nextArgument();
1128         String portStatsInterv = ci.nextArgument();
1129         String descStatsInterv = ci.nextArgument();
1130         String tableStatsInterv = ci.nextArgument();
1131
1132         if (flowStatsInterv == null || portStatsInterv == null
1133                 || descStatsInterv == null) {
1134             ci.println("Usage: ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds)");
1135             ci.println("Current Values: fP=" + statisticsTickNumber + "sec pP="
1136                     + portTickNumber + "sec dP=" + descriptionTickNumber + "sec tP=" + tableTickNumber + " sec");
1137             return;
1138         }
1139         Short fP, pP, dP, tP;
1140         try {
1141             fP = Short.parseShort(flowStatsInterv);
1142             pP = Short.parseShort(portStatsInterv);
1143             dP = Short.parseShort(descStatsInterv);
1144             tP = Short.parseShort(tableStatsInterv);
1145         } catch (Exception e) {
1146             ci.println("Invalid format values: " + e.getMessage());
1147             return;
1148         }
1149
1150         if (pP <= 1 || fP <= 1 || dP <= 1 || tP <= 1) {
1151             ci.println("Invalid values. fP, pP, dP, tP have to be greater than 1.");
1152             return;
1153         }
1154
1155         statisticsTickNumber = fP;
1156         portTickNumber = pP;
1157         descriptionTickNumber = dP;
1158         tableTickNumber = tP;
1159
1160         ci.println("New Values: fP=" + statisticsTickNumber + "s pP="
1161                 + portTickNumber + "s dP=" + descriptionTickNumber + "s tP="
1162                 + tableTickNumber + "s");
1163     }
1164
1165     /**
1166      * This method retrieves user configurations from config.ini and updates
1167      * statisticsTickNumber/portTickNumber/descriptionTickNumber accordingly.
1168      */
1169     private void configStatsPollIntervals() {
1170         String fsStr = System.getProperty("of.flowStatsPollInterval");
1171         String psStr = System.getProperty("of.portStatsPollInterval");
1172         String dsStr = System.getProperty("of.descStatsPollInterval");
1173         String tsStr = System.getProperty("of.tableStatsPollInterval");
1174         Short fs, ps, ds, ts;
1175
1176         if (fsStr != null) {
1177             try {
1178                 fs = Short.parseShort(fsStr);
1179                 if (fs > 0) {
1180                     statisticsTickNumber = fs;
1181                 }
1182             } catch (Exception e) {
1183             }
1184         }
1185
1186         if (psStr != null) {
1187             try {
1188                 ps = Short.parseShort(psStr);
1189                 if (ps > 0) {
1190                     portTickNumber = ps;
1191                 }
1192             } catch (Exception e) {
1193             }
1194         }
1195
1196         if (dsStr != null) {
1197             try {
1198                 ds = Short.parseShort(dsStr);
1199                 if (ds > 0) {
1200                     descriptionTickNumber = ds;
1201                 }
1202             } catch (Exception e) {
1203             }
1204         }
1205
1206         if (tsStr != null) {
1207             try{
1208                 ts = Short.parseShort(tsStr);
1209                 if (ts > 0) {
1210                     tableTickNumber = ts;
1211                 }
1212             } catch (Exception e) {
1213             }
1214         }
1215     }
1216 }