OpenDaylight Controller functional modules.
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / OFStatisticsManager.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11
12 import java.nio.ByteBuffer;
13 import java.util.ArrayList;
14 import java.util.Collections;
15 import java.util.Deque;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Map.Entry;
21 import java.util.Set;
22 import java.util.Timer;
23 import java.util.TimerTask;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.LinkedBlockingDeque;
28 import java.util.concurrent.LinkedBlockingQueue;
29
30 import org.eclipse.osgi.framework.console.CommandInterpreter;
31 import org.eclipse.osgi.framework.console.CommandProvider;
32 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
33 import org.opendaylight.controller.protocol_plugin.openflow.IOFInventoryService;
34 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
36 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
37 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
38 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
39 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
40 import org.openflow.protocol.OFError;
41 import org.openflow.protocol.OFMatch;
42 import org.openflow.protocol.OFPort;
43 import org.openflow.protocol.OFStatisticsRequest;
44 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
45 import org.openflow.protocol.statistics.OFDescriptionStatistics;
46 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
47 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
48 import org.openflow.protocol.statistics.OFPortStatisticsReply;
49 import org.openflow.protocol.statistics.OFPortStatisticsRequest;
50 import org.openflow.protocol.statistics.OFQueueStatisticsRequest;
51 import org.openflow.protocol.statistics.OFStatistics;
52 import org.openflow.protocol.statistics.OFStatisticsType;
53 import org.openflow.protocol.statistics.OFVendorStatistics;
54 import org.openflow.util.HexString;
55 import org.osgi.framework.BundleContext;
56 import org.osgi.framework.FrameworkUtil;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 import org.opendaylight.controller.sal.core.Name;
61 import org.opendaylight.controller.sal.core.Node;
62 import org.opendaylight.controller.sal.core.NodeConnector;
63 import org.opendaylight.controller.sal.core.Property;
64 import org.opendaylight.controller.sal.core.UpdateType;
65 import org.opendaylight.controller.sal.utils.GlobalConstants;
66 import org.opendaylight.controller.sal.utils.HexEncode;
67 import org.opendaylight.controller.sal.utils.ServiceHelper;
68
69 /**
70  * It periodically polls the different OF statistics from the OF switches
71  * and caches them for quick retrieval for the above layers' modules
72  * It also provides an API to directly query the switch about the statistics
73  *
74  *
75  *
76  */
77 public class OFStatisticsManager implements IOFStatisticsManager,
78         IInventoryShimExternalListener, CommandProvider {
79     private static final Logger log = LoggerFactory
80             .getLogger(OFStatisticsManager.class);
81     private static final int initialSize = 64;
82     private static final long flowStatsPeriod = 10000;
83     private static final long descriptionStatsPeriod = 60000;
84     private static final long portStatsPeriod = 5000;
85     private long statisticsTimeout = 4000;
86     private static final long tickPeriod = 1000;
87     private static short statisticsTickNumber = (short) (flowStatsPeriod / tickPeriod);
88     private static short descriptionTickNumber = (short) (descriptionStatsPeriod / tickPeriod);
89     private static short portTickNumber = (short) (portStatsPeriod / tickPeriod);
90     private static short factoredSamples = (short) 2;
91     private static short counter = 1;
92     private IController controller = null;
93     private ConcurrentMap<Long, List<OFStatistics>> flowStatistics;
94     private ConcurrentMap<Long, List<OFStatistics>> descStatistics;
95     private ConcurrentMap<Long, List<OFStatistics>> portStatistics;
96     private List<OFStatistics> dummyList;
97     private ConcurrentMap<Long, StatisticsTicks> statisticsTimerTicks;
98     protected BlockingQueue<StatsRequest> pendingStatsRequests;
99     protected BlockingQueue<Long> switchPortStatsUpdated;
100     private Thread statisticsCollector;
101     private Thread txRatesUpdater;
102     private Timer statisticsTimer;
103     private TimerTask statisticsTimerTask;
104     private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
105     private Map<Long, String> switchNamesDB;
106     private Map<Long, Map<Short, TxRates>> txRates; // Per port sampled (every portStatsPeriod) transmit rate
107
108     /**
109      * The object containing the latest factoredSamples tx rate samples
110      * for a given switch port
111      */
112     protected class TxRates {
113         Deque<Long> sampledTxBytes; // contains the latest factoredSamples sampled transmitted bytes
114
115         public TxRates() {
116             sampledTxBytes = new LinkedBlockingDeque<Long>();
117         }
118
119         public void update(Long txBytes) {
120             /*
121              * Based on how many samples our average works on,
122              * we might have to remove the oldest sample
123              */
124             if (sampledTxBytes.size() == factoredSamples) {
125                 sampledTxBytes.removeLast();
126             }
127
128             // Add the latest sample to the top of the queue
129             sampledTxBytes.addFirst(txBytes);
130         }
131
132         /**
133          * Returns the average transmit rate in bps
134          * @return the average transmit rate [bps]
135          */
136         public long getAverageTxRate() {
137             long average = 0;
138             /*
139              * If we cannot provide the value for the time window length set
140              */
141             if (sampledTxBytes.size() < factoredSamples) {
142                 return average;
143             }
144             long increment = (long) (sampledTxBytes.getFirst() - sampledTxBytes
145                     .getLast());
146             long timePeriod = (long) (factoredSamples * portStatsPeriod)
147                     / (long) tickPeriod;
148             average = (8 * increment) / timePeriod;
149             return average;
150         }
151     }
152
153     public void setController(IController core) {
154         this.controller = core;
155     }
156
157     public void unsetController(IController core) {
158         if (this.controller == core) {
159             this.controller = null;
160         }
161     }
162
163     /**
164      * Function called by the dependency manager when all the required
165      * dependencies are satisfied
166      *
167      */
168     void init() {
169     }
170
171     /**
172      * Function called by the dependency manager when at least one
173      * dependency become unsatisfied or when the component is shutting
174      * down because for example bundle is being stopped.
175      *
176      */
177     void destroy() {
178     }
179
180     /**
181      * Function called by dependency manager after "init ()" is called
182      * and after the services provided by the class are registered in
183      * the service registry
184      *
185      */
186     void start() {
187         // Start managed timers
188         statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, tickPeriod);
189
190         // Start statistics collector thread
191         statisticsCollector.start();
192
193         // Start bandwidth utilization computer thread
194         txRatesUpdater.start();
195
196         // OSGI console
197         registerWithOSGIConsole();
198     }
199
200     /**
201      * Function called by the dependency manager before the services
202      * exported by the component are unregistered, this will be
203      * followed by a "destroy ()" calls
204      *
205      */
206     void stop() {
207         // Stop managed timers
208         statisticsTimer.cancel();
209     }
210
211     public OFStatisticsManager() {
212         flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
213         descStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
214         portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
215         dummyList = new ArrayList<OFStatistics>(1);
216         switchNamesDB = new HashMap<Long, String>();
217         statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(
218                 initialSize);
219         pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(
220                 initialSize);
221         switchPortStatsUpdated = new LinkedBlockingQueue<Long>(initialSize);
222         switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(
223                 initialSize);
224         txRates = new HashMap<Long, Map<Short, TxRates>>(initialSize);
225
226         // Initialize managed timers
227         statisticsTimer = new Timer();
228         statisticsTimerTask = new TimerTask() {
229             @Override
230             public void run() {
231                 decrementTicks();
232             }
233         };
234
235         // Initialize Statistics collector thread
236         statisticsCollector = new Thread(new Runnable() {
237             @Override
238             public void run() {
239                 while (true) {
240                     try {
241                         StatsRequest req = pendingStatsRequests.take();
242                         acquireStatistics(req.switchId, req.type,
243                                 statisticsTimeout);
244                     } catch (InterruptedException e) {
245                         log
246                                 .warn("Flow Statistics Collector thread interrupted");
247                     }
248                 }
249             }
250         }, "Statistics Collector");
251
252         // Initialize Tx Rate Updater thread
253         txRatesUpdater = new Thread(new Runnable() {
254             @Override
255             public void run() {
256                 while (true) {
257                     try {
258                         long switchId = switchPortStatsUpdated.take();
259                         updatePortsTxRate(switchId);
260                     } catch (InterruptedException e) {
261                         log.warn("TX Rate Updater thread interrupted");
262                     }
263                 }
264             }
265         }, "TX Rate Updater");
266
267     }
268
269     private void registerWithOSGIConsole() {
270         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
271                 .getBundleContext();
272         bundleContext.registerService(CommandProvider.class.getName(), this,
273                 null);
274     }
275
276     private static class StatsRequest {
277         protected Long switchId;
278         protected OFStatisticsType type;
279
280         public StatsRequest(Long d, OFStatisticsType t) {
281             switchId = d;
282             type = t;
283         }
284
285         public String toString() {
286             return "SReq = {switchId=" + switchId + ", type=" + type + "}";
287         }
288
289         @Override
290         public int hashCode() {
291             final int prime = 31;
292             int result = 1;
293             result = prime * result
294                     + ((switchId == null) ? 0 : switchId.hashCode());
295             result = prime * result + ((type == null) ? 0 : type.ordinal());
296             return result;
297         }
298
299         @Override
300         public boolean equals(Object obj) {
301             if (this == obj) {
302                 return true;
303             }
304             if (obj == null) {
305                 return false;
306             }
307             if (getClass() != obj.getClass()) {
308                 return false;
309             }
310             StatsRequest other = (StatsRequest) obj;
311             if (switchId == null) {
312                 if (other.switchId != null) {
313                     return false;
314                 }
315             } else if (!switchId.equals(other.switchId)) {
316                 return false;
317             }
318             if (type != other.type) {
319                 return false;
320             }
321             return true;
322         }
323     }
324
325     private void addStatisticsTicks(Long switchId) {
326         switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume switch supports Vendor extension stats
327         statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
328         log.info("Added Switch {} to target pool", HexString
329                 .toHexString(switchId.longValue()));
330     }
331
332     protected static class StatisticsTicks {
333         private short flowStatisticsTicks;
334         private short descriptionTicks;
335         private short portStatisticsTicks;
336
337         public StatisticsTicks(boolean scattered) {
338             if (scattered) {
339                 // scatter bursts by statisticsTickPeriod
340                 if (++counter < 0) {
341                     counter = 0;
342                 } // being paranoid here
343                 flowStatisticsTicks = (short) (1 + counter
344                         % statisticsTickNumber);
345                 descriptionTicks = (short) (1 + counter % descriptionTickNumber);
346                 portStatisticsTicks = (short) (1 + counter % portTickNumber);
347             } else {
348                 flowStatisticsTicks = statisticsTickNumber;
349                 descriptionTicks = descriptionTickNumber;
350                 portStatisticsTicks = portTickNumber;
351             }
352         }
353
354         public boolean decrementFlowTicksIsZero() {
355             // Please ensure no code is inserted between the if check and the flowStatisticsTicks reset
356             if (--flowStatisticsTicks == 0) {
357                 flowStatisticsTicks = statisticsTickNumber;
358                 return true;
359             }
360             return false;
361         }
362
363         public boolean decrementDescTicksIsZero() {
364             // Please ensure no code is inserted between the if check and the descriptionTicks reset
365             if (--descriptionTicks == 0) {
366                 descriptionTicks = descriptionTickNumber;
367                 return true;
368             }
369             return false;
370         }
371
372         public boolean decrementPortTicksIsZero() {
373             // Please ensure no code is inserted between the if check and the descriptionTicks reset
374             if (--portStatisticsTicks == 0) {
375                 portStatisticsTicks = portTickNumber;
376                 return true;
377             }
378             return false;
379         }
380
381         public String toString() {
382             return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks
383                     + ",pT=" + portStatisticsTicks + "}";
384         }
385     }
386
387     private void printInfoMessage(String type, StatsRequest request) {
388         log
389                 .info(
390                         type
391                                 + " stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
392                         new Object[] { HexString.toHexString(request.switchId),
393                                 pendingStatsRequests.size(),
394                                 statisticsCollector.getState().toString() });
395     }
396
397     protected void decrementTicks() {
398         StatsRequest request = null;
399         for (Map.Entry<Long, StatisticsTicks> entry : statisticsTimerTicks
400                 .entrySet()) {
401             StatisticsTicks clock = entry.getValue();
402             Long switchId = entry.getKey();
403             if (clock.decrementFlowTicksIsZero() == true) {
404                 request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ? new StatsRequest(
405                         switchId, OFStatisticsType.VENDOR)
406                         : new StatsRequest(switchId, OFStatisticsType.FLOW);
407                 // If a request for this switch is already in the queue, skip to add this new request
408                 if (!pendingStatsRequests.contains(request)
409                         && false == pendingStatsRequests.offer(request)) {
410                     printInfoMessage("Flow", request);
411                 }
412             }
413
414             if (clock.decrementDescTicksIsZero() == true) {
415                 request = new StatsRequest(switchId, OFStatisticsType.DESC);
416                 // If a request for this switch is already in the queue, skip to add this new request
417                 if (!pendingStatsRequests.contains(request)
418                         && false == pendingStatsRequests.offer(request)) {
419                     printInfoMessage("Description", request);
420                 }
421             }
422
423             if (clock.decrementPortTicksIsZero() == true) {
424                 request = new StatsRequest(switchId, OFStatisticsType.PORT);
425                 // If a request for this switch is already in the queue, skip to add this new request
426                 if (!pendingStatsRequests.contains(request)
427                         && false == pendingStatsRequests.offer(request)) {
428                     printInfoMessage("Port", request);
429                 }
430             }
431         }
432     }
433
434     private void removeStatsRequestTasks(Long switchId) {
435         log.info("Cleaning Statistics database for switch "
436                 + HexEncode.longToHexString(switchId));
437         // To be safe, let's attempt removal of both VENDOR and FLOW request. It does not hurt
438         pendingStatsRequests.remove(new StatsRequest(switchId,
439                 OFStatisticsType.VENDOR));
440         pendingStatsRequests.remove(new StatsRequest(switchId,
441                 OFStatisticsType.FLOW));
442         pendingStatsRequests.remove(new StatsRequest(switchId,
443                 OFStatisticsType.DESC));
444         pendingStatsRequests.remove(new StatsRequest(switchId,
445                 OFStatisticsType.PORT));
446         // Take care of the TX rate databases
447         switchPortStatsUpdated.remove(switchId);
448         txRates.remove(switchId);
449     }
450
451     private void clearFlowStatsAndTicks(Long switchId) {
452         statisticsTimerTicks.remove(switchId);
453         removeStatsRequestTasks(switchId);
454         flowStatistics.remove(switchId);
455         log.info("Statistics removed for switch "
456                 + HexString.toHexString(switchId));
457     }
458
459     private void acquireStatistics(Long switchId, OFStatisticsType statType,
460             long timeout) {
461
462         // Query the switch on all matches
463         List<OFStatistics> values = this.acquireStatistics(switchId, statType,
464                 null, timeout);
465
466         // Update local caching database if got a valid response
467         if (values != null && !values.isEmpty()) {
468             if ((statType == OFStatisticsType.FLOW)
469                     || (statType == OFStatisticsType.VENDOR)) {
470                 flowStatistics.put(switchId, values);
471             } else if (statType == OFStatisticsType.DESC) {
472                 if ((switchNamesDB != null)
473                         && switchNamesDB.containsKey(switchId)) {
474                     // Check if user manually configured a name for the switch
475                     for (OFStatistics entry : values) {
476                         OFDescriptionStatistics reply = (OFDescriptionStatistics) entry;
477                         reply.setSerialNumber(switchNamesDB.get(switchId));
478                     }
479                 }
480                 // check if notification is needed
481                 if (descStatistics.get(switchId) == null
482                         || !(descStatistics.get(switchId).get(0).equals(values
483                                 .get(0)))) {
484                     IOFInventoryService inventory = (IOFInventoryService) ServiceHelper
485                             .getInstance(IOFInventoryService.class,
486                                     GlobalConstants.DEFAULT.toString(), this);
487                     if (inventory != null) {
488                         // Notify Inventory Service about the name update
489                         Set<Property> propSet = new HashSet<Property>(1);
490                         Name name = new Name(((OFDescriptionStatistics) values
491                                 .get(0)).getSerialNumber());
492                         propSet.add(name);
493                         inventory.updateSwitchProperty(switchId, propSet);
494                     }
495                 }
496                 // overwrite cache
497                 descStatistics.put(switchId, values);
498             } else if (statType == OFStatisticsType.PORT) {
499                 // Overwrite cache with new port statistics for this switch
500                 portStatistics.put(switchId, values);
501
502                 // Wake up the thread which maintains the TX byte counters for each port
503                 switchPortStatsUpdated.offer(switchId);
504             }
505         }
506     }
507
508     /*
509      * Generic function to get the statistics form a OF switch
510      */
511     @SuppressWarnings("unchecked")
512     private List<OFStatistics> acquireStatistics(Long switchId,
513             OFStatisticsType statsType, Object target, long timeout) {
514         List<OFStatistics> values = null;
515         String type = null;
516         ISwitch sw = controller.getSwitch(switchId);
517
518         if (sw != null) {
519             OFStatisticsRequest req = new OFStatisticsRequest();
520             req.setStatisticType(statsType);
521             int requestLength = req.getLengthU();
522
523             if (statsType == OFStatisticsType.FLOW) {
524                 OFMatch match = null;
525                 if (target == null) {
526                     // All flows request
527                     match = new OFMatch();
528                     match.setWildcards(0xffffffff);
529                 } else if (!(target instanceof OFMatch)) {
530                     // Malformed request
531                     log.warn("Invalid target type for Flow stats request: "
532                             + target.getClass());
533                     return null;
534                 } else {
535                     // Specific flow request
536                     match = (OFMatch) target;
537                 }
538                 OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest();
539                 specificReq.setMatch(match);
540                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
541                 specificReq.setTableId((byte) 0xff);
542                 req.setStatistics(Collections
543                         .singletonList((OFStatistics) specificReq));
544                 requestLength += specificReq.getLength();
545                 type = "FLOW";
546             } else if (statsType == OFStatisticsType.VENDOR) {
547                 V6StatsRequest specificReq = new V6StatsRequest();
548                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
549                 specificReq.setTableId((byte) 0xff);
550                 req.setStatistics(Collections
551                         .singletonList((OFStatistics) specificReq));
552                 requestLength += specificReq.getLength();
553                 type = "VENDOR";
554             } else if (statsType == OFStatisticsType.AGGREGATE) {
555                 OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest();
556                 OFMatch match = new OFMatch();
557                 match.setWildcards(0xffffffff);
558                 specificReq.setMatch(match);
559                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
560                 specificReq.setTableId((byte) 0xff);
561                 req.setStatistics(Collections
562                         .singletonList((OFStatistics) specificReq));
563                 requestLength += specificReq.getLength();
564                 type = "AGGREGATE";
565             } else if (statsType == OFStatisticsType.PORT) {
566                 short targetPort;
567                 if (target == null) {
568                     // All ports request
569                     targetPort = (short) OFPort.OFPP_NONE.getValue();
570                 } else if (!(target instanceof Short)) {
571                     // Malformed request
572                     log.warn("Invalid target type for Port stats request: "
573                             + target.getClass());
574                     return null;
575                 } else {
576                     // Specific port request
577                     targetPort = (Short) target;
578                 }
579                 OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest();
580                 specificReq.setPortNumber(targetPort);
581                 req.setStatistics(Collections
582                         .singletonList((OFStatistics) specificReq));
583                 requestLength += specificReq.getLength();
584                 type = "PORT";
585             } else if (statsType == OFStatisticsType.QUEUE) {
586                 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
587                 specificReq.setPortNumber((short) OFPort.OFPP_ALL.getValue());
588                 specificReq.setQueueId(0xffffffff);
589                 req.setStatistics(Collections
590                         .singletonList((OFStatistics) specificReq));
591                 requestLength += specificReq.getLength();
592                 type = "QUEUE";
593             } else if (statsType == OFStatisticsType.DESC) {
594                 type = "DESC";
595             } else if (statsType == OFStatisticsType.TABLE) {
596                 type = "TABLE";
597             }
598             req.setLengthU(requestLength);
599             Object result = sw.getStatistics(req);
600
601             if (result == null) {
602                 log.warn("Request Timed Out for ({}) from switch {}", type,
603                         HexString.toHexString(switchId));
604             } else if (result instanceof OFError) {
605                 log.warn("Switch {} failed to handle ({}) stats request: "
606                         + Utils.getOFErrorString((OFError) result), HexString
607                         .toHexString(switchId), type);
608                 if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
609                     log
610                             .warn(
611                                     "Switching back to regular Flow stats requests for switch {}",
612                                     HexString.toHexString(switchId));
613                     this.switchSupportsVendorExtStats.put(switchId,
614                             Boolean.FALSE);
615                 }
616             } else {
617                 values = (List<OFStatistics>) result;
618             }
619         }
620         return values;
621     }
622
623     @Override
624     public List<OFStatistics> getOFFlowStatistics(Long switchId) {
625         List<OFStatistics> list = flowStatistics.get(switchId);
626
627         /*
628          *  Check on emptiness as interference between add and get is still
629          *  possible on the inner list (the concurrentMap entry's value)
630          */
631         return (list == null || list.isEmpty()) ? this.dummyList
632                 : (list.get(0) instanceof OFVendorStatistics) ? this
633                         .v6StatsListToOFStatsList(list) : list;
634     }
635
636     @Override
637     public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch) {
638         List<OFStatistics> statsList = flowStatistics.get(switchId);
639
640         /*
641          *  Check on emptiness as interference between add and get is still
642          *  possible on the inner list (the concurrentMap entry's value)
643          */
644         if (statsList == null || statsList.isEmpty()) {
645             return this.dummyList;
646         }
647
648         if (statsList.get(0) instanceof OFVendorStatistics) {
649             /*
650              * Caller could provide regular OF match when we
651              * instead pull the vendor statistics from this node
652              * Caller is not supposed to know whether this switch supports
653              * vendor extensions statistics requests
654              */
655             V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch
656                     : new V6Match(ofMatch);
657
658             List<OFStatistics> targetList = v6StatsListToOFStatsList(statsList);
659             for (OFStatistics stats : targetList) {
660                 V6StatsReply v6Stats = (V6StatsReply) stats;
661                 V6Match v6Match = v6Stats.getMatch();
662                 if (v6Match.equals(targetMatch)) {
663                     List<OFStatistics> list = new ArrayList<OFStatistics>();
664                     list.add(stats);
665                     return list;
666                 }
667             }
668         } else {
669             for (OFStatistics stats : statsList) {
670                 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
671                 if (flowStats.getMatch().equals(ofMatch)) {
672                     List<OFStatistics> list = new ArrayList<OFStatistics>();
673                     list.add(stats);
674                     return list;
675                 }
676             }
677         }
678         return this.dummyList;
679     }
680
681     /*
682      * Converts the v6 vendor statistics to the OFStatistics
683      */
684     private List<OFStatistics> v6StatsListToOFStatsList(
685             List<OFStatistics> statistics) {
686         List<OFStatistics> v6statistics = new ArrayList<OFStatistics>();
687         if (statistics != null && !statistics.isEmpty()) {
688             for (OFStatistics stats : statistics) {
689                 if (stats instanceof OFVendorStatistics) {
690                     List<OFStatistics> r = getV6ReplyStatistics((OFVendorStatistics) stats);
691                     if (r != null) {
692                         v6statistics.addAll(r);
693                     }
694                 }
695             }
696         }
697         return v6statistics;
698     }
699
700     private static List<OFStatistics> getV6ReplyStatistics(
701             OFVendorStatistics stat) {
702         int length = stat.getLength();
703         List<OFStatistics> results = new ArrayList<OFStatistics>();
704         if (length < 12)
705             return null; // Nicira Hdr is 12 bytes. We need atleast that much
706         ByteBuffer data = ByteBuffer.allocate(length);
707         stat.writeTo(data);
708         data.rewind();
709         log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}", HexString
710                 .toHexString(data.array()));
711
712         int vendor = data.getInt(); //first 4 bytes is vendor id.
713         if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) {
714             log
715                     .debug("Unexpected vendor id: 0x{}", Integer
716                             .toHexString(vendor));
717             return null;
718         } else {
719             //go ahead by 8 bytes which is 8 bytes of 0
720             data.getLong(); //should be all 0's
721             length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have been consumed
722         }
723
724         V6StatsReply v6statsreply;
725         int min_len;
726         while (length > 0) {
727             v6statsreply = new V6StatsReply();
728             min_len = v6statsreply.getLength();
729             if (length < v6statsreply.getLength())
730                 break;
731             v6statsreply.setActionFactory(stat.getActionFactory());
732             v6statsreply.readFrom(data);
733             if (v6statsreply.getLength() < min_len)
734                 break;
735             v6statsreply.setVendorId(vendor);
736             log.trace("V6StatsReply: {}", v6statsreply);
737             length -= v6statsreply.getLength();
738             results.add(v6statsreply);
739         }
740         return results;
741     }
742
743     @Override
744     public List<OFStatistics> queryStatistics(Long switchId,
745             OFStatisticsType statType, Object target, long timeout) {
746         /*
747          * Caller does not know and it is not supposed to know whether
748          * this switch supports vendor extension. We adjust the target for him
749          */
750         if (statType == OFStatisticsType.FLOW) {
751             if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
752                 statType = OFStatisticsType.VENDOR;
753             }
754         }
755
756         List<OFStatistics> list = this.acquireStatistics(switchId, statType,
757                 target, timeout);
758
759         return (list == null) ? null
760                 : (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list)
761                         : list;
762     }
763
764     @Override
765     public List<OFStatistics> getOFDescStatistics(Long switchId) {
766         if (!descStatistics.containsKey(switchId))
767             return this.dummyList;
768
769         return descStatistics.get(switchId);
770     }
771
772     @Override
773     public List<OFStatistics> getOFPortStatistics(Long switchId) {
774         if (!portStatistics.containsKey(switchId)) {
775             return this.dummyList;
776         }
777
778         return portStatistics.get(switchId);
779     }
780
781     @Override
782     public List<OFStatistics> getOFPortStatistics(Long switchId, short portId) {
783         if (!portStatistics.containsKey(switchId)) {
784             return this.dummyList;
785         }
786         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
787         for (OFStatistics stats : portStatistics.get(switchId)) {
788             if (((OFPortStatisticsReply) stats).getPortNumber() == portId) {
789                 list.add(stats);
790                 break;
791             }
792         }
793         return list;
794     }
795
796     @Override
797     public int getFlowsNumber(long switchId) {
798         return this.flowStatistics.get(switchId).size();
799     }
800
801     /*
802      * InventoryShim replay for us all the switch addition which happened before we were brought up
803      */
804     @Override
805     public void updateNode(Node node, UpdateType type, Set<Property> props) {
806         Long switchId = (Long) node.getID();
807         switch (type) {
808         case ADDED:
809             addStatisticsTicks(switchId);
810             break;
811         case REMOVED:
812             clearFlowStatsAndTicks(switchId);
813         default:
814         }
815     }
816
817     @Override
818     public void updateNodeConnector(NodeConnector nodeConnector,
819             UpdateType type, Set<Property> props) {
820         // No action
821     }
822
823     /**
824      * Update the cached port rates for this switch with the latest
825      * retrieved port transmit byte count
826      * @param switchId
827      */
828     private synchronized void updatePortsTxRate(long switchId) {
829         List<OFStatistics> newPortStatistics = this.portStatistics
830                 .get(switchId);
831         if (newPortStatistics == null) {
832             return;
833         }
834         Map<Short, TxRates> rates = this.txRates.get(switchId);
835         if (rates == null) {
836             // First time rates for this switch are added
837             rates = new HashMap<Short, TxRates>();
838             txRates.put(switchId, rates);
839         }
840         for (OFStatistics stats : newPortStatistics) {
841             OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats;
842             short port = newPortStat.getPortNumber();
843             TxRates portRatesHolder = rates.get(port);
844             if (portRatesHolder == null) {
845                 // First time rates for this port are added
846                 portRatesHolder = new TxRates();
847                 rates.put(port, portRatesHolder);
848             }
849             // Get and store the number of transmitted bytes for this port
850             // And handle the case where agent does not support the counter
851             long transmitBytes = newPortStat.getTransmitBytes();
852             long value = (transmitBytes < 0) ? 0 : transmitBytes;
853             portRatesHolder.update(value);
854         }
855     }
856
857     @Override
858     public synchronized long getTransmitRate(Long switchId, Short port) {
859         long average = 0;
860         if (switchId == null || port == null) {
861             return average;
862         }
863         Map<Short, TxRates> perSwitch = txRates.get(switchId);
864         if (perSwitch == null) {
865             return average;
866         }
867         TxRates portRates = perSwitch.get(port);
868         if (portRates == null) {
869             return average;
870         }
871         return portRates.getAverageTxRate();
872     }
873
874     /*
875      * Manual switch name configuration code
876      */
877     @Override
878     public String getHelp() {
879         StringBuffer help = new StringBuffer();
880         help.append("---OF Switch ID to Name Mapping---\n");
881         help
882                 .append("\t ofaddname              - Add a switchId to name mapping entry\n");
883         help
884                 .append("\t ofdeletename           - Delete a switchId from the mapping db\n");
885         help.append("\t ofprintnames           - Print the mapping db\n");
886         help.append("---OF Statistics Manager utilities---\n");
887         help
888                 .append("\t ofdumpstatsmgr         - Print Internal Stats Mgr db\n");
889         help
890                 .append("\t ofstatstimeout         - Change Statistics request's timeout value\n");
891         return help.toString();
892     }
893
894     private boolean isValidSwitchId(String switchId) {
895         String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$";
896         String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$";
897
898         return (switchId != null && (switchId.matches(regexDatapathID) || switchId
899                 .matches(regexDatapathIDLong)));
900     }
901
902     public long getSwitchIDLong(String switchId) {
903         int radix = 16;
904         String switchString = "0";
905
906         if (isValidSwitchId(switchId)) {
907             if (switchId.contains(":")) {
908                 // Handle the 00:00:AA:BB:CC:DD:EE:FF notation
909                 switchString = switchId.replace(":", "");
910             } else if (switchId.contains("-")) {
911                 // Handle the 00-00-AA-BB-CC-DD-EE-FF notation
912                 switchString = switchId.replace("-", "");
913             } else {
914                 // Handle the 0123456789ABCDEF notation
915                 switchString = switchId;
916             }
917         }
918         return Long.parseLong(switchString, radix);
919     }
920
921     public void _ofaddname(CommandInterpreter ci) {
922         if (switchNamesDB == null)
923             switchNamesDB = new HashMap<Long, String>();
924         String switchId = ci.nextArgument();
925         if (!isValidSwitchId(switchId)) {
926             ci.println("Please provide a valid SwithcId");
927             return;
928         }
929         Long sid = getSwitchIDLong(switchId);
930         String switchName = ci.nextArgument();
931         if (switchName == null) {
932             ci.println("Please provide a valid Switch name");
933             return;
934         }
935         switchNamesDB.put(sid, switchName);
936     }
937
938     public void _ofdeletename(CommandInterpreter ci) {
939         if (switchNamesDB == null)
940             return;
941         String switchId = ci.nextArgument();
942         if (!isValidSwitchId(switchId)) {
943             ci.println("Please provide a valid SwitchId");
944             return;
945         }
946         Long sid = getSwitchIDLong(switchId);
947         switchNamesDB.remove(sid);
948     }
949
950     public void _ofprintnames(CommandInterpreter ci) {
951         if (switchNamesDB == null)
952             return;
953         for (Long key : switchNamesDB.keySet()) {
954             ci.println(key + " -> " + switchNamesDB.get(key) + "\n");
955         }
956     }
957
958     /*
959      * Internal information dump code
960      */
961     private String prettyPrintSwitchMap(ConcurrentMap<Long, StatisticsTicks> map) {
962         StringBuffer buffer = new StringBuffer();
963         buffer.append("{");
964         for (Entry<Long, StatisticsTicks> entry : map.entrySet()) {
965             buffer.append(HexString.toHexString(entry.getKey()) + "="
966                     + entry.getValue().toString() + " ");
967         }
968         buffer.append("}");
969         return buffer.toString();
970     }
971
972     public void _ofdumpstatsmgr(CommandInterpreter ci) {
973         ci.println("Global Counter: " + counter);
974         ci
975                 .println("Timer Ticks: "
976                         + prettyPrintSwitchMap(statisticsTimerTicks));
977         ci.println("PendingStatsQueue: " + pendingStatsRequests);
978         ci.println("PendingStatsQueue size: " + pendingStatsRequests.size());
979         ci.println("Stats Collector alive: " + statisticsCollector.isAlive());
980         ci.println("Stats Collector State: "
981                 + statisticsCollector.getState().toString());
982         ci.println("StatsTimer: " + statisticsTimer.toString());
983         ci.println("Flow Stats Period: " + statisticsTickNumber + " s");
984         ci.println("Desc Stats Period: " + descriptionTickNumber + " s");
985         ci.println("Port Stats Period: " + portTickNumber + " s");
986         ci.println("Stats request timeout: " + Float.valueOf(statisticsTimeout)
987                 / 1000.0 + " s");
988     }
989
990     public void _ofstatstimeout(CommandInterpreter ci) {
991         String timeoutString = ci.nextArgument();
992         if (timeoutString == null || !timeoutString.matches("^[0-9]+$")) {
993             ci.println("Invalid value. Has to be numeric.");
994             return;
995         }
996
997         long newTimeout = Long.valueOf(timeoutString);
998         if (newTimeout < 50 || newTimeout > 60000) {
999             ci.println("Invalid value. Valid range is [50-60000]ms");
1000             return;
1001         }
1002         this.statisticsTimeout = newTimeout;
1003         ci.println("New value: " + statisticsTimeout + " ms");
1004     }
1005
1006     public void _resetSwitchCapability(CommandInterpreter ci) {
1007         String sidString = ci.nextArgument();
1008         Long sid = null;
1009         if (sidString == null) {
1010             ci.println("Insert the switch id (numeric value)");
1011             return;
1012         }
1013         try {
1014             sid = Long.valueOf(sidString);
1015             this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE);
1016             ci.println("Vendor capability for switch " + sid + " set to "
1017                     + this.switchSupportsVendorExtStats.get(sid));
1018         } catch (NumberFormatException e) {
1019             ci.println("Invalid switch id. Has to be numeric.");
1020         }
1021
1022     }
1023
1024     public void _ofbw(CommandInterpreter ci) {
1025         String sidString = ci.nextArgument();
1026         Long sid = null;
1027         if (sidString == null) {
1028             ci.println("Insert the switch id (numeric value)");
1029             return;
1030         }
1031         try {
1032             sid = Long.valueOf(sidString);
1033         } catch (NumberFormatException e) {
1034             ci.println("Invalid switch id. Has to be numeric.");
1035         }
1036         if (sid != null) {
1037             Map<Short, TxRates> thisSwitchRates = txRates.get(sid);
1038             ci.println("Bandwidth utilization (" + factoredSamples
1039                     * portTickNumber + " sec average) for switch "
1040                     + HexEncode.longToHexString(sid) + ":");
1041             if (thisSwitchRates == null) {
1042                 ci.println("Not available");
1043             } else {
1044                 for (Entry<Short, TxRates> entry : thisSwitchRates.entrySet()) {
1045                     ci.println("Port: " + entry.getKey() + ": "
1046                             + entry.getValue().getAverageTxRate() + " bps");
1047                 }
1048             }
1049         }
1050     }
1051
1052     public void _txratewindow(CommandInterpreter ci) {
1053         String averageWindow = ci.nextArgument();
1054         short seconds = 0;
1055         if (averageWindow == null) {
1056             ci
1057                     .println("Insert the length in seconds of the average window for tx rate");
1058             ci
1059                     .println("Current: " + factoredSamples * portTickNumber
1060                             + " secs");
1061             return;
1062         }
1063         try {
1064             seconds = Short.valueOf(averageWindow);
1065         } catch (NumberFormatException e) {
1066             ci.println("Invalid period.");
1067         }
1068         OFStatisticsManager.factoredSamples = (short) (seconds / portTickNumber);
1069         ci.println("New: " + factoredSamples * portTickNumber + " secs");
1070     }
1071
1072 }