Merge "Refactor frontend JS"
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / OFStatisticsManager.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11
12 import java.nio.ByteBuffer;
13 import java.util.ArrayList;
14 import java.util.Collections;
15 import java.util.Deque;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Map.Entry;
21 import java.util.Set;
22 import java.util.Timer;
23 import java.util.TimerTask;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.LinkedBlockingDeque;
28 import java.util.concurrent.LinkedBlockingQueue;
29
30 import org.eclipse.osgi.framework.console.CommandInterpreter;
31 import org.eclipse.osgi.framework.console.CommandProvider;
32 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
33 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
34 import org.opendaylight.controller.protocol_plugin.openflow.IStatisticsListener;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
36 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
37 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
38 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
39 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
40 import org.opendaylight.controller.sal.core.Node;
41 import org.opendaylight.controller.sal.core.NodeConnector;
42 import org.opendaylight.controller.sal.core.Property;
43 import org.opendaylight.controller.sal.core.UpdateType;
44 import org.opendaylight.controller.sal.utils.HexEncode;
45 import org.openflow.protocol.OFError;
46 import org.openflow.protocol.OFMatch;
47 import org.openflow.protocol.OFPort;
48 import org.openflow.protocol.OFStatisticsRequest;
49 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
50 import org.openflow.protocol.statistics.OFDescriptionStatistics;
51 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
52 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
53 import org.openflow.protocol.statistics.OFPortStatisticsReply;
54 import org.openflow.protocol.statistics.OFPortStatisticsRequest;
55 import org.openflow.protocol.statistics.OFQueueStatisticsRequest;
56 import org.openflow.protocol.statistics.OFStatistics;
57 import org.openflow.protocol.statistics.OFStatisticsType;
58 import org.openflow.protocol.statistics.OFVendorStatistics;
59 import org.openflow.util.HexString;
60 import org.osgi.framework.BundleContext;
61 import org.osgi.framework.FrameworkUtil;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 /**
66  * It periodically polls the different OF statistics from the OF switches
67  * and caches them for quick retrieval for the above layers' modules
68  * It also provides an API to directly query the switch about the statistics
69  */
70 public class OFStatisticsManager implements IOFStatisticsManager,
71         IInventoryShimExternalListener, CommandProvider {
72     private static final Logger log = LoggerFactory
73             .getLogger(OFStatisticsManager.class);
74     private static final int initialSize = 64;
75     private static final long flowStatsPeriod = 10000;
76     private static final long descriptionStatsPeriod = 60000;
77     private static final long portStatsPeriod = 5000;
78     private static final long tickPeriod = 1000;
79     private static short statisticsTickNumber = (short) (flowStatsPeriod / tickPeriod);
80     private static short descriptionTickNumber = (short) (descriptionStatsPeriod / tickPeriod);
81     private static short portTickNumber = (short) (portStatsPeriod / tickPeriod);
82     private static short factoredSamples = (short) 2;
83     private static short counter = 1;
84     private IController controller = null;
85     private ConcurrentMap<Long, List<OFStatistics>> flowStatistics;
86     private ConcurrentMap<Long, List<OFStatistics>> descStatistics;
87     private ConcurrentMap<Long, List<OFStatistics>> portStatistics;
88     private List<OFStatistics> dummyList;
89     private ConcurrentMap<Long, StatisticsTicks> statisticsTimerTicks;
90     protected BlockingQueue<StatsRequest> pendingStatsRequests;
91     protected BlockingQueue<Long> switchPortStatsUpdated;
92     private Thread statisticsCollector;
93     private Thread txRatesUpdater;
94     private Timer statisticsTimer;
95     private TimerTask statisticsTimerTask;
96     private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
97     private Map<Long, Map<Short, TxRates>> txRates; // Per port sampled (every portStatsPeriod) transmit rate
98     private Set<IStatisticsListener> descriptionListeners;
99
100     /**
101      * The object containing the latest factoredSamples tx rate samples
102      * for a given switch port
103      */
104     protected class TxRates {
105         Deque<Long> sampledTxBytes; // contains the latest factoredSamples sampled transmitted bytes
106
107         public TxRates() {
108             sampledTxBytes = new LinkedBlockingDeque<Long>();
109         }
110
111         public void update(Long txBytes) {
112             /*
113              * Based on how many samples our average works on,
114              * we might have to remove the oldest sample
115              */
116             if (sampledTxBytes.size() == factoredSamples) {
117                 sampledTxBytes.removeLast();
118             }
119
120             // Add the latest sample to the top of the queue
121             sampledTxBytes.addFirst(txBytes);
122         }
123
124         /**
125          * Returns the average transmit rate in bps
126          * @return the average transmit rate [bps]
127          */
128         public long getAverageTxRate() {
129             long average = 0;
130             /*
131              * If we cannot provide the value for the time window length set
132              */
133             if (sampledTxBytes.size() < factoredSamples) {
134                 return average;
135             }
136             long increment = (long) (sampledTxBytes.getFirst() - sampledTxBytes
137                     .getLast());
138             long timePeriod = (long) (factoredSamples * portStatsPeriod)
139                     / (long) tickPeriod;
140             average = (8L * increment) / timePeriod;
141             return average;
142         }
143     }
144
145     public void setController(IController core) {
146         this.controller = core;
147     }
148
149     public void unsetController(IController core) {
150         if (this.controller == core) {
151             this.controller = null;
152         }
153     }
154
155     /**
156      * Function called by the dependency manager when all the required
157      * dependencies are satisfied
158      *
159      */
160     void init() {
161         flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
162         descStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
163         portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
164         dummyList = new ArrayList<OFStatistics>(1);
165         statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(
166                 initialSize);
167         pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(
168                 initialSize);
169         switchPortStatsUpdated = new LinkedBlockingQueue<Long>(initialSize);
170         switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(
171                 initialSize);
172         txRates = new HashMap<Long, Map<Short, TxRates>>(initialSize);
173         descriptionListeners = new HashSet<IStatisticsListener>();
174
175         // Initialize managed timers
176         statisticsTimer = new Timer();
177         statisticsTimerTask = new TimerTask() {
178             @Override
179             public void run() {
180                 decrementTicks();
181             }
182         };
183
184         // Initialize Statistics collector thread
185         statisticsCollector = new Thread(new Runnable() {
186             @Override
187             public void run() {
188                 while (true) {
189                     try {
190                         StatsRequest req = pendingStatsRequests.take();
191                         acquireStatistics(req.switchId, req.type);
192                     } catch (InterruptedException e) {
193                         log.warn("Flow Statistics Collector thread " +
194                                         "interrupted");
195                     }
196                 }
197             }
198         }, "Statistics Collector");
199
200         // Initialize Tx Rate Updater thread
201         txRatesUpdater = new Thread(new Runnable() {
202             @Override
203             public void run() {
204                 while (true) {
205                     try {
206                         long switchId = switchPortStatsUpdated.take();
207                         updatePortsTxRate(switchId);
208                     } catch (InterruptedException e) {
209                         log.warn("TX Rate Updater thread interrupted");
210                     }
211                 }
212             }
213         }, "TX Rate Updater");
214     }
215
216     /**
217      * Function called by the dependency manager when at least one
218      * dependency become unsatisfied or when the component is shutting
219      * down because for example bundle is being stopped.
220      *
221      */
222     void destroy() {
223     }
224
225     /**
226      * Function called by dependency manager after "init ()" is called
227      * and after the services provided by the class are registered in
228      * the service registry
229      *
230      */
231     void start() {
232         // Start managed timers
233         statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, tickPeriod);
234
235         // Start statistics collector thread
236         statisticsCollector.start();
237
238         // Start bandwidth utilization computer thread
239         txRatesUpdater.start();
240
241         // OSGI console
242         registerWithOSGIConsole();
243     }
244
245     /**
246      * Function called by the dependency manager before the services
247      * exported by the component are unregistered, this will be
248      * followed by a "destroy ()" calls
249      *
250      */
251     void stop() {
252         // Stop managed timers
253         statisticsTimer.cancel();
254     }
255
256     public void setStatisticsListener(IStatisticsListener s) {
257         this.descriptionListeners.add(s);
258     }
259     
260     public void unsetStatisticsListener(IStatisticsListener s) {
261         if (s != null) {
262                 this.descriptionListeners.remove(s);
263         }
264     }
265     
266     private void registerWithOSGIConsole() {
267         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
268                 .getBundleContext();
269         bundleContext.registerService(CommandProvider.class.getName(), this,
270                 null);
271     }
272
273     private static class StatsRequest {
274         protected Long switchId;
275         protected OFStatisticsType type;
276
277         public StatsRequest(Long d, OFStatisticsType t) {
278             switchId = d;
279             type = t;
280         }
281
282         public String toString() {
283             return "SReq = {switchId=" + switchId + ", type=" + type + "}";
284         }
285
286         @Override
287         public int hashCode() {
288             final int prime = 31;
289             int result = 1;
290             result = prime * result
291                     + ((switchId == null) ? 0 : switchId.hashCode());
292             result = prime * result + ((type == null) ? 0 : type.ordinal());
293             return result;
294         }
295
296         @Override
297         public boolean equals(Object obj) {
298             if (this == obj) {
299                 return true;
300             }
301             if (obj == null) {
302                 return false;
303             }
304             if (getClass() != obj.getClass()) {
305                 return false;
306             }
307             StatsRequest other = (StatsRequest) obj;
308             if (switchId == null) {
309                 if (other.switchId != null) {
310                     return false;
311                 }
312             } else if (!switchId.equals(other.switchId)) {
313                 return false;
314             }
315             if (type != other.type) {
316                 return false;
317             }
318             return true;
319         }
320     }
321
322     private void addStatisticsTicks(Long switchId) {
323         switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume switch supports Vendor extension stats
324         statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
325         log.info("Added Switch {} to target pool", HexString
326                 .toHexString(switchId.longValue()));
327     }
328
329     protected static class StatisticsTicks {
330         private short flowStatisticsTicks;
331         private short descriptionTicks;
332         private short portStatisticsTicks;
333
334         public StatisticsTicks(boolean scattered) {
335             if (scattered) {
336                 // scatter bursts by statisticsTickPeriod
337                 if (++counter < 0) {
338                     counter = 0;
339                 } // being paranoid here
340                 flowStatisticsTicks = (short) (1 + counter
341                         % statisticsTickNumber);
342                 descriptionTicks = (short) (1 + counter % descriptionTickNumber);
343                 portStatisticsTicks = (short) (1 + counter % portTickNumber);
344             } else {
345                 flowStatisticsTicks = statisticsTickNumber;
346                 descriptionTicks = descriptionTickNumber;
347                 portStatisticsTicks = portTickNumber;
348             }
349         }
350
351         public boolean decrementFlowTicksIsZero() {
352             // Please ensure no code is inserted between the if check and the flowStatisticsTicks reset
353             if (--flowStatisticsTicks == 0) {
354                 flowStatisticsTicks = statisticsTickNumber;
355                 return true;
356             }
357             return false;
358         }
359
360         public boolean decrementDescTicksIsZero() {
361             // Please ensure no code is inserted between the if check and the descriptionTicks reset
362             if (--descriptionTicks == 0) {
363                 descriptionTicks = descriptionTickNumber;
364                 return true;
365             }
366             return false;
367         }
368
369         public boolean decrementPortTicksIsZero() {
370             // Please ensure no code is inserted between the if check and the descriptionTicks reset
371             if (--portStatisticsTicks == 0) {
372                 portStatisticsTicks = portTickNumber;
373                 return true;
374             }
375             return false;
376         }
377
378         public String toString() {
379             return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks
380                     + ",pT=" + portStatisticsTicks + "}";
381         }
382     }
383
384     private void printInfoMessage(String type, StatsRequest request) {
385         log
386                 .info(
387                         type
388                                 + " stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
389                         new Object[] { HexString.toHexString(request.switchId),
390                                 pendingStatsRequests.size(),
391                                 statisticsCollector.getState().toString() });
392     }
393
394     protected void decrementTicks() {
395         StatsRequest request = null;
396         for (Map.Entry<Long, StatisticsTicks> entry : statisticsTimerTicks
397                 .entrySet()) {
398             StatisticsTicks clock = entry.getValue();
399             Long switchId = entry.getKey();
400             if (clock.decrementFlowTicksIsZero() == true) {
401                 request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ? new StatsRequest(
402                         switchId, OFStatisticsType.VENDOR)
403                         : new StatsRequest(switchId, OFStatisticsType.FLOW);
404                 // If a request for this switch is already in the queue, skip to add this new request
405                 if (!pendingStatsRequests.contains(request)
406                         && false == pendingStatsRequests.offer(request)) {
407                     printInfoMessage("Flow", request);
408                 }
409             }
410
411             if (clock.decrementDescTicksIsZero() == true) {
412                 request = new StatsRequest(switchId, OFStatisticsType.DESC);
413                 // If a request for this switch is already in the queue, skip to add this new request
414                 if (!pendingStatsRequests.contains(request)
415                         && false == pendingStatsRequests.offer(request)) {
416                     printInfoMessage("Description", request);
417                 }
418             }
419
420             if (clock.decrementPortTicksIsZero() == true) {
421                 request = new StatsRequest(switchId, OFStatisticsType.PORT);
422                 // If a request for this switch is already in the queue, skip to add this new request
423                 if (!pendingStatsRequests.contains(request)
424                         && false == pendingStatsRequests.offer(request)) {
425                     printInfoMessage("Port", request);
426                 }
427             }
428         }
429     }
430
431     private void removeStatsRequestTasks(Long switchId) {
432         log.info("Cleaning Statistics database for switch "
433                 + HexEncode.longToHexString(switchId));
434         // To be safe, let's attempt removal of both VENDOR and FLOW request. It does not hurt
435         pendingStatsRequests.remove(new StatsRequest(switchId,
436                 OFStatisticsType.VENDOR));
437         pendingStatsRequests.remove(new StatsRequest(switchId,
438                 OFStatisticsType.FLOW));
439         pendingStatsRequests.remove(new StatsRequest(switchId,
440                 OFStatisticsType.DESC));
441         pendingStatsRequests.remove(new StatsRequest(switchId,
442                 OFStatisticsType.PORT));
443         // Take care of the TX rate databases
444         switchPortStatsUpdated.remove(switchId);
445         txRates.remove(switchId);
446     }
447
448     private void clearFlowStatsAndTicks(Long switchId) {
449         statisticsTimerTicks.remove(switchId);
450         removeStatsRequestTasks(switchId);
451         flowStatistics.remove(switchId);
452         log.info("Statistics removed for switch "
453                 + HexString.toHexString(switchId));
454     }
455
456     private void acquireStatistics(Long switchId, OFStatisticsType statType) {
457
458         // Query the switch on all matches
459         List<OFStatistics> values = this.acquireStatistics(switchId, statType,
460                 null);
461
462         // Update local caching database if got a valid response
463         if (values != null && !values.isEmpty()) {
464             if ((statType == OFStatisticsType.FLOW)
465                     || (statType == OFStatisticsType.VENDOR)) {
466                 flowStatistics.put(switchId, values);
467             } else if (statType == OFStatisticsType.DESC) {
468                 // Notify who may be interested in a description change
469                         notifyDescriptionListeners(switchId, values);
470                 
471                 // Overwrite cache
472                 descStatistics.put(switchId, values);
473             } else if (statType == OFStatisticsType.PORT) {
474                 // Overwrite cache with new port statistics for this switch
475                 portStatistics.put(switchId, values);
476
477                 // Wake up the thread which maintains the TX byte counters for each port
478                 switchPortStatsUpdated.offer(switchId);
479             }
480         }
481     }
482
483     private void notifyDescriptionListeners(Long switchId,
484                                                                 List<OFStatistics> values) {
485                 for (IStatisticsListener l : this.descriptionListeners) {
486                         l.descriptionRefreshed(switchId, 
487                                         ((OFDescriptionStatistics)values.get(0)));
488                 }
489     }
490     
491     /*
492      * Generic function to get the statistics form a OF switch
493      */
494     @SuppressWarnings("unchecked")
495     private List<OFStatistics> acquireStatistics(Long switchId,
496             OFStatisticsType statsType, Object target) {
497         List<OFStatistics> values = null;
498         String type = null;
499         ISwitch sw = controller.getSwitch(switchId);
500
501         if (sw != null) {
502             OFStatisticsRequest req = new OFStatisticsRequest();
503             req.setStatisticType(statsType);
504             int requestLength = req.getLengthU();
505
506             if (statsType == OFStatisticsType.FLOW) {
507                 OFMatch match = null;
508                 if (target == null) {
509                     // All flows request
510                     match = new OFMatch();
511                     match.setWildcards(0xffffffff);
512                 } else if (!(target instanceof OFMatch)) {
513                     // Malformed request
514                     log.warn("Invalid target type for Flow stats request: "
515                             + target.getClass());
516                     return null;
517                 } else {
518                     // Specific flow request
519                     match = (OFMatch) target;
520                 }
521                 OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest();
522                 specificReq.setMatch(match);
523                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
524                 specificReq.setTableId((byte) 0xff);
525                 req.setStatistics(Collections
526                         .singletonList((OFStatistics) specificReq));
527                 requestLength += specificReq.getLength();
528                 type = "FLOW";
529             } else if (statsType == OFStatisticsType.VENDOR) {
530                 V6StatsRequest specificReq = new V6StatsRequest();
531                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
532                 specificReq.setTableId((byte) 0xff);
533                 req.setStatistics(Collections
534                         .singletonList((OFStatistics) specificReq));
535                 requestLength += specificReq.getLength();
536                 type = "VENDOR";
537             } else if (statsType == OFStatisticsType.AGGREGATE) {
538                 OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest();
539                 OFMatch match = new OFMatch();
540                 match.setWildcards(0xffffffff);
541                 specificReq.setMatch(match);
542                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
543                 specificReq.setTableId((byte) 0xff);
544                 req.setStatistics(Collections
545                         .singletonList((OFStatistics) specificReq));
546                 requestLength += specificReq.getLength();
547                 type = "AGGREGATE";
548             } else if (statsType == OFStatisticsType.PORT) {
549                 short targetPort;
550                 if (target == null) {
551                     // All ports request
552                     targetPort = (short) OFPort.OFPP_NONE.getValue();
553                 } else if (!(target instanceof Short)) {
554                     // Malformed request
555                     log.warn("Invalid target type for Port stats request: "
556                             + target.getClass());
557                     return null;
558                 } else {
559                     // Specific port request
560                     targetPort = (Short) target;
561                 }
562                 OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest();
563                 specificReq.setPortNumber(targetPort);
564                 req.setStatistics(Collections
565                         .singletonList((OFStatistics) specificReq));
566                 requestLength += specificReq.getLength();
567                 type = "PORT";
568             } else if (statsType == OFStatisticsType.QUEUE) {
569                 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
570                 specificReq.setPortNumber((short) OFPort.OFPP_ALL.getValue());
571                 specificReq.setQueueId(0xffffffff);
572                 req.setStatistics(Collections
573                         .singletonList((OFStatistics) specificReq));
574                 requestLength += specificReq.getLength();
575                 type = "QUEUE";
576             } else if (statsType == OFStatisticsType.DESC) {
577                 type = "DESC";
578             } else if (statsType == OFStatisticsType.TABLE) {
579                 type = "TABLE";
580             }
581             req.setLengthU(requestLength);
582             Object result = sw.getStatistics(req);
583
584             if (result == null) {
585                 log.warn("Request Timed Out for ({}) from switch {}", type,
586                         HexString.toHexString(switchId));
587             } else if (result instanceof OFError) {
588                 log.warn("Switch {} failed to handle ({}) stats request: "
589                         + Utils.getOFErrorString((OFError) result), HexString
590                         .toHexString(switchId), type);
591                 if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
592                     log
593                             .warn(
594                                     "Switching back to regular Flow stats requests for switch {}",
595                                     HexString.toHexString(switchId));
596                     this.switchSupportsVendorExtStats.put(switchId,
597                             Boolean.FALSE);
598                 }
599             } else {
600                 values = (List<OFStatistics>) result;
601             }
602         }
603         return values;
604     }
605
606     @Override
607     public List<OFStatistics> getOFFlowStatistics(Long switchId) {
608         List<OFStatistics> list = flowStatistics.get(switchId);
609
610         /*
611          *  Check on emptiness as interference between add and get is still
612          *  possible on the inner list (the concurrentMap entry's value)
613          */
614         return (list == null || list.isEmpty()) ? this.dummyList
615                 : (list.get(0) instanceof OFVendorStatistics) ? this
616                         .v6StatsListToOFStatsList(list) : list;
617     }
618
619     @Override
620     public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch) {
621         List<OFStatistics> statsList = flowStatistics.get(switchId);
622
623         /*
624          *  Check on emptiness as interference between add and get is still
625          *  possible on the inner list (the concurrentMap entry's value)
626          */
627         if (statsList == null || statsList.isEmpty()) {
628             return this.dummyList;
629         }
630
631         if (statsList.get(0) instanceof OFVendorStatistics) {
632             /*
633              * Caller could provide regular OF match when we
634              * instead pull the vendor statistics from this node
635              * Caller is not supposed to know whether this switch supports
636              * vendor extensions statistics requests
637              */
638             V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch
639                     : new V6Match(ofMatch);
640
641             List<OFStatistics> targetList = v6StatsListToOFStatsList(statsList);
642             for (OFStatistics stats : targetList) {
643                 V6StatsReply v6Stats = (V6StatsReply) stats;
644                 V6Match v6Match = v6Stats.getMatch();
645                 if (v6Match.equals(targetMatch)) {
646                     List<OFStatistics> list = new ArrayList<OFStatistics>();
647                     list.add(stats);
648                     return list;
649                 }
650             }
651         } else {
652             for (OFStatistics stats : statsList) {
653                 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
654                 if (flowStats.getMatch().equals(ofMatch)) {
655                     List<OFStatistics> list = new ArrayList<OFStatistics>();
656                     list.add(stats);
657                     return list;
658                 }
659             }
660         }
661         return this.dummyList;
662     }
663
664     /*
665      * Converts the v6 vendor statistics to the OFStatistics
666      */
667     private List<OFStatistics> v6StatsListToOFStatsList(
668             List<OFStatistics> statistics) {
669         List<OFStatistics> v6statistics = new ArrayList<OFStatistics>();
670         if (statistics != null && !statistics.isEmpty()) {
671             for (OFStatistics stats : statistics) {
672                 if (stats instanceof OFVendorStatistics) {
673                     List<OFStatistics> r = getV6ReplyStatistics((OFVendorStatistics) stats);
674                     if (r != null) {
675                         v6statistics.addAll(r);
676                     }
677                 }
678             }
679         }
680         return v6statistics;
681     }
682
683     private static List<OFStatistics> getV6ReplyStatistics(
684             OFVendorStatistics stat) {
685         int length = stat.getLength();
686         List<OFStatistics> results = new ArrayList<OFStatistics>();
687         if (length < 12)
688             return null; // Nicira Hdr is 12 bytes. We need atleast that much
689         ByteBuffer data = ByteBuffer.allocate(length);
690         stat.writeTo(data);
691         data.rewind();
692         log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}", HexString
693                 .toHexString(data.array()));
694
695         int vendor = data.getInt(); //first 4 bytes is vendor id.
696         if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) {
697             log
698                     .debug("Unexpected vendor id: 0x{}", Integer
699                             .toHexString(vendor));
700             return null;
701         } else {
702             //go ahead by 8 bytes which is 8 bytes of 0
703             data.getLong(); //should be all 0's
704             length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have been consumed
705         }
706
707         V6StatsReply v6statsreply;
708         int min_len;
709         while (length > 0) {
710             v6statsreply = new V6StatsReply();
711             min_len = v6statsreply.getLength();
712             if (length < v6statsreply.getLength())
713                 break;
714             v6statsreply.setActionFactory(stat.getActionFactory());
715             v6statsreply.readFrom(data);
716             if (v6statsreply.getLength() < min_len)
717                 break;
718             v6statsreply.setVendorId(vendor);
719             log.trace("V6StatsReply: {}", v6statsreply);
720             length -= v6statsreply.getLength();
721             results.add(v6statsreply);
722         }
723         return results;
724     }
725
726     @Override
727     public List<OFStatistics> queryStatistics(Long switchId,
728             OFStatisticsType statType, Object target) {
729         /*
730          * Caller does not know and it is not supposed to know whether
731          * this switch supports vendor extension. We adjust the target for him
732          */
733         if (statType == OFStatisticsType.FLOW) {
734             if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
735                 statType = OFStatisticsType.VENDOR;
736             }
737         }
738
739         List<OFStatistics> list = this.acquireStatistics(switchId, statType,
740                 target);
741
742         return (list == null) ? null
743                 : (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list)
744                         : list;
745     }
746
747     @Override
748     public List<OFStatistics> getOFDescStatistics(Long switchId) {
749         if (!descStatistics.containsKey(switchId))
750             return this.dummyList;
751
752         return descStatistics.get(switchId);
753     }
754
755     @Override
756     public List<OFStatistics> getOFPortStatistics(Long switchId) {
757         if (!portStatistics.containsKey(switchId)) {
758             return this.dummyList;
759         }
760
761         return portStatistics.get(switchId);
762     }
763
764     @Override
765     public List<OFStatistics> getOFPortStatistics(Long switchId, short portId) {
766         if (!portStatistics.containsKey(switchId)) {
767             return this.dummyList;
768         }
769         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
770         for (OFStatistics stats : portStatistics.get(switchId)) {
771             if (((OFPortStatisticsReply) stats).getPortNumber() == portId) {
772                 list.add(stats);
773                 break;
774             }
775         }
776         return list;
777     }
778
779     @Override
780     public int getFlowsNumber(long switchId) {
781         return this.flowStatistics.get(switchId).size();
782     }
783
784     /*
785      * InventoryShim replay for us all the switch addition which happened before we were brought up
786      */
787     @Override
788     public void updateNode(Node node, UpdateType type, Set<Property> props) {
789         Long switchId = (Long) node.getID();
790         switch (type) {
791         case ADDED:
792             addStatisticsTicks(switchId);
793             break;
794         case REMOVED:
795             clearFlowStatsAndTicks(switchId);
796         default:
797         }
798     }
799
800     @Override
801     public void updateNodeConnector(NodeConnector nodeConnector,
802             UpdateType type, Set<Property> props) {
803         // No action
804     }
805
806     /**
807      * Update the cached port rates for this switch with the latest
808      * retrieved port transmit byte count
809      * @param switchId
810      */
811     private synchronized void updatePortsTxRate(long switchId) {
812         List<OFStatistics> newPortStatistics = this.portStatistics
813                 .get(switchId);
814         if (newPortStatistics == null) {
815             return;
816         }
817         Map<Short, TxRates> rates = this.txRates.get(switchId);
818         if (rates == null) {
819             // First time rates for this switch are added
820             rates = new HashMap<Short, TxRates>();
821             txRates.put(switchId, rates);
822         }
823         for (OFStatistics stats : newPortStatistics) {
824             OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats;
825             short port = newPortStat.getPortNumber();
826             TxRates portRatesHolder = rates.get(port);
827             if (portRatesHolder == null) {
828                 // First time rates for this port are added
829                 portRatesHolder = new TxRates();
830                 rates.put(port, portRatesHolder);
831             }
832             // Get and store the number of transmitted bytes for this port
833             // And handle the case where agent does not support the counter
834             long transmitBytes = newPortStat.getTransmitBytes();
835             long value = (transmitBytes < 0) ? 0 : transmitBytes;
836             portRatesHolder.update(value);
837         }
838     }
839
840     @Override
841     public synchronized long getTransmitRate(Long switchId, Short port) {
842         long average = 0;
843         if (switchId == null || port == null) {
844             return average;
845         }
846         Map<Short, TxRates> perSwitch = txRates.get(switchId);
847         if (perSwitch == null) {
848             return average;
849         }
850         TxRates portRates = perSwitch.get(port);
851         if (portRates == null) {
852             return average;
853         }
854         return portRates.getAverageTxRate();
855     }
856
857     /*
858      * Manual switch name configuration code
859      */
860     @Override
861     public String getHelp() {
862         StringBuffer help = new StringBuffer();
863         help.append("---OF Statistics Manager utilities---\n");
864         help.append("\t ofdumpstatsmgr         - " + 
865                                 "Print Internal Stats Mgr db\n");
866         return help.toString();
867     }
868
869     private boolean isValidSwitchId(String switchId) {
870         String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$";
871         String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$";
872
873         return (switchId != null && (switchId.matches(regexDatapathID) || switchId
874                 .matches(regexDatapathIDLong)));
875     }
876
877     public long getSwitchIDLong(String switchId) {
878         int radix = 16;
879         String switchString = "0";
880
881         if (isValidSwitchId(switchId)) {
882             if (switchId.contains(":")) {
883                 // Handle the 00:00:AA:BB:CC:DD:EE:FF notation
884                 switchString = switchId.replace(":", "");
885             } else if (switchId.contains("-")) {
886                 // Handle the 00-00-AA-BB-CC-DD-EE-FF notation
887                 switchString = switchId.replace("-", "");
888             } else {
889                 // Handle the 0123456789ABCDEF notation
890                 switchString = switchId;
891             }
892         }
893         return Long.parseLong(switchString, radix);
894     }
895
896     /*
897      * Internal information dump code
898      */
899     private String prettyPrintSwitchMap(ConcurrentMap<Long, StatisticsTicks> map) {
900         StringBuffer buffer = new StringBuffer();
901         buffer.append("{");
902         for (Entry<Long, StatisticsTicks> entry : map.entrySet()) {
903             buffer.append(HexString.toHexString(entry.getKey()) + "="
904                     + entry.getValue().toString() + " ");
905         }
906         buffer.append("}");
907         return buffer.toString();
908     }
909
910     public void _ofdumpstatsmgr(CommandInterpreter ci) {
911         ci.println("Global Counter: " + counter);
912         ci
913                 .println("Timer Ticks: "
914                         + prettyPrintSwitchMap(statisticsTimerTicks));
915         ci.println("PendingStatsQueue: " + pendingStatsRequests);
916         ci.println("PendingStatsQueue size: " + pendingStatsRequests.size());
917         ci.println("Stats Collector alive: " + statisticsCollector.isAlive());
918         ci.println("Stats Collector State: "
919                 + statisticsCollector.getState().toString());
920         ci.println("StatsTimer: " + statisticsTimer.toString());
921         ci.println("Flow Stats Period: " + statisticsTickNumber + " s");
922         ci.println("Desc Stats Period: " + descriptionTickNumber + " s");
923         ci.println("Port Stats Period: " + portTickNumber + " s");
924     }
925
926     public void _resetSwitchCapability(CommandInterpreter ci) {
927         String sidString = ci.nextArgument();
928         Long sid = null;
929         if (sidString == null) {
930             ci.println("Insert the switch id (numeric value)");
931             return;
932         }
933         try {
934             sid = Long.valueOf(sidString);
935             this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE);
936             ci.println("Vendor capability for switch " + sid + " set to "
937                     + this.switchSupportsVendorExtStats.get(sid));
938         } catch (NumberFormatException e) {
939             ci.println("Invalid switch id. Has to be numeric.");
940         }
941
942     }
943
944     public void _ofbw(CommandInterpreter ci) {
945         String sidString = ci.nextArgument();
946         Long sid = null;
947         if (sidString == null) {
948             ci.println("Insert the switch id (numeric value)");
949             return;
950         }
951         try {
952             sid = Long.valueOf(sidString);
953         } catch (NumberFormatException e) {
954             ci.println("Invalid switch id. Has to be numeric.");
955         }
956         if (sid != null) {
957             Map<Short, TxRates> thisSwitchRates = txRates.get(sid);
958             ci.println("Bandwidth utilization (" + factoredSamples
959                     * portTickNumber + " sec average) for switch "
960                     + HexEncode.longToHexString(sid) + ":");
961             if (thisSwitchRates == null) {
962                 ci.println("Not available");
963             } else {
964                 for (Entry<Short, TxRates> entry : thisSwitchRates.entrySet()) {
965                     ci.println("Port: " + entry.getKey() + ": "
966                             + entry.getValue().getAverageTxRate() + " bps");
967                 }
968             }
969         }
970     }
971
972     public void _txratewindow(CommandInterpreter ci) {
973         String averageWindow = ci.nextArgument();
974         short seconds = 0;
975         if (averageWindow == null) {
976             ci.println("Insert the length in seconds of the median " + 
977                         "window for tx rate");
978             ci.println("Current: " + factoredSamples * portTickNumber
979                             + " secs");
980             return;
981         }
982         try {
983             seconds = Short.valueOf(averageWindow);
984         } catch (NumberFormatException e) {
985             ci.println("Invalid period.");
986         }
987         OFStatisticsManager.factoredSamples = (short) (seconds/portTickNumber);
988         ci.println("New: " + factoredSamples * portTickNumber + " secs");
989     }
990
991     public void _ofstatsmgrintervals(CommandInterpreter ci) {
992         String flowStatsInterv = ci.nextArgument();
993         String portStatsInterv = ci.nextArgument();
994         
995         if (flowStatsInterv == null || portStatsInterv == null) {
996
997             ci.println("Usage: ostatsmgrintervals <fP> <pP> (in seconds)");
998             ci.println("Current Values: fP=" + statisticsTickNumber +
999                         "s pP=" + portTickNumber + "s");
1000             return;
1001         }
1002         Short fP, pP;
1003         try {
1004                 fP = Short.parseShort(flowStatsInterv);
1005                 pP = Short.parseShort(portStatsInterv);
1006         } catch (Exception e) {
1007                 ci.println("Invalid format values: " + e.getMessage());
1008                 return;
1009         }
1010
1011         if (pP <= 1 || fP <=1) {
1012                 ci.println("Invalid values. fP and pP have to be greater than 1.");
1013                 return;
1014         }
1015         
1016         statisticsTickNumber = fP;
1017         portTickNumber = pP;
1018         
1019         ci.println("New Values: fP=" + statisticsTickNumber +
1020                         "s pP=" + portTickNumber + "s");
1021     }
1022
1023 }