Merge "Enhance debug capabilities"
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / OFStatisticsManager.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.nio.ByteBuffer;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.Deque;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.Map.Entry;
19 import java.util.Set;
20 import java.util.Timer;
21 import java.util.TimerTask;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.CopyOnWriteArraySet;
26 import java.util.concurrent.LinkedBlockingDeque;
27 import java.util.concurrent.LinkedBlockingQueue;
28
29 import org.eclipse.osgi.framework.console.CommandInterpreter;
30 import org.eclipse.osgi.framework.console.CommandProvider;
31 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
32 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsListener;
33 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
34 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
36 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
37 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
38 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
39 import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
40 import org.opendaylight.controller.sal.core.Node;
41 import org.opendaylight.controller.sal.core.NodeConnector;
42 import org.opendaylight.controller.sal.core.Property;
43 import org.opendaylight.controller.sal.core.UpdateType;
44 import org.opendaylight.controller.sal.utils.HexEncode;
45 import org.openflow.protocol.OFError;
46 import org.openflow.protocol.OFMatch;
47 import org.openflow.protocol.OFPort;
48 import org.openflow.protocol.OFStatisticsRequest;
49 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
50 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
51 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
52 import org.openflow.protocol.statistics.OFPortStatisticsReply;
53 import org.openflow.protocol.statistics.OFPortStatisticsRequest;
54 import org.openflow.protocol.statistics.OFQueueStatisticsRequest;
55 import org.openflow.protocol.statistics.OFStatistics;
56 import org.openflow.protocol.statistics.OFStatisticsType;
57 import org.openflow.protocol.statistics.OFTableStatistics;
58 import org.openflow.protocol.statistics.OFVendorStatistics;
59 import org.openflow.util.HexString;
60 import org.osgi.framework.BundleContext;
61 import org.osgi.framework.FrameworkUtil;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 /**
66  * Periodically polls the different OF statistics from the OF switches, caches
67  * them, and publishes results towards SAL. It also provides an API to directly
68  * query the switch for any specific statistics.
69  */
70 public class OFStatisticsManager implements IOFStatisticsManager, IInventoryShimExternalListener, CommandProvider {
71     private static final Logger log = LoggerFactory.getLogger(OFStatisticsManager.class);
72     private static final int INITIAL_SIZE = 64;
73     private static final long FLOW_STATS_PERIOD = 10000;
74     private static final long DESC_STATS_PERIOD = 60000;
75     private static final long PORT_STATS_PERIOD = 5000;
76     private static final long TABLE_STATS_PERIOD = 10000;
77     private static final long TICK = 1000;
78     private static short statisticsTickNumber = (short) (FLOW_STATS_PERIOD / TICK);
79     private static short descriptionTickNumber = (short) (DESC_STATS_PERIOD / TICK);
80     private static short portTickNumber = (short) (PORT_STATS_PERIOD / TICK);
81     private static short tableTickNumber = (short) (TABLE_STATS_PERIOD / TICK);
82     private static short factoredSamples = (short) 2;
83     private static short counter = 1;
84     private IController controller = null;
85     private ConcurrentMap<Long, List<OFStatistics>> flowStatistics;
86     private ConcurrentMap<Long, List<OFStatistics>> descStatistics;
87     private ConcurrentMap<Long, List<OFStatistics>> portStatistics;
88     private ConcurrentMap<Long, List<OFStatistics>> tableStatistics;
89     private List<OFStatistics> dummyList;
90     private ConcurrentMap<Long, StatisticsTicks> statisticsTimerTicks;
91     protected BlockingQueue<StatsRequest> pendingStatsRequests;
92     protected BlockingQueue<Long> switchPortStatsUpdated;
93     private Thread statisticsCollector;
94     private Thread txRatesUpdater;
95     private Timer statisticsTimer;
96     private TimerTask statisticsTimerTask;
97     private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
98     // Per port sampled (every portStatsPeriod) transmit rate
99     private Map<Long, Map<Short, TxRates>> txRates;
100     private Set<IOFStatisticsListener> statisticsListeners = new CopyOnWriteArraySet<IOFStatisticsListener>();
101
102     /**
103      * The object containing the latest factoredSamples tx rate samples for a
104      * given switch port
105      */
106     protected class TxRates {
107         // contains the latest factoredSamples sampled transmitted bytes
108         Deque<Long> sampledTxBytes;
109
110         public TxRates() {
111             sampledTxBytes = new LinkedBlockingDeque<Long>();
112         }
113
114         public void update(Long txBytes) {
115             /*
116              * Based on how many samples our average works on, we might have to
117              * remove the oldest sample
118              */
119             if (sampledTxBytes.size() == factoredSamples) {
120                 sampledTxBytes.removeLast();
121             }
122
123             // Add the latest sample to the top of the queue
124             sampledTxBytes.addFirst(txBytes);
125         }
126
127         /**
128          * Returns the average transmit rate in bps
129          *
130          * @return the average transmit rate [bps]
131          */
132         public long getAverageTxRate() {
133             long average = 0;
134             /*
135              * If we cannot provide the value for the time window length set
136              */
137             if (sampledTxBytes.size() < factoredSamples) {
138                 return average;
139             }
140             long increment = sampledTxBytes.getFirst() - sampledTxBytes
141                     .getLast();
142             long timePeriod = factoredSamples * PORT_STATS_PERIOD / TICK;
143             average = (8L * increment) / timePeriod;
144             return average;
145         }
146     }
147
148     public void setController(IController core) {
149         this.controller = core;
150     }
151
152     public void unsetController(IController core) {
153         if (this.controller == core) {
154             this.controller = null;
155         }
156     }
157
158     private short getStatsQueueSize() {
159         String statsQueueSizeStr = System.getProperty("of.statsQueueSize");
160         short statsQueueSize = INITIAL_SIZE;
161         if (statsQueueSizeStr != null) {
162             try {
163                 statsQueueSize = Short.parseShort(statsQueueSizeStr);
164                 if (statsQueueSize <= 0) {
165                     statsQueueSize = INITIAL_SIZE;
166                 }
167             } catch (Exception e) {
168             }
169         }
170         return statsQueueSize;
171     }
172
173     IPluginOutConnectionService connectionPluginOutService;
174     void setIPluginOutConnectionService(IPluginOutConnectionService s) {
175         connectionPluginOutService = s;
176     }
177
178     void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
179         if (connectionPluginOutService == s) {
180             connectionPluginOutService = null;
181         }
182     }
183
184     /**
185      * Function called by the dependency manager when all the required
186      * dependencies are satisfied
187      *
188      */
189     void init() {
190         flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
191         descStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
192         portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
193         tableStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
194         dummyList = new ArrayList<OFStatistics>(1);
195         pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(getStatsQueueSize());
196         statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(INITIAL_SIZE);
197         switchPortStatsUpdated = new LinkedBlockingQueue<Long>(INITIAL_SIZE);
198         switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(INITIAL_SIZE);
199         txRates = new HashMap<Long, Map<Short, TxRates>>(INITIAL_SIZE);
200
201         configStatsPollIntervals();
202
203         // Initialize managed timers
204         statisticsTimer = new Timer("Statistics Timer Ticks");
205         statisticsTimerTask = new TimerTask() {
206             @Override
207             public void run() {
208                 decrementTicks();
209             }
210         };
211
212         // Initialize Statistics collector thread
213         statisticsCollector = new Thread(new Runnable() {
214             @Override
215             public void run() {
216                 while (true) {
217                     try {
218                         StatsRequest req = pendingStatsRequests.take();
219                         queryStatisticsInternal(req.switchId, req.type);
220                     } catch (InterruptedException e) {
221                         log.warn("Flow Statistics Collector thread "
222                                 + "interrupted", e);
223                         return;
224                     }
225                 }
226             }
227         }, "Statistics Collector");
228
229         // Initialize Tx Rate Updater thread
230         txRatesUpdater = new Thread(new Runnable() {
231             @Override
232             public void run() {
233                 while (true) {
234                     try {
235                         long switchId = switchPortStatsUpdated.take();
236                         updatePortsTxRate(switchId);
237                     } catch (InterruptedException e) {
238                         log.warn("TX Rate Updater thread interrupted", e);
239                         return;
240                     }
241                 }
242             }
243         }, "TX Rate Updater");
244     }
245
246     /**
247      * Function called by the dependency manager when at least one dependency
248      * become unsatisfied or when the component is shutting down because for
249      * example bundle is being stopped.
250      *
251      */
252     void destroy() {
253         statisticsListeners.clear();
254     }
255
256     /**
257      * Function called by dependency manager after "init ()" is called and after
258      * the services provided by the class are registered in the service registry
259      *
260      */
261     void start() {
262         // Start managed timers
263         statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, TICK);
264
265         // Start statistics collector thread
266         statisticsCollector.start();
267
268         // Start bandwidth utilization computer thread
269         txRatesUpdater.start();
270
271         // OSGI console
272         registerWithOSGIConsole();
273     }
274
275     /**
276      * Function called by the dependency manager before the services exported by
277      * the component are unregistered, this will be followed by a "destroy ()"
278      * calls
279      *
280      */
281     void stop() {
282         // Stop managed timers
283         statisticsTimer.cancel();
284     }
285
286     public void setStatisticsListener(IOFStatisticsListener s) {
287         this.statisticsListeners.add(s);
288     }
289
290     public void unsetStatisticsListener(IOFStatisticsListener s) {
291         if (s != null) {
292             this.statisticsListeners.remove(s);
293         }
294     }
295
296     private void registerWithOSGIConsole() {
297         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
298         bundleContext.registerService(CommandProvider.class.getName(), this, null);
299     }
300
301     private static class StatsRequest {
302         protected Long switchId;
303         protected OFStatisticsType type;
304
305         public StatsRequest(Long d, OFStatisticsType t) {
306             switchId = d;
307             type = t;
308         }
309
310         @Override
311         public String toString() {
312             return "SReq = {switchId=" + switchId + ", type=" + type + "}";
313         }
314
315         @Override
316         public int hashCode() {
317             final int prime = 31;
318             int result = 1;
319             result = prime * result
320                     + ((switchId == null) ? 0 : switchId.hashCode());
321             result = prime * result + ((type == null) ? 0 : type.ordinal());
322             return result;
323         }
324
325         @Override
326         public boolean equals(Object obj) {
327             if (this == obj) {
328                 return true;
329             }
330             if (obj == null) {
331                 return false;
332             }
333             if (getClass() != obj.getClass()) {
334                 return false;
335             }
336             StatsRequest other = (StatsRequest) obj;
337             if (switchId == null) {
338                 if (other.switchId != null) {
339                     return false;
340                 }
341             } else if (!switchId.equals(other.switchId)) {
342                 return false;
343             }
344             if (type != other.type) {
345                 return false;
346             }
347             return true;
348         }
349     }
350
351     private void addStatisticsTicks(Long switchId) {
352         switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume
353                                                                   // switch
354                                                                   // supports
355                                                                   // Vendor
356                                                                   // extension
357                                                                   // stats
358         statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
359         log.debug("Added Switch {} to target pool",
360                 HexString.toHexString(switchId.longValue()));
361     }
362
363     protected static class StatisticsTicks {
364         private short flowStatisticsTicks;
365         private short descriptionTicks;
366         private short portStatisticsTicks;
367         private short tableStatisticsTicks;
368
369         public StatisticsTicks(boolean scattered) {
370             if (scattered) {
371                 // scatter bursts by statisticsTickPeriod
372                 if (++counter < 0) {
373                     counter = 0;
374                 } // being paranoid here
375                 flowStatisticsTicks = (short) (1 + counter
376                         % statisticsTickNumber);
377                 descriptionTicks = (short) (1 + counter % descriptionTickNumber);
378                 portStatisticsTicks = (short) (1 + counter % portTickNumber);
379                 tableStatisticsTicks = (short) (1 + counter % tableTickNumber);
380             } else {
381                 flowStatisticsTicks = statisticsTickNumber;
382                 descriptionTicks = descriptionTickNumber;
383                 portStatisticsTicks = portTickNumber;
384                 tableStatisticsTicks = tableTickNumber;
385             }
386         }
387
388         public boolean decrementFlowTicksIsZero() {
389             // Please ensure no code is inserted between the if check and the
390             // flowStatisticsTicks reset
391             if (--flowStatisticsTicks == 0) {
392                 flowStatisticsTicks = statisticsTickNumber;
393                 return true;
394             }
395             return false;
396         }
397
398         public boolean decrementDescTicksIsZero() {
399             // Please ensure no code is inserted between the if check and the
400             // descriptionTicks reset
401             if (--descriptionTicks == 0) {
402                 descriptionTicks = descriptionTickNumber;
403                 return true;
404             }
405             return false;
406         }
407
408         public boolean decrementPortTicksIsZero() {
409             // Please ensure no code is inserted between the if check and the
410             // descriptionTicks reset
411             if (--portStatisticsTicks == 0) {
412                 portStatisticsTicks = portTickNumber;
413                 return true;
414             }
415             return false;
416         }
417
418         public boolean decrementTableTicksIsZero() {
419             // Please ensure no code is inserted between the if check and the
420             // descriptionTicks reset
421             if(--tableStatisticsTicks == 0) {
422                 tableStatisticsTicks = tableTickNumber;
423                 return true;
424             }
425             return false;
426         }
427
428         @Override
429         public String toString() {
430             return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks
431                     + ",pT=" + portStatisticsTicks + ",tT=" + tableStatisticsTicks + "}";
432         }
433     }
434
435     private void printInfoMessage(String type, StatsRequest request) {
436         log.info("{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
437                 new Object[] {type, HexString.toHexString(request.switchId), pendingStatsRequests.size(),
438                 statisticsCollector.getState().toString() });
439     }
440
441     protected void decrementTicks() {
442         StatsRequest request = null;
443         for (Map.Entry<Long, StatisticsTicks> entry : statisticsTimerTicks
444                 .entrySet()) {
445             StatisticsTicks clock = entry.getValue();
446             Long switchId = entry.getKey();
447             if (clock.decrementFlowTicksIsZero()) {
448                 request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ?
449                         new StatsRequest(switchId, OFStatisticsType.VENDOR) :
450                         new StatsRequest(switchId, OFStatisticsType.FLOW);
451                 // If a request for this switch is already in the queue, skip to
452                 // add this new request
453                 if (!pendingStatsRequests.contains(request)
454                         && false == pendingStatsRequests.offer(request)) {
455                     printInfoMessage("Flow", request);
456                 }
457             }
458
459             if (clock.decrementDescTicksIsZero()) {
460                 request = new StatsRequest(switchId, OFStatisticsType.DESC);
461                 // If a request for this switch is already in the queue, skip to
462                 // add this new request
463                 if (!pendingStatsRequests.contains(request)
464                         && false == pendingStatsRequests.offer(request)) {
465                     printInfoMessage("Description", request);
466                 }
467             }
468
469             if (clock.decrementPortTicksIsZero()) {
470                 request = new StatsRequest(switchId, OFStatisticsType.PORT);
471                 // If a request for this switch is already in the queue, skip to
472                 // add this new request
473                 if (!pendingStatsRequests.contains(request)
474                         && false == pendingStatsRequests.offer(request)) {
475                     printInfoMessage("Port", request);
476                 }
477             }
478
479             if(clock.decrementTableTicksIsZero()) {
480                 request = new StatsRequest(switchId, OFStatisticsType.TABLE);
481                 // If a request for this switch is already in the queue, skip to
482                 // add this new request
483                 if (!pendingStatsRequests.contains(request)
484                         && false == pendingStatsRequests.offer(request)) {
485                     printInfoMessage("Table", request);
486                 }
487             }
488         }
489     }
490
491     private void removeStatsRequestTasks(Long switchId) {
492         log.debug("Cleaning Statistics database for switch {}",
493                 HexEncode.longToHexString(switchId));
494         // To be safe, let's attempt removal of both VENDOR and FLOW request. It
495         // does not hurt
496         pendingStatsRequests.remove(new StatsRequest(switchId,
497                 OFStatisticsType.VENDOR));
498         pendingStatsRequests.remove(new StatsRequest(switchId,
499                 OFStatisticsType.FLOW));
500         pendingStatsRequests.remove(new StatsRequest(switchId,
501                 OFStatisticsType.DESC));
502         pendingStatsRequests.remove(new StatsRequest(switchId,
503                 OFStatisticsType.PORT));
504         pendingStatsRequests.remove(new StatsRequest(switchId,
505                 OFStatisticsType.TABLE));
506         // Take care of the TX rate databases
507         switchPortStatsUpdated.remove(switchId);
508         txRates.remove(switchId);
509     }
510
511     private void clearFlowStatsAndTicks(Long switchId) {
512         statisticsTimerTicks.remove(switchId);
513         removeStatsRequestTasks(switchId);
514         flowStatistics.remove(switchId);
515         log.debug("Statistics removed for switch {}",
516                 HexString.toHexString(switchId));
517     }
518
519     private void queryStatisticsInternal(Long switchId, OFStatisticsType statType) {
520
521         // Query the switch on all matches
522         List<OFStatistics> values = this.fetchStatisticsFromSwitch(switchId, statType, null);
523
524         // If got a valid response update local cache and notify listeners
525         if (values != null && !values.isEmpty()) {
526             switch (statType) {
527                 case FLOW:
528                 case VENDOR:
529                     flowStatistics.put(switchId, values);
530                     notifyFlowUpdate(switchId, values);
531                     break;
532                 case DESC:
533                     // Overwrite cache
534                     descStatistics.put(switchId, values);
535                     // Notify who may be interested in a description change
536                     notifyDescriptionUpdate(switchId, values);
537                     break;
538                 case PORT:
539                     // Overwrite cache with new port statistics for this switch
540                     portStatistics.put(switchId, values);
541
542                     // Wake up the thread which maintains the TX byte counters for
543                     // each port
544                     switchPortStatsUpdated.offer(switchId);
545                     notifyPortUpdate(switchId, values);
546                     break;
547                 case TABLE:
548                     // Overwrite cache
549                     tableStatistics.put(switchId, values);
550                     notifyTableUpdate(switchId, values);
551                     break;
552                 default:
553             }
554         }
555     }
556
557     private void notifyDescriptionUpdate(Long switchId, List<OFStatistics> values) {
558         for (IOFStatisticsListener l : this.statisticsListeners) {
559             l.descriptionStatisticsRefreshed(switchId, values);
560         }
561     }
562
563     private void notifyFlowUpdate(Long switchId, List<OFStatistics> values) {
564         if (values.get(0) instanceof OFVendorStatistics) {
565             values = this.v6StatsListToOFStatsList(values);
566         }
567
568         for (IOFStatisticsListener l : this.statisticsListeners) {
569             l.flowStatisticsRefreshed(switchId, values);
570         }
571
572     }
573
574     private void notifyPortUpdate(Long switchId, List<OFStatistics> values) {
575         for (IOFStatisticsListener l : this.statisticsListeners) {
576             l.portStatisticsRefreshed(switchId, values);
577         }
578     }
579
580     private void notifyTableUpdate(Long switchId, List<OFStatistics> values) {
581         for (IOFStatisticsListener l : this.statisticsListeners) {
582             l.tableStatisticsRefreshed(switchId, values);
583         }
584     }
585
586     /*
587      * Generic function to get the statistics form an OF switch
588      */
589     @SuppressWarnings("unchecked")
590     private List<OFStatistics> fetchStatisticsFromSwitch(Long switchId,
591             OFStatisticsType statsType, Object target) {
592         List<OFStatistics> values = null;
593         String type = null;
594         ISwitch sw = controller.getSwitch(switchId);
595
596         if (sw != null) {
597             OFStatisticsRequest req = new OFStatisticsRequest();
598             req.setStatisticType(statsType);
599             int requestLength = req.getLengthU();
600
601             if (statsType == OFStatisticsType.FLOW) {
602                 OFMatch match = null;
603                 if (target == null) {
604                     // All flows request
605                     match = new OFMatch();
606                     match.setWildcards(0xffffffff);
607                 } else if (!(target instanceof OFMatch)) {
608                     // Malformed request
609                     log.warn("Invalid target type for Flow stats request: {}",
610                             target.getClass());
611                     return null;
612                 } else {
613                     // Specific flow request
614                     match = (OFMatch) target;
615                 }
616                 OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest();
617                 specificReq.setMatch(match);
618                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
619                 specificReq.setTableId((byte) 0xff);
620                 req.setStatistics(Collections
621                         .singletonList((OFStatistics) specificReq));
622                 requestLength += specificReq.getLength();
623                 type = "FLOW";
624             } else if (statsType == OFStatisticsType.VENDOR) {
625                 V6StatsRequest specificReq = new V6StatsRequest();
626                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
627                 specificReq.setTableId((byte) 0xff);
628                 req.setStatistics(Collections
629                         .singletonList((OFStatistics) specificReq));
630                 requestLength += specificReq.getLength();
631                 type = "VENDOR";
632             } else if (statsType == OFStatisticsType.AGGREGATE) {
633                 OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest();
634                 OFMatch match = new OFMatch();
635                 match.setWildcards(0xffffffff);
636                 specificReq.setMatch(match);
637                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
638                 specificReq.setTableId((byte) 0xff);
639                 req.setStatistics(Collections
640                         .singletonList((OFStatistics) specificReq));
641                 requestLength += specificReq.getLength();
642                 type = "AGGREGATE";
643             } else if (statsType == OFStatisticsType.PORT) {
644                 short targetPort;
645                 if (target == null) {
646                     // All ports request
647                     targetPort = OFPort.OFPP_NONE.getValue();
648                 } else if (!(target instanceof Short)) {
649                     // Malformed request
650                     log.warn("Invalid target type for Port stats request: {}",
651                             target.getClass());
652                     return null;
653                 } else {
654                     // Specific port request
655                     targetPort = (Short) target;
656                 }
657                 OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest();
658                 specificReq.setPortNumber(targetPort);
659                 req.setStatistics(Collections
660                         .singletonList((OFStatistics) specificReq));
661                 requestLength += specificReq.getLength();
662                 type = "PORT";
663             } else if (statsType == OFStatisticsType.QUEUE) {
664                 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
665                 specificReq.setPortNumber(OFPort.OFPP_ALL.getValue());
666                 specificReq.setQueueId(0xffffffff);
667                 req.setStatistics(Collections
668                         .singletonList((OFStatistics) specificReq));
669                 requestLength += specificReq.getLength();
670                 type = "QUEUE";
671             } else if (statsType == OFStatisticsType.DESC) {
672                 type = "DESC";
673             } else if (statsType == OFStatisticsType.TABLE) {
674                 if(target != null){
675                     if (!(target instanceof Byte)) {
676                         // Malformed request
677                         log.warn("Invalid table id for table stats request: {}",
678                                 target.getClass());
679                         return null;
680                     }
681                     byte targetTable = (Byte) target;
682                     OFTableStatistics specificReq = new OFTableStatistics();
683                     specificReq.setTableId(targetTable);
684                     req.setStatistics(Collections
685                             .singletonList((OFStatistics) specificReq));
686                     requestLength += specificReq.getLength();
687                 }
688                 type = "TABLE";
689             }
690             req.setLengthU(requestLength);
691             Object result = sw.getStatistics(req);
692
693             if (result == null) {
694                 log.warn("Request Timed Out for ({}) from switch {}", type,
695                         HexString.toHexString(switchId));
696             } else if (result instanceof OFError) {
697                 log.warn("Switch {} failed to handle ({}) stats request: {}",
698                         new Object[] { HexString.toHexString(switchId), type,
699                         Utils.getOFErrorString((OFError) result) });
700                 if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
701                     log.warn(
702                             "Switching back to regular Flow stats requests for switch {}",
703                             HexString.toHexString(switchId));
704                     this.switchSupportsVendorExtStats.put(switchId,
705                             Boolean.FALSE);
706                 }
707             } else {
708                 values = (List<OFStatistics>) result;
709             }
710         }
711         return values;
712     }
713
714     @Override
715     public List<OFStatistics> getOFFlowStatistics(Long switchId) {
716         List<OFStatistics> list = flowStatistics.get(switchId);
717
718         /*
719          * Check on emptiness as interference between add and get is still
720          * possible on the inner list (the concurrentMap entry's value)
721          */
722         return (list == null || list.isEmpty()) ? this.dummyList
723                 : (list.get(0) instanceof OFVendorStatistics) ? this
724                         .v6StatsListToOFStatsList(list) : list;
725     }
726
727     @Override
728     public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch, short priority) {
729         List<OFStatistics> statsList = flowStatistics.get(switchId);
730
731         /*
732          * Check on emptiness as interference between add and get is still
733          * possible on the inner list (the concurrentMap entry's value)
734          */
735         if (statsList == null || statsList.isEmpty()) {
736             return this.dummyList;
737         }
738
739         if (statsList.get(0) instanceof OFVendorStatistics) {
740             /*
741              * Caller could provide regular OF match when we instead pull the
742              * vendor statistics from this node Caller is not supposed to know
743              * whether this switch supports vendor extensions statistics
744              * requests
745              */
746             V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch
747                     : new V6Match(ofMatch);
748
749             List<OFStatistics> targetList = v6StatsListToOFStatsList(statsList);
750             for (OFStatistics stats : targetList) {
751                 V6StatsReply v6Stats = (V6StatsReply) stats;
752                 V6Match v6Match = v6Stats.getMatch();
753                 if (v6Stats.getPriority() == priority && v6Match.equals(targetMatch)) {
754                     List<OFStatistics> list = new ArrayList<OFStatistics>();
755                     list.add(stats);
756                     return list;
757                 }
758             }
759         } else {
760             for (OFStatistics stats : statsList) {
761                 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
762                 if (flowStats.getPriority() == priority && flowStats.getMatch().equals(ofMatch)) {
763                     List<OFStatistics> list = new ArrayList<OFStatistics>();
764                     list.add(stats);
765                     return list;
766                 }
767             }
768         }
769         return this.dummyList;
770     }
771
772     /*
773      * Converts the v6 vendor statistics to the OFStatistics
774      */
775     private List<OFStatistics> v6StatsListToOFStatsList(
776             List<OFStatistics> statistics) {
777         List<OFStatistics> v6statistics = new ArrayList<OFStatistics>();
778         if (statistics != null && !statistics.isEmpty()) {
779             for (OFStatistics stats : statistics) {
780                 if (stats instanceof OFVendorStatistics) {
781                     List<OFStatistics> r = getV6ReplyStatistics((OFVendorStatistics) stats);
782                     if (r != null) {
783                         v6statistics.addAll(r);
784                     }
785                 }
786             }
787         }
788         return v6statistics;
789     }
790
791     private static List<OFStatistics> getV6ReplyStatistics(
792             OFVendorStatistics stat) {
793         int length = stat.getLength();
794         List<OFStatistics> results = new ArrayList<OFStatistics>();
795         if (length < 12)
796             return null; // Nicira Hdr is 12 bytes. We need atleast that much
797         ByteBuffer data = ByteBuffer.allocate(length);
798         stat.writeTo(data);
799         data.rewind();
800         if (log.isTraceEnabled()) {
801             log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}",
802                     HexString.toHexString(data.array()));
803         }
804
805         int vendor = data.getInt(); // first 4 bytes is vendor id.
806         if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) {
807             log.warn("Unexpected vendor id: 0x{}", Integer.toHexString(vendor));
808             return null;
809         } else {
810             // go ahead by 8 bytes which is 8 bytes of 0
811             data.getLong(); // should be all 0's
812             length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have
813                           // been consumed
814         }
815
816         V6StatsReply v6statsreply;
817         int min_len;
818         while (length > 0) {
819             v6statsreply = new V6StatsReply();
820             min_len = v6statsreply.getLength();
821             if (length < v6statsreply.getLength())
822                 break;
823             v6statsreply.setActionFactory(stat.getActionFactory());
824             v6statsreply.readFrom(data);
825             if (v6statsreply.getLength() < min_len)
826                 break;
827             v6statsreply.setVendorId(vendor);
828             log.trace("V6StatsReply: {}", v6statsreply);
829             length -= v6statsreply.getLength();
830             results.add(v6statsreply);
831         }
832         return results;
833     }
834
835     @Override
836     public List<OFStatistics> queryStatistics(Long switchId,
837             OFStatisticsType statType, Object target) {
838         /*
839          * Caller does not know and it is not supposed to know whether this
840          * switch supports vendor extension. We adjust the target for him
841          */
842         if (statType == OFStatisticsType.FLOW) {
843             if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
844                 statType = OFStatisticsType.VENDOR;
845             }
846         }
847
848         List<OFStatistics> list = this.fetchStatisticsFromSwitch(switchId, statType,
849                 target);
850
851         return (list == null) ? null :
852             (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list) : list;
853     }
854
855     @Override
856     public List<OFStatistics> getOFDescStatistics(Long switchId) {
857         if (!descStatistics.containsKey(switchId))
858             return this.dummyList;
859
860         return descStatistics.get(switchId);
861     }
862
863     @Override
864     public List<OFStatistics> getOFPortStatistics(Long switchId) {
865         if (!portStatistics.containsKey(switchId)) {
866             return this.dummyList;
867         }
868
869         return portStatistics.get(switchId);
870     }
871
872     @Override
873     public List<OFStatistics> getOFPortStatistics(Long switchId, short portId) {
874         if (!portStatistics.containsKey(switchId)) {
875             return this.dummyList;
876         }
877         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
878         for (OFStatistics stats : portStatistics.get(switchId)) {
879             if (((OFPortStatisticsReply) stats).getPortNumber() == portId) {
880                 list.add(stats);
881                 break;
882             }
883         }
884         return list;
885     }
886
887     @Override
888     public List<OFStatistics> getOFTableStatistics(Long switchId) {
889         if (!tableStatistics.containsKey(switchId)) {
890             return this.dummyList;
891         }
892
893         return tableStatistics.get(switchId);
894     }
895
896     @Override
897     public List<OFStatistics> getOFTableStatistics(Long switchId, Byte tableId) {
898         if (!tableStatistics.containsKey(switchId)) {
899             return this.dummyList;
900         }
901
902         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
903         for (OFStatistics stats : tableStatistics.get(switchId)) {
904             if (((OFTableStatistics) stats).getTableId() == tableId) {
905                 list.add(stats);
906                 break;
907             }
908         }
909         return list;
910     }
911
912     @Override
913     public int getFlowsNumber(long switchId) {
914         return this.flowStatistics.get(switchId).size();
915     }
916
917     /*
918      * InventoryShim replay for us all the switch addition which happened before
919      * we were brought up
920      */
921     @Override
922     public void updateNode(Node node, UpdateType type, Set<Property> props) {
923         Long switchId = (Long) node.getID();
924         switch (type) {
925         case ADDED:
926             addStatisticsTicks(switchId);
927             break;
928         case REMOVED:
929             clearFlowStatsAndTicks(switchId);
930         default:
931         }
932     }
933
934     @Override
935     public void updateNodeConnector(NodeConnector nodeConnector,
936             UpdateType type, Set<Property> props) {
937         // No action
938     }
939
940     /**
941      * Update the cached port rates for this switch with the latest retrieved
942      * port transmit byte count
943      *
944      * @param switchId
945      */
946     private synchronized void updatePortsTxRate(long switchId) {
947         List<OFStatistics> newPortStatistics = this.portStatistics.get(switchId);
948         if (newPortStatistics == null) {
949             return;
950         }
951         Map<Short, TxRates> rates = this.txRates.get(switchId);
952         if (rates == null) {
953             // First time rates for this switch are added
954             rates = new HashMap<Short, TxRates>();
955             txRates.put(switchId, rates);
956         }
957         for (OFStatistics stats : newPortStatistics) {
958             OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats;
959             short port = newPortStat.getPortNumber();
960             TxRates portRatesHolder = rates.get(port);
961             if (portRatesHolder == null) {
962                 // First time rates for this port are added
963                 portRatesHolder = new TxRates();
964                 rates.put(port, portRatesHolder);
965             }
966             // Get and store the number of transmitted bytes for this port
967             // And handle the case where agent does not support the counter
968             long transmitBytes = newPortStat.getTransmitBytes();
969             long value = (transmitBytes < 0) ? 0 : transmitBytes;
970             portRatesHolder.update(value);
971         }
972     }
973
974     @Override
975     public synchronized long getTransmitRate(Long switchId, Short port) {
976         long average = 0;
977         if (switchId == null || port == null) {
978             return average;
979         }
980         Map<Short, TxRates> perSwitch = txRates.get(switchId);
981         if (perSwitch == null) {
982             return average;
983         }
984         TxRates portRates = perSwitch.get(port);
985         if (portRates == null) {
986             return average;
987         }
988         return portRates.getAverageTxRate();
989     }
990
991     /*
992      * Manual switch name configuration code
993      */
994     @Override
995     public String getHelp() {
996         StringBuffer help = new StringBuffer();
997         help.append("---OF Statistics Manager utilities---\n");
998         help.append("\t ofdumpstatsmgr         - "
999                 + "Print Internal Stats Mgr db\n");
1000         help.append("\t ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds) - "
1001                 + "Set/Show flow/port/dedscription stats poll intervals\n");
1002         return help.toString();
1003     }
1004
1005     private boolean isValidSwitchId(String switchId) {
1006         String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$";
1007         String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$";
1008
1009         return (switchId != null && (switchId.matches(regexDatapathID) || switchId
1010                 .matches(regexDatapathIDLong)));
1011     }
1012
1013     public long getSwitchIDLong(String switchId) {
1014         int radix = 16;
1015         String switchString = "0";
1016
1017         if (isValidSwitchId(switchId)) {
1018             if (switchId.contains(":")) {
1019                 // Handle the 00:00:AA:BB:CC:DD:EE:FF notation
1020                 switchString = switchId.replace(":", "");
1021             } else if (switchId.contains("-")) {
1022                 // Handle the 00-00-AA-BB-CC-DD-EE-FF notation
1023                 switchString = switchId.replace("-", "");
1024             } else {
1025                 // Handle the 0123456789ABCDEF notation
1026                 switchString = switchId;
1027             }
1028         }
1029         return Long.parseLong(switchString, radix);
1030     }
1031
1032     /*
1033      * Internal information dump code
1034      */
1035     private String prettyPrintSwitchMap(ConcurrentMap<Long, StatisticsTicks> map) {
1036         StringBuffer buffer = new StringBuffer();
1037         buffer.append("{");
1038         for (Entry<Long, StatisticsTicks> entry : map.entrySet()) {
1039             buffer.append(HexString.toHexString(entry.getKey()) + "="
1040                     + entry.getValue().toString() + " ");
1041         }
1042         buffer.append("}");
1043         return buffer.toString();
1044     }
1045
1046     public void _ofdumpstatsmgr(CommandInterpreter ci) {
1047         ci.println("Global Counter: " + counter);
1048         ci.println("Timer Ticks: " + prettyPrintSwitchMap(statisticsTimerTicks));
1049         ci.println("PendingStatsQueue: " + pendingStatsRequests);
1050         ci.println("PendingStatsQueue size: " + pendingStatsRequests.size());
1051         ci.println("Stats Collector alive: " + statisticsCollector.isAlive());
1052         ci.println("Stats Collector State: "
1053                 + statisticsCollector.getState().toString());
1054         ci.println("StatsTimer: " + statisticsTimer.toString());
1055         ci.println("Flow Stats Period: " + statisticsTickNumber + " s");
1056         ci.println("Desc Stats Period: " + descriptionTickNumber + " s");
1057         ci.println("Port Stats Period: " + portTickNumber + " s");
1058         ci.println("Table Stats Period: " + tableTickNumber + " s");
1059     }
1060
1061     public void _resetSwitchCapability(CommandInterpreter ci) {
1062         String sidString = ci.nextArgument();
1063         Long sid = null;
1064         if (sidString == null) {
1065             ci.println("Insert the switch id (numeric value)");
1066             return;
1067         }
1068         try {
1069             sid = Long.valueOf(sidString);
1070             this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE);
1071             ci.println("Vendor capability for switch " + sid + " set to "
1072                     + this.switchSupportsVendorExtStats.get(sid));
1073         } catch (NumberFormatException e) {
1074             ci.println("Invalid switch id. Has to be numeric.");
1075         }
1076
1077     }
1078
1079     public void _ofbw(CommandInterpreter ci) {
1080         String sidString = ci.nextArgument();
1081         Long sid = null;
1082         if (sidString == null) {
1083             ci.println("Insert the switch id (numeric value)");
1084             return;
1085         }
1086         try {
1087             sid = Long.valueOf(sidString);
1088         } catch (NumberFormatException e) {
1089             ci.println("Invalid switch id. Has to be numeric.");
1090         }
1091         if (sid != null) {
1092             Map<Short, TxRates> thisSwitchRates = txRates.get(sid);
1093             ci.println("Bandwidth utilization (" + factoredSamples
1094                     * portTickNumber + " sec average) for switch "
1095                     + HexEncode.longToHexString(sid) + ":");
1096             if (thisSwitchRates == null) {
1097                 ci.println("Not available");
1098             } else {
1099                 for (Entry<Short, TxRates> entry : thisSwitchRates.entrySet()) {
1100                     ci.println("Port: " + entry.getKey() + ": "
1101                             + entry.getValue().getAverageTxRate() + " bps");
1102                 }
1103             }
1104         }
1105     }
1106
1107     public void _txratewindow(CommandInterpreter ci) {
1108         String averageWindow = ci.nextArgument();
1109         short seconds = 0;
1110         if (averageWindow == null) {
1111             ci.println("Insert the length in seconds of the median "
1112                     + "window for tx rate");
1113             ci.println("Current: " + factoredSamples * portTickNumber + " secs");
1114             return;
1115         }
1116         try {
1117             seconds = Short.valueOf(averageWindow);
1118         } catch (NumberFormatException e) {
1119             ci.println("Invalid period.");
1120         }
1121         OFStatisticsManager.factoredSamples = (short) (seconds / portTickNumber);
1122         ci.println("New: " + factoredSamples * portTickNumber + " secs");
1123     }
1124
1125     public void _ofstatsmgrintervals(CommandInterpreter ci) {
1126         String flowStatsInterv = ci.nextArgument();
1127         String portStatsInterv = ci.nextArgument();
1128         String descStatsInterv = ci.nextArgument();
1129         String tableStatsInterv = ci.nextArgument();
1130
1131         if (flowStatsInterv == null || portStatsInterv == null
1132                 || descStatsInterv == null) {
1133             ci.println("Usage: ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds)");
1134             ci.println("Current Values: fP=" + statisticsTickNumber + "sec pP="
1135                     + portTickNumber + "sec dP=" + descriptionTickNumber + "sec tP=" + tableTickNumber + " sec");
1136             return;
1137         }
1138         Short fP, pP, dP, tP;
1139         try {
1140             fP = Short.parseShort(flowStatsInterv);
1141             pP = Short.parseShort(portStatsInterv);
1142             dP = Short.parseShort(descStatsInterv);
1143             tP = Short.parseShort(tableStatsInterv);
1144         } catch (Exception e) {
1145             ci.println("Invalid format values: " + e.getMessage());
1146             return;
1147         }
1148
1149         if (pP <= 1 || fP <= 1 || dP <= 1 || tP <= 1) {
1150             ci.println("Invalid values. fP, pP, dP, tP have to be greater than 1.");
1151             return;
1152         }
1153
1154         statisticsTickNumber = fP;
1155         portTickNumber = pP;
1156         descriptionTickNumber = dP;
1157         tableTickNumber = tP;
1158
1159         ci.println("New Values: fP=" + statisticsTickNumber + "s pP="
1160                 + portTickNumber + "s dP=" + descriptionTickNumber + "s tP="
1161                 + tableTickNumber + "s");
1162     }
1163
1164     /**
1165      * This method retrieves user configurations from config.ini and updates
1166      * statisticsTickNumber/portTickNumber/descriptionTickNumber accordingly.
1167      */
1168     private void configStatsPollIntervals() {
1169         String fsStr = System.getProperty("of.flowStatsPollInterval");
1170         String psStr = System.getProperty("of.portStatsPollInterval");
1171         String dsStr = System.getProperty("of.descStatsPollInterval");
1172         String tsStr = System.getProperty("of.tableStatsPollInterval");
1173         Short fs, ps, ds, ts;
1174
1175         if (fsStr != null) {
1176             try {
1177                 fs = Short.parseShort(fsStr);
1178                 if (fs > 0) {
1179                     statisticsTickNumber = fs;
1180                 }
1181             } catch (Exception e) {
1182             }
1183         }
1184
1185         if (psStr != null) {
1186             try {
1187                 ps = Short.parseShort(psStr);
1188                 if (ps > 0) {
1189                     portTickNumber = ps;
1190                 }
1191             } catch (Exception e) {
1192             }
1193         }
1194
1195         if (dsStr != null) {
1196             try {
1197                 ds = Short.parseShort(dsStr);
1198                 if (ds > 0) {
1199                     descriptionTickNumber = ds;
1200                 }
1201             } catch (Exception e) {
1202             }
1203         }
1204
1205         if (tsStr != null) {
1206             try{
1207                 ts = Short.parseShort(tsStr);
1208                 if (ts > 0) {
1209                     tableTickNumber = ts;
1210                 }
1211             } catch (Exception e) {
1212             }
1213         }
1214     }
1215 }