Merge "Adding tests for clustering services. Removed first slash for infinispan-confi...
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / OFStatisticsManager.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.nio.ByteBuffer;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.Deque;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Map.Entry;
20 import java.util.Set;
21 import java.util.Timer;
22 import java.util.TimerTask;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.LinkedBlockingDeque;
27 import java.util.concurrent.LinkedBlockingQueue;
28
29 import org.eclipse.osgi.framework.console.CommandInterpreter;
30 import org.eclipse.osgi.framework.console.CommandProvider;
31 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
32 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
33 import org.opendaylight.controller.protocol_plugin.openflow.IStatisticsListener;
34 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
36 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
37 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
38 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
39 import org.opendaylight.controller.sal.core.Node;
40 import org.opendaylight.controller.sal.core.NodeConnector;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.core.UpdateType;
43 import org.opendaylight.controller.sal.utils.HexEncode;
44 import org.openflow.protocol.OFError;
45 import org.openflow.protocol.OFMatch;
46 import org.openflow.protocol.OFPort;
47 import org.openflow.protocol.OFStatisticsRequest;
48 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
49 import org.openflow.protocol.statistics.OFDescriptionStatistics;
50 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
51 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
52 import org.openflow.protocol.statistics.OFPortStatisticsReply;
53 import org.openflow.protocol.statistics.OFPortStatisticsRequest;
54 import org.openflow.protocol.statistics.OFQueueStatisticsRequest;
55 import org.openflow.protocol.statistics.OFStatistics;
56 import org.openflow.protocol.statistics.OFStatisticsType;
57 import org.openflow.protocol.statistics.OFVendorStatistics;
58 import org.openflow.util.HexString;
59 import org.osgi.framework.BundleContext;
60 import org.osgi.framework.FrameworkUtil;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 /**
65  * It periodically polls the different OF statistics from the OF switches and
66  * caches them for quick retrieval for the above layers' modules It also
67  * provides an API to directly query the switch about the statistics
68  */
69 public class OFStatisticsManager implements IOFStatisticsManager,
70         IInventoryShimExternalListener, CommandProvider {
71     private static final Logger log = LoggerFactory
72             .getLogger(OFStatisticsManager.class);
73     private static final int initialSize = 64;
74     private static final long flowStatsPeriod = 10000;
75     private static final long descriptionStatsPeriod = 60000;
76     private static final long portStatsPeriod = 5000;
77     private static final long tickPeriod = 1000;
78     private static short statisticsTickNumber = (short) (flowStatsPeriod / tickPeriod);
79     private static short descriptionTickNumber = (short) (descriptionStatsPeriod / tickPeriod);
80     private static short portTickNumber = (short) (portStatsPeriod / tickPeriod);
81     private static short factoredSamples = (short) 2;
82     private static short counter = 1;
83     private IController controller = null;
84     private ConcurrentMap<Long, List<OFStatistics>> flowStatistics;
85     private ConcurrentMap<Long, List<OFStatistics>> descStatistics;
86     private ConcurrentMap<Long, List<OFStatistics>> portStatistics;
87     private List<OFStatistics> dummyList;
88     private ConcurrentMap<Long, StatisticsTicks> statisticsTimerTicks;
89     protected BlockingQueue<StatsRequest> pendingStatsRequests;
90     protected BlockingQueue<Long> switchPortStatsUpdated;
91     private Thread statisticsCollector;
92     private Thread txRatesUpdater;
93     private Timer statisticsTimer;
94     private TimerTask statisticsTimerTask;
95     private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
96     private Map<Long, Map<Short, TxRates>> txRates; // Per port sampled (every
97                                                     // portStatsPeriod) transmit
98                                                     // rate
99     private Set<IStatisticsListener> descriptionListeners;
100
101     /**
102      * The object containing the latest factoredSamples tx rate samples for a
103      * given switch port
104      */
105     protected class TxRates {
106         Deque<Long> sampledTxBytes; // contains the latest factoredSamples
107                                     // sampled transmitted bytes
108
109         public TxRates() {
110             sampledTxBytes = new LinkedBlockingDeque<Long>();
111         }
112
113         public void update(Long txBytes) {
114             /*
115              * Based on how many samples our average works on, we might have to
116              * remove the oldest sample
117              */
118             if (sampledTxBytes.size() == factoredSamples) {
119                 sampledTxBytes.removeLast();
120             }
121
122             // Add the latest sample to the top of the queue
123             sampledTxBytes.addFirst(txBytes);
124         }
125
126         /**
127          * Returns the average transmit rate in bps
128          * 
129          * @return the average transmit rate [bps]
130          */
131         public long getAverageTxRate() {
132             long average = 0;
133             /*
134              * If we cannot provide the value for the time window length set
135              */
136             if (sampledTxBytes.size() < factoredSamples) {
137                 return average;
138             }
139             long increment = (long) (sampledTxBytes.getFirst() - sampledTxBytes
140                     .getLast());
141             long timePeriod = (long) (factoredSamples * portStatsPeriod)
142                     / (long) tickPeriod;
143             average = (8L * increment) / timePeriod;
144             return average;
145         }
146     }
147
148     public void setController(IController core) {
149         this.controller = core;
150     }
151
152     public void unsetController(IController core) {
153         if (this.controller == core) {
154             this.controller = null;
155         }
156     }
157
158     /**
159      * Function called by the dependency manager when all the required
160      * dependencies are satisfied
161      * 
162      */
163     void init() {
164         flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
165         descStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
166         portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
167         dummyList = new ArrayList<OFStatistics>(1);
168         statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(
169                 initialSize);
170         pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(
171                 initialSize);
172         switchPortStatsUpdated = new LinkedBlockingQueue<Long>(initialSize);
173         switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(
174                 initialSize);
175         txRates = new HashMap<Long, Map<Short, TxRates>>(initialSize);
176         descriptionListeners = new HashSet<IStatisticsListener>();
177
178         // Initialize managed timers
179         statisticsTimer = new Timer();
180         statisticsTimerTask = new TimerTask() {
181             @Override
182             public void run() {
183                 decrementTicks();
184             }
185         };
186
187         // Initialize Statistics collector thread
188         statisticsCollector = new Thread(new Runnable() {
189             @Override
190             public void run() {
191                 while (true) {
192                     try {
193                         StatsRequest req = pendingStatsRequests.take();
194                         acquireStatistics(req.switchId, req.type);
195                     } catch (InterruptedException e) {
196                         log.warn("Flow Statistics Collector thread "
197                                 + "interrupted", e);
198                     }
199                 }
200             }
201         }, "Statistics Collector");
202
203         // Initialize Tx Rate Updater thread
204         txRatesUpdater = new Thread(new Runnable() {
205             @Override
206             public void run() {
207                 while (true) {
208                     try {
209                         long switchId = switchPortStatsUpdated.take();
210                         updatePortsTxRate(switchId);
211                     } catch (InterruptedException e) {
212                         log.warn("TX Rate Updater thread interrupted", e);
213                     }
214                 }
215             }
216         }, "TX Rate Updater");
217     }
218
219     /**
220      * Function called by the dependency manager when at least one dependency
221      * become unsatisfied or when the component is shutting down because for
222      * example bundle is being stopped.
223      * 
224      */
225     void destroy() {
226     }
227
228     /**
229      * Function called by dependency manager after "init ()" is called and after
230      * the services provided by the class are registered in the service registry
231      * 
232      */
233     void start() {
234         // Start managed timers
235         statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, tickPeriod);
236
237         // Start statistics collector thread
238         statisticsCollector.start();
239
240         // Start bandwidth utilization computer thread
241         txRatesUpdater.start();
242
243         // OSGI console
244         registerWithOSGIConsole();
245     }
246
247     /**
248      * Function called by the dependency manager before the services exported by
249      * the component are unregistered, this will be followed by a "destroy ()"
250      * calls
251      * 
252      */
253     void stop() {
254         // Stop managed timers
255         statisticsTimer.cancel();
256     }
257
258     public void setStatisticsListener(IStatisticsListener s) {
259         this.descriptionListeners.add(s);
260     }
261
262     public void unsetStatisticsListener(IStatisticsListener s) {
263         if (s != null) {
264             this.descriptionListeners.remove(s);
265         }
266     }
267
268     private void registerWithOSGIConsole() {
269         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
270                 .getBundleContext();
271         bundleContext.registerService(CommandProvider.class.getName(), this,
272                 null);
273     }
274
275     private static class StatsRequest {
276         protected Long switchId;
277         protected OFStatisticsType type;
278
279         public StatsRequest(Long d, OFStatisticsType t) {
280             switchId = d;
281             type = t;
282         }
283
284         public String toString() {
285             return "SReq = {switchId=" + switchId + ", type=" + type + "}";
286         }
287
288         @Override
289         public int hashCode() {
290             final int prime = 31;
291             int result = 1;
292             result = prime * result
293                     + ((switchId == null) ? 0 : switchId.hashCode());
294             result = prime * result + ((type == null) ? 0 : type.ordinal());
295             return result;
296         }
297
298         @Override
299         public boolean equals(Object obj) {
300             if (this == obj) {
301                 return true;
302             }
303             if (obj == null) {
304                 return false;
305             }
306             if (getClass() != obj.getClass()) {
307                 return false;
308             }
309             StatsRequest other = (StatsRequest) obj;
310             if (switchId == null) {
311                 if (other.switchId != null) {
312                     return false;
313                 }
314             } else if (!switchId.equals(other.switchId)) {
315                 return false;
316             }
317             if (type != other.type) {
318                 return false;
319             }
320             return true;
321         }
322     }
323
324     private void addStatisticsTicks(Long switchId) {
325         switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume
326                                                                   // switch
327                                                                   // supports
328                                                                   // Vendor
329                                                                   // extension
330                                                                   // stats
331         statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
332         log.info("Added Switch {} to target pool",
333                 HexString.toHexString(switchId.longValue()));
334     }
335
336     protected static class StatisticsTicks {
337         private short flowStatisticsTicks;
338         private short descriptionTicks;
339         private short portStatisticsTicks;
340
341         public StatisticsTicks(boolean scattered) {
342             if (scattered) {
343                 // scatter bursts by statisticsTickPeriod
344                 if (++counter < 0) {
345                     counter = 0;
346                 } // being paranoid here
347                 flowStatisticsTicks = (short) (1 + counter
348                         % statisticsTickNumber);
349                 descriptionTicks = (short) (1 + counter % descriptionTickNumber);
350                 portStatisticsTicks = (short) (1 + counter % portTickNumber);
351             } else {
352                 flowStatisticsTicks = statisticsTickNumber;
353                 descriptionTicks = descriptionTickNumber;
354                 portStatisticsTicks = portTickNumber;
355             }
356         }
357
358         public boolean decrementFlowTicksIsZero() {
359             // Please ensure no code is inserted between the if check and the
360             // flowStatisticsTicks reset
361             if (--flowStatisticsTicks == 0) {
362                 flowStatisticsTicks = statisticsTickNumber;
363                 return true;
364             }
365             return false;
366         }
367
368         public boolean decrementDescTicksIsZero() {
369             // Please ensure no code is inserted between the if check and the
370             // descriptionTicks reset
371             if (--descriptionTicks == 0) {
372                 descriptionTicks = descriptionTickNumber;
373                 return true;
374             }
375             return false;
376         }
377
378         public boolean decrementPortTicksIsZero() {
379             // Please ensure no code is inserted between the if check and the
380             // descriptionTicks reset
381             if (--portStatisticsTicks == 0) {
382                 portStatisticsTicks = portTickNumber;
383                 return true;
384             }
385             return false;
386         }
387
388         public String toString() {
389             return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks
390                     + ",pT=" + portStatisticsTicks + "}";
391         }
392     }
393
394     private void printInfoMessage(String type, StatsRequest request) {
395         log.info(
396                 "{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
397                 new Object[] { type, HexString.toHexString(request.switchId),
398                         pendingStatsRequests.size(),
399                         statisticsCollector.getState().toString() });
400     }
401
402     protected void decrementTicks() {
403         StatsRequest request = null;
404         for (Map.Entry<Long, StatisticsTicks> entry : statisticsTimerTicks
405                 .entrySet()) {
406             StatisticsTicks clock = entry.getValue();
407             Long switchId = entry.getKey();
408             if (clock.decrementFlowTicksIsZero() == true) {
409                 request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ? new StatsRequest(
410                         switchId, OFStatisticsType.VENDOR) : new StatsRequest(
411                         switchId, OFStatisticsType.FLOW);
412                 // If a request for this switch is already in the queue, skip to
413                 // add this new request
414                 if (!pendingStatsRequests.contains(request)
415                         && false == pendingStatsRequests.offer(request)) {
416                     printInfoMessage("Flow", request);
417                 }
418             }
419
420             if (clock.decrementDescTicksIsZero() == true) {
421                 request = new StatsRequest(switchId, OFStatisticsType.DESC);
422                 // If a request for this switch is already in the queue, skip to
423                 // add this new request
424                 if (!pendingStatsRequests.contains(request)
425                         && false == pendingStatsRequests.offer(request)) {
426                     printInfoMessage("Description", request);
427                 }
428             }
429
430             if (clock.decrementPortTicksIsZero() == true) {
431                 request = new StatsRequest(switchId, OFStatisticsType.PORT);
432                 // If a request for this switch is already in the queue, skip to
433                 // add this new request
434                 if (!pendingStatsRequests.contains(request)
435                         && false == pendingStatsRequests.offer(request)) {
436                     printInfoMessage("Port", request);
437                 }
438             }
439         }
440     }
441
442     private void removeStatsRequestTasks(Long switchId) {
443         log.info("Cleaning Statistics database for switch {}",
444                 HexEncode.longToHexString(switchId));
445         // To be safe, let's attempt removal of both VENDOR and FLOW request. It
446         // does not hurt
447         pendingStatsRequests.remove(new StatsRequest(switchId,
448                 OFStatisticsType.VENDOR));
449         pendingStatsRequests.remove(new StatsRequest(switchId,
450                 OFStatisticsType.FLOW));
451         pendingStatsRequests.remove(new StatsRequest(switchId,
452                 OFStatisticsType.DESC));
453         pendingStatsRequests.remove(new StatsRequest(switchId,
454                 OFStatisticsType.PORT));
455         // Take care of the TX rate databases
456         switchPortStatsUpdated.remove(switchId);
457         txRates.remove(switchId);
458     }
459
460     private void clearFlowStatsAndTicks(Long switchId) {
461         statisticsTimerTicks.remove(switchId);
462         removeStatsRequestTasks(switchId);
463         flowStatistics.remove(switchId);
464         log.info("Statistics removed for switch {}",
465                 HexString.toHexString(switchId));
466     }
467
468     private void acquireStatistics(Long switchId, OFStatisticsType statType) {
469
470         // Query the switch on all matches
471         List<OFStatistics> values = this.acquireStatistics(switchId, statType,
472                 null);
473
474         // Update local caching database if got a valid response
475         if (values != null && !values.isEmpty()) {
476             if ((statType == OFStatisticsType.FLOW)
477                     || (statType == OFStatisticsType.VENDOR)) {
478                 flowStatistics.put(switchId, values);
479             } else if (statType == OFStatisticsType.DESC) {
480                 // Notify who may be interested in a description change
481                 notifyDescriptionListeners(switchId, values);
482
483                 // Overwrite cache
484                 descStatistics.put(switchId, values);
485             } else if (statType == OFStatisticsType.PORT) {
486                 // Overwrite cache with new port statistics for this switch
487                 portStatistics.put(switchId, values);
488
489                 // Wake up the thread which maintains the TX byte counters for
490                 // each port
491                 switchPortStatsUpdated.offer(switchId);
492             }
493         }
494     }
495
496     private void notifyDescriptionListeners(Long switchId,
497             List<OFStatistics> values) {
498         for (IStatisticsListener l : this.descriptionListeners) {
499             l.descriptionRefreshed(switchId,
500                     ((OFDescriptionStatistics) values.get(0)));
501         }
502     }
503
504     /*
505      * Generic function to get the statistics form a OF switch
506      */
507     @SuppressWarnings("unchecked")
508     private List<OFStatistics> acquireStatistics(Long switchId,
509             OFStatisticsType statsType, Object target) {
510         List<OFStatistics> values = null;
511         String type = null;
512         ISwitch sw = controller.getSwitch(switchId);
513
514         if (sw != null) {
515             OFStatisticsRequest req = new OFStatisticsRequest();
516             req.setStatisticType(statsType);
517             int requestLength = req.getLengthU();
518
519             if (statsType == OFStatisticsType.FLOW) {
520                 OFMatch match = null;
521                 if (target == null) {
522                     // All flows request
523                     match = new OFMatch();
524                     match.setWildcards(0xffffffff);
525                 } else if (!(target instanceof OFMatch)) {
526                     // Malformed request
527                     log.warn("Invalid target type for Flow stats request: {}",
528                             target.getClass());
529                     return null;
530                 } else {
531                     // Specific flow request
532                     match = (OFMatch) target;
533                 }
534                 OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest();
535                 specificReq.setMatch(match);
536                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
537                 specificReq.setTableId((byte) 0xff);
538                 req.setStatistics(Collections
539                         .singletonList((OFStatistics) specificReq));
540                 requestLength += specificReq.getLength();
541                 type = "FLOW";
542             } else if (statsType == OFStatisticsType.VENDOR) {
543                 V6StatsRequest specificReq = new V6StatsRequest();
544                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
545                 specificReq.setTableId((byte) 0xff);
546                 req.setStatistics(Collections
547                         .singletonList((OFStatistics) specificReq));
548                 requestLength += specificReq.getLength();
549                 type = "VENDOR";
550             } else if (statsType == OFStatisticsType.AGGREGATE) {
551                 OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest();
552                 OFMatch match = new OFMatch();
553                 match.setWildcards(0xffffffff);
554                 specificReq.setMatch(match);
555                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
556                 specificReq.setTableId((byte) 0xff);
557                 req.setStatistics(Collections
558                         .singletonList((OFStatistics) specificReq));
559                 requestLength += specificReq.getLength();
560                 type = "AGGREGATE";
561             } else if (statsType == OFStatisticsType.PORT) {
562                 short targetPort;
563                 if (target == null) {
564                     // All ports request
565                     targetPort = (short) OFPort.OFPP_NONE.getValue();
566                 } else if (!(target instanceof Short)) {
567                     // Malformed request
568                     log.warn("Invalid target type for Port stats request: {}",
569                             target.getClass());
570                     return null;
571                 } else {
572                     // Specific port request
573                     targetPort = (Short) target;
574                 }
575                 OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest();
576                 specificReq.setPortNumber(targetPort);
577                 req.setStatistics(Collections
578                         .singletonList((OFStatistics) specificReq));
579                 requestLength += specificReq.getLength();
580                 type = "PORT";
581             } else if (statsType == OFStatisticsType.QUEUE) {
582                 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
583                 specificReq.setPortNumber((short) OFPort.OFPP_ALL.getValue());
584                 specificReq.setQueueId(0xffffffff);
585                 req.setStatistics(Collections
586                         .singletonList((OFStatistics) specificReq));
587                 requestLength += specificReq.getLength();
588                 type = "QUEUE";
589             } else if (statsType == OFStatisticsType.DESC) {
590                 type = "DESC";
591             } else if (statsType == OFStatisticsType.TABLE) {
592                 type = "TABLE";
593             }
594             req.setLengthU(requestLength);
595             Object result = sw.getStatistics(req);
596
597             if (result == null) {
598                 log.warn("Request Timed Out for ({}) from switch {}", type,
599                         HexString.toHexString(switchId));
600             } else if (result instanceof OFError) {
601                 log.warn("Switch {} failed to handle ({}) stats request: {}",
602                         new Object[] { HexString.toHexString(switchId), type,
603                                 Utils.getOFErrorString((OFError) result) });
604                 if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
605                     log.warn(
606                             "Switching back to regular Flow stats requests for switch {}",
607                             HexString.toHexString(switchId));
608                     this.switchSupportsVendorExtStats.put(switchId,
609                             Boolean.FALSE);
610                 }
611             } else {
612                 values = (List<OFStatistics>) result;
613             }
614         }
615         return values;
616     }
617
618     @Override
619     public List<OFStatistics> getOFFlowStatistics(Long switchId) {
620         List<OFStatistics> list = flowStatistics.get(switchId);
621
622         /*
623          * Check on emptiness as interference between add and get is still
624          * possible on the inner list (the concurrentMap entry's value)
625          */
626         return (list == null || list.isEmpty()) ? this.dummyList
627                 : (list.get(0) instanceof OFVendorStatistics) ? this
628                         .v6StatsListToOFStatsList(list) : list;
629     }
630
631     @Override
632     public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch) {
633         List<OFStatistics> statsList = flowStatistics.get(switchId);
634
635         /*
636          * Check on emptiness as interference between add and get is still
637          * possible on the inner list (the concurrentMap entry's value)
638          */
639         if (statsList == null || statsList.isEmpty()) {
640             return this.dummyList;
641         }
642
643         if (statsList.get(0) instanceof OFVendorStatistics) {
644             /*
645              * Caller could provide regular OF match when we instead pull the
646              * vendor statistics from this node Caller is not supposed to know
647              * whether this switch supports vendor extensions statistics
648              * requests
649              */
650             V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch
651                     : new V6Match(ofMatch);
652
653             List<OFStatistics> targetList = v6StatsListToOFStatsList(statsList);
654             for (OFStatistics stats : targetList) {
655                 V6StatsReply v6Stats = (V6StatsReply) stats;
656                 V6Match v6Match = v6Stats.getMatch();
657                 if (v6Match.equals(targetMatch)) {
658                     List<OFStatistics> list = new ArrayList<OFStatistics>();
659                     list.add(stats);
660                     return list;
661                 }
662             }
663         } else {
664             for (OFStatistics stats : statsList) {
665                 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
666                 if (flowStats.getMatch().equals(ofMatch)) {
667                     List<OFStatistics> list = new ArrayList<OFStatistics>();
668                     list.add(stats);
669                     return list;
670                 }
671             }
672         }
673         return this.dummyList;
674     }
675
676     /*
677      * Converts the v6 vendor statistics to the OFStatistics
678      */
679     private List<OFStatistics> v6StatsListToOFStatsList(
680             List<OFStatistics> statistics) {
681         List<OFStatistics> v6statistics = new ArrayList<OFStatistics>();
682         if (statistics != null && !statistics.isEmpty()) {
683             for (OFStatistics stats : statistics) {
684                 if (stats instanceof OFVendorStatistics) {
685                     List<OFStatistics> r = getV6ReplyStatistics((OFVendorStatistics) stats);
686                     if (r != null) {
687                         v6statistics.addAll(r);
688                     }
689                 }
690             }
691         }
692         return v6statistics;
693     }
694
695     private static List<OFStatistics> getV6ReplyStatistics(
696             OFVendorStatistics stat) {
697         int length = stat.getLength();
698         List<OFStatistics> results = new ArrayList<OFStatistics>();
699         if (length < 12)
700             return null; // Nicira Hdr is 12 bytes. We need atleast that much
701         ByteBuffer data = ByteBuffer.allocate(length);
702         stat.writeTo(data);
703         data.rewind();
704         log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}",
705                 HexString.toHexString(data.array()));
706
707         int vendor = data.getInt(); // first 4 bytes is vendor id.
708         if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) {
709             log.warn("Unexpected vendor id: 0x{}", Integer.toHexString(vendor));
710             return null;
711         } else {
712             // go ahead by 8 bytes which is 8 bytes of 0
713             data.getLong(); // should be all 0's
714             length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have
715                           // been consumed
716         }
717
718         V6StatsReply v6statsreply;
719         int min_len;
720         while (length > 0) {
721             v6statsreply = new V6StatsReply();
722             min_len = v6statsreply.getLength();
723             if (length < v6statsreply.getLength())
724                 break;
725             v6statsreply.setActionFactory(stat.getActionFactory());
726             v6statsreply.readFrom(data);
727             if (v6statsreply.getLength() < min_len)
728                 break;
729             v6statsreply.setVendorId(vendor);
730             log.trace("V6StatsReply: {}", v6statsreply);
731             length -= v6statsreply.getLength();
732             results.add(v6statsreply);
733         }
734         return results;
735     }
736
737     @Override
738     public List<OFStatistics> queryStatistics(Long switchId,
739             OFStatisticsType statType, Object target) {
740         /*
741          * Caller does not know and it is not supposed to know whether this
742          * switch supports vendor extension. We adjust the target for him
743          */
744         if (statType == OFStatisticsType.FLOW) {
745             if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
746                 statType = OFStatisticsType.VENDOR;
747             }
748         }
749
750         List<OFStatistics> list = this.acquireStatistics(switchId, statType,
751                 target);
752
753         return (list == null) ? null
754                 : (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list)
755                         : list;
756     }
757
758     @Override
759     public List<OFStatistics> getOFDescStatistics(Long switchId) {
760         if (!descStatistics.containsKey(switchId))
761             return this.dummyList;
762
763         return descStatistics.get(switchId);
764     }
765
766     @Override
767     public List<OFStatistics> getOFPortStatistics(Long switchId) {
768         if (!portStatistics.containsKey(switchId)) {
769             return this.dummyList;
770         }
771
772         return portStatistics.get(switchId);
773     }
774
775     @Override
776     public List<OFStatistics> getOFPortStatistics(Long switchId, short portId) {
777         if (!portStatistics.containsKey(switchId)) {
778             return this.dummyList;
779         }
780         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
781         for (OFStatistics stats : portStatistics.get(switchId)) {
782             if (((OFPortStatisticsReply) stats).getPortNumber() == portId) {
783                 list.add(stats);
784                 break;
785             }
786         }
787         return list;
788     }
789
790     @Override
791     public int getFlowsNumber(long switchId) {
792         return this.flowStatistics.get(switchId).size();
793     }
794
795     /*
796      * InventoryShim replay for us all the switch addition which happened before
797      * we were brought up
798      */
799     @Override
800     public void updateNode(Node node, UpdateType type, Set<Property> props) {
801         Long switchId = (Long) node.getID();
802         switch (type) {
803         case ADDED:
804             addStatisticsTicks(switchId);
805             break;
806         case REMOVED:
807             clearFlowStatsAndTicks(switchId);
808         default:
809         }
810     }
811
812     @Override
813     public void updateNodeConnector(NodeConnector nodeConnector,
814             UpdateType type, Set<Property> props) {
815         // No action
816     }
817
818     /**
819      * Update the cached port rates for this switch with the latest retrieved
820      * port transmit byte count
821      * 
822      * @param switchId
823      */
824     private synchronized void updatePortsTxRate(long switchId) {
825         List<OFStatistics> newPortStatistics = this.portStatistics
826                 .get(switchId);
827         if (newPortStatistics == null) {
828             return;
829         }
830         Map<Short, TxRates> rates = this.txRates.get(switchId);
831         if (rates == null) {
832             // First time rates for this switch are added
833             rates = new HashMap<Short, TxRates>();
834             txRates.put(switchId, rates);
835         }
836         for (OFStatistics stats : newPortStatistics) {
837             OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats;
838             short port = newPortStat.getPortNumber();
839             TxRates portRatesHolder = rates.get(port);
840             if (portRatesHolder == null) {
841                 // First time rates for this port are added
842                 portRatesHolder = new TxRates();
843                 rates.put(port, portRatesHolder);
844             }
845             // Get and store the number of transmitted bytes for this port
846             // And handle the case where agent does not support the counter
847             long transmitBytes = newPortStat.getTransmitBytes();
848             long value = (transmitBytes < 0) ? 0 : transmitBytes;
849             portRatesHolder.update(value);
850         }
851     }
852
853     @Override
854     public synchronized long getTransmitRate(Long switchId, Short port) {
855         long average = 0;
856         if (switchId == null || port == null) {
857             return average;
858         }
859         Map<Short, TxRates> perSwitch = txRates.get(switchId);
860         if (perSwitch == null) {
861             return average;
862         }
863         TxRates portRates = perSwitch.get(port);
864         if (portRates == null) {
865             return average;
866         }
867         return portRates.getAverageTxRate();
868     }
869
870     /*
871      * Manual switch name configuration code
872      */
873     @Override
874     public String getHelp() {
875         StringBuffer help = new StringBuffer();
876         help.append("---OF Statistics Manager utilities---\n");
877         help.append("\t ofdumpstatsmgr         - "
878                 + "Print Internal Stats Mgr db\n");
879         return help.toString();
880     }
881
882     private boolean isValidSwitchId(String switchId) {
883         String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$";
884         String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$";
885
886         return (switchId != null && (switchId.matches(regexDatapathID) || switchId
887                 .matches(regexDatapathIDLong)));
888     }
889
890     public long getSwitchIDLong(String switchId) {
891         int radix = 16;
892         String switchString = "0";
893
894         if (isValidSwitchId(switchId)) {
895             if (switchId.contains(":")) {
896                 // Handle the 00:00:AA:BB:CC:DD:EE:FF notation
897                 switchString = switchId.replace(":", "");
898             } else if (switchId.contains("-")) {
899                 // Handle the 00-00-AA-BB-CC-DD-EE-FF notation
900                 switchString = switchId.replace("-", "");
901             } else {
902                 // Handle the 0123456789ABCDEF notation
903                 switchString = switchId;
904             }
905         }
906         return Long.parseLong(switchString, radix);
907     }
908
909     /*
910      * Internal information dump code
911      */
912     private String prettyPrintSwitchMap(ConcurrentMap<Long, StatisticsTicks> map) {
913         StringBuffer buffer = new StringBuffer();
914         buffer.append("{");
915         for (Entry<Long, StatisticsTicks> entry : map.entrySet()) {
916             buffer.append(HexString.toHexString(entry.getKey()) + "="
917                     + entry.getValue().toString() + " ");
918         }
919         buffer.append("}");
920         return buffer.toString();
921     }
922
923     public void _ofdumpstatsmgr(CommandInterpreter ci) {
924         ci.println("Global Counter: " + counter);
925         ci.println("Timer Ticks: " + prettyPrintSwitchMap(statisticsTimerTicks));
926         ci.println("PendingStatsQueue: " + pendingStatsRequests);
927         ci.println("PendingStatsQueue size: " + pendingStatsRequests.size());
928         ci.println("Stats Collector alive: " + statisticsCollector.isAlive());
929         ci.println("Stats Collector State: "
930                 + statisticsCollector.getState().toString());
931         ci.println("StatsTimer: " + statisticsTimer.toString());
932         ci.println("Flow Stats Period: " + statisticsTickNumber + " s");
933         ci.println("Desc Stats Period: " + descriptionTickNumber + " s");
934         ci.println("Port Stats Period: " + portTickNumber + " s");
935     }
936
937     public void _resetSwitchCapability(CommandInterpreter ci) {
938         String sidString = ci.nextArgument();
939         Long sid = null;
940         if (sidString == null) {
941             ci.println("Insert the switch id (numeric value)");
942             return;
943         }
944         try {
945             sid = Long.valueOf(sidString);
946             this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE);
947             ci.println("Vendor capability for switch " + sid + " set to "
948                     + this.switchSupportsVendorExtStats.get(sid));
949         } catch (NumberFormatException e) {
950             ci.println("Invalid switch id. Has to be numeric.");
951         }
952
953     }
954
955     public void _ofbw(CommandInterpreter ci) {
956         String sidString = ci.nextArgument();
957         Long sid = null;
958         if (sidString == null) {
959             ci.println("Insert the switch id (numeric value)");
960             return;
961         }
962         try {
963             sid = Long.valueOf(sidString);
964         } catch (NumberFormatException e) {
965             ci.println("Invalid switch id. Has to be numeric.");
966         }
967         if (sid != null) {
968             Map<Short, TxRates> thisSwitchRates = txRates.get(sid);
969             ci.println("Bandwidth utilization (" + factoredSamples
970                     * portTickNumber + " sec average) for switch "
971                     + HexEncode.longToHexString(sid) + ":");
972             if (thisSwitchRates == null) {
973                 ci.println("Not available");
974             } else {
975                 for (Entry<Short, TxRates> entry : thisSwitchRates.entrySet()) {
976                     ci.println("Port: " + entry.getKey() + ": "
977                             + entry.getValue().getAverageTxRate() + " bps");
978                 }
979             }
980         }
981     }
982
983     public void _txratewindow(CommandInterpreter ci) {
984         String averageWindow = ci.nextArgument();
985         short seconds = 0;
986         if (averageWindow == null) {
987             ci.println("Insert the length in seconds of the median "
988                     + "window for tx rate");
989             ci.println("Current: " + factoredSamples * portTickNumber + " secs");
990             return;
991         }
992         try {
993             seconds = Short.valueOf(averageWindow);
994         } catch (NumberFormatException e) {
995             ci.println("Invalid period.");
996         }
997         OFStatisticsManager.factoredSamples = (short) (seconds / portTickNumber);
998         ci.println("New: " + factoredSamples * portTickNumber + " secs");
999     }
1000
1001     public void _ofstatsmgrintervals(CommandInterpreter ci) {
1002         String flowStatsInterv = ci.nextArgument();
1003         String portStatsInterv = ci.nextArgument();
1004
1005         if (flowStatsInterv == null || portStatsInterv == null) {
1006
1007             ci.println("Usage: ostatsmgrintervals <fP> <pP> (in seconds)");
1008             ci.println("Current Values: fP=" + statisticsTickNumber + "s pP="
1009                     + portTickNumber + "s");
1010             return;
1011         }
1012         Short fP, pP;
1013         try {
1014             fP = Short.parseShort(flowStatsInterv);
1015             pP = Short.parseShort(portStatsInterv);
1016         } catch (Exception e) {
1017             ci.println("Invalid format values: " + e.getMessage());
1018             return;
1019         }
1020
1021         if (pP <= 1 || fP <= 1) {
1022             ci.println("Invalid values. fP and pP have to be greater than 1.");
1023             return;
1024         }
1025
1026         statisticsTickNumber = fP;
1027         portTickNumber = pP;
1028
1029         ci.println("New Values: fP=" + statisticsTickNumber + "s pP="
1030                 + portTickNumber + "s");
1031     }
1032
1033 }