Merge "Refactored yang-maven-plugin."
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / OFStatisticsManager.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.nio.ByteBuffer;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.Deque;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Map.Entry;
20 import java.util.Set;
21 import java.util.Timer;
22 import java.util.TimerTask;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.LinkedBlockingDeque;
27 import java.util.concurrent.LinkedBlockingQueue;
28
29 import org.eclipse.osgi.framework.console.CommandInterpreter;
30 import org.eclipse.osgi.framework.console.CommandProvider;
31 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
32 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
33 import org.opendaylight.controller.protocol_plugin.openflow.IStatisticsListener;
34 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
36 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
37 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
38 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
39 import org.opendaylight.controller.sal.core.Node;
40 import org.opendaylight.controller.sal.core.NodeConnector;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.core.UpdateType;
43 import org.opendaylight.controller.sal.utils.HexEncode;
44 import org.openflow.protocol.OFError;
45 import org.openflow.protocol.OFMatch;
46 import org.openflow.protocol.OFPort;
47 import org.openflow.protocol.OFStatisticsRequest;
48 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
49 import org.openflow.protocol.statistics.OFDescriptionStatistics;
50 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
51 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
52 import org.openflow.protocol.statistics.OFPortStatisticsReply;
53 import org.openflow.protocol.statistics.OFPortStatisticsRequest;
54 import org.openflow.protocol.statistics.OFQueueStatisticsRequest;
55 import org.openflow.protocol.statistics.OFStatistics;
56 import org.openflow.protocol.statistics.OFStatisticsType;
57 import org.openflow.protocol.statistics.OFVendorStatistics;
58 import org.openflow.util.HexString;
59 import org.osgi.framework.BundleContext;
60 import org.osgi.framework.FrameworkUtil;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 /**
65  * It periodically polls the different OF statistics from the OF switches and
66  * caches them for quick retrieval for the above layers' modules It also
67  * provides an API to directly query the switch about the statistics
68  */
69 public class OFStatisticsManager implements IOFStatisticsManager,
70         IInventoryShimExternalListener, CommandProvider {
71     private static final Logger log = LoggerFactory
72             .getLogger(OFStatisticsManager.class);
73     private static final int initialSize = 64;
74     private static final long flowStatsPeriod = 10000;
75     private static final long descriptionStatsPeriod = 60000;
76     private static final long portStatsPeriod = 5000;
77     private static final long tickPeriod = 1000;
78     private static short statisticsTickNumber = (short) (flowStatsPeriod / tickPeriod);
79     private static short descriptionTickNumber = (short) (descriptionStatsPeriod / tickPeriod);
80     private static short portTickNumber = (short) (portStatsPeriod / tickPeriod);
81     private static short factoredSamples = (short) 2;
82     private static short counter = 1;
83     private IController controller = null;
84     private ConcurrentMap<Long, List<OFStatistics>> flowStatistics;
85     private ConcurrentMap<Long, List<OFStatistics>> descStatistics;
86     private ConcurrentMap<Long, List<OFStatistics>> portStatistics;
87     private List<OFStatistics> dummyList;
88     private ConcurrentMap<Long, StatisticsTicks> statisticsTimerTicks;
89     protected BlockingQueue<StatsRequest> pendingStatsRequests;
90     protected BlockingQueue<Long> switchPortStatsUpdated;
91     private Thread statisticsCollector;
92     private Thread txRatesUpdater;
93     private Timer statisticsTimer;
94     private TimerTask statisticsTimerTask;
95     private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
96     private Map<Long, Map<Short, TxRates>> txRates; // Per port sampled (every
97                                                     // portStatsPeriod) transmit
98                                                     // rate
99     private Set<IStatisticsListener> descriptionListeners;
100
101     /**
102      * The object containing the latest factoredSamples tx rate samples for a
103      * given switch port
104      */
105     protected class TxRates {
106         Deque<Long> sampledTxBytes; // contains the latest factoredSamples
107                                     // sampled transmitted bytes
108
109         public TxRates() {
110             sampledTxBytes = new LinkedBlockingDeque<Long>();
111         }
112
113         public void update(Long txBytes) {
114             /*
115              * Based on how many samples our average works on, we might have to
116              * remove the oldest sample
117              */
118             if (sampledTxBytes.size() == factoredSamples) {
119                 sampledTxBytes.removeLast();
120             }
121
122             // Add the latest sample to the top of the queue
123             sampledTxBytes.addFirst(txBytes);
124         }
125
126         /**
127          * Returns the average transmit rate in bps
128          * 
129          * @return the average transmit rate [bps]
130          */
131         public long getAverageTxRate() {
132             long average = 0;
133             /*
134              * If we cannot provide the value for the time window length set
135              */
136             if (sampledTxBytes.size() < factoredSamples) {
137                 return average;
138             }
139             long increment = (long) (sampledTxBytes.getFirst() - sampledTxBytes
140                     .getLast());
141             long timePeriod = (long) (factoredSamples * portStatsPeriod)
142                     / (long) tickPeriod;
143             average = (8L * increment) / timePeriod;
144             return average;
145         }
146     }
147
148     public void setController(IController core) {
149         this.controller = core;
150     }
151
152     public void unsetController(IController core) {
153         if (this.controller == core) {
154             this.controller = null;
155         }
156     }
157
158     /**
159      * Function called by the dependency manager when all the required
160      * dependencies are satisfied
161      * 
162      */
163     void init() {
164         flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
165         descStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
166         portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
167         dummyList = new ArrayList<OFStatistics>(1);
168         statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(
169                 initialSize);
170         pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(
171                 initialSize);
172         switchPortStatsUpdated = new LinkedBlockingQueue<Long>(initialSize);
173         switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(
174                 initialSize);
175         txRates = new HashMap<Long, Map<Short, TxRates>>(initialSize);
176         descriptionListeners = new HashSet<IStatisticsListener>();
177
178         configStatsPollIntervals();
179
180         // Initialize managed timers
181         statisticsTimer = new Timer();
182         statisticsTimerTask = new TimerTask() {
183             @Override
184             public void run() {
185                 decrementTicks();
186             }
187         };
188
189         // Initialize Statistics collector thread
190         statisticsCollector = new Thread(new Runnable() {
191             @Override
192             public void run() {
193                 while (true) {
194                     try {
195                         StatsRequest req = pendingStatsRequests.take();
196                         acquireStatistics(req.switchId, req.type);
197                     } catch (InterruptedException e) {
198                         log.warn("Flow Statistics Collector thread "
199                                 + "interrupted", e);
200                     }
201                 }
202             }
203         }, "Statistics Collector");
204
205         // Initialize Tx Rate Updater thread
206         txRatesUpdater = new Thread(new Runnable() {
207             @Override
208             public void run() {
209                 while (true) {
210                     try {
211                         long switchId = switchPortStatsUpdated.take();
212                         updatePortsTxRate(switchId);
213                     } catch (InterruptedException e) {
214                         log.warn("TX Rate Updater thread interrupted", e);
215                     }
216                 }
217             }
218         }, "TX Rate Updater");
219     }
220
221     /**
222      * Function called by the dependency manager when at least one dependency
223      * become unsatisfied or when the component is shutting down because for
224      * example bundle is being stopped.
225      * 
226      */
227     void destroy() {
228     }
229
230     /**
231      * Function called by dependency manager after "init ()" is called and after
232      * the services provided by the class are registered in the service registry
233      * 
234      */
235     void start() {
236         // Start managed timers
237         statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, tickPeriod);
238
239         // Start statistics collector thread
240         statisticsCollector.start();
241
242         // Start bandwidth utilization computer thread
243         txRatesUpdater.start();
244
245         // OSGI console
246         registerWithOSGIConsole();
247     }
248
249     /**
250      * Function called by the dependency manager before the services exported by
251      * the component are unregistered, this will be followed by a "destroy ()"
252      * calls
253      * 
254      */
255     void stop() {
256         // Stop managed timers
257         statisticsTimer.cancel();
258     }
259
260     public void setStatisticsListener(IStatisticsListener s) {
261         this.descriptionListeners.add(s);
262     }
263
264     public void unsetStatisticsListener(IStatisticsListener s) {
265         if (s != null) {
266             this.descriptionListeners.remove(s);
267         }
268     }
269
270     private void registerWithOSGIConsole() {
271         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
272                 .getBundleContext();
273         bundleContext.registerService(CommandProvider.class.getName(), this,
274                 null);
275     }
276
277     private static class StatsRequest {
278         protected Long switchId;
279         protected OFStatisticsType type;
280
281         public StatsRequest(Long d, OFStatisticsType t) {
282             switchId = d;
283             type = t;
284         }
285
286         public String toString() {
287             return "SReq = {switchId=" + switchId + ", type=" + type + "}";
288         }
289
290         @Override
291         public int hashCode() {
292             final int prime = 31;
293             int result = 1;
294             result = prime * result
295                     + ((switchId == null) ? 0 : switchId.hashCode());
296             result = prime * result + ((type == null) ? 0 : type.ordinal());
297             return result;
298         }
299
300         @Override
301         public boolean equals(Object obj) {
302             if (this == obj) {
303                 return true;
304             }
305             if (obj == null) {
306                 return false;
307             }
308             if (getClass() != obj.getClass()) {
309                 return false;
310             }
311             StatsRequest other = (StatsRequest) obj;
312             if (switchId == null) {
313                 if (other.switchId != null) {
314                     return false;
315                 }
316             } else if (!switchId.equals(other.switchId)) {
317                 return false;
318             }
319             if (type != other.type) {
320                 return false;
321             }
322             return true;
323         }
324     }
325
326     private void addStatisticsTicks(Long switchId) {
327         switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume
328                                                                   // switch
329                                                                   // supports
330                                                                   // Vendor
331                                                                   // extension
332                                                                   // stats
333         statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
334         log.info("Added Switch {} to target pool",
335                 HexString.toHexString(switchId.longValue()));
336     }
337
338     protected static class StatisticsTicks {
339         private short flowStatisticsTicks;
340         private short descriptionTicks;
341         private short portStatisticsTicks;
342
343         public StatisticsTicks(boolean scattered) {
344             if (scattered) {
345                 // scatter bursts by statisticsTickPeriod
346                 if (++counter < 0) {
347                     counter = 0;
348                 } // being paranoid here
349                 flowStatisticsTicks = (short) (1 + counter
350                         % statisticsTickNumber);
351                 descriptionTicks = (short) (1 + counter % descriptionTickNumber);
352                 portStatisticsTicks = (short) (1 + counter % portTickNumber);
353             } else {
354                 flowStatisticsTicks = statisticsTickNumber;
355                 descriptionTicks = descriptionTickNumber;
356                 portStatisticsTicks = portTickNumber;
357             }
358         }
359
360         public boolean decrementFlowTicksIsZero() {
361             // Please ensure no code is inserted between the if check and the
362             // flowStatisticsTicks reset
363             if (--flowStatisticsTicks == 0) {
364                 flowStatisticsTicks = statisticsTickNumber;
365                 return true;
366             }
367             return false;
368         }
369
370         public boolean decrementDescTicksIsZero() {
371             // Please ensure no code is inserted between the if check and the
372             // descriptionTicks reset
373             if (--descriptionTicks == 0) {
374                 descriptionTicks = descriptionTickNumber;
375                 return true;
376             }
377             return false;
378         }
379
380         public boolean decrementPortTicksIsZero() {
381             // Please ensure no code is inserted between the if check and the
382             // descriptionTicks reset
383             if (--portStatisticsTicks == 0) {
384                 portStatisticsTicks = portTickNumber;
385                 return true;
386             }
387             return false;
388         }
389
390         public String toString() {
391             return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks
392                     + ",pT=" + portStatisticsTicks + "}";
393         }
394     }
395
396     private void printInfoMessage(String type, StatsRequest request) {
397         log.info(
398                 "{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
399                 new Object[] { type, HexString.toHexString(request.switchId),
400                         pendingStatsRequests.size(),
401                         statisticsCollector.getState().toString() });
402     }
403
404     protected void decrementTicks() {
405         StatsRequest request = null;
406         for (Map.Entry<Long, StatisticsTicks> entry : statisticsTimerTicks
407                 .entrySet()) {
408             StatisticsTicks clock = entry.getValue();
409             Long switchId = entry.getKey();
410             if (clock.decrementFlowTicksIsZero() == true) {
411                 request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ? new StatsRequest(
412                         switchId, OFStatisticsType.VENDOR) : new StatsRequest(
413                         switchId, OFStatisticsType.FLOW);
414                 // If a request for this switch is already in the queue, skip to
415                 // add this new request
416                 if (!pendingStatsRequests.contains(request)
417                         && false == pendingStatsRequests.offer(request)) {
418                     printInfoMessage("Flow", request);
419                 }
420             }
421
422             if (clock.decrementDescTicksIsZero() == true) {
423                 request = new StatsRequest(switchId, OFStatisticsType.DESC);
424                 // If a request for this switch is already in the queue, skip to
425                 // add this new request
426                 if (!pendingStatsRequests.contains(request)
427                         && false == pendingStatsRequests.offer(request)) {
428                     printInfoMessage("Description", request);
429                 }
430             }
431
432             if (clock.decrementPortTicksIsZero() == true) {
433                 request = new StatsRequest(switchId, OFStatisticsType.PORT);
434                 // If a request for this switch is already in the queue, skip to
435                 // add this new request
436                 if (!pendingStatsRequests.contains(request)
437                         && false == pendingStatsRequests.offer(request)) {
438                     printInfoMessage("Port", request);
439                 }
440             }
441         }
442     }
443
444     private void removeStatsRequestTasks(Long switchId) {
445         log.info("Cleaning Statistics database for switch {}",
446                 HexEncode.longToHexString(switchId));
447         // To be safe, let's attempt removal of both VENDOR and FLOW request. It
448         // does not hurt
449         pendingStatsRequests.remove(new StatsRequest(switchId,
450                 OFStatisticsType.VENDOR));
451         pendingStatsRequests.remove(new StatsRequest(switchId,
452                 OFStatisticsType.FLOW));
453         pendingStatsRequests.remove(new StatsRequest(switchId,
454                 OFStatisticsType.DESC));
455         pendingStatsRequests.remove(new StatsRequest(switchId,
456                 OFStatisticsType.PORT));
457         // Take care of the TX rate databases
458         switchPortStatsUpdated.remove(switchId);
459         txRates.remove(switchId);
460     }
461
462     private void clearFlowStatsAndTicks(Long switchId) {
463         statisticsTimerTicks.remove(switchId);
464         removeStatsRequestTasks(switchId);
465         flowStatistics.remove(switchId);
466         log.info("Statistics removed for switch {}",
467                 HexString.toHexString(switchId));
468     }
469
470     private void acquireStatistics(Long switchId, OFStatisticsType statType) {
471
472         // Query the switch on all matches
473         List<OFStatistics> values = this.acquireStatistics(switchId, statType,
474                 null);
475
476         // Update local caching database if got a valid response
477         if (values != null && !values.isEmpty()) {
478             if ((statType == OFStatisticsType.FLOW)
479                     || (statType == OFStatisticsType.VENDOR)) {
480                 flowStatistics.put(switchId, values);
481             } else if (statType == OFStatisticsType.DESC) {
482                 // Notify who may be interested in a description change
483                 notifyDescriptionListeners(switchId, values);
484
485                 // Overwrite cache
486                 descStatistics.put(switchId, values);
487             } else if (statType == OFStatisticsType.PORT) {
488                 // Overwrite cache with new port statistics for this switch
489                 portStatistics.put(switchId, values);
490
491                 // Wake up the thread which maintains the TX byte counters for
492                 // each port
493                 switchPortStatsUpdated.offer(switchId);
494             }
495         }
496     }
497
498     private void notifyDescriptionListeners(Long switchId,
499             List<OFStatistics> values) {
500         for (IStatisticsListener l : this.descriptionListeners) {
501             l.descriptionRefreshed(switchId,
502                     ((OFDescriptionStatistics) values.get(0)));
503         }
504     }
505
506     /*
507      * Generic function to get the statistics form a OF switch
508      */
509     @SuppressWarnings("unchecked")
510     private List<OFStatistics> acquireStatistics(Long switchId,
511             OFStatisticsType statsType, Object target) {
512         List<OFStatistics> values = null;
513         String type = null;
514         ISwitch sw = controller.getSwitch(switchId);
515
516         if (sw != null) {
517             OFStatisticsRequest req = new OFStatisticsRequest();
518             req.setStatisticType(statsType);
519             int requestLength = req.getLengthU();
520
521             if (statsType == OFStatisticsType.FLOW) {
522                 OFMatch match = null;
523                 if (target == null) {
524                     // All flows request
525                     match = new OFMatch();
526                     match.setWildcards(0xffffffff);
527                 } else if (!(target instanceof OFMatch)) {
528                     // Malformed request
529                     log.warn("Invalid target type for Flow stats request: {}",
530                             target.getClass());
531                     return null;
532                 } else {
533                     // Specific flow request
534                     match = (OFMatch) target;
535                 }
536                 OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest();
537                 specificReq.setMatch(match);
538                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
539                 specificReq.setTableId((byte) 0xff);
540                 req.setStatistics(Collections
541                         .singletonList((OFStatistics) specificReq));
542                 requestLength += specificReq.getLength();
543                 type = "FLOW";
544             } else if (statsType == OFStatisticsType.VENDOR) {
545                 V6StatsRequest specificReq = new V6StatsRequest();
546                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
547                 specificReq.setTableId((byte) 0xff);
548                 req.setStatistics(Collections
549                         .singletonList((OFStatistics) specificReq));
550                 requestLength += specificReq.getLength();
551                 type = "VENDOR";
552             } else if (statsType == OFStatisticsType.AGGREGATE) {
553                 OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest();
554                 OFMatch match = new OFMatch();
555                 match.setWildcards(0xffffffff);
556                 specificReq.setMatch(match);
557                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
558                 specificReq.setTableId((byte) 0xff);
559                 req.setStatistics(Collections
560                         .singletonList((OFStatistics) specificReq));
561                 requestLength += specificReq.getLength();
562                 type = "AGGREGATE";
563             } else if (statsType == OFStatisticsType.PORT) {
564                 short targetPort;
565                 if (target == null) {
566                     // All ports request
567                     targetPort = (short) OFPort.OFPP_NONE.getValue();
568                 } else if (!(target instanceof Short)) {
569                     // Malformed request
570                     log.warn("Invalid target type for Port stats request: {}",
571                             target.getClass());
572                     return null;
573                 } else {
574                     // Specific port request
575                     targetPort = (Short) target;
576                 }
577                 OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest();
578                 specificReq.setPortNumber(targetPort);
579                 req.setStatistics(Collections
580                         .singletonList((OFStatistics) specificReq));
581                 requestLength += specificReq.getLength();
582                 type = "PORT";
583             } else if (statsType == OFStatisticsType.QUEUE) {
584                 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
585                 specificReq.setPortNumber((short) OFPort.OFPP_ALL.getValue());
586                 specificReq.setQueueId(0xffffffff);
587                 req.setStatistics(Collections
588                         .singletonList((OFStatistics) specificReq));
589                 requestLength += specificReq.getLength();
590                 type = "QUEUE";
591             } else if (statsType == OFStatisticsType.DESC) {
592                 type = "DESC";
593             } else if (statsType == OFStatisticsType.TABLE) {
594                 type = "TABLE";
595             }
596             req.setLengthU(requestLength);
597             Object result = sw.getStatistics(req);
598
599             if (result == null) {
600                 log.warn("Request Timed Out for ({}) from switch {}", type,
601                         HexString.toHexString(switchId));
602             } else if (result instanceof OFError) {
603                 log.warn("Switch {} failed to handle ({}) stats request: {}",
604                         new Object[] { HexString.toHexString(switchId), type,
605                                 Utils.getOFErrorString((OFError) result) });
606                 if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
607                     log.warn(
608                             "Switching back to regular Flow stats requests for switch {}",
609                             HexString.toHexString(switchId));
610                     this.switchSupportsVendorExtStats.put(switchId,
611                             Boolean.FALSE);
612                 }
613             } else {
614                 values = (List<OFStatistics>) result;
615             }
616         }
617         return values;
618     }
619
620     @Override
621     public List<OFStatistics> getOFFlowStatistics(Long switchId) {
622         List<OFStatistics> list = flowStatistics.get(switchId);
623
624         /*
625          * Check on emptiness as interference between add and get is still
626          * possible on the inner list (the concurrentMap entry's value)
627          */
628         return (list == null || list.isEmpty()) ? this.dummyList
629                 : (list.get(0) instanceof OFVendorStatistics) ? this
630                         .v6StatsListToOFStatsList(list) : list;
631     }
632
633     @Override
634     public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch) {
635         List<OFStatistics> statsList = flowStatistics.get(switchId);
636
637         /*
638          * Check on emptiness as interference between add and get is still
639          * possible on the inner list (the concurrentMap entry's value)
640          */
641         if (statsList == null || statsList.isEmpty()) {
642             return this.dummyList;
643         }
644
645         if (statsList.get(0) instanceof OFVendorStatistics) {
646             /*
647              * Caller could provide regular OF match when we instead pull the
648              * vendor statistics from this node Caller is not supposed to know
649              * whether this switch supports vendor extensions statistics
650              * requests
651              */
652             V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch
653                     : new V6Match(ofMatch);
654
655             List<OFStatistics> targetList = v6StatsListToOFStatsList(statsList);
656             for (OFStatistics stats : targetList) {
657                 V6StatsReply v6Stats = (V6StatsReply) stats;
658                 V6Match v6Match = v6Stats.getMatch();
659                 if (v6Match.equals(targetMatch)) {
660                     List<OFStatistics> list = new ArrayList<OFStatistics>();
661                     list.add(stats);
662                     return list;
663                 }
664             }
665         } else {
666             for (OFStatistics stats : statsList) {
667                 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
668                 if (flowStats.getMatch().equals(ofMatch)) {
669                     List<OFStatistics> list = new ArrayList<OFStatistics>();
670                     list.add(stats);
671                     return list;
672                 }
673             }
674         }
675         return this.dummyList;
676     }
677
678     /*
679      * Converts the v6 vendor statistics to the OFStatistics
680      */
681     private List<OFStatistics> v6StatsListToOFStatsList(
682             List<OFStatistics> statistics) {
683         List<OFStatistics> v6statistics = new ArrayList<OFStatistics>();
684         if (statistics != null && !statistics.isEmpty()) {
685             for (OFStatistics stats : statistics) {
686                 if (stats instanceof OFVendorStatistics) {
687                     List<OFStatistics> r = getV6ReplyStatistics((OFVendorStatistics) stats);
688                     if (r != null) {
689                         v6statistics.addAll(r);
690                     }
691                 }
692             }
693         }
694         return v6statistics;
695     }
696
697     private static List<OFStatistics> getV6ReplyStatistics(
698             OFVendorStatistics stat) {
699         int length = stat.getLength();
700         List<OFStatistics> results = new ArrayList<OFStatistics>();
701         if (length < 12)
702             return null; // Nicira Hdr is 12 bytes. We need atleast that much
703         ByteBuffer data = ByteBuffer.allocate(length);
704         stat.writeTo(data);
705         data.rewind();
706         if (log.isTraceEnabled()) {
707             log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}",
708                     HexString.toHexString(data.array()));
709         }
710
711         int vendor = data.getInt(); // first 4 bytes is vendor id.
712         if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) {
713             log.warn("Unexpected vendor id: 0x{}", Integer.toHexString(vendor));
714             return null;
715         } else {
716             // go ahead by 8 bytes which is 8 bytes of 0
717             data.getLong(); // should be all 0's
718             length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have
719                           // been consumed
720         }
721
722         V6StatsReply v6statsreply;
723         int min_len;
724         while (length > 0) {
725             v6statsreply = new V6StatsReply();
726             min_len = v6statsreply.getLength();
727             if (length < v6statsreply.getLength())
728                 break;
729             v6statsreply.setActionFactory(stat.getActionFactory());
730             v6statsreply.readFrom(data);
731             if (v6statsreply.getLength() < min_len)
732                 break;
733             v6statsreply.setVendorId(vendor);
734             log.trace("V6StatsReply: {}", v6statsreply);
735             length -= v6statsreply.getLength();
736             results.add(v6statsreply);
737         }
738         return results;
739     }
740
741     @Override
742     public List<OFStatistics> queryStatistics(Long switchId,
743             OFStatisticsType statType, Object target) {
744         /*
745          * Caller does not know and it is not supposed to know whether this
746          * switch supports vendor extension. We adjust the target for him
747          */
748         if (statType == OFStatisticsType.FLOW) {
749             if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
750                 statType = OFStatisticsType.VENDOR;
751             }
752         }
753
754         List<OFStatistics> list = this.acquireStatistics(switchId, statType,
755                 target);
756
757         return (list == null) ? null
758                 : (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list)
759                         : list;
760     }
761
762     @Override
763     public List<OFStatistics> getOFDescStatistics(Long switchId) {
764         if (!descStatistics.containsKey(switchId))
765             return this.dummyList;
766
767         return descStatistics.get(switchId);
768     }
769
770     @Override
771     public List<OFStatistics> getOFPortStatistics(Long switchId) {
772         if (!portStatistics.containsKey(switchId)) {
773             return this.dummyList;
774         }
775
776         return portStatistics.get(switchId);
777     }
778
779     @Override
780     public List<OFStatistics> getOFPortStatistics(Long switchId, short portId) {
781         if (!portStatistics.containsKey(switchId)) {
782             return this.dummyList;
783         }
784         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
785         for (OFStatistics stats : portStatistics.get(switchId)) {
786             if (((OFPortStatisticsReply) stats).getPortNumber() == portId) {
787                 list.add(stats);
788                 break;
789             }
790         }
791         return list;
792     }
793
794     @Override
795     public int getFlowsNumber(long switchId) {
796         return this.flowStatistics.get(switchId).size();
797     }
798
799     /*
800      * InventoryShim replay for us all the switch addition which happened before
801      * we were brought up
802      */
803     @Override
804     public void updateNode(Node node, UpdateType type, Set<Property> props) {
805         Long switchId = (Long) node.getID();
806         switch (type) {
807         case ADDED:
808             addStatisticsTicks(switchId);
809             break;
810         case REMOVED:
811             clearFlowStatsAndTicks(switchId);
812         default:
813         }
814     }
815
816     @Override
817     public void updateNodeConnector(NodeConnector nodeConnector,
818             UpdateType type, Set<Property> props) {
819         // No action
820     }
821
822     /**
823      * Update the cached port rates for this switch with the latest retrieved
824      * port transmit byte count
825      * 
826      * @param switchId
827      */
828     private synchronized void updatePortsTxRate(long switchId) {
829         List<OFStatistics> newPortStatistics = this.portStatistics
830                 .get(switchId);
831         if (newPortStatistics == null) {
832             return;
833         }
834         Map<Short, TxRates> rates = this.txRates.get(switchId);
835         if (rates == null) {
836             // First time rates for this switch are added
837             rates = new HashMap<Short, TxRates>();
838             txRates.put(switchId, rates);
839         }
840         for (OFStatistics stats : newPortStatistics) {
841             OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats;
842             short port = newPortStat.getPortNumber();
843             TxRates portRatesHolder = rates.get(port);
844             if (portRatesHolder == null) {
845                 // First time rates for this port are added
846                 portRatesHolder = new TxRates();
847                 rates.put(port, portRatesHolder);
848             }
849             // Get and store the number of transmitted bytes for this port
850             // And handle the case where agent does not support the counter
851             long transmitBytes = newPortStat.getTransmitBytes();
852             long value = (transmitBytes < 0) ? 0 : transmitBytes;
853             portRatesHolder.update(value);
854         }
855     }
856
857     @Override
858     public synchronized long getTransmitRate(Long switchId, Short port) {
859         long average = 0;
860         if (switchId == null || port == null) {
861             return average;
862         }
863         Map<Short, TxRates> perSwitch = txRates.get(switchId);
864         if (perSwitch == null) {
865             return average;
866         }
867         TxRates portRates = perSwitch.get(port);
868         if (portRates == null) {
869             return average;
870         }
871         return portRates.getAverageTxRate();
872     }
873
874     /*
875      * Manual switch name configuration code
876      */
877     @Override
878     public String getHelp() {
879         StringBuffer help = new StringBuffer();
880         help.append("---OF Statistics Manager utilities---\n");
881         help.append("\t ofdumpstatsmgr         - "
882                 + "Print Internal Stats Mgr db\n");
883         help.append("\t ofstatsmgrintervals <fP> <pP> <dP>(in seconds) - "
884                 + "Set/Show flow/port/dedscription stats poll intervals\n");
885         return help.toString();
886     }
887
888     private boolean isValidSwitchId(String switchId) {
889         String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$";
890         String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$";
891
892         return (switchId != null && (switchId.matches(regexDatapathID) || switchId
893                 .matches(regexDatapathIDLong)));
894     }
895
896     public long getSwitchIDLong(String switchId) {
897         int radix = 16;
898         String switchString = "0";
899
900         if (isValidSwitchId(switchId)) {
901             if (switchId.contains(":")) {
902                 // Handle the 00:00:AA:BB:CC:DD:EE:FF notation
903                 switchString = switchId.replace(":", "");
904             } else if (switchId.contains("-")) {
905                 // Handle the 00-00-AA-BB-CC-DD-EE-FF notation
906                 switchString = switchId.replace("-", "");
907             } else {
908                 // Handle the 0123456789ABCDEF notation
909                 switchString = switchId;
910             }
911         }
912         return Long.parseLong(switchString, radix);
913     }
914
915     /*
916      * Internal information dump code
917      */
918     private String prettyPrintSwitchMap(ConcurrentMap<Long, StatisticsTicks> map) {
919         StringBuffer buffer = new StringBuffer();
920         buffer.append("{");
921         for (Entry<Long, StatisticsTicks> entry : map.entrySet()) {
922             buffer.append(HexString.toHexString(entry.getKey()) + "="
923                     + entry.getValue().toString() + " ");
924         }
925         buffer.append("}");
926         return buffer.toString();
927     }
928
929     public void _ofdumpstatsmgr(CommandInterpreter ci) {
930         ci.println("Global Counter: " + counter);
931         ci.println("Timer Ticks: " + prettyPrintSwitchMap(statisticsTimerTicks));
932         ci.println("PendingStatsQueue: " + pendingStatsRequests);
933         ci.println("PendingStatsQueue size: " + pendingStatsRequests.size());
934         ci.println("Stats Collector alive: " + statisticsCollector.isAlive());
935         ci.println("Stats Collector State: "
936                 + statisticsCollector.getState().toString());
937         ci.println("StatsTimer: " + statisticsTimer.toString());
938         ci.println("Flow Stats Period: " + statisticsTickNumber + " s");
939         ci.println("Desc Stats Period: " + descriptionTickNumber + " s");
940         ci.println("Port Stats Period: " + portTickNumber + " s");
941     }
942
943     public void _resetSwitchCapability(CommandInterpreter ci) {
944         String sidString = ci.nextArgument();
945         Long sid = null;
946         if (sidString == null) {
947             ci.println("Insert the switch id (numeric value)");
948             return;
949         }
950         try {
951             sid = Long.valueOf(sidString);
952             this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE);
953             ci.println("Vendor capability for switch " + sid + " set to "
954                     + this.switchSupportsVendorExtStats.get(sid));
955         } catch (NumberFormatException e) {
956             ci.println("Invalid switch id. Has to be numeric.");
957         }
958
959     }
960
961     public void _ofbw(CommandInterpreter ci) {
962         String sidString = ci.nextArgument();
963         Long sid = null;
964         if (sidString == null) {
965             ci.println("Insert the switch id (numeric value)");
966             return;
967         }
968         try {
969             sid = Long.valueOf(sidString);
970         } catch (NumberFormatException e) {
971             ci.println("Invalid switch id. Has to be numeric.");
972         }
973         if (sid != null) {
974             Map<Short, TxRates> thisSwitchRates = txRates.get(sid);
975             ci.println("Bandwidth utilization (" + factoredSamples
976                     * portTickNumber + " sec average) for switch "
977                     + HexEncode.longToHexString(sid) + ":");
978             if (thisSwitchRates == null) {
979                 ci.println("Not available");
980             } else {
981                 for (Entry<Short, TxRates> entry : thisSwitchRates.entrySet()) {
982                     ci.println("Port: " + entry.getKey() + ": "
983                             + entry.getValue().getAverageTxRate() + " bps");
984                 }
985             }
986         }
987     }
988
989     public void _txratewindow(CommandInterpreter ci) {
990         String averageWindow = ci.nextArgument();
991         short seconds = 0;
992         if (averageWindow == null) {
993             ci.println("Insert the length in seconds of the median "
994                     + "window for tx rate");
995             ci.println("Current: " + factoredSamples * portTickNumber + " secs");
996             return;
997         }
998         try {
999             seconds = Short.valueOf(averageWindow);
1000         } catch (NumberFormatException e) {
1001             ci.println("Invalid period.");
1002         }
1003         OFStatisticsManager.factoredSamples = (short) (seconds / portTickNumber);
1004         ci.println("New: " + factoredSamples * portTickNumber + " secs");
1005     }
1006
1007     public void _ofstatsmgrintervals(CommandInterpreter ci) {
1008         String flowStatsInterv = ci.nextArgument();
1009         String portStatsInterv = ci.nextArgument();
1010         String descStatsInterv = ci.nextArgument();
1011
1012         if (flowStatsInterv == null || portStatsInterv == null
1013                 || descStatsInterv == null) {
1014             ci.println("Usage: ostatsmgrintervals <fP> <pP> <dP>(in seconds)");
1015             ci.println("Current Values: fP=" + statisticsTickNumber + "s pP="
1016                     + portTickNumber + "s dP=" + descriptionTickNumber + "s");
1017             return;
1018         }
1019         Short fP, pP, dP;
1020         try {
1021             fP = Short.parseShort(flowStatsInterv);
1022             pP = Short.parseShort(portStatsInterv);
1023             dP = Short.parseShort(descStatsInterv);
1024         } catch (Exception e) {
1025             ci.println("Invalid format values: " + e.getMessage());
1026             return;
1027         }
1028
1029         if (pP <= 1 || fP <= 1 || dP <= 1) {
1030             ci.println("Invalid values. fP, pP, dP have to be greater than 1.");
1031             return;
1032         }
1033
1034         statisticsTickNumber = fP;
1035         portTickNumber = pP;
1036         descriptionTickNumber = dP;
1037
1038         ci.println("New Values: fP=" + statisticsTickNumber + "s pP="
1039                 + portTickNumber + "s dP=" + descriptionTickNumber + "s");
1040     }
1041
1042     /**
1043      * This method retrieves user configurations from config.ini and updates
1044      * statisticsTickNumber/portTickNumber/descriptionTickNumber accordingly.
1045      */
1046     private void configStatsPollIntervals() {
1047         String fsStr = System.getProperty("of.flowStatsPollInterval");
1048         String psStr = System.getProperty("of.portStatsPollInterval");
1049         String dsStr = System.getProperty("of.descStatsPollInterval");
1050         Short fs, ps, ds;
1051
1052         if (fsStr != null) {
1053             try {
1054                 fs = Short.parseShort(fsStr);
1055                 if (fs > 0) {
1056                     statisticsTickNumber = fs;
1057                 }
1058             } catch (Exception e) {
1059             }
1060         }
1061
1062         if (psStr != null) {
1063             try {
1064                 ps = Short.parseShort(psStr);
1065                 if (ps > 0) {
1066                     portTickNumber = ps;
1067                 }
1068             } catch (Exception e) {
1069             }
1070         }
1071
1072         if (dsStr != null) {
1073             try {
1074                 ds = Short.parseShort(dsStr);
1075                 if (ds > 0) {
1076                     descriptionTickNumber = ds;
1077                 }
1078             } catch (Exception e) {
1079             }
1080         }
1081     }
1082 }