b63517b8add5d90a4df4aecd386c63767fd47e50
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / OFStatisticsManager.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.nio.ByteBuffer;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.Deque;
15 import java.util.HashMap;
16 import java.util.List;
17 import java.util.Map;
18 import java.util.Map.Entry;
19 import java.util.Set;
20 import java.util.Timer;
21 import java.util.TimerTask;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.CopyOnWriteArraySet;
26 import java.util.concurrent.LinkedBlockingDeque;
27 import java.util.concurrent.LinkedBlockingQueue;
28
29 import org.eclipse.osgi.framework.console.CommandInterpreter;
30 import org.eclipse.osgi.framework.console.CommandProvider;
31 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
32 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsListener;
33 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
34 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
36 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
37 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
38 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
39 import org.opendaylight.controller.sal.core.Node;
40 import org.opendaylight.controller.sal.core.NodeConnector;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.core.UpdateType;
43 import org.opendaylight.controller.sal.utils.HexEncode;
44 import org.openflow.protocol.OFError;
45 import org.openflow.protocol.OFMatch;
46 import org.openflow.protocol.OFPort;
47 import org.openflow.protocol.OFStatisticsRequest;
48 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
49 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
50 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
51 import org.openflow.protocol.statistics.OFPortStatisticsReply;
52 import org.openflow.protocol.statistics.OFPortStatisticsRequest;
53 import org.openflow.protocol.statistics.OFQueueStatisticsRequest;
54 import org.openflow.protocol.statistics.OFStatistics;
55 import org.openflow.protocol.statistics.OFStatisticsType;
56 import org.openflow.protocol.statistics.OFTableStatistics;
57 import org.openflow.protocol.statistics.OFVendorStatistics;
58 import org.openflow.util.HexString;
59 import org.osgi.framework.BundleContext;
60 import org.osgi.framework.FrameworkUtil;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 /**
65  * It periodically polls the different OF statistics from the OF switches and
66  * caches them for quick retrieval for the above layers' modules It also
67  * provides an API to directly query the switch about the statistics
68  */
69 public class OFStatisticsManager implements IOFStatisticsManager,
70 IInventoryShimExternalListener, CommandProvider {
71     private static final Logger log = LoggerFactory.getLogger(OFStatisticsManager.class);
72     private static final int INITIAL_SIZE = 64;
73     private static final long FLOW_STATS_PERIOD = 10000;
74     private static final long DESC_STATS_PERIOD = 60000;
75     private static final long PORT_STATS_PERIOD = 5000;
76     private static final long TABLE_STATS_PERIOD = 10000;
77     private static final long TICK = 1000;
78     private static short statisticsTickNumber = (short) (FLOW_STATS_PERIOD / TICK);
79     private static short descriptionTickNumber = (short) (DESC_STATS_PERIOD / TICK);
80     private static short portTickNumber = (short) (PORT_STATS_PERIOD / TICK);
81     private static short tableTickNumber = (short) (TABLE_STATS_PERIOD / TICK);
82     private static short factoredSamples = (short) 2;
83     private static short counter = 1;
84     private IController controller = null;
85     private ConcurrentMap<Long, List<OFStatistics>> flowStatistics;
86     private ConcurrentMap<Long, List<OFStatistics>> descStatistics;
87     private ConcurrentMap<Long, List<OFStatistics>> portStatistics;
88     private ConcurrentMap<Long, List<OFStatistics>> tableStatistics;
89     private List<OFStatistics> dummyList;
90     private ConcurrentMap<Long, StatisticsTicks> statisticsTimerTicks;
91     protected BlockingQueue<StatsRequest> pendingStatsRequests;
92     protected BlockingQueue<Long> switchPortStatsUpdated;
93     private Thread statisticsCollector;
94     private Thread txRatesUpdater;
95     private Timer statisticsTimer;
96     private TimerTask statisticsTimerTask;
97     private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
98     // Per port sampled (every portStatsPeriod) transmit rate
99     private Map<Long, Map<Short, TxRates>> txRates;
100     private Set<IOFStatisticsListener> statisticsListeners;
101
102     /**
103      * The object containing the latest factoredSamples tx rate samples for a
104      * given switch port
105      */
106     protected class TxRates {
107         // contains the latest factoredSamples sampled transmitted bytes
108         Deque<Long> sampledTxBytes;
109
110         public TxRates() {
111             sampledTxBytes = new LinkedBlockingDeque<Long>();
112         }
113
114         public void update(Long txBytes) {
115             /*
116              * Based on how many samples our average works on, we might have to
117              * remove the oldest sample
118              */
119             if (sampledTxBytes.size() == factoredSamples) {
120                 sampledTxBytes.removeLast();
121             }
122
123             // Add the latest sample to the top of the queue
124             sampledTxBytes.addFirst(txBytes);
125         }
126
127         /**
128          * Returns the average transmit rate in bps
129          *
130          * @return the average transmit rate [bps]
131          */
132         public long getAverageTxRate() {
133             long average = 0;
134             /*
135              * If we cannot provide the value for the time window length set
136              */
137             if (sampledTxBytes.size() < factoredSamples) {
138                 return average;
139             }
140             long increment = sampledTxBytes.getFirst() - sampledTxBytes
141                     .getLast();
142             long timePeriod = factoredSamples * PORT_STATS_PERIOD / TICK;
143             average = (8L * increment) / timePeriod;
144             return average;
145         }
146     }
147
148     public void setController(IController core) {
149         this.controller = core;
150     }
151
152     public void unsetController(IController core) {
153         if (this.controller == core) {
154             this.controller = null;
155         }
156     }
157
158     /**
159      * Function called by the dependency manager when all the required
160      * dependencies are satisfied
161      *
162      */
163     void init() {
164         flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
165         descStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
166         portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
167         tableStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
168         dummyList = new ArrayList<OFStatistics>(1);
169         statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(INITIAL_SIZE);
170         pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(INITIAL_SIZE);
171         switchPortStatsUpdated = new LinkedBlockingQueue<Long>(INITIAL_SIZE);
172         switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(INITIAL_SIZE);
173         txRates = new HashMap<Long, Map<Short, TxRates>>(INITIAL_SIZE);
174         statisticsListeners = new CopyOnWriteArraySet<IOFStatisticsListener>();
175
176         configStatsPollIntervals();
177
178         // Initialize managed timers
179         statisticsTimer = new Timer();
180         statisticsTimerTask = new TimerTask() {
181             @Override
182             public void run() {
183                 decrementTicks();
184             }
185         };
186
187         // Initialize Statistics collector thread
188         statisticsCollector = new Thread(new Runnable() {
189             @Override
190             public void run() {
191                 while (true) {
192                     try {
193                         StatsRequest req = pendingStatsRequests.take();
194                         queryStatisticsInternal(req.switchId, req.type);
195                     } catch (InterruptedException e) {
196                         log.warn("Flow Statistics Collector thread "
197                                 + "interrupted", e);
198                     }
199                 }
200             }
201         }, "Statistics Collector");
202
203         // Initialize Tx Rate Updater thread
204         txRatesUpdater = new Thread(new Runnable() {
205             @Override
206             public void run() {
207                 while (true) {
208                     try {
209                         long switchId = switchPortStatsUpdated.take();
210                         updatePortsTxRate(switchId);
211                     } catch (InterruptedException e) {
212                         log.warn("TX Rate Updater thread interrupted", e);
213                     }
214                 }
215             }
216         }, "TX Rate Updater");
217     }
218
219     /**
220      * Function called by the dependency manager when at least one dependency
221      * become unsatisfied or when the component is shutting down because for
222      * example bundle is being stopped.
223      *
224      */
225     void destroy() {
226     }
227
228     /**
229      * Function called by dependency manager after "init ()" is called and after
230      * the services provided by the class are registered in the service registry
231      *
232      */
233     void start() {
234         // Start managed timers
235         statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, TICK);
236
237         // Start statistics collector thread
238         statisticsCollector.start();
239
240         // Start bandwidth utilization computer thread
241         txRatesUpdater.start();
242
243         // OSGI console
244         registerWithOSGIConsole();
245     }
246
247     /**
248      * Function called by the dependency manager before the services exported by
249      * the component are unregistered, this will be followed by a "destroy ()"
250      * calls
251      *
252      */
253     void stop() {
254         // Stop managed timers
255         statisticsTimer.cancel();
256     }
257
258     public void setStatisticsListener(IOFStatisticsListener s) {
259         this.statisticsListeners.add(s);
260     }
261
262     public void unsetStatisticsListener(IOFStatisticsListener s) {
263         if (s != null) {
264             this.statisticsListeners.remove(s);
265         }
266     }
267
268     private void registerWithOSGIConsole() {
269         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
270         bundleContext.registerService(CommandProvider.class.getName(), this, null);
271     }
272
273     private static class StatsRequest {
274         protected Long switchId;
275         protected OFStatisticsType type;
276
277         public StatsRequest(Long d, OFStatisticsType t) {
278             switchId = d;
279             type = t;
280         }
281
282         @Override
283         public String toString() {
284             return "SReq = {switchId=" + switchId + ", type=" + type + "}";
285         }
286
287         @Override
288         public int hashCode() {
289             final int prime = 31;
290             int result = 1;
291             result = prime * result
292                     + ((switchId == null) ? 0 : switchId.hashCode());
293             result = prime * result + ((type == null) ? 0 : type.ordinal());
294             return result;
295         }
296
297         @Override
298         public boolean equals(Object obj) {
299             if (this == obj) {
300                 return true;
301             }
302             if (obj == null) {
303                 return false;
304             }
305             if (getClass() != obj.getClass()) {
306                 return false;
307             }
308             StatsRequest other = (StatsRequest) obj;
309             if (switchId == null) {
310                 if (other.switchId != null) {
311                     return false;
312                 }
313             } else if (!switchId.equals(other.switchId)) {
314                 return false;
315             }
316             if (type != other.type) {
317                 return false;
318             }
319             return true;
320         }
321     }
322
323     private void addStatisticsTicks(Long switchId) {
324         switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume
325                                                                   // switch
326                                                                   // supports
327                                                                   // Vendor
328                                                                   // extension
329                                                                   // stats
330         statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
331         log.debug("Added Switch {} to target pool",
332                 HexString.toHexString(switchId.longValue()));
333     }
334
335     protected static class StatisticsTicks {
336         private short flowStatisticsTicks;
337         private short descriptionTicks;
338         private short portStatisticsTicks;
339         private short tableStatisticsTicks;
340
341         public StatisticsTicks(boolean scattered) {
342             if (scattered) {
343                 // scatter bursts by statisticsTickPeriod
344                 if (++counter < 0) {
345                     counter = 0;
346                 } // being paranoid here
347                 flowStatisticsTicks = (short) (1 + counter
348                         % statisticsTickNumber);
349                 descriptionTicks = (short) (1 + counter % descriptionTickNumber);
350                 portStatisticsTicks = (short) (1 + counter % portTickNumber);
351                 tableStatisticsTicks = (short) (1 + counter % tableTickNumber);
352             } else {
353                 flowStatisticsTicks = statisticsTickNumber;
354                 descriptionTicks = descriptionTickNumber;
355                 portStatisticsTicks = portTickNumber;
356                 tableStatisticsTicks = tableTickNumber;
357             }
358         }
359
360         public boolean decrementFlowTicksIsZero() {
361             // Please ensure no code is inserted between the if check and the
362             // flowStatisticsTicks reset
363             if (--flowStatisticsTicks == 0) {
364                 flowStatisticsTicks = statisticsTickNumber;
365                 return true;
366             }
367             return false;
368         }
369
370         public boolean decrementDescTicksIsZero() {
371             // Please ensure no code is inserted between the if check and the
372             // descriptionTicks reset
373             if (--descriptionTicks == 0) {
374                 descriptionTicks = descriptionTickNumber;
375                 return true;
376             }
377             return false;
378         }
379
380         public boolean decrementPortTicksIsZero() {
381             // Please ensure no code is inserted between the if check and the
382             // descriptionTicks reset
383             if (--portStatisticsTicks == 0) {
384                 portStatisticsTicks = portTickNumber;
385                 return true;
386             }
387             return false;
388         }
389
390         public boolean decrementTableTicksIsZero() {
391             // Please ensure no code is inserted between the if check and the
392             // descriptionTicks reset
393             if(--tableStatisticsTicks == 0) {
394                 tableStatisticsTicks = tableTickNumber;
395                 return true;
396             }
397             return false;
398         }
399
400         @Override
401         public String toString() {
402             return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks
403                     + ",pT=" + portStatisticsTicks + ",tT=" + tableStatisticsTicks + "}";
404         }
405     }
406
407     private void printInfoMessage(String type, StatsRequest request) {
408         log.info("{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
409                 new Object[] {type, HexString.toHexString(request.switchId), pendingStatsRequests.size(),
410                 statisticsCollector.getState().toString() });
411     }
412
413     protected void decrementTicks() {
414         StatsRequest request = null;
415         for (Map.Entry<Long, StatisticsTicks> entry : statisticsTimerTicks
416                 .entrySet()) {
417             StatisticsTicks clock = entry.getValue();
418             Long switchId = entry.getKey();
419             if (clock.decrementFlowTicksIsZero()) {
420                 request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ?
421                         new StatsRequest(switchId, OFStatisticsType.VENDOR) :
422                         new StatsRequest(switchId, OFStatisticsType.FLOW);
423                 // If a request for this switch is already in the queue, skip to
424                 // add this new request
425                 if (!pendingStatsRequests.contains(request)
426                         && false == pendingStatsRequests.offer(request)) {
427                     printInfoMessage("Flow", request);
428                 }
429             }
430
431             if (clock.decrementDescTicksIsZero()) {
432                 request = new StatsRequest(switchId, OFStatisticsType.DESC);
433                 // If a request for this switch is already in the queue, skip to
434                 // add this new request
435                 if (!pendingStatsRequests.contains(request)
436                         && false == pendingStatsRequests.offer(request)) {
437                     printInfoMessage("Description", request);
438                 }
439             }
440
441             if (clock.decrementPortTicksIsZero()) {
442                 request = new StatsRequest(switchId, OFStatisticsType.PORT);
443                 // If a request for this switch is already in the queue, skip to
444                 // add this new request
445                 if (!pendingStatsRequests.contains(request)
446                         && false == pendingStatsRequests.offer(request)) {
447                     printInfoMessage("Port", request);
448                 }
449             }
450
451             if(clock.decrementTableTicksIsZero()) {
452                 request = new StatsRequest(switchId, OFStatisticsType.TABLE);
453                 // If a request for this switch is already in the queue, skip to
454                 // add this new request
455                 if (!pendingStatsRequests.contains(request)
456                         && false == pendingStatsRequests.offer(request)) {
457                     printInfoMessage("Table", request);
458                 }
459             }
460         }
461     }
462
463     private void removeStatsRequestTasks(Long switchId) {
464         log.debug("Cleaning Statistics database for switch {}",
465                 HexEncode.longToHexString(switchId));
466         // To be safe, let's attempt removal of both VENDOR and FLOW request. It
467         // does not hurt
468         pendingStatsRequests.remove(new StatsRequest(switchId,
469                 OFStatisticsType.VENDOR));
470         pendingStatsRequests.remove(new StatsRequest(switchId,
471                 OFStatisticsType.FLOW));
472         pendingStatsRequests.remove(new StatsRequest(switchId,
473                 OFStatisticsType.DESC));
474         pendingStatsRequests.remove(new StatsRequest(switchId,
475                 OFStatisticsType.PORT));
476         pendingStatsRequests.remove(new StatsRequest(switchId,
477                 OFStatisticsType.TABLE));
478         // Take care of the TX rate databases
479         switchPortStatsUpdated.remove(switchId);
480         txRates.remove(switchId);
481     }
482
483     private void clearFlowStatsAndTicks(Long switchId) {
484         statisticsTimerTicks.remove(switchId);
485         removeStatsRequestTasks(switchId);
486         flowStatistics.remove(switchId);
487         log.debug("Statistics removed for switch {}",
488                 HexString.toHexString(switchId));
489     }
490
491     private void queryStatisticsInternal(Long switchId, OFStatisticsType statType) {
492
493         // Query the switch on all matches
494         List<OFStatistics> values = this.fetchStatisticsFromSwitch(switchId, statType, null);
495
496         // If got a valid response update local cache and notify listeners
497         if (values != null && !values.isEmpty()) {
498             switch (statType) {
499                 case FLOW:
500                 case VENDOR:
501                     flowStatistics.put(switchId, values);
502                     notifyFlowUpdate(switchId, values);
503                     break;
504                 case DESC:
505                     // Overwrite cache
506                     descStatistics.put(switchId, values);
507                     // Notify who may be interested in a description change
508                     notifyDescriptionUpdate(switchId, values);
509                     break;
510                 case PORT:
511                     // Overwrite cache with new port statistics for this switch
512                     portStatistics.put(switchId, values);
513
514                     // Wake up the thread which maintains the TX byte counters for
515                     // each port
516                     switchPortStatsUpdated.offer(switchId);
517                     notifyPortUpdate(switchId, values);
518                     break;
519                 case TABLE:
520                     // Overwrite cache
521                     tableStatistics.put(switchId, values);
522                     notifyTableUpdate(switchId, values);
523                     break;
524                 default:
525             }
526         }
527     }
528
529     private void notifyDescriptionUpdate(Long switchId, List<OFStatistics> values) {
530         for (IOFStatisticsListener l : this.statisticsListeners) {
531             l.descriptionStatisticsRefreshed(switchId, values);
532         }
533     }
534
535     private void notifyFlowUpdate(Long switchId, List<OFStatistics> values) {
536         if (values.get(0) instanceof OFVendorStatistics) {
537             values = this.v6StatsListToOFStatsList(values);
538         }
539
540         if (!values.isEmpty()) { //possiblly filtered out by v6StatsListToOFStatsList()
541             for (IOFStatisticsListener l : this.statisticsListeners) {
542                 l.flowStatisticsRefreshed(switchId, values);
543             }
544         }
545     }
546
547     private void notifyPortUpdate(Long switchId, List<OFStatistics> values) {
548         for (IOFStatisticsListener l : this.statisticsListeners) {
549             l.portStatisticsRefreshed(switchId, values);
550         }
551     }
552
553     private void notifyTableUpdate(Long switchId, List<OFStatistics> values) {
554         for (IOFStatisticsListener l : this.statisticsListeners) {
555             l.tableStatisticsRefreshed(switchId, values);
556         }
557     }
558
559     /*
560      * Generic function to get the statistics form an OF switch
561      */
562     @SuppressWarnings("unchecked")
563     private List<OFStatistics> fetchStatisticsFromSwitch(Long switchId,
564             OFStatisticsType statsType, Object target) {
565         List<OFStatistics> values = null;
566         String type = null;
567         ISwitch sw = controller.getSwitch(switchId);
568
569         if (sw != null) {
570             OFStatisticsRequest req = new OFStatisticsRequest();
571             req.setStatisticType(statsType);
572             int requestLength = req.getLengthU();
573
574             if (statsType == OFStatisticsType.FLOW) {
575                 OFMatch match = null;
576                 if (target == null) {
577                     // All flows request
578                     match = new OFMatch();
579                     match.setWildcards(0xffffffff);
580                 } else if (!(target instanceof OFMatch)) {
581                     // Malformed request
582                     log.warn("Invalid target type for Flow stats request: {}",
583                             target.getClass());
584                     return null;
585                 } else {
586                     // Specific flow request
587                     match = (OFMatch) target;
588                 }
589                 OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest();
590                 specificReq.setMatch(match);
591                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
592                 specificReq.setTableId((byte) 0xff);
593                 req.setStatistics(Collections
594                         .singletonList((OFStatistics) specificReq));
595                 requestLength += specificReq.getLength();
596                 type = "FLOW";
597             } else if (statsType == OFStatisticsType.VENDOR) {
598                 V6StatsRequest specificReq = new V6StatsRequest();
599                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
600                 specificReq.setTableId((byte) 0xff);
601                 req.setStatistics(Collections
602                         .singletonList((OFStatistics) specificReq));
603                 requestLength += specificReq.getLength();
604                 type = "VENDOR";
605             } else if (statsType == OFStatisticsType.AGGREGATE) {
606                 OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest();
607                 OFMatch match = new OFMatch();
608                 match.setWildcards(0xffffffff);
609                 specificReq.setMatch(match);
610                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
611                 specificReq.setTableId((byte) 0xff);
612                 req.setStatistics(Collections
613                         .singletonList((OFStatistics) specificReq));
614                 requestLength += specificReq.getLength();
615                 type = "AGGREGATE";
616             } else if (statsType == OFStatisticsType.PORT) {
617                 short targetPort;
618                 if (target == null) {
619                     // All ports request
620                     targetPort = OFPort.OFPP_NONE.getValue();
621                 } else if (!(target instanceof Short)) {
622                     // Malformed request
623                     log.warn("Invalid target type for Port stats request: {}",
624                             target.getClass());
625                     return null;
626                 } else {
627                     // Specific port request
628                     targetPort = (Short) target;
629                 }
630                 OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest();
631                 specificReq.setPortNumber(targetPort);
632                 req.setStatistics(Collections
633                         .singletonList((OFStatistics) specificReq));
634                 requestLength += specificReq.getLength();
635                 type = "PORT";
636             } else if (statsType == OFStatisticsType.QUEUE) {
637                 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
638                 specificReq.setPortNumber(OFPort.OFPP_ALL.getValue());
639                 specificReq.setQueueId(0xffffffff);
640                 req.setStatistics(Collections
641                         .singletonList((OFStatistics) specificReq));
642                 requestLength += specificReq.getLength();
643                 type = "QUEUE";
644             } else if (statsType == OFStatisticsType.DESC) {
645                 type = "DESC";
646             } else if (statsType == OFStatisticsType.TABLE) {
647                 if(target != null){
648                     if (!(target instanceof Byte)) {
649                         // Malformed request
650                         log.warn("Invalid table id for table stats request: {}",
651                                 target.getClass());
652                         return null;
653                     }
654                     byte targetTable = (Byte) target;
655                     OFTableStatistics specificReq = new OFTableStatistics();
656                     specificReq.setTableId(targetTable);
657                     req.setStatistics(Collections
658                             .singletonList((OFStatistics) specificReq));
659                     requestLength += specificReq.getLength();
660                 }
661                 type = "TABLE";
662             }
663             req.setLengthU(requestLength);
664             Object result = sw.getStatistics(req);
665
666             if (result == null) {
667                 log.warn("Request Timed Out for ({}) from switch {}", type,
668                         HexString.toHexString(switchId));
669             } else if (result instanceof OFError) {
670                 log.warn("Switch {} failed to handle ({}) stats request: {}",
671                         new Object[] { HexString.toHexString(switchId), type,
672                         Utils.getOFErrorString((OFError) result) });
673                 if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
674                     log.warn(
675                             "Switching back to regular Flow stats requests for switch {}",
676                             HexString.toHexString(switchId));
677                     this.switchSupportsVendorExtStats.put(switchId,
678                             Boolean.FALSE);
679                 }
680             } else {
681                 values = (List<OFStatistics>) result;
682             }
683         }
684         return values;
685     }
686
687     @Override
688     public List<OFStatistics> getOFFlowStatistics(Long switchId) {
689         List<OFStatistics> list = flowStatistics.get(switchId);
690
691         /*
692          * Check on emptiness as interference between add and get is still
693          * possible on the inner list (the concurrentMap entry's value)
694          */
695         return (list == null || list.isEmpty()) ? this.dummyList
696                 : (list.get(0) instanceof OFVendorStatistics) ? this
697                         .v6StatsListToOFStatsList(list) : list;
698     }
699
700     @Override
701     public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch, short priority) {
702         List<OFStatistics> statsList = flowStatistics.get(switchId);
703
704         /*
705          * Check on emptiness as interference between add and get is still
706          * possible on the inner list (the concurrentMap entry's value)
707          */
708         if (statsList == null || statsList.isEmpty()) {
709             return this.dummyList;
710         }
711
712         if (statsList.get(0) instanceof OFVendorStatistics) {
713             /*
714              * Caller could provide regular OF match when we instead pull the
715              * vendor statistics from this node Caller is not supposed to know
716              * whether this switch supports vendor extensions statistics
717              * requests
718              */
719             V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch
720                     : new V6Match(ofMatch);
721
722             List<OFStatistics> targetList = v6StatsListToOFStatsList(statsList);
723             for (OFStatistics stats : targetList) {
724                 V6StatsReply v6Stats = (V6StatsReply) stats;
725                 V6Match v6Match = v6Stats.getMatch();
726                 if (v6Stats.getPriority() == priority && v6Match.equals(targetMatch)) {
727                     List<OFStatistics> list = new ArrayList<OFStatistics>();
728                     list.add(stats);
729                     return list;
730                 }
731             }
732         } else {
733             for (OFStatistics stats : statsList) {
734                 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
735                 if (flowStats.getPriority() == priority && flowStats.getMatch().equals(ofMatch)) {
736                     List<OFStatistics> list = new ArrayList<OFStatistics>();
737                     list.add(stats);
738                     return list;
739                 }
740             }
741         }
742         return this.dummyList;
743     }
744
745     /*
746      * Converts the v6 vendor statistics to the OFStatistics
747      */
748     private List<OFStatistics> v6StatsListToOFStatsList(
749             List<OFStatistics> statistics) {
750         List<OFStatistics> v6statistics = new ArrayList<OFStatistics>();
751         if (statistics != null && !statistics.isEmpty()) {
752             for (OFStatistics stats : statistics) {
753                 if (stats instanceof OFVendorStatistics) {
754                     List<OFStatistics> r = getV6ReplyStatistics((OFVendorStatistics) stats);
755                     if (r != null) {
756                         v6statistics.addAll(r);
757                     }
758                 }
759             }
760         }
761         return v6statistics;
762     }
763
764     private static List<OFStatistics> getV6ReplyStatistics(
765             OFVendorStatistics stat) {
766         int length = stat.getLength();
767         List<OFStatistics> results = new ArrayList<OFStatistics>();
768         if (length < 12)
769             return null; // Nicira Hdr is 12 bytes. We need atleast that much
770         ByteBuffer data = ByteBuffer.allocate(length);
771         stat.writeTo(data);
772         data.rewind();
773         if (log.isTraceEnabled()) {
774             log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}",
775                     HexString.toHexString(data.array()));
776         }
777
778         int vendor = data.getInt(); // first 4 bytes is vendor id.
779         if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) {
780             log.warn("Unexpected vendor id: 0x{}", Integer.toHexString(vendor));
781             return null;
782         } else {
783             // go ahead by 8 bytes which is 8 bytes of 0
784             data.getLong(); // should be all 0's
785             length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have
786                           // been consumed
787         }
788
789         V6StatsReply v6statsreply;
790         int min_len;
791         while (length > 0) {
792             v6statsreply = new V6StatsReply();
793             min_len = v6statsreply.getLength();
794             if (length < v6statsreply.getLength())
795                 break;
796             v6statsreply.setActionFactory(stat.getActionFactory());
797             v6statsreply.readFrom(data);
798             if (v6statsreply.getLength() < min_len)
799                 break;
800             v6statsreply.setVendorId(vendor);
801             log.trace("V6StatsReply: {}", v6statsreply);
802             length -= v6statsreply.getLength();
803             results.add(v6statsreply);
804         }
805         return results;
806     }
807
808     @Override
809     public List<OFStatistics> queryStatistics(Long switchId,
810             OFStatisticsType statType, Object target) {
811         /*
812          * Caller does not know and it is not supposed to know whether this
813          * switch supports vendor extension. We adjust the target for him
814          */
815         if (statType == OFStatisticsType.FLOW) {
816             if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
817                 statType = OFStatisticsType.VENDOR;
818             }
819         }
820
821         List<OFStatistics> list = this.fetchStatisticsFromSwitch(switchId, statType,
822                 target);
823
824         return (list == null) ? null :
825             (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list) : list;
826     }
827
828     @Override
829     public List<OFStatistics> getOFDescStatistics(Long switchId) {
830         if (!descStatistics.containsKey(switchId))
831             return this.dummyList;
832
833         return descStatistics.get(switchId);
834     }
835
836     @Override
837     public List<OFStatistics> getOFPortStatistics(Long switchId) {
838         if (!portStatistics.containsKey(switchId)) {
839             return this.dummyList;
840         }
841
842         return portStatistics.get(switchId);
843     }
844
845     @Override
846     public List<OFStatistics> getOFPortStatistics(Long switchId, short portId) {
847         if (!portStatistics.containsKey(switchId)) {
848             return this.dummyList;
849         }
850         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
851         for (OFStatistics stats : portStatistics.get(switchId)) {
852             if (((OFPortStatisticsReply) stats).getPortNumber() == portId) {
853                 list.add(stats);
854                 break;
855             }
856         }
857         return list;
858     }
859
860     @Override
861     public List<OFStatistics> getOFTableStatistics(Long switchId) {
862         if (!tableStatistics.containsKey(switchId)) {
863             return this.dummyList;
864         }
865
866         return tableStatistics.get(switchId);
867     }
868
869     @Override
870     public List<OFStatistics> getOFTableStatistics(Long switchId, Byte tableId) {
871         if (!tableStatistics.containsKey(switchId)) {
872             return this.dummyList;
873         }
874
875         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
876         for (OFStatistics stats : tableStatistics.get(switchId)) {
877             if (((OFTableStatistics) stats).getTableId() == tableId) {
878                 list.add(stats);
879                 break;
880             }
881         }
882         return list;
883     }
884
885     @Override
886     public int getFlowsNumber(long switchId) {
887         return this.flowStatistics.get(switchId).size();
888     }
889
890     /*
891      * InventoryShim replay for us all the switch addition which happened before
892      * we were brought up
893      */
894     @Override
895     public void updateNode(Node node, UpdateType type, Set<Property> props) {
896         Long switchId = (Long) node.getID();
897         switch (type) {
898         case ADDED:
899             addStatisticsTicks(switchId);
900             break;
901         case REMOVED:
902             clearFlowStatsAndTicks(switchId);
903         default:
904         }
905     }
906
907     @Override
908     public void updateNodeConnector(NodeConnector nodeConnector,
909             UpdateType type, Set<Property> props) {
910         // No action
911     }
912
913     /**
914      * Update the cached port rates for this switch with the latest retrieved
915      * port transmit byte count
916      *
917      * @param switchId
918      */
919     private synchronized void updatePortsTxRate(long switchId) {
920         List<OFStatistics> newPortStatistics = this.portStatistics.get(switchId);
921         if (newPortStatistics == null) {
922             return;
923         }
924         Map<Short, TxRates> rates = this.txRates.get(switchId);
925         if (rates == null) {
926             // First time rates for this switch are added
927             rates = new HashMap<Short, TxRates>();
928             txRates.put(switchId, rates);
929         }
930         for (OFStatistics stats : newPortStatistics) {
931             OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats;
932             short port = newPortStat.getPortNumber();
933             TxRates portRatesHolder = rates.get(port);
934             if (portRatesHolder == null) {
935                 // First time rates for this port are added
936                 portRatesHolder = new TxRates();
937                 rates.put(port, portRatesHolder);
938             }
939             // Get and store the number of transmitted bytes for this port
940             // And handle the case where agent does not support the counter
941             long transmitBytes = newPortStat.getTransmitBytes();
942             long value = (transmitBytes < 0) ? 0 : transmitBytes;
943             portRatesHolder.update(value);
944         }
945     }
946
947     @Override
948     public synchronized long getTransmitRate(Long switchId, Short port) {
949         long average = 0;
950         if (switchId == null || port == null) {
951             return average;
952         }
953         Map<Short, TxRates> perSwitch = txRates.get(switchId);
954         if (perSwitch == null) {
955             return average;
956         }
957         TxRates portRates = perSwitch.get(port);
958         if (portRates == null) {
959             return average;
960         }
961         return portRates.getAverageTxRate();
962     }
963
964     /*
965      * Manual switch name configuration code
966      */
967     @Override
968     public String getHelp() {
969         StringBuffer help = new StringBuffer();
970         help.append("---OF Statistics Manager utilities---\n");
971         help.append("\t ofdumpstatsmgr         - "
972                 + "Print Internal Stats Mgr db\n");
973         help.append("\t ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds) - "
974                 + "Set/Show flow/port/dedscription stats poll intervals\n");
975         return help.toString();
976     }
977
978     private boolean isValidSwitchId(String switchId) {
979         String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$";
980         String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$";
981
982         return (switchId != null && (switchId.matches(regexDatapathID) || switchId
983                 .matches(regexDatapathIDLong)));
984     }
985
986     public long getSwitchIDLong(String switchId) {
987         int radix = 16;
988         String switchString = "0";
989
990         if (isValidSwitchId(switchId)) {
991             if (switchId.contains(":")) {
992                 // Handle the 00:00:AA:BB:CC:DD:EE:FF notation
993                 switchString = switchId.replace(":", "");
994             } else if (switchId.contains("-")) {
995                 // Handle the 00-00-AA-BB-CC-DD-EE-FF notation
996                 switchString = switchId.replace("-", "");
997             } else {
998                 // Handle the 0123456789ABCDEF notation
999                 switchString = switchId;
1000             }
1001         }
1002         return Long.parseLong(switchString, radix);
1003     }
1004
1005     /*
1006      * Internal information dump code
1007      */
1008     private String prettyPrintSwitchMap(ConcurrentMap<Long, StatisticsTicks> map) {
1009         StringBuffer buffer = new StringBuffer();
1010         buffer.append("{");
1011         for (Entry<Long, StatisticsTicks> entry : map.entrySet()) {
1012             buffer.append(HexString.toHexString(entry.getKey()) + "="
1013                     + entry.getValue().toString() + " ");
1014         }
1015         buffer.append("}");
1016         return buffer.toString();
1017     }
1018
1019     public void _ofdumpstatsmgr(CommandInterpreter ci) {
1020         ci.println("Global Counter: " + counter);
1021         ci.println("Timer Ticks: " + prettyPrintSwitchMap(statisticsTimerTicks));
1022         ci.println("PendingStatsQueue: " + pendingStatsRequests);
1023         ci.println("PendingStatsQueue size: " + pendingStatsRequests.size());
1024         ci.println("Stats Collector alive: " + statisticsCollector.isAlive());
1025         ci.println("Stats Collector State: "
1026                 + statisticsCollector.getState().toString());
1027         ci.println("StatsTimer: " + statisticsTimer.toString());
1028         ci.println("Flow Stats Period: " + statisticsTickNumber + " s");
1029         ci.println("Desc Stats Period: " + descriptionTickNumber + " s");
1030         ci.println("Port Stats Period: " + portTickNumber + " s");
1031         ci.println("Table Stats Period: " + tableTickNumber + " s");
1032     }
1033
1034     public void _resetSwitchCapability(CommandInterpreter ci) {
1035         String sidString = ci.nextArgument();
1036         Long sid = null;
1037         if (sidString == null) {
1038             ci.println("Insert the switch id (numeric value)");
1039             return;
1040         }
1041         try {
1042             sid = Long.valueOf(sidString);
1043             this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE);
1044             ci.println("Vendor capability for switch " + sid + " set to "
1045                     + this.switchSupportsVendorExtStats.get(sid));
1046         } catch (NumberFormatException e) {
1047             ci.println("Invalid switch id. Has to be numeric.");
1048         }
1049
1050     }
1051
1052     public void _ofbw(CommandInterpreter ci) {
1053         String sidString = ci.nextArgument();
1054         Long sid = null;
1055         if (sidString == null) {
1056             ci.println("Insert the switch id (numeric value)");
1057             return;
1058         }
1059         try {
1060             sid = Long.valueOf(sidString);
1061         } catch (NumberFormatException e) {
1062             ci.println("Invalid switch id. Has to be numeric.");
1063         }
1064         if (sid != null) {
1065             Map<Short, TxRates> thisSwitchRates = txRates.get(sid);
1066             ci.println("Bandwidth utilization (" + factoredSamples
1067                     * portTickNumber + " sec average) for switch "
1068                     + HexEncode.longToHexString(sid) + ":");
1069             if (thisSwitchRates == null) {
1070                 ci.println("Not available");
1071             } else {
1072                 for (Entry<Short, TxRates> entry : thisSwitchRates.entrySet()) {
1073                     ci.println("Port: " + entry.getKey() + ": "
1074                             + entry.getValue().getAverageTxRate() + " bps");
1075                 }
1076             }
1077         }
1078     }
1079
1080     public void _txratewindow(CommandInterpreter ci) {
1081         String averageWindow = ci.nextArgument();
1082         short seconds = 0;
1083         if (averageWindow == null) {
1084             ci.println("Insert the length in seconds of the median "
1085                     + "window for tx rate");
1086             ci.println("Current: " + factoredSamples * portTickNumber + " secs");
1087             return;
1088         }
1089         try {
1090             seconds = Short.valueOf(averageWindow);
1091         } catch (NumberFormatException e) {
1092             ci.println("Invalid period.");
1093         }
1094         OFStatisticsManager.factoredSamples = (short) (seconds / portTickNumber);
1095         ci.println("New: " + factoredSamples * portTickNumber + " secs");
1096     }
1097
1098     public void _ofstatsmgrintervals(CommandInterpreter ci) {
1099         String flowStatsInterv = ci.nextArgument();
1100         String portStatsInterv = ci.nextArgument();
1101         String descStatsInterv = ci.nextArgument();
1102         String tableStatsInterv = ci.nextArgument();
1103
1104         if (flowStatsInterv == null || portStatsInterv == null
1105                 || descStatsInterv == null) {
1106             ci.println("Usage: ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds)");
1107             ci.println("Current Values: fP=" + statisticsTickNumber + "sec pP="
1108                     + portTickNumber + "sec dP=" + descriptionTickNumber + "sec tP=" + tableTickNumber + " sec");
1109             return;
1110         }
1111         Short fP, pP, dP, tP;
1112         try {
1113             fP = Short.parseShort(flowStatsInterv);
1114             pP = Short.parseShort(portStatsInterv);
1115             dP = Short.parseShort(descStatsInterv);
1116             tP = Short.parseShort(tableStatsInterv);
1117         } catch (Exception e) {
1118             ci.println("Invalid format values: " + e.getMessage());
1119             return;
1120         }
1121
1122         if (pP <= 1 || fP <= 1 || dP <= 1 || tP <= 1) {
1123             ci.println("Invalid values. fP, pP, dP, tP have to be greater than 1.");
1124             return;
1125         }
1126
1127         statisticsTickNumber = fP;
1128         portTickNumber = pP;
1129         descriptionTickNumber = dP;
1130         tableTickNumber = tP;
1131
1132         ci.println("New Values: fP=" + statisticsTickNumber + "s pP="
1133                 + portTickNumber + "s dP=" + descriptionTickNumber + "s tP="
1134                 + tableTickNumber + "s");
1135     }
1136
1137     /**
1138      * This method retrieves user configurations from config.ini and updates
1139      * statisticsTickNumber/portTickNumber/descriptionTickNumber accordingly.
1140      */
1141     private void configStatsPollIntervals() {
1142         String fsStr = System.getProperty("of.flowStatsPollInterval");
1143         String psStr = System.getProperty("of.portStatsPollInterval");
1144         String dsStr = System.getProperty("of.descStatsPollInterval");
1145         String tsStr = System.getProperty("of.tableStatsPollInterval");
1146         Short fs, ps, ds, ts;
1147
1148         if (fsStr != null) {
1149             try {
1150                 fs = Short.parseShort(fsStr);
1151                 if (fs > 0) {
1152                     statisticsTickNumber = fs;
1153                 }
1154             } catch (Exception e) {
1155             }
1156         }
1157
1158         if (psStr != null) {
1159             try {
1160                 ps = Short.parseShort(psStr);
1161                 if (ps > 0) {
1162                     portTickNumber = ps;
1163                 }
1164             } catch (Exception e) {
1165             }
1166         }
1167
1168         if (dsStr != null) {
1169             try {
1170                 ds = Short.parseShort(dsStr);
1171                 if (ds > 0) {
1172                     descriptionTickNumber = ds;
1173                 }
1174             } catch (Exception e) {
1175             }
1176         }
1177
1178         if (tsStr != null) {
1179             try{
1180                 ts = Short.parseShort(tsStr);
1181                 if (ts > 0) {
1182                     tableTickNumber = ts;
1183                 }
1184             } catch (Exception e) {
1185             }
1186         }
1187     }
1188 }