Make statistic queue size configurable via config.ini
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / OFStatisticsManager.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.nio.ByteBuffer;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.Deque;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.Map.Entry;
19 import java.util.Set;
20 import java.util.Timer;
21 import java.util.TimerTask;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.CopyOnWriteArraySet;
26 import java.util.concurrent.LinkedBlockingDeque;
27 import java.util.concurrent.LinkedBlockingQueue;
28
29 import org.eclipse.osgi.framework.console.CommandInterpreter;
30 import org.eclipse.osgi.framework.console.CommandProvider;
31 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
32 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsListener;
33 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
34 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
36 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
37 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
38 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
39 import org.opendaylight.controller.sal.core.Node;
40 import org.opendaylight.controller.sal.core.NodeConnector;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.core.UpdateType;
43 import org.opendaylight.controller.sal.utils.HexEncode;
44 import org.openflow.protocol.OFError;
45 import org.openflow.protocol.OFMatch;
46 import org.openflow.protocol.OFPort;
47 import org.openflow.protocol.OFStatisticsRequest;
48 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
49 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
50 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
51 import org.openflow.protocol.statistics.OFPortStatisticsReply;
52 import org.openflow.protocol.statistics.OFPortStatisticsRequest;
53 import org.openflow.protocol.statistics.OFQueueStatisticsRequest;
54 import org.openflow.protocol.statistics.OFStatistics;
55 import org.openflow.protocol.statistics.OFStatisticsType;
56 import org.openflow.protocol.statistics.OFTableStatistics;
57 import org.openflow.protocol.statistics.OFVendorStatistics;
58 import org.openflow.util.HexString;
59 import org.osgi.framework.BundleContext;
60 import org.osgi.framework.FrameworkUtil;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 /**
65  * It periodically polls the different OF statistics from the OF switches and
66  * caches them for quick retrieval for the above layers' modules It also
67  * provides an API to directly query the switch about the statistics
68  */
69 public class OFStatisticsManager implements IOFStatisticsManager,
70 IInventoryShimExternalListener, CommandProvider {
71     private static final Logger log = LoggerFactory.getLogger(OFStatisticsManager.class);
72     private static final int INITIAL_SIZE = 64;
73     private static final long FLOW_STATS_PERIOD = 10000;
74     private static final long DESC_STATS_PERIOD = 60000;
75     private static final long PORT_STATS_PERIOD = 5000;
76     private static final long TABLE_STATS_PERIOD = 10000;
77     private static final long TICK = 1000;
78     private static short statisticsTickNumber = (short) (FLOW_STATS_PERIOD / TICK);
79     private static short descriptionTickNumber = (short) (DESC_STATS_PERIOD / TICK);
80     private static short portTickNumber = (short) (PORT_STATS_PERIOD / TICK);
81     private static short tableTickNumber = (short) (TABLE_STATS_PERIOD / TICK);
82     private static short factoredSamples = (short) 2;
83     private static short counter = 1;
84     private IController controller = null;
85     private ConcurrentMap<Long, List<OFStatistics>> flowStatistics;
86     private ConcurrentMap<Long, List<OFStatistics>> descStatistics;
87     private ConcurrentMap<Long, List<OFStatistics>> portStatistics;
88     private ConcurrentMap<Long, List<OFStatistics>> tableStatistics;
89     private List<OFStatistics> dummyList;
90     private ConcurrentMap<Long, StatisticsTicks> statisticsTimerTicks;
91     protected BlockingQueue<StatsRequest> pendingStatsRequests;
92     protected BlockingQueue<Long> switchPortStatsUpdated;
93     private Thread statisticsCollector;
94     private Thread txRatesUpdater;
95     private Timer statisticsTimer;
96     private TimerTask statisticsTimerTask;
97     private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
98     // Per port sampled (every portStatsPeriod) transmit rate
99     private Map<Long, Map<Short, TxRates>> txRates;
100     private Set<IOFStatisticsListener> statisticsListeners;
101
102     /**
103      * The object containing the latest factoredSamples tx rate samples for a
104      * given switch port
105      */
106     protected class TxRates {
107         // contains the latest factoredSamples sampled transmitted bytes
108         Deque<Long> sampledTxBytes;
109
110         public TxRates() {
111             sampledTxBytes = new LinkedBlockingDeque<Long>();
112         }
113
114         public void update(Long txBytes) {
115             /*
116              * Based on how many samples our average works on, we might have to
117              * remove the oldest sample
118              */
119             if (sampledTxBytes.size() == factoredSamples) {
120                 sampledTxBytes.removeLast();
121             }
122
123             // Add the latest sample to the top of the queue
124             sampledTxBytes.addFirst(txBytes);
125         }
126
127         /**
128          * Returns the average transmit rate in bps
129          *
130          * @return the average transmit rate [bps]
131          */
132         public long getAverageTxRate() {
133             long average = 0;
134             /*
135              * If we cannot provide the value for the time window length set
136              */
137             if (sampledTxBytes.size() < factoredSamples) {
138                 return average;
139             }
140             long increment = sampledTxBytes.getFirst() - sampledTxBytes
141                     .getLast();
142             long timePeriod = factoredSamples * PORT_STATS_PERIOD / TICK;
143             average = (8L * increment) / timePeriod;
144             return average;
145         }
146     }
147
148     public void setController(IController core) {
149         this.controller = core;
150     }
151
152     public void unsetController(IController core) {
153         if (this.controller == core) {
154             this.controller = null;
155         }
156     }
157
158     private short getStatsQueueSize() {
159         String statsQueueSizeStr = System.getProperty("of.statsQueueSize");
160         short statsQueueSize = INITIAL_SIZE;
161         if (statsQueueSizeStr != null) {
162             try {
163                 statsQueueSize = Short.parseShort(statsQueueSizeStr);
164                 if (statsQueueSize <= 0) {
165                     statsQueueSize = INITIAL_SIZE;
166                 }
167             } catch (Exception e) {
168             }
169         }
170         return statsQueueSize;
171     }
172     /**
173      * Function called by the dependency manager when all the required
174      * dependencies are satisfied
175      *
176      */
177     void init() {
178         flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
179         descStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
180         portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
181         tableStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
182         dummyList = new ArrayList<OFStatistics>(1);
183         pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(getStatsQueueSize());
184         statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(INITIAL_SIZE);
185         switchPortStatsUpdated = new LinkedBlockingQueue<Long>(INITIAL_SIZE);
186         switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(INITIAL_SIZE);
187         txRates = new HashMap<Long, Map<Short, TxRates>>(INITIAL_SIZE);
188         statisticsListeners = new CopyOnWriteArraySet<IOFStatisticsListener>();
189
190         configStatsPollIntervals();
191
192         // Initialize managed timers
193         statisticsTimer = new Timer();
194         statisticsTimerTask = new TimerTask() {
195             @Override
196             public void run() {
197                 decrementTicks();
198             }
199         };
200
201         // Initialize Statistics collector thread
202         statisticsCollector = new Thread(new Runnable() {
203             @Override
204             public void run() {
205                 while (true) {
206                     try {
207                         StatsRequest req = pendingStatsRequests.take();
208                         queryStatisticsInternal(req.switchId, req.type);
209                     } catch (InterruptedException e) {
210                         log.warn("Flow Statistics Collector thread "
211                                 + "interrupted", e);
212                     }
213                 }
214             }
215         }, "Statistics Collector");
216
217         // Initialize Tx Rate Updater thread
218         txRatesUpdater = new Thread(new Runnable() {
219             @Override
220             public void run() {
221                 while (true) {
222                     try {
223                         long switchId = switchPortStatsUpdated.take();
224                         updatePortsTxRate(switchId);
225                     } catch (InterruptedException e) {
226                         log.warn("TX Rate Updater thread interrupted", e);
227                     }
228                 }
229             }
230         }, "TX Rate Updater");
231     }
232
233     /**
234      * Function called by the dependency manager when at least one dependency
235      * become unsatisfied or when the component is shutting down because for
236      * example bundle is being stopped.
237      *
238      */
239     void destroy() {
240     }
241
242     /**
243      * Function called by dependency manager after "init ()" is called and after
244      * the services provided by the class are registered in the service registry
245      *
246      */
247     void start() {
248         // Start managed timers
249         statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, TICK);
250
251         // Start statistics collector thread
252         statisticsCollector.start();
253
254         // Start bandwidth utilization computer thread
255         txRatesUpdater.start();
256
257         // OSGI console
258         registerWithOSGIConsole();
259     }
260
261     /**
262      * Function called by the dependency manager before the services exported by
263      * the component are unregistered, this will be followed by a "destroy ()"
264      * calls
265      *
266      */
267     void stop() {
268         // Stop managed timers
269         statisticsTimer.cancel();
270     }
271
272     public void setStatisticsListener(IOFStatisticsListener s) {
273         this.statisticsListeners.add(s);
274     }
275
276     public void unsetStatisticsListener(IOFStatisticsListener s) {
277         if (s != null) {
278             this.statisticsListeners.remove(s);
279         }
280     }
281
282     private void registerWithOSGIConsole() {
283         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
284         bundleContext.registerService(CommandProvider.class.getName(), this, null);
285     }
286
287     private static class StatsRequest {
288         protected Long switchId;
289         protected OFStatisticsType type;
290
291         public StatsRequest(Long d, OFStatisticsType t) {
292             switchId = d;
293             type = t;
294         }
295
296         @Override
297         public String toString() {
298             return "SReq = {switchId=" + switchId + ", type=" + type + "}";
299         }
300
301         @Override
302         public int hashCode() {
303             final int prime = 31;
304             int result = 1;
305             result = prime * result
306                     + ((switchId == null) ? 0 : switchId.hashCode());
307             result = prime * result + ((type == null) ? 0 : type.ordinal());
308             return result;
309         }
310
311         @Override
312         public boolean equals(Object obj) {
313             if (this == obj) {
314                 return true;
315             }
316             if (obj == null) {
317                 return false;
318             }
319             if (getClass() != obj.getClass()) {
320                 return false;
321             }
322             StatsRequest other = (StatsRequest) obj;
323             if (switchId == null) {
324                 if (other.switchId != null) {
325                     return false;
326                 }
327             } else if (!switchId.equals(other.switchId)) {
328                 return false;
329             }
330             if (type != other.type) {
331                 return false;
332             }
333             return true;
334         }
335     }
336
337     private void addStatisticsTicks(Long switchId) {
338         switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume
339                                                                   // switch
340                                                                   // supports
341                                                                   // Vendor
342                                                                   // extension
343                                                                   // stats
344         statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
345         log.debug("Added Switch {} to target pool",
346                 HexString.toHexString(switchId.longValue()));
347     }
348
349     protected static class StatisticsTicks {
350         private short flowStatisticsTicks;
351         private short descriptionTicks;
352         private short portStatisticsTicks;
353         private short tableStatisticsTicks;
354
355         public StatisticsTicks(boolean scattered) {
356             if (scattered) {
357                 // scatter bursts by statisticsTickPeriod
358                 if (++counter < 0) {
359                     counter = 0;
360                 } // being paranoid here
361                 flowStatisticsTicks = (short) (1 + counter
362                         % statisticsTickNumber);
363                 descriptionTicks = (short) (1 + counter % descriptionTickNumber);
364                 portStatisticsTicks = (short) (1 + counter % portTickNumber);
365                 tableStatisticsTicks = (short) (1 + counter % tableTickNumber);
366             } else {
367                 flowStatisticsTicks = statisticsTickNumber;
368                 descriptionTicks = descriptionTickNumber;
369                 portStatisticsTicks = portTickNumber;
370                 tableStatisticsTicks = tableTickNumber;
371             }
372         }
373
374         public boolean decrementFlowTicksIsZero() {
375             // Please ensure no code is inserted between the if check and the
376             // flowStatisticsTicks reset
377             if (--flowStatisticsTicks == 0) {
378                 flowStatisticsTicks = statisticsTickNumber;
379                 return true;
380             }
381             return false;
382         }
383
384         public boolean decrementDescTicksIsZero() {
385             // Please ensure no code is inserted between the if check and the
386             // descriptionTicks reset
387             if (--descriptionTicks == 0) {
388                 descriptionTicks = descriptionTickNumber;
389                 return true;
390             }
391             return false;
392         }
393
394         public boolean decrementPortTicksIsZero() {
395             // Please ensure no code is inserted between the if check and the
396             // descriptionTicks reset
397             if (--portStatisticsTicks == 0) {
398                 portStatisticsTicks = portTickNumber;
399                 return true;
400             }
401             return false;
402         }
403
404         public boolean decrementTableTicksIsZero() {
405             // Please ensure no code is inserted between the if check and the
406             // descriptionTicks reset
407             if(--tableStatisticsTicks == 0) {
408                 tableStatisticsTicks = tableTickNumber;
409                 return true;
410             }
411             return false;
412         }
413
414         @Override
415         public String toString() {
416             return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks
417                     + ",pT=" + portStatisticsTicks + ",tT=" + tableStatisticsTicks + "}";
418         }
419     }
420
421     private void printInfoMessage(String type, StatsRequest request) {
422         log.info("{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
423                 new Object[] {type, HexString.toHexString(request.switchId), pendingStatsRequests.size(),
424                 statisticsCollector.getState().toString() });
425     }
426
427     protected void decrementTicks() {
428         StatsRequest request = null;
429         for (Map.Entry<Long, StatisticsTicks> entry : statisticsTimerTicks
430                 .entrySet()) {
431             StatisticsTicks clock = entry.getValue();
432             Long switchId = entry.getKey();
433             if (clock.decrementFlowTicksIsZero()) {
434                 request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ?
435                         new StatsRequest(switchId, OFStatisticsType.VENDOR) :
436                         new StatsRequest(switchId, OFStatisticsType.FLOW);
437                 // If a request for this switch is already in the queue, skip to
438                 // add this new request
439                 if (!pendingStatsRequests.contains(request)
440                         && false == pendingStatsRequests.offer(request)) {
441                     printInfoMessage("Flow", request);
442                 }
443             }
444
445             if (clock.decrementDescTicksIsZero()) {
446                 request = new StatsRequest(switchId, OFStatisticsType.DESC);
447                 // If a request for this switch is already in the queue, skip to
448                 // add this new request
449                 if (!pendingStatsRequests.contains(request)
450                         && false == pendingStatsRequests.offer(request)) {
451                     printInfoMessage("Description", request);
452                 }
453             }
454
455             if (clock.decrementPortTicksIsZero()) {
456                 request = new StatsRequest(switchId, OFStatisticsType.PORT);
457                 // If a request for this switch is already in the queue, skip to
458                 // add this new request
459                 if (!pendingStatsRequests.contains(request)
460                         && false == pendingStatsRequests.offer(request)) {
461                     printInfoMessage("Port", request);
462                 }
463             }
464
465             if(clock.decrementTableTicksIsZero()) {
466                 request = new StatsRequest(switchId, OFStatisticsType.TABLE);
467                 // If a request for this switch is already in the queue, skip to
468                 // add this new request
469                 if (!pendingStatsRequests.contains(request)
470                         && false == pendingStatsRequests.offer(request)) {
471                     printInfoMessage("Table", request);
472                 }
473             }
474         }
475     }
476
477     private void removeStatsRequestTasks(Long switchId) {
478         log.debug("Cleaning Statistics database for switch {}",
479                 HexEncode.longToHexString(switchId));
480         // To be safe, let's attempt removal of both VENDOR and FLOW request. It
481         // does not hurt
482         pendingStatsRequests.remove(new StatsRequest(switchId,
483                 OFStatisticsType.VENDOR));
484         pendingStatsRequests.remove(new StatsRequest(switchId,
485                 OFStatisticsType.FLOW));
486         pendingStatsRequests.remove(new StatsRequest(switchId,
487                 OFStatisticsType.DESC));
488         pendingStatsRequests.remove(new StatsRequest(switchId,
489                 OFStatisticsType.PORT));
490         pendingStatsRequests.remove(new StatsRequest(switchId,
491                 OFStatisticsType.TABLE));
492         // Take care of the TX rate databases
493         switchPortStatsUpdated.remove(switchId);
494         txRates.remove(switchId);
495     }
496
497     private void clearFlowStatsAndTicks(Long switchId) {
498         statisticsTimerTicks.remove(switchId);
499         removeStatsRequestTasks(switchId);
500         flowStatistics.remove(switchId);
501         log.debug("Statistics removed for switch {}",
502                 HexString.toHexString(switchId));
503     }
504
505     private void queryStatisticsInternal(Long switchId, OFStatisticsType statType) {
506
507         // Query the switch on all matches
508         List<OFStatistics> values = this.fetchStatisticsFromSwitch(switchId, statType, null);
509
510         // If got a valid response update local cache and notify listeners
511         if (values != null && !values.isEmpty()) {
512             switch (statType) {
513                 case FLOW:
514                 case VENDOR:
515                     flowStatistics.put(switchId, values);
516                     notifyFlowUpdate(switchId, values);
517                     break;
518                 case DESC:
519                     // Overwrite cache
520                     descStatistics.put(switchId, values);
521                     // Notify who may be interested in a description change
522                     notifyDescriptionUpdate(switchId, values);
523                     break;
524                 case PORT:
525                     // Overwrite cache with new port statistics for this switch
526                     portStatistics.put(switchId, values);
527
528                     // Wake up the thread which maintains the TX byte counters for
529                     // each port
530                     switchPortStatsUpdated.offer(switchId);
531                     notifyPortUpdate(switchId, values);
532                     break;
533                 case TABLE:
534                     // Overwrite cache
535                     tableStatistics.put(switchId, values);
536                     notifyTableUpdate(switchId, values);
537                     break;
538                 default:
539             }
540         }
541     }
542
543     private void notifyDescriptionUpdate(Long switchId, List<OFStatistics> values) {
544         for (IOFStatisticsListener l : this.statisticsListeners) {
545             l.descriptionStatisticsRefreshed(switchId, values);
546         }
547     }
548
549     private void notifyFlowUpdate(Long switchId, List<OFStatistics> values) {
550         if (values.get(0) instanceof OFVendorStatistics) {
551             values = this.v6StatsListToOFStatsList(values);
552         }
553
554         for (IOFStatisticsListener l : this.statisticsListeners) {
555             l.flowStatisticsRefreshed(switchId, values);
556         }
557
558     }
559
560     private void notifyPortUpdate(Long switchId, List<OFStatistics> values) {
561         for (IOFStatisticsListener l : this.statisticsListeners) {
562             l.portStatisticsRefreshed(switchId, values);
563         }
564     }
565
566     private void notifyTableUpdate(Long switchId, List<OFStatistics> values) {
567         for (IOFStatisticsListener l : this.statisticsListeners) {
568             l.tableStatisticsRefreshed(switchId, values);
569         }
570     }
571
572     /*
573      * Generic function to get the statistics form an OF switch
574      */
575     @SuppressWarnings("unchecked")
576     private List<OFStatistics> fetchStatisticsFromSwitch(Long switchId,
577             OFStatisticsType statsType, Object target) {
578         List<OFStatistics> values = null;
579         String type = null;
580         ISwitch sw = controller.getSwitch(switchId);
581
582         if (sw != null) {
583             OFStatisticsRequest req = new OFStatisticsRequest();
584             req.setStatisticType(statsType);
585             int requestLength = req.getLengthU();
586
587             if (statsType == OFStatisticsType.FLOW) {
588                 OFMatch match = null;
589                 if (target == null) {
590                     // All flows request
591                     match = new OFMatch();
592                     match.setWildcards(0xffffffff);
593                 } else if (!(target instanceof OFMatch)) {
594                     // Malformed request
595                     log.warn("Invalid target type for Flow stats request: {}",
596                             target.getClass());
597                     return null;
598                 } else {
599                     // Specific flow request
600                     match = (OFMatch) target;
601                 }
602                 OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest();
603                 specificReq.setMatch(match);
604                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
605                 specificReq.setTableId((byte) 0xff);
606                 req.setStatistics(Collections
607                         .singletonList((OFStatistics) specificReq));
608                 requestLength += specificReq.getLength();
609                 type = "FLOW";
610             } else if (statsType == OFStatisticsType.VENDOR) {
611                 V6StatsRequest specificReq = new V6StatsRequest();
612                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
613                 specificReq.setTableId((byte) 0xff);
614                 req.setStatistics(Collections
615                         .singletonList((OFStatistics) specificReq));
616                 requestLength += specificReq.getLength();
617                 type = "VENDOR";
618             } else if (statsType == OFStatisticsType.AGGREGATE) {
619                 OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest();
620                 OFMatch match = new OFMatch();
621                 match.setWildcards(0xffffffff);
622                 specificReq.setMatch(match);
623                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
624                 specificReq.setTableId((byte) 0xff);
625                 req.setStatistics(Collections
626                         .singletonList((OFStatistics) specificReq));
627                 requestLength += specificReq.getLength();
628                 type = "AGGREGATE";
629             } else if (statsType == OFStatisticsType.PORT) {
630                 short targetPort;
631                 if (target == null) {
632                     // All ports request
633                     targetPort = OFPort.OFPP_NONE.getValue();
634                 } else if (!(target instanceof Short)) {
635                     // Malformed request
636                     log.warn("Invalid target type for Port stats request: {}",
637                             target.getClass());
638                     return null;
639                 } else {
640                     // Specific port request
641                     targetPort = (Short) target;
642                 }
643                 OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest();
644                 specificReq.setPortNumber(targetPort);
645                 req.setStatistics(Collections
646                         .singletonList((OFStatistics) specificReq));
647                 requestLength += specificReq.getLength();
648                 type = "PORT";
649             } else if (statsType == OFStatisticsType.QUEUE) {
650                 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
651                 specificReq.setPortNumber(OFPort.OFPP_ALL.getValue());
652                 specificReq.setQueueId(0xffffffff);
653                 req.setStatistics(Collections
654                         .singletonList((OFStatistics) specificReq));
655                 requestLength += specificReq.getLength();
656                 type = "QUEUE";
657             } else if (statsType == OFStatisticsType.DESC) {
658                 type = "DESC";
659             } else if (statsType == OFStatisticsType.TABLE) {
660                 if(target != null){
661                     if (!(target instanceof Byte)) {
662                         // Malformed request
663                         log.warn("Invalid table id for table stats request: {}",
664                                 target.getClass());
665                         return null;
666                     }
667                     byte targetTable = (Byte) target;
668                     OFTableStatistics specificReq = new OFTableStatistics();
669                     specificReq.setTableId(targetTable);
670                     req.setStatistics(Collections
671                             .singletonList((OFStatistics) specificReq));
672                     requestLength += specificReq.getLength();
673                 }
674                 type = "TABLE";
675             }
676             req.setLengthU(requestLength);
677             Object result = sw.getStatistics(req);
678
679             if (result == null) {
680                 log.warn("Request Timed Out for ({}) from switch {}", type,
681                         HexString.toHexString(switchId));
682             } else if (result instanceof OFError) {
683                 log.warn("Switch {} failed to handle ({}) stats request: {}",
684                         new Object[] { HexString.toHexString(switchId), type,
685                         Utils.getOFErrorString((OFError) result) });
686                 if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
687                     log.warn(
688                             "Switching back to regular Flow stats requests for switch {}",
689                             HexString.toHexString(switchId));
690                     this.switchSupportsVendorExtStats.put(switchId,
691                             Boolean.FALSE);
692                 }
693             } else {
694                 values = (List<OFStatistics>) result;
695             }
696         }
697         return values;
698     }
699
700     @Override
701     public List<OFStatistics> getOFFlowStatistics(Long switchId) {
702         List<OFStatistics> list = flowStatistics.get(switchId);
703
704         /*
705          * Check on emptiness as interference between add and get is still
706          * possible on the inner list (the concurrentMap entry's value)
707          */
708         return (list == null || list.isEmpty()) ? this.dummyList
709                 : (list.get(0) instanceof OFVendorStatistics) ? this
710                         .v6StatsListToOFStatsList(list) : list;
711     }
712
713     @Override
714     public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch, short priority) {
715         List<OFStatistics> statsList = flowStatistics.get(switchId);
716
717         /*
718          * Check on emptiness as interference between add and get is still
719          * possible on the inner list (the concurrentMap entry's value)
720          */
721         if (statsList == null || statsList.isEmpty()) {
722             return this.dummyList;
723         }
724
725         if (statsList.get(0) instanceof OFVendorStatistics) {
726             /*
727              * Caller could provide regular OF match when we instead pull the
728              * vendor statistics from this node Caller is not supposed to know
729              * whether this switch supports vendor extensions statistics
730              * requests
731              */
732             V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch
733                     : new V6Match(ofMatch);
734
735             List<OFStatistics> targetList = v6StatsListToOFStatsList(statsList);
736             for (OFStatistics stats : targetList) {
737                 V6StatsReply v6Stats = (V6StatsReply) stats;
738                 V6Match v6Match = v6Stats.getMatch();
739                 if (v6Stats.getPriority() == priority && v6Match.equals(targetMatch)) {
740                     List<OFStatistics> list = new ArrayList<OFStatistics>();
741                     list.add(stats);
742                     return list;
743                 }
744             }
745         } else {
746             for (OFStatistics stats : statsList) {
747                 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
748                 if (flowStats.getPriority() == priority && flowStats.getMatch().equals(ofMatch)) {
749                     List<OFStatistics> list = new ArrayList<OFStatistics>();
750                     list.add(stats);
751                     return list;
752                 }
753             }
754         }
755         return this.dummyList;
756     }
757
758     /*
759      * Converts the v6 vendor statistics to the OFStatistics
760      */
761     private List<OFStatistics> v6StatsListToOFStatsList(
762             List<OFStatistics> statistics) {
763         List<OFStatistics> v6statistics = new ArrayList<OFStatistics>();
764         if (statistics != null && !statistics.isEmpty()) {
765             for (OFStatistics stats : statistics) {
766                 if (stats instanceof OFVendorStatistics) {
767                     List<OFStatistics> r = getV6ReplyStatistics((OFVendorStatistics) stats);
768                     if (r != null) {
769                         v6statistics.addAll(r);
770                     }
771                 }
772             }
773         }
774         return v6statistics;
775     }
776
777     private static List<OFStatistics> getV6ReplyStatistics(
778             OFVendorStatistics stat) {
779         int length = stat.getLength();
780         List<OFStatistics> results = new ArrayList<OFStatistics>();
781         if (length < 12)
782             return null; // Nicira Hdr is 12 bytes. We need atleast that much
783         ByteBuffer data = ByteBuffer.allocate(length);
784         stat.writeTo(data);
785         data.rewind();
786         if (log.isTraceEnabled()) {
787             log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}",
788                     HexString.toHexString(data.array()));
789         }
790
791         int vendor = data.getInt(); // first 4 bytes is vendor id.
792         if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) {
793             log.warn("Unexpected vendor id: 0x{}", Integer.toHexString(vendor));
794             return null;
795         } else {
796             // go ahead by 8 bytes which is 8 bytes of 0
797             data.getLong(); // should be all 0's
798             length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have
799                           // been consumed
800         }
801
802         V6StatsReply v6statsreply;
803         int min_len;
804         while (length > 0) {
805             v6statsreply = new V6StatsReply();
806             min_len = v6statsreply.getLength();
807             if (length < v6statsreply.getLength())
808                 break;
809             v6statsreply.setActionFactory(stat.getActionFactory());
810             v6statsreply.readFrom(data);
811             if (v6statsreply.getLength() < min_len)
812                 break;
813             v6statsreply.setVendorId(vendor);
814             log.trace("V6StatsReply: {}", v6statsreply);
815             length -= v6statsreply.getLength();
816             results.add(v6statsreply);
817         }
818         return results;
819     }
820
821     @Override
822     public List<OFStatistics> queryStatistics(Long switchId,
823             OFStatisticsType statType, Object target) {
824         /*
825          * Caller does not know and it is not supposed to know whether this
826          * switch supports vendor extension. We adjust the target for him
827          */
828         if (statType == OFStatisticsType.FLOW) {
829             if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
830                 statType = OFStatisticsType.VENDOR;
831             }
832         }
833
834         List<OFStatistics> list = this.fetchStatisticsFromSwitch(switchId, statType,
835                 target);
836
837         return (list == null) ? null :
838             (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list) : list;
839     }
840
841     @Override
842     public List<OFStatistics> getOFDescStatistics(Long switchId) {
843         if (!descStatistics.containsKey(switchId))
844             return this.dummyList;
845
846         return descStatistics.get(switchId);
847     }
848
849     @Override
850     public List<OFStatistics> getOFPortStatistics(Long switchId) {
851         if (!portStatistics.containsKey(switchId)) {
852             return this.dummyList;
853         }
854
855         return portStatistics.get(switchId);
856     }
857
858     @Override
859     public List<OFStatistics> getOFPortStatistics(Long switchId, short portId) {
860         if (!portStatistics.containsKey(switchId)) {
861             return this.dummyList;
862         }
863         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
864         for (OFStatistics stats : portStatistics.get(switchId)) {
865             if (((OFPortStatisticsReply) stats).getPortNumber() == portId) {
866                 list.add(stats);
867                 break;
868             }
869         }
870         return list;
871     }
872
873     @Override
874     public List<OFStatistics> getOFTableStatistics(Long switchId) {
875         if (!tableStatistics.containsKey(switchId)) {
876             return this.dummyList;
877         }
878
879         return tableStatistics.get(switchId);
880     }
881
882     @Override
883     public List<OFStatistics> getOFTableStatistics(Long switchId, Byte tableId) {
884         if (!tableStatistics.containsKey(switchId)) {
885             return this.dummyList;
886         }
887
888         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
889         for (OFStatistics stats : tableStatistics.get(switchId)) {
890             if (((OFTableStatistics) stats).getTableId() == tableId) {
891                 list.add(stats);
892                 break;
893             }
894         }
895         return list;
896     }
897
898     @Override
899     public int getFlowsNumber(long switchId) {
900         return this.flowStatistics.get(switchId).size();
901     }
902
903     /*
904      * InventoryShim replay for us all the switch addition which happened before
905      * we were brought up
906      */
907     @Override
908     public void updateNode(Node node, UpdateType type, Set<Property> props) {
909         Long switchId = (Long) node.getID();
910         switch (type) {
911         case ADDED:
912             addStatisticsTicks(switchId);
913             break;
914         case REMOVED:
915             clearFlowStatsAndTicks(switchId);
916         default:
917         }
918     }
919
920     @Override
921     public void updateNodeConnector(NodeConnector nodeConnector,
922             UpdateType type, Set<Property> props) {
923         // No action
924     }
925
926     /**
927      * Update the cached port rates for this switch with the latest retrieved
928      * port transmit byte count
929      *
930      * @param switchId
931      */
932     private synchronized void updatePortsTxRate(long switchId) {
933         List<OFStatistics> newPortStatistics = this.portStatistics.get(switchId);
934         if (newPortStatistics == null) {
935             return;
936         }
937         Map<Short, TxRates> rates = this.txRates.get(switchId);
938         if (rates == null) {
939             // First time rates for this switch are added
940             rates = new HashMap<Short, TxRates>();
941             txRates.put(switchId, rates);
942         }
943         for (OFStatistics stats : newPortStatistics) {
944             OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats;
945             short port = newPortStat.getPortNumber();
946             TxRates portRatesHolder = rates.get(port);
947             if (portRatesHolder == null) {
948                 // First time rates for this port are added
949                 portRatesHolder = new TxRates();
950                 rates.put(port, portRatesHolder);
951             }
952             // Get and store the number of transmitted bytes for this port
953             // And handle the case where agent does not support the counter
954             long transmitBytes = newPortStat.getTransmitBytes();
955             long value = (transmitBytes < 0) ? 0 : transmitBytes;
956             portRatesHolder.update(value);
957         }
958     }
959
960     @Override
961     public synchronized long getTransmitRate(Long switchId, Short port) {
962         long average = 0;
963         if (switchId == null || port == null) {
964             return average;
965         }
966         Map<Short, TxRates> perSwitch = txRates.get(switchId);
967         if (perSwitch == null) {
968             return average;
969         }
970         TxRates portRates = perSwitch.get(port);
971         if (portRates == null) {
972             return average;
973         }
974         return portRates.getAverageTxRate();
975     }
976
977     /*
978      * Manual switch name configuration code
979      */
980     @Override
981     public String getHelp() {
982         StringBuffer help = new StringBuffer();
983         help.append("---OF Statistics Manager utilities---\n");
984         help.append("\t ofdumpstatsmgr         - "
985                 + "Print Internal Stats Mgr db\n");
986         help.append("\t ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds) - "
987                 + "Set/Show flow/port/dedscription stats poll intervals\n");
988         return help.toString();
989     }
990
991     private boolean isValidSwitchId(String switchId) {
992         String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$";
993         String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$";
994
995         return (switchId != null && (switchId.matches(regexDatapathID) || switchId
996                 .matches(regexDatapathIDLong)));
997     }
998
999     public long getSwitchIDLong(String switchId) {
1000         int radix = 16;
1001         String switchString = "0";
1002
1003         if (isValidSwitchId(switchId)) {
1004             if (switchId.contains(":")) {
1005                 // Handle the 00:00:AA:BB:CC:DD:EE:FF notation
1006                 switchString = switchId.replace(":", "");
1007             } else if (switchId.contains("-")) {
1008                 // Handle the 00-00-AA-BB-CC-DD-EE-FF notation
1009                 switchString = switchId.replace("-", "");
1010             } else {
1011                 // Handle the 0123456789ABCDEF notation
1012                 switchString = switchId;
1013             }
1014         }
1015         return Long.parseLong(switchString, radix);
1016     }
1017
1018     /*
1019      * Internal information dump code
1020      */
1021     private String prettyPrintSwitchMap(ConcurrentMap<Long, StatisticsTicks> map) {
1022         StringBuffer buffer = new StringBuffer();
1023         buffer.append("{");
1024         for (Entry<Long, StatisticsTicks> entry : map.entrySet()) {
1025             buffer.append(HexString.toHexString(entry.getKey()) + "="
1026                     + entry.getValue().toString() + " ");
1027         }
1028         buffer.append("}");
1029         return buffer.toString();
1030     }
1031
1032     public void _ofdumpstatsmgr(CommandInterpreter ci) {
1033         ci.println("Global Counter: " + counter);
1034         ci.println("Timer Ticks: " + prettyPrintSwitchMap(statisticsTimerTicks));
1035         ci.println("PendingStatsQueue: " + pendingStatsRequests);
1036         ci.println("PendingStatsQueue size: " + pendingStatsRequests.size());
1037         ci.println("Stats Collector alive: " + statisticsCollector.isAlive());
1038         ci.println("Stats Collector State: "
1039                 + statisticsCollector.getState().toString());
1040         ci.println("StatsTimer: " + statisticsTimer.toString());
1041         ci.println("Flow Stats Period: " + statisticsTickNumber + " s");
1042         ci.println("Desc Stats Period: " + descriptionTickNumber + " s");
1043         ci.println("Port Stats Period: " + portTickNumber + " s");
1044         ci.println("Table Stats Period: " + tableTickNumber + " s");
1045     }
1046
1047     public void _resetSwitchCapability(CommandInterpreter ci) {
1048         String sidString = ci.nextArgument();
1049         Long sid = null;
1050         if (sidString == null) {
1051             ci.println("Insert the switch id (numeric value)");
1052             return;
1053         }
1054         try {
1055             sid = Long.valueOf(sidString);
1056             this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE);
1057             ci.println("Vendor capability for switch " + sid + " set to "
1058                     + this.switchSupportsVendorExtStats.get(sid));
1059         } catch (NumberFormatException e) {
1060             ci.println("Invalid switch id. Has to be numeric.");
1061         }
1062
1063     }
1064
1065     public void _ofbw(CommandInterpreter ci) {
1066         String sidString = ci.nextArgument();
1067         Long sid = null;
1068         if (sidString == null) {
1069             ci.println("Insert the switch id (numeric value)");
1070             return;
1071         }
1072         try {
1073             sid = Long.valueOf(sidString);
1074         } catch (NumberFormatException e) {
1075             ci.println("Invalid switch id. Has to be numeric.");
1076         }
1077         if (sid != null) {
1078             Map<Short, TxRates> thisSwitchRates = txRates.get(sid);
1079             ci.println("Bandwidth utilization (" + factoredSamples
1080                     * portTickNumber + " sec average) for switch "
1081                     + HexEncode.longToHexString(sid) + ":");
1082             if (thisSwitchRates == null) {
1083                 ci.println("Not available");
1084             } else {
1085                 for (Entry<Short, TxRates> entry : thisSwitchRates.entrySet()) {
1086                     ci.println("Port: " + entry.getKey() + ": "
1087                             + entry.getValue().getAverageTxRate() + " bps");
1088                 }
1089             }
1090         }
1091     }
1092
1093     public void _txratewindow(CommandInterpreter ci) {
1094         String averageWindow = ci.nextArgument();
1095         short seconds = 0;
1096         if (averageWindow == null) {
1097             ci.println("Insert the length in seconds of the median "
1098                     + "window for tx rate");
1099             ci.println("Current: " + factoredSamples * portTickNumber + " secs");
1100             return;
1101         }
1102         try {
1103             seconds = Short.valueOf(averageWindow);
1104         } catch (NumberFormatException e) {
1105             ci.println("Invalid period.");
1106         }
1107         OFStatisticsManager.factoredSamples = (short) (seconds / portTickNumber);
1108         ci.println("New: " + factoredSamples * portTickNumber + " secs");
1109     }
1110
1111     public void _ofstatsmgrintervals(CommandInterpreter ci) {
1112         String flowStatsInterv = ci.nextArgument();
1113         String portStatsInterv = ci.nextArgument();
1114         String descStatsInterv = ci.nextArgument();
1115         String tableStatsInterv = ci.nextArgument();
1116
1117         if (flowStatsInterv == null || portStatsInterv == null
1118                 || descStatsInterv == null) {
1119             ci.println("Usage: ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds)");
1120             ci.println("Current Values: fP=" + statisticsTickNumber + "sec pP="
1121                     + portTickNumber + "sec dP=" + descriptionTickNumber + "sec tP=" + tableTickNumber + " sec");
1122             return;
1123         }
1124         Short fP, pP, dP, tP;
1125         try {
1126             fP = Short.parseShort(flowStatsInterv);
1127             pP = Short.parseShort(portStatsInterv);
1128             dP = Short.parseShort(descStatsInterv);
1129             tP = Short.parseShort(tableStatsInterv);
1130         } catch (Exception e) {
1131             ci.println("Invalid format values: " + e.getMessage());
1132             return;
1133         }
1134
1135         if (pP <= 1 || fP <= 1 || dP <= 1 || tP <= 1) {
1136             ci.println("Invalid values. fP, pP, dP, tP have to be greater than 1.");
1137             return;
1138         }
1139
1140         statisticsTickNumber = fP;
1141         portTickNumber = pP;
1142         descriptionTickNumber = dP;
1143         tableTickNumber = tP;
1144
1145         ci.println("New Values: fP=" + statisticsTickNumber + "s pP="
1146                 + portTickNumber + "s dP=" + descriptionTickNumber + "s tP="
1147                 + tableTickNumber + "s");
1148     }
1149
1150     /**
1151      * This method retrieves user configurations from config.ini and updates
1152      * statisticsTickNumber/portTickNumber/descriptionTickNumber accordingly.
1153      */
1154     private void configStatsPollIntervals() {
1155         String fsStr = System.getProperty("of.flowStatsPollInterval");
1156         String psStr = System.getProperty("of.portStatsPollInterval");
1157         String dsStr = System.getProperty("of.descStatsPollInterval");
1158         String tsStr = System.getProperty("of.tableStatsPollInterval");
1159         Short fs, ps, ds, ts;
1160
1161         if (fsStr != null) {
1162             try {
1163                 fs = Short.parseShort(fsStr);
1164                 if (fs > 0) {
1165                     statisticsTickNumber = fs;
1166                 }
1167             } catch (Exception e) {
1168             }
1169         }
1170
1171         if (psStr != null) {
1172             try {
1173                 ps = Short.parseShort(psStr);
1174                 if (ps > 0) {
1175                     portTickNumber = ps;
1176                 }
1177             } catch (Exception e) {
1178             }
1179         }
1180
1181         if (dsStr != null) {
1182             try {
1183                 ds = Short.parseShort(dsStr);
1184                 if (ds > 0) {
1185                     descriptionTickNumber = ds;
1186                 }
1187             } catch (Exception e) {
1188             }
1189         }
1190
1191         if (tsStr != null) {
1192             try{
1193                 ts = Short.parseShort(tsStr);
1194                 if (ts > 0) {
1195                     tableTickNumber = ts;
1196                 }
1197             } catch (Exception e) {
1198             }
1199         }
1200     }
1201 }