OpenFlow Protocol_plugin changes to make use of the Connection Manager infrastructure...
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / OFStatisticsManager.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.nio.ByteBuffer;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.Deque;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.Map.Entry;
19 import java.util.Set;
20 import java.util.Timer;
21 import java.util.TimerTask;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.CopyOnWriteArraySet;
26 import java.util.concurrent.LinkedBlockingDeque;
27 import java.util.concurrent.LinkedBlockingQueue;
28
29 import org.eclipse.osgi.framework.console.CommandInterpreter;
30 import org.eclipse.osgi.framework.console.CommandProvider;
31 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
32 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsListener;
33 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
34 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
36 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
37 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
38 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
39 import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
40 import org.opendaylight.controller.sal.core.Node;
41 import org.opendaylight.controller.sal.core.NodeConnector;
42 import org.opendaylight.controller.sal.core.Property;
43 import org.opendaylight.controller.sal.core.UpdateType;
44 import org.opendaylight.controller.sal.utils.HexEncode;
45 import org.openflow.protocol.OFError;
46 import org.openflow.protocol.OFMatch;
47 import org.openflow.protocol.OFPort;
48 import org.openflow.protocol.OFStatisticsRequest;
49 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
50 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
51 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
52 import org.openflow.protocol.statistics.OFPortStatisticsReply;
53 import org.openflow.protocol.statistics.OFPortStatisticsRequest;
54 import org.openflow.protocol.statistics.OFQueueStatisticsRequest;
55 import org.openflow.protocol.statistics.OFStatistics;
56 import org.openflow.protocol.statistics.OFStatisticsType;
57 import org.openflow.protocol.statistics.OFTableStatistics;
58 import org.openflow.protocol.statistics.OFVendorStatistics;
59 import org.openflow.util.HexString;
60 import org.osgi.framework.BundleContext;
61 import org.osgi.framework.FrameworkUtil;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 /**
66  * It periodically polls the different OF statistics from the OF switches and
67  * caches them for quick retrieval for the above layers' modules It also
68  * provides an API to directly query the switch about the statistics
69  */
70 public class OFStatisticsManager implements IOFStatisticsManager,
71 IInventoryShimExternalListener, CommandProvider {
72     private static final Logger log = LoggerFactory.getLogger(OFStatisticsManager.class);
73     private static final int INITIAL_SIZE = 64;
74     private static final long FLOW_STATS_PERIOD = 10000;
75     private static final long DESC_STATS_PERIOD = 60000;
76     private static final long PORT_STATS_PERIOD = 5000;
77     private static final long TABLE_STATS_PERIOD = 10000;
78     private static final long TICK = 1000;
79     private static short statisticsTickNumber = (short) (FLOW_STATS_PERIOD / TICK);
80     private static short descriptionTickNumber = (short) (DESC_STATS_PERIOD / TICK);
81     private static short portTickNumber = (short) (PORT_STATS_PERIOD / TICK);
82     private static short tableTickNumber = (short) (TABLE_STATS_PERIOD / TICK);
83     private static short factoredSamples = (short) 2;
84     private static short counter = 1;
85     private IController controller = null;
86     private ConcurrentMap<Long, List<OFStatistics>> flowStatistics;
87     private ConcurrentMap<Long, List<OFStatistics>> descStatistics;
88     private ConcurrentMap<Long, List<OFStatistics>> portStatistics;
89     private ConcurrentMap<Long, List<OFStatistics>> tableStatistics;
90     private List<OFStatistics> dummyList;
91     private ConcurrentMap<Long, StatisticsTicks> statisticsTimerTicks;
92     protected BlockingQueue<StatsRequest> pendingStatsRequests;
93     protected BlockingQueue<Long> switchPortStatsUpdated;
94     private Thread statisticsCollector;
95     private Thread txRatesUpdater;
96     private Timer statisticsTimer;
97     private TimerTask statisticsTimerTask;
98     private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
99     // Per port sampled (every portStatsPeriod) transmit rate
100     private Map<Long, Map<Short, TxRates>> txRates;
101     private Set<IOFStatisticsListener> statisticsListeners;
102
103     /**
104      * The object containing the latest factoredSamples tx rate samples for a
105      * given switch port
106      */
107     protected class TxRates {
108         // contains the latest factoredSamples sampled transmitted bytes
109         Deque<Long> sampledTxBytes;
110
111         public TxRates() {
112             sampledTxBytes = new LinkedBlockingDeque<Long>();
113         }
114
115         public void update(Long txBytes) {
116             /*
117              * Based on how many samples our average works on, we might have to
118              * remove the oldest sample
119              */
120             if (sampledTxBytes.size() == factoredSamples) {
121                 sampledTxBytes.removeLast();
122             }
123
124             // Add the latest sample to the top of the queue
125             sampledTxBytes.addFirst(txBytes);
126         }
127
128         /**
129          * Returns the average transmit rate in bps
130          *
131          * @return the average transmit rate [bps]
132          */
133         public long getAverageTxRate() {
134             long average = 0;
135             /*
136              * If we cannot provide the value for the time window length set
137              */
138             if (sampledTxBytes.size() < factoredSamples) {
139                 return average;
140             }
141             long increment = sampledTxBytes.getFirst() - sampledTxBytes
142                     .getLast();
143             long timePeriod = factoredSamples * PORT_STATS_PERIOD / TICK;
144             average = (8L * increment) / timePeriod;
145             return average;
146         }
147     }
148
149     public void setController(IController core) {
150         this.controller = core;
151     }
152
153     public void unsetController(IController core) {
154         if (this.controller == core) {
155             this.controller = null;
156         }
157     }
158
159     private short getStatsQueueSize() {
160         String statsQueueSizeStr = System.getProperty("of.statsQueueSize");
161         short statsQueueSize = INITIAL_SIZE;
162         if (statsQueueSizeStr != null) {
163             try {
164                 statsQueueSize = Short.parseShort(statsQueueSizeStr);
165                 if (statsQueueSize <= 0) {
166                     statsQueueSize = INITIAL_SIZE;
167                 }
168             } catch (Exception e) {
169             }
170         }
171         return statsQueueSize;
172     }
173
174     IPluginOutConnectionService connectionPluginOutService;
175     void setIPluginOutConnectionService(IPluginOutConnectionService s) {
176         connectionPluginOutService = s;
177     }
178
179     void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
180         if (connectionPluginOutService == s) {
181             connectionPluginOutService = null;
182         }
183     }
184
185     /**
186      * Function called by the dependency manager when all the required
187      * dependencies are satisfied
188      *
189      */
190     void init() {
191         flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
192         descStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
193         portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
194         tableStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
195         dummyList = new ArrayList<OFStatistics>(1);
196         pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(getStatsQueueSize());
197         statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(INITIAL_SIZE);
198         switchPortStatsUpdated = new LinkedBlockingQueue<Long>(INITIAL_SIZE);
199         switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(INITIAL_SIZE);
200         txRates = new HashMap<Long, Map<Short, TxRates>>(INITIAL_SIZE);
201         statisticsListeners = new CopyOnWriteArraySet<IOFStatisticsListener>();
202
203         configStatsPollIntervals();
204
205         // Initialize managed timers
206         statisticsTimer = new Timer();
207         statisticsTimerTask = new TimerTask() {
208             @Override
209             public void run() {
210                 decrementTicks();
211             }
212         };
213
214         // Initialize Statistics collector thread
215         statisticsCollector = new Thread(new Runnable() {
216             @Override
217             public void run() {
218                 while (true) {
219                     try {
220                         StatsRequest req = pendingStatsRequests.take();
221                         queryStatisticsInternal(req.switchId, req.type);
222                     } catch (InterruptedException e) {
223                         log.warn("Flow Statistics Collector thread "
224                                 + "interrupted", e);
225                     }
226                 }
227             }
228         }, "Statistics Collector");
229
230         // Initialize Tx Rate Updater thread
231         txRatesUpdater = new Thread(new Runnable() {
232             @Override
233             public void run() {
234                 while (true) {
235                     try {
236                         long switchId = switchPortStatsUpdated.take();
237                         updatePortsTxRate(switchId);
238                     } catch (InterruptedException e) {
239                         log.warn("TX Rate Updater thread interrupted", e);
240                     }
241                 }
242             }
243         }, "TX Rate Updater");
244     }
245
246     /**
247      * Function called by the dependency manager when at least one dependency
248      * become unsatisfied or when the component is shutting down because for
249      * example bundle is being stopped.
250      *
251      */
252     void destroy() {
253     }
254
255     /**
256      * Function called by dependency manager after "init ()" is called and after
257      * the services provided by the class are registered in the service registry
258      *
259      */
260     void start() {
261         // Start managed timers
262         statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, TICK);
263
264         // Start statistics collector thread
265         statisticsCollector.start();
266
267         // Start bandwidth utilization computer thread
268         txRatesUpdater.start();
269
270         // OSGI console
271         registerWithOSGIConsole();
272     }
273
274     /**
275      * Function called by the dependency manager before the services exported by
276      * the component are unregistered, this will be followed by a "destroy ()"
277      * calls
278      *
279      */
280     void stop() {
281         // Stop managed timers
282         statisticsTimer.cancel();
283     }
284
285     public void setStatisticsListener(IOFStatisticsListener s) {
286         this.statisticsListeners.add(s);
287     }
288
289     public void unsetStatisticsListener(IOFStatisticsListener s) {
290         if (s != null) {
291             this.statisticsListeners.remove(s);
292         }
293     }
294
295     private void registerWithOSGIConsole() {
296         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
297         bundleContext.registerService(CommandProvider.class.getName(), this, null);
298     }
299
300     private static class StatsRequest {
301         protected Long switchId;
302         protected OFStatisticsType type;
303
304         public StatsRequest(Long d, OFStatisticsType t) {
305             switchId = d;
306             type = t;
307         }
308
309         @Override
310         public String toString() {
311             return "SReq = {switchId=" + switchId + ", type=" + type + "}";
312         }
313
314         @Override
315         public int hashCode() {
316             final int prime = 31;
317             int result = 1;
318             result = prime * result
319                     + ((switchId == null) ? 0 : switchId.hashCode());
320             result = prime * result + ((type == null) ? 0 : type.ordinal());
321             return result;
322         }
323
324         @Override
325         public boolean equals(Object obj) {
326             if (this == obj) {
327                 return true;
328             }
329             if (obj == null) {
330                 return false;
331             }
332             if (getClass() != obj.getClass()) {
333                 return false;
334             }
335             StatsRequest other = (StatsRequest) obj;
336             if (switchId == null) {
337                 if (other.switchId != null) {
338                     return false;
339                 }
340             } else if (!switchId.equals(other.switchId)) {
341                 return false;
342             }
343             if (type != other.type) {
344                 return false;
345             }
346             return true;
347         }
348     }
349
350     private void addStatisticsTicks(Long switchId) {
351         switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume
352                                                                   // switch
353                                                                   // supports
354                                                                   // Vendor
355                                                                   // extension
356                                                                   // stats
357         statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
358         log.debug("Added Switch {} to target pool",
359                 HexString.toHexString(switchId.longValue()));
360     }
361
362     protected static class StatisticsTicks {
363         private short flowStatisticsTicks;
364         private short descriptionTicks;
365         private short portStatisticsTicks;
366         private short tableStatisticsTicks;
367
368         public StatisticsTicks(boolean scattered) {
369             if (scattered) {
370                 // scatter bursts by statisticsTickPeriod
371                 if (++counter < 0) {
372                     counter = 0;
373                 } // being paranoid here
374                 flowStatisticsTicks = (short) (1 + counter
375                         % statisticsTickNumber);
376                 descriptionTicks = (short) (1 + counter % descriptionTickNumber);
377                 portStatisticsTicks = (short) (1 + counter % portTickNumber);
378                 tableStatisticsTicks = (short) (1 + counter % tableTickNumber);
379             } else {
380                 flowStatisticsTicks = statisticsTickNumber;
381                 descriptionTicks = descriptionTickNumber;
382                 portStatisticsTicks = portTickNumber;
383                 tableStatisticsTicks = tableTickNumber;
384             }
385         }
386
387         public boolean decrementFlowTicksIsZero() {
388             // Please ensure no code is inserted between the if check and the
389             // flowStatisticsTicks reset
390             if (--flowStatisticsTicks == 0) {
391                 flowStatisticsTicks = statisticsTickNumber;
392                 return true;
393             }
394             return false;
395         }
396
397         public boolean decrementDescTicksIsZero() {
398             // Please ensure no code is inserted between the if check and the
399             // descriptionTicks reset
400             if (--descriptionTicks == 0) {
401                 descriptionTicks = descriptionTickNumber;
402                 return true;
403             }
404             return false;
405         }
406
407         public boolean decrementPortTicksIsZero() {
408             // Please ensure no code is inserted between the if check and the
409             // descriptionTicks reset
410             if (--portStatisticsTicks == 0) {
411                 portStatisticsTicks = portTickNumber;
412                 return true;
413             }
414             return false;
415         }
416
417         public boolean decrementTableTicksIsZero() {
418             // Please ensure no code is inserted between the if check and the
419             // descriptionTicks reset
420             if(--tableStatisticsTicks == 0) {
421                 tableStatisticsTicks = tableTickNumber;
422                 return true;
423             }
424             return false;
425         }
426
427         @Override
428         public String toString() {
429             return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks
430                     + ",pT=" + portStatisticsTicks + ",tT=" + tableStatisticsTicks + "}";
431         }
432     }
433
434     private void printInfoMessage(String type, StatsRequest request) {
435         log.info("{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
436                 new Object[] {type, HexString.toHexString(request.switchId), pendingStatsRequests.size(),
437                 statisticsCollector.getState().toString() });
438     }
439
440     protected void decrementTicks() {
441         StatsRequest request = null;
442         for (Map.Entry<Long, StatisticsTicks> entry : statisticsTimerTicks
443                 .entrySet()) {
444             StatisticsTicks clock = entry.getValue();
445             Long switchId = entry.getKey();
446             if (clock.decrementFlowTicksIsZero()) {
447                 request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ?
448                         new StatsRequest(switchId, OFStatisticsType.VENDOR) :
449                         new StatsRequest(switchId, OFStatisticsType.FLOW);
450                 // If a request for this switch is already in the queue, skip to
451                 // add this new request
452                 if (!pendingStatsRequests.contains(request)
453                         && false == pendingStatsRequests.offer(request)) {
454                     printInfoMessage("Flow", request);
455                 }
456             }
457
458             if (clock.decrementDescTicksIsZero()) {
459                 request = new StatsRequest(switchId, OFStatisticsType.DESC);
460                 // If a request for this switch is already in the queue, skip to
461                 // add this new request
462                 if (!pendingStatsRequests.contains(request)
463                         && false == pendingStatsRequests.offer(request)) {
464                     printInfoMessage("Description", request);
465                 }
466             }
467
468             if (clock.decrementPortTicksIsZero()) {
469                 request = new StatsRequest(switchId, OFStatisticsType.PORT);
470                 // If a request for this switch is already in the queue, skip to
471                 // add this new request
472                 if (!pendingStatsRequests.contains(request)
473                         && false == pendingStatsRequests.offer(request)) {
474                     printInfoMessage("Port", request);
475                 }
476             }
477
478             if(clock.decrementTableTicksIsZero()) {
479                 request = new StatsRequest(switchId, OFStatisticsType.TABLE);
480                 // If a request for this switch is already in the queue, skip to
481                 // add this new request
482                 if (!pendingStatsRequests.contains(request)
483                         && false == pendingStatsRequests.offer(request)) {
484                     printInfoMessage("Table", request);
485                 }
486             }
487         }
488     }
489
490     private void removeStatsRequestTasks(Long switchId) {
491         log.debug("Cleaning Statistics database for switch {}",
492                 HexEncode.longToHexString(switchId));
493         // To be safe, let's attempt removal of both VENDOR and FLOW request. It
494         // does not hurt
495         pendingStatsRequests.remove(new StatsRequest(switchId,
496                 OFStatisticsType.VENDOR));
497         pendingStatsRequests.remove(new StatsRequest(switchId,
498                 OFStatisticsType.FLOW));
499         pendingStatsRequests.remove(new StatsRequest(switchId,
500                 OFStatisticsType.DESC));
501         pendingStatsRequests.remove(new StatsRequest(switchId,
502                 OFStatisticsType.PORT));
503         pendingStatsRequests.remove(new StatsRequest(switchId,
504                 OFStatisticsType.TABLE));
505         // Take care of the TX rate databases
506         switchPortStatsUpdated.remove(switchId);
507         txRates.remove(switchId);
508     }
509
510     private void clearFlowStatsAndTicks(Long switchId) {
511         statisticsTimerTicks.remove(switchId);
512         removeStatsRequestTasks(switchId);
513         flowStatistics.remove(switchId);
514         log.debug("Statistics removed for switch {}",
515                 HexString.toHexString(switchId));
516     }
517
518     private void queryStatisticsInternal(Long switchId, OFStatisticsType statType) {
519
520         // Query the switch on all matches
521         List<OFStatistics> values = this.fetchStatisticsFromSwitch(switchId, statType, null);
522
523         // If got a valid response update local cache and notify listeners
524         if (values != null && !values.isEmpty()) {
525             switch (statType) {
526                 case FLOW:
527                 case VENDOR:
528                     flowStatistics.put(switchId, values);
529                     notifyFlowUpdate(switchId, values);
530                     break;
531                 case DESC:
532                     // Overwrite cache
533                     descStatistics.put(switchId, values);
534                     // Notify who may be interested in a description change
535                     notifyDescriptionUpdate(switchId, values);
536                     break;
537                 case PORT:
538                     // Overwrite cache with new port statistics for this switch
539                     portStatistics.put(switchId, values);
540
541                     // Wake up the thread which maintains the TX byte counters for
542                     // each port
543                     switchPortStatsUpdated.offer(switchId);
544                     notifyPortUpdate(switchId, values);
545                     break;
546                 case TABLE:
547                     // Overwrite cache
548                     tableStatistics.put(switchId, values);
549                     notifyTableUpdate(switchId, values);
550                     break;
551                 default:
552             }
553         }
554     }
555
556     private void notifyDescriptionUpdate(Long switchId, List<OFStatistics> values) {
557         for (IOFStatisticsListener l : this.statisticsListeners) {
558             l.descriptionStatisticsRefreshed(switchId, values);
559         }
560     }
561
562     private void notifyFlowUpdate(Long switchId, List<OFStatistics> values) {
563         if (values.get(0) instanceof OFVendorStatistics) {
564             values = this.v6StatsListToOFStatsList(values);
565         }
566
567         for (IOFStatisticsListener l : this.statisticsListeners) {
568             l.flowStatisticsRefreshed(switchId, values);
569         }
570
571     }
572
573     private void notifyPortUpdate(Long switchId, List<OFStatistics> values) {
574         for (IOFStatisticsListener l : this.statisticsListeners) {
575             l.portStatisticsRefreshed(switchId, values);
576         }
577     }
578
579     private void notifyTableUpdate(Long switchId, List<OFStatistics> values) {
580         for (IOFStatisticsListener l : this.statisticsListeners) {
581             l.tableStatisticsRefreshed(switchId, values);
582         }
583     }
584
585     /*
586      * Generic function to get the statistics form an OF switch
587      */
588     @SuppressWarnings("unchecked")
589     private List<OFStatistics> fetchStatisticsFromSwitch(Long switchId,
590             OFStatisticsType statsType, Object target) {
591         List<OFStatistics> values = null;
592         String type = null;
593         ISwitch sw = controller.getSwitch(switchId);
594
595         if (sw != null) {
596             OFStatisticsRequest req = new OFStatisticsRequest();
597             req.setStatisticType(statsType);
598             int requestLength = req.getLengthU();
599
600             if (statsType == OFStatisticsType.FLOW) {
601                 OFMatch match = null;
602                 if (target == null) {
603                     // All flows request
604                     match = new OFMatch();
605                     match.setWildcards(0xffffffff);
606                 } else if (!(target instanceof OFMatch)) {
607                     // Malformed request
608                     log.warn("Invalid target type for Flow stats request: {}",
609                             target.getClass());
610                     return null;
611                 } else {
612                     // Specific flow request
613                     match = (OFMatch) target;
614                 }
615                 OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest();
616                 specificReq.setMatch(match);
617                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
618                 specificReq.setTableId((byte) 0xff);
619                 req.setStatistics(Collections
620                         .singletonList((OFStatistics) specificReq));
621                 requestLength += specificReq.getLength();
622                 type = "FLOW";
623             } else if (statsType == OFStatisticsType.VENDOR) {
624                 V6StatsRequest specificReq = new V6StatsRequest();
625                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
626                 specificReq.setTableId((byte) 0xff);
627                 req.setStatistics(Collections
628                         .singletonList((OFStatistics) specificReq));
629                 requestLength += specificReq.getLength();
630                 type = "VENDOR";
631             } else if (statsType == OFStatisticsType.AGGREGATE) {
632                 OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest();
633                 OFMatch match = new OFMatch();
634                 match.setWildcards(0xffffffff);
635                 specificReq.setMatch(match);
636                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
637                 specificReq.setTableId((byte) 0xff);
638                 req.setStatistics(Collections
639                         .singletonList((OFStatistics) specificReq));
640                 requestLength += specificReq.getLength();
641                 type = "AGGREGATE";
642             } else if (statsType == OFStatisticsType.PORT) {
643                 short targetPort;
644                 if (target == null) {
645                     // All ports request
646                     targetPort = OFPort.OFPP_NONE.getValue();
647                 } else if (!(target instanceof Short)) {
648                     // Malformed request
649                     log.warn("Invalid target type for Port stats request: {}",
650                             target.getClass());
651                     return null;
652                 } else {
653                     // Specific port request
654                     targetPort = (Short) target;
655                 }
656                 OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest();
657                 specificReq.setPortNumber(targetPort);
658                 req.setStatistics(Collections
659                         .singletonList((OFStatistics) specificReq));
660                 requestLength += specificReq.getLength();
661                 type = "PORT";
662             } else if (statsType == OFStatisticsType.QUEUE) {
663                 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
664                 specificReq.setPortNumber(OFPort.OFPP_ALL.getValue());
665                 specificReq.setQueueId(0xffffffff);
666                 req.setStatistics(Collections
667                         .singletonList((OFStatistics) specificReq));
668                 requestLength += specificReq.getLength();
669                 type = "QUEUE";
670             } else if (statsType == OFStatisticsType.DESC) {
671                 type = "DESC";
672             } else if (statsType == OFStatisticsType.TABLE) {
673                 if(target != null){
674                     if (!(target instanceof Byte)) {
675                         // Malformed request
676                         log.warn("Invalid table id for table stats request: {}",
677                                 target.getClass());
678                         return null;
679                     }
680                     byte targetTable = (Byte) target;
681                     OFTableStatistics specificReq = new OFTableStatistics();
682                     specificReq.setTableId(targetTable);
683                     req.setStatistics(Collections
684                             .singletonList((OFStatistics) specificReq));
685                     requestLength += specificReq.getLength();
686                 }
687                 type = "TABLE";
688             }
689             req.setLengthU(requestLength);
690             Object result = sw.getStatistics(req);
691
692             if (result == null) {
693                 log.warn("Request Timed Out for ({}) from switch {}", type,
694                         HexString.toHexString(switchId));
695             } else if (result instanceof OFError) {
696                 log.warn("Switch {} failed to handle ({}) stats request: {}",
697                         new Object[] { HexString.toHexString(switchId), type,
698                         Utils.getOFErrorString((OFError) result) });
699                 if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
700                     log.warn(
701                             "Switching back to regular Flow stats requests for switch {}",
702                             HexString.toHexString(switchId));
703                     this.switchSupportsVendorExtStats.put(switchId,
704                             Boolean.FALSE);
705                 }
706             } else {
707                 values = (List<OFStatistics>) result;
708             }
709         }
710         return values;
711     }
712
713     @Override
714     public List<OFStatistics> getOFFlowStatistics(Long switchId) {
715         List<OFStatistics> list = flowStatistics.get(switchId);
716
717         /*
718          * Check on emptiness as interference between add and get is still
719          * possible on the inner list (the concurrentMap entry's value)
720          */
721         return (list == null || list.isEmpty()) ? this.dummyList
722                 : (list.get(0) instanceof OFVendorStatistics) ? this
723                         .v6StatsListToOFStatsList(list) : list;
724     }
725
726     @Override
727     public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch, short priority) {
728         List<OFStatistics> statsList = flowStatistics.get(switchId);
729
730         /*
731          * Check on emptiness as interference between add and get is still
732          * possible on the inner list (the concurrentMap entry's value)
733          */
734         if (statsList == null || statsList.isEmpty()) {
735             return this.dummyList;
736         }
737
738         if (statsList.get(0) instanceof OFVendorStatistics) {
739             /*
740              * Caller could provide regular OF match when we instead pull the
741              * vendor statistics from this node Caller is not supposed to know
742              * whether this switch supports vendor extensions statistics
743              * requests
744              */
745             V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch
746                     : new V6Match(ofMatch);
747
748             List<OFStatistics> targetList = v6StatsListToOFStatsList(statsList);
749             for (OFStatistics stats : targetList) {
750                 V6StatsReply v6Stats = (V6StatsReply) stats;
751                 V6Match v6Match = v6Stats.getMatch();
752                 if (v6Stats.getPriority() == priority && v6Match.equals(targetMatch)) {
753                     List<OFStatistics> list = new ArrayList<OFStatistics>();
754                     list.add(stats);
755                     return list;
756                 }
757             }
758         } else {
759             for (OFStatistics stats : statsList) {
760                 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
761                 if (flowStats.getPriority() == priority && flowStats.getMatch().equals(ofMatch)) {
762                     List<OFStatistics> list = new ArrayList<OFStatistics>();
763                     list.add(stats);
764                     return list;
765                 }
766             }
767         }
768         return this.dummyList;
769     }
770
771     /*
772      * Converts the v6 vendor statistics to the OFStatistics
773      */
774     private List<OFStatistics> v6StatsListToOFStatsList(
775             List<OFStatistics> statistics) {
776         List<OFStatistics> v6statistics = new ArrayList<OFStatistics>();
777         if (statistics != null && !statistics.isEmpty()) {
778             for (OFStatistics stats : statistics) {
779                 if (stats instanceof OFVendorStatistics) {
780                     List<OFStatistics> r = getV6ReplyStatistics((OFVendorStatistics) stats);
781                     if (r != null) {
782                         v6statistics.addAll(r);
783                     }
784                 }
785             }
786         }
787         return v6statistics;
788     }
789
790     private static List<OFStatistics> getV6ReplyStatistics(
791             OFVendorStatistics stat) {
792         int length = stat.getLength();
793         List<OFStatistics> results = new ArrayList<OFStatistics>();
794         if (length < 12)
795             return null; // Nicira Hdr is 12 bytes. We need atleast that much
796         ByteBuffer data = ByteBuffer.allocate(length);
797         stat.writeTo(data);
798         data.rewind();
799         if (log.isTraceEnabled()) {
800             log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}",
801                     HexString.toHexString(data.array()));
802         }
803
804         int vendor = data.getInt(); // first 4 bytes is vendor id.
805         if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) {
806             log.warn("Unexpected vendor id: 0x{}", Integer.toHexString(vendor));
807             return null;
808         } else {
809             // go ahead by 8 bytes which is 8 bytes of 0
810             data.getLong(); // should be all 0's
811             length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have
812                           // been consumed
813         }
814
815         V6StatsReply v6statsreply;
816         int min_len;
817         while (length > 0) {
818             v6statsreply = new V6StatsReply();
819             min_len = v6statsreply.getLength();
820             if (length < v6statsreply.getLength())
821                 break;
822             v6statsreply.setActionFactory(stat.getActionFactory());
823             v6statsreply.readFrom(data);
824             if (v6statsreply.getLength() < min_len)
825                 break;
826             v6statsreply.setVendorId(vendor);
827             log.trace("V6StatsReply: {}", v6statsreply);
828             length -= v6statsreply.getLength();
829             results.add(v6statsreply);
830         }
831         return results;
832     }
833
834     @Override
835     public List<OFStatistics> queryStatistics(Long switchId,
836             OFStatisticsType statType, Object target) {
837         /*
838          * Caller does not know and it is not supposed to know whether this
839          * switch supports vendor extension. We adjust the target for him
840          */
841         if (statType == OFStatisticsType.FLOW) {
842             if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
843                 statType = OFStatisticsType.VENDOR;
844             }
845         }
846
847         List<OFStatistics> list = this.fetchStatisticsFromSwitch(switchId, statType,
848                 target);
849
850         return (list == null) ? null :
851             (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list) : list;
852     }
853
854     @Override
855     public List<OFStatistics> getOFDescStatistics(Long switchId) {
856         if (!descStatistics.containsKey(switchId))
857             return this.dummyList;
858
859         return descStatistics.get(switchId);
860     }
861
862     @Override
863     public List<OFStatistics> getOFPortStatistics(Long switchId) {
864         if (!portStatistics.containsKey(switchId)) {
865             return this.dummyList;
866         }
867
868         return portStatistics.get(switchId);
869     }
870
871     @Override
872     public List<OFStatistics> getOFPortStatistics(Long switchId, short portId) {
873         if (!portStatistics.containsKey(switchId)) {
874             return this.dummyList;
875         }
876         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
877         for (OFStatistics stats : portStatistics.get(switchId)) {
878             if (((OFPortStatisticsReply) stats).getPortNumber() == portId) {
879                 list.add(stats);
880                 break;
881             }
882         }
883         return list;
884     }
885
886     @Override
887     public List<OFStatistics> getOFTableStatistics(Long switchId) {
888         if (!tableStatistics.containsKey(switchId)) {
889             return this.dummyList;
890         }
891
892         return tableStatistics.get(switchId);
893     }
894
895     @Override
896     public List<OFStatistics> getOFTableStatistics(Long switchId, Byte tableId) {
897         if (!tableStatistics.containsKey(switchId)) {
898             return this.dummyList;
899         }
900
901         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
902         for (OFStatistics stats : tableStatistics.get(switchId)) {
903             if (((OFTableStatistics) stats).getTableId() == tableId) {
904                 list.add(stats);
905                 break;
906             }
907         }
908         return list;
909     }
910
911     @Override
912     public int getFlowsNumber(long switchId) {
913         return this.flowStatistics.get(switchId).size();
914     }
915
916     /*
917      * InventoryShim replay for us all the switch addition which happened before
918      * we were brought up
919      */
920     @Override
921     public void updateNode(Node node, UpdateType type, Set<Property> props) {
922         Long switchId = (Long) node.getID();
923         switch (type) {
924         case ADDED:
925             addStatisticsTicks(switchId);
926             break;
927         case REMOVED:
928             clearFlowStatsAndTicks(switchId);
929         default:
930         }
931     }
932
933     @Override
934     public void updateNodeConnector(NodeConnector nodeConnector,
935             UpdateType type, Set<Property> props) {
936         // No action
937     }
938
939     /**
940      * Update the cached port rates for this switch with the latest retrieved
941      * port transmit byte count
942      *
943      * @param switchId
944      */
945     private synchronized void updatePortsTxRate(long switchId) {
946         List<OFStatistics> newPortStatistics = this.portStatistics.get(switchId);
947         if (newPortStatistics == null) {
948             return;
949         }
950         Map<Short, TxRates> rates = this.txRates.get(switchId);
951         if (rates == null) {
952             // First time rates for this switch are added
953             rates = new HashMap<Short, TxRates>();
954             txRates.put(switchId, rates);
955         }
956         for (OFStatistics stats : newPortStatistics) {
957             OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats;
958             short port = newPortStat.getPortNumber();
959             TxRates portRatesHolder = rates.get(port);
960             if (portRatesHolder == null) {
961                 // First time rates for this port are added
962                 portRatesHolder = new TxRates();
963                 rates.put(port, portRatesHolder);
964             }
965             // Get and store the number of transmitted bytes for this port
966             // And handle the case where agent does not support the counter
967             long transmitBytes = newPortStat.getTransmitBytes();
968             long value = (transmitBytes < 0) ? 0 : transmitBytes;
969             portRatesHolder.update(value);
970         }
971     }
972
973     @Override
974     public synchronized long getTransmitRate(Long switchId, Short port) {
975         long average = 0;
976         if (switchId == null || port == null) {
977             return average;
978         }
979         Map<Short, TxRates> perSwitch = txRates.get(switchId);
980         if (perSwitch == null) {
981             return average;
982         }
983         TxRates portRates = perSwitch.get(port);
984         if (portRates == null) {
985             return average;
986         }
987         return portRates.getAverageTxRate();
988     }
989
990     /*
991      * Manual switch name configuration code
992      */
993     @Override
994     public String getHelp() {
995         StringBuffer help = new StringBuffer();
996         help.append("---OF Statistics Manager utilities---\n");
997         help.append("\t ofdumpstatsmgr         - "
998                 + "Print Internal Stats Mgr db\n");
999         help.append("\t ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds) - "
1000                 + "Set/Show flow/port/dedscription stats poll intervals\n");
1001         return help.toString();
1002     }
1003
1004     private boolean isValidSwitchId(String switchId) {
1005         String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$";
1006         String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$";
1007
1008         return (switchId != null && (switchId.matches(regexDatapathID) || switchId
1009                 .matches(regexDatapathIDLong)));
1010     }
1011
1012     public long getSwitchIDLong(String switchId) {
1013         int radix = 16;
1014         String switchString = "0";
1015
1016         if (isValidSwitchId(switchId)) {
1017             if (switchId.contains(":")) {
1018                 // Handle the 00:00:AA:BB:CC:DD:EE:FF notation
1019                 switchString = switchId.replace(":", "");
1020             } else if (switchId.contains("-")) {
1021                 // Handle the 00-00-AA-BB-CC-DD-EE-FF notation
1022                 switchString = switchId.replace("-", "");
1023             } else {
1024                 // Handle the 0123456789ABCDEF notation
1025                 switchString = switchId;
1026             }
1027         }
1028         return Long.parseLong(switchString, radix);
1029     }
1030
1031     /*
1032      * Internal information dump code
1033      */
1034     private String prettyPrintSwitchMap(ConcurrentMap<Long, StatisticsTicks> map) {
1035         StringBuffer buffer = new StringBuffer();
1036         buffer.append("{");
1037         for (Entry<Long, StatisticsTicks> entry : map.entrySet()) {
1038             buffer.append(HexString.toHexString(entry.getKey()) + "="
1039                     + entry.getValue().toString() + " ");
1040         }
1041         buffer.append("}");
1042         return buffer.toString();
1043     }
1044
1045     public void _ofdumpstatsmgr(CommandInterpreter ci) {
1046         ci.println("Global Counter: " + counter);
1047         ci.println("Timer Ticks: " + prettyPrintSwitchMap(statisticsTimerTicks));
1048         ci.println("PendingStatsQueue: " + pendingStatsRequests);
1049         ci.println("PendingStatsQueue size: " + pendingStatsRequests.size());
1050         ci.println("Stats Collector alive: " + statisticsCollector.isAlive());
1051         ci.println("Stats Collector State: "
1052                 + statisticsCollector.getState().toString());
1053         ci.println("StatsTimer: " + statisticsTimer.toString());
1054         ci.println("Flow Stats Period: " + statisticsTickNumber + " s");
1055         ci.println("Desc Stats Period: " + descriptionTickNumber + " s");
1056         ci.println("Port Stats Period: " + portTickNumber + " s");
1057         ci.println("Table Stats Period: " + tableTickNumber + " s");
1058     }
1059
1060     public void _resetSwitchCapability(CommandInterpreter ci) {
1061         String sidString = ci.nextArgument();
1062         Long sid = null;
1063         if (sidString == null) {
1064             ci.println("Insert the switch id (numeric value)");
1065             return;
1066         }
1067         try {
1068             sid = Long.valueOf(sidString);
1069             this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE);
1070             ci.println("Vendor capability for switch " + sid + " set to "
1071                     + this.switchSupportsVendorExtStats.get(sid));
1072         } catch (NumberFormatException e) {
1073             ci.println("Invalid switch id. Has to be numeric.");
1074         }
1075
1076     }
1077
1078     public void _ofbw(CommandInterpreter ci) {
1079         String sidString = ci.nextArgument();
1080         Long sid = null;
1081         if (sidString == null) {
1082             ci.println("Insert the switch id (numeric value)");
1083             return;
1084         }
1085         try {
1086             sid = Long.valueOf(sidString);
1087         } catch (NumberFormatException e) {
1088             ci.println("Invalid switch id. Has to be numeric.");
1089         }
1090         if (sid != null) {
1091             Map<Short, TxRates> thisSwitchRates = txRates.get(sid);
1092             ci.println("Bandwidth utilization (" + factoredSamples
1093                     * portTickNumber + " sec average) for switch "
1094                     + HexEncode.longToHexString(sid) + ":");
1095             if (thisSwitchRates == null) {
1096                 ci.println("Not available");
1097             } else {
1098                 for (Entry<Short, TxRates> entry : thisSwitchRates.entrySet()) {
1099                     ci.println("Port: " + entry.getKey() + ": "
1100                             + entry.getValue().getAverageTxRate() + " bps");
1101                 }
1102             }
1103         }
1104     }
1105
1106     public void _txratewindow(CommandInterpreter ci) {
1107         String averageWindow = ci.nextArgument();
1108         short seconds = 0;
1109         if (averageWindow == null) {
1110             ci.println("Insert the length in seconds of the median "
1111                     + "window for tx rate");
1112             ci.println("Current: " + factoredSamples * portTickNumber + " secs");
1113             return;
1114         }
1115         try {
1116             seconds = Short.valueOf(averageWindow);
1117         } catch (NumberFormatException e) {
1118             ci.println("Invalid period.");
1119         }
1120         OFStatisticsManager.factoredSamples = (short) (seconds / portTickNumber);
1121         ci.println("New: " + factoredSamples * portTickNumber + " secs");
1122     }
1123
1124     public void _ofstatsmgrintervals(CommandInterpreter ci) {
1125         String flowStatsInterv = ci.nextArgument();
1126         String portStatsInterv = ci.nextArgument();
1127         String descStatsInterv = ci.nextArgument();
1128         String tableStatsInterv = ci.nextArgument();
1129
1130         if (flowStatsInterv == null || portStatsInterv == null
1131                 || descStatsInterv == null) {
1132             ci.println("Usage: ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds)");
1133             ci.println("Current Values: fP=" + statisticsTickNumber + "sec pP="
1134                     + portTickNumber + "sec dP=" + descriptionTickNumber + "sec tP=" + tableTickNumber + " sec");
1135             return;
1136         }
1137         Short fP, pP, dP, tP;
1138         try {
1139             fP = Short.parseShort(flowStatsInterv);
1140             pP = Short.parseShort(portStatsInterv);
1141             dP = Short.parseShort(descStatsInterv);
1142             tP = Short.parseShort(tableStatsInterv);
1143         } catch (Exception e) {
1144             ci.println("Invalid format values: " + e.getMessage());
1145             return;
1146         }
1147
1148         if (pP <= 1 || fP <= 1 || dP <= 1 || tP <= 1) {
1149             ci.println("Invalid values. fP, pP, dP, tP have to be greater than 1.");
1150             return;
1151         }
1152
1153         statisticsTickNumber = fP;
1154         portTickNumber = pP;
1155         descriptionTickNumber = dP;
1156         tableTickNumber = tP;
1157
1158         ci.println("New Values: fP=" + statisticsTickNumber + "s pP="
1159                 + portTickNumber + "s dP=" + descriptionTickNumber + "s tP="
1160                 + tableTickNumber + "s");
1161     }
1162
1163     /**
1164      * This method retrieves user configurations from config.ini and updates
1165      * statisticsTickNumber/portTickNumber/descriptionTickNumber accordingly.
1166      */
1167     private void configStatsPollIntervals() {
1168         String fsStr = System.getProperty("of.flowStatsPollInterval");
1169         String psStr = System.getProperty("of.portStatsPollInterval");
1170         String dsStr = System.getProperty("of.descStatsPollInterval");
1171         String tsStr = System.getProperty("of.tableStatsPollInterval");
1172         Short fs, ps, ds, ts;
1173
1174         if (fsStr != null) {
1175             try {
1176                 fs = Short.parseShort(fsStr);
1177                 if (fs > 0) {
1178                     statisticsTickNumber = fs;
1179                 }
1180             } catch (Exception e) {
1181             }
1182         }
1183
1184         if (psStr != null) {
1185             try {
1186                 ps = Short.parseShort(psStr);
1187                 if (ps > 0) {
1188                     portTickNumber = ps;
1189                 }
1190             } catch (Exception e) {
1191             }
1192         }
1193
1194         if (dsStr != null) {
1195             try {
1196                 ds = Short.parseShort(dsStr);
1197                 if (ds > 0) {
1198                     descriptionTickNumber = ds;
1199                 }
1200             } catch (Exception e) {
1201             }
1202         }
1203
1204         if (tsStr != null) {
1205             try{
1206                 ts = Short.parseShort(tsStr);
1207                 if (ts > 0) {
1208                     tableTickNumber = ts;
1209                 }
1210             } catch (Exception e) {
1211             }
1212         }
1213     }
1214 }