Add 'TableStatistics' to SAL and Northbound Statistics API.
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / OFStatisticsManager.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
10
11 import java.nio.ByteBuffer;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.Deque;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.Map.Entry;
20 import java.util.Set;
21 import java.util.Timer;
22 import java.util.TimerTask;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.LinkedBlockingDeque;
27 import java.util.concurrent.LinkedBlockingQueue;
28
29 import org.eclipse.osgi.framework.console.CommandInterpreter;
30 import org.eclipse.osgi.framework.console.CommandProvider;
31 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
32 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
33 import org.opendaylight.controller.protocol_plugin.openflow.IStatisticsListener;
34 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
36 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
37 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
38 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
39 import org.opendaylight.controller.sal.core.Node;
40 import org.opendaylight.controller.sal.core.NodeConnector;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.core.UpdateType;
43 import org.opendaylight.controller.sal.utils.HexEncode;
44 import org.openflow.protocol.OFError;
45 import org.openflow.protocol.OFMatch;
46 import org.openflow.protocol.OFPort;
47 import org.openflow.protocol.OFStatisticsRequest;
48 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
49 import org.openflow.protocol.statistics.OFDescriptionStatistics;
50 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
51 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
52 import org.openflow.protocol.statistics.OFPortStatisticsReply;
53 import org.openflow.protocol.statistics.OFPortStatisticsRequest;
54 import org.openflow.protocol.statistics.OFQueueStatisticsRequest;
55 import org.openflow.protocol.statistics.OFStatistics;
56 import org.openflow.protocol.statistics.OFStatisticsType;
57 import org.openflow.protocol.statistics.OFTableStatistics;
58 import org.openflow.protocol.statistics.OFVendorStatistics;
59 import org.openflow.util.HexString;
60 import org.osgi.framework.BundleContext;
61 import org.osgi.framework.FrameworkUtil;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 /**
66  * It periodically polls the different OF statistics from the OF switches and
67  * caches them for quick retrieval for the above layers' modules It also
68  * provides an API to directly query the switch about the statistics
69  */
70 public class OFStatisticsManager implements IOFStatisticsManager,
71 IInventoryShimExternalListener, CommandProvider {
72     private static final Logger log = LoggerFactory
73             .getLogger(OFStatisticsManager.class);
74     private static final int initialSize = 64;
75     private static final long flowStatsPeriod = 10000;
76     private static final long descriptionStatsPeriod = 60000;
77     private static final long portStatsPeriod = 5000;
78     private static final long tableStatsPeriod = 10000;
79     private static final long tickPeriod = 1000;
80     private static short statisticsTickNumber = (short) (flowStatsPeriod / tickPeriod);
81     private static short descriptionTickNumber = (short) (descriptionStatsPeriod / tickPeriod);
82     private static short portTickNumber = (short) (portStatsPeriod / tickPeriod);
83     private static short tableTickNumber = (short) (tableStatsPeriod / tickPeriod);
84     private static short factoredSamples = (short) 2;
85     private static short counter = 1;
86     private IController controller = null;
87     private ConcurrentMap<Long, List<OFStatistics>> flowStatistics;
88     private ConcurrentMap<Long, List<OFStatistics>> descStatistics;
89     private ConcurrentMap<Long, List<OFStatistics>> portStatistics;
90     private ConcurrentMap<Long, List<OFStatistics>> tableStatistics;
91     private List<OFStatistics> dummyList;
92     private ConcurrentMap<Long, StatisticsTicks> statisticsTimerTicks;
93     protected BlockingQueue<StatsRequest> pendingStatsRequests;
94     protected BlockingQueue<Long> switchPortStatsUpdated;
95     private Thread statisticsCollector;
96     private Thread txRatesUpdater;
97     private Timer statisticsTimer;
98     private TimerTask statisticsTimerTask;
99     private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
100     private Map<Long, Map<Short, TxRates>> txRates; // Per port sampled (every
101                                                     // portStatsPeriod) transmit
102                                                     // rate
103     private Set<IStatisticsListener> descriptionListeners;
104
105     /**
106      * The object containing the latest factoredSamples tx rate samples for a
107      * given switch port
108      */
109     protected class TxRates {
110         Deque<Long> sampledTxBytes; // contains the latest factoredSamples
111                                     // sampled transmitted bytes
112
113         public TxRates() {
114             sampledTxBytes = new LinkedBlockingDeque<Long>();
115         }
116
117         public void update(Long txBytes) {
118             /*
119              * Based on how many samples our average works on, we might have to
120              * remove the oldest sample
121              */
122             if (sampledTxBytes.size() == factoredSamples) {
123                 sampledTxBytes.removeLast();
124             }
125
126             // Add the latest sample to the top of the queue
127             sampledTxBytes.addFirst(txBytes);
128         }
129
130         /**
131          * Returns the average transmit rate in bps
132          * 
133          * @return the average transmit rate [bps]
134          */
135         public long getAverageTxRate() {
136             long average = 0;
137             /*
138              * If we cannot provide the value for the time window length set
139              */
140             if (sampledTxBytes.size() < factoredSamples) {
141                 return average;
142             }
143             long increment = (long) (sampledTxBytes.getFirst() - sampledTxBytes
144                     .getLast());
145             long timePeriod = (long) (factoredSamples * portStatsPeriod)
146                     / (long) tickPeriod;
147             average = (8L * increment) / timePeriod;
148             return average;
149         }
150     }
151
152     public void setController(IController core) {
153         this.controller = core;
154     }
155
156     public void unsetController(IController core) {
157         if (this.controller == core) {
158             this.controller = null;
159         }
160     }
161
162     /**
163      * Function called by the dependency manager when all the required
164      * dependencies are satisfied
165      * 
166      */
167     void init() {
168         flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
169         descStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
170         portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
171         tableStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
172         dummyList = new ArrayList<OFStatistics>(1);
173         statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(
174                 initialSize);
175         pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(
176                 initialSize);
177         switchPortStatsUpdated = new LinkedBlockingQueue<Long>(initialSize);
178         switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(
179                 initialSize);
180         txRates = new HashMap<Long, Map<Short, TxRates>>(initialSize);
181         descriptionListeners = new HashSet<IStatisticsListener>();
182
183         configStatsPollIntervals();
184
185         // Initialize managed timers
186         statisticsTimer = new Timer();
187         statisticsTimerTask = new TimerTask() {
188             @Override
189             public void run() {
190                 decrementTicks();
191             }
192         };
193
194         // Initialize Statistics collector thread
195         statisticsCollector = new Thread(new Runnable() {
196             @Override
197             public void run() {
198                 while (true) {
199                     try {
200                         StatsRequest req = pendingStatsRequests.take();
201                         acquireStatistics(req.switchId, req.type);
202                     } catch (InterruptedException e) {
203                         log.warn("Flow Statistics Collector thread "
204                                 + "interrupted", e);
205                     }
206                 }
207             }
208         }, "Statistics Collector");
209
210         // Initialize Tx Rate Updater thread
211         txRatesUpdater = new Thread(new Runnable() {
212             @Override
213             public void run() {
214                 while (true) {
215                     try {
216                         long switchId = switchPortStatsUpdated.take();
217                         updatePortsTxRate(switchId);
218                     } catch (InterruptedException e) {
219                         log.warn("TX Rate Updater thread interrupted", e);
220                     }
221                 }
222             }
223         }, "TX Rate Updater");
224     }
225
226     /**
227      * Function called by the dependency manager when at least one dependency
228      * become unsatisfied or when the component is shutting down because for
229      * example bundle is being stopped.
230      * 
231      */
232     void destroy() {
233     }
234
235     /**
236      * Function called by dependency manager after "init ()" is called and after
237      * the services provided by the class are registered in the service registry
238      * 
239      */
240     void start() {
241         // Start managed timers
242         statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, tickPeriod);
243
244         // Start statistics collector thread
245         statisticsCollector.start();
246
247         // Start bandwidth utilization computer thread
248         txRatesUpdater.start();
249
250         // OSGI console
251         registerWithOSGIConsole();
252     }
253
254     /**
255      * Function called by the dependency manager before the services exported by
256      * the component are unregistered, this will be followed by a "destroy ()"
257      * calls
258      * 
259      */
260     void stop() {
261         // Stop managed timers
262         statisticsTimer.cancel();
263     }
264
265     public void setStatisticsListener(IStatisticsListener s) {
266         this.descriptionListeners.add(s);
267     }
268
269     public void unsetStatisticsListener(IStatisticsListener s) {
270         if (s != null) {
271             this.descriptionListeners.remove(s);
272         }
273     }
274
275     private void registerWithOSGIConsole() {
276         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
277                 .getBundleContext();
278         bundleContext.registerService(CommandProvider.class.getName(), this,
279                 null);
280     }
281
282     private static class StatsRequest {
283         protected Long switchId;
284         protected OFStatisticsType type;
285
286         public StatsRequest(Long d, OFStatisticsType t) {
287             switchId = d;
288             type = t;
289         }
290
291         @Override
292         public String toString() {
293             return "SReq = {switchId=" + switchId + ", type=" + type + "}";
294         }
295
296         @Override
297         public int hashCode() {
298             final int prime = 31;
299             int result = 1;
300             result = prime * result
301                     + ((switchId == null) ? 0 : switchId.hashCode());
302             result = prime * result + ((type == null) ? 0 : type.ordinal());
303             return result;
304         }
305
306         @Override
307         public boolean equals(Object obj) {
308             if (this == obj) {
309                 return true;
310             }
311             if (obj == null) {
312                 return false;
313             }
314             if (getClass() != obj.getClass()) {
315                 return false;
316             }
317             StatsRequest other = (StatsRequest) obj;
318             if (switchId == null) {
319                 if (other.switchId != null) {
320                     return false;
321                 }
322             } else if (!switchId.equals(other.switchId)) {
323                 return false;
324             }
325             if (type != other.type) {
326                 return false;
327             }
328             return true;
329         }
330     }
331
332     private void addStatisticsTicks(Long switchId) {
333         switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume
334                                                                   // switch
335                                                                   // supports
336                                                                   // Vendor
337                                                                   // extension
338                                                                   // stats
339         statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
340         log.info("Added Switch {} to target pool",
341                 HexString.toHexString(switchId.longValue()));
342     }
343
344     protected static class StatisticsTicks {
345         private short flowStatisticsTicks;
346         private short descriptionTicks;
347         private short portStatisticsTicks;
348         private short tableStatisticsTicks;
349
350         public StatisticsTicks(boolean scattered) {
351             if (scattered) {
352                 // scatter bursts by statisticsTickPeriod
353                 if (++counter < 0) {
354                     counter = 0;
355                 } // being paranoid here
356                 flowStatisticsTicks = (short) (1 + counter
357                         % statisticsTickNumber);
358                 descriptionTicks = (short) (1 + counter % descriptionTickNumber);
359                 portStatisticsTicks = (short) (1 + counter % portTickNumber);
360                 tableStatisticsTicks = (short) (1 + counter % tableTickNumber);
361             } else {
362                 flowStatisticsTicks = statisticsTickNumber;
363                 descriptionTicks = descriptionTickNumber;
364                 portStatisticsTicks = portTickNumber;
365                 tableStatisticsTicks = tableTickNumber;
366             }
367         }
368
369         public boolean decrementFlowTicksIsZero() {
370             // Please ensure no code is inserted between the if check and the
371             // flowStatisticsTicks reset
372             if (--flowStatisticsTicks == 0) {
373                 flowStatisticsTicks = statisticsTickNumber;
374                 return true;
375             }
376             return false;
377         }
378
379         public boolean decrementDescTicksIsZero() {
380             // Please ensure no code is inserted between the if check and the
381             // descriptionTicks reset
382             if (--descriptionTicks == 0) {
383                 descriptionTicks = descriptionTickNumber;
384                 return true;
385             }
386             return false;
387         }
388
389         public boolean decrementPortTicksIsZero() {
390             // Please ensure no code is inserted between the if check and the
391             // descriptionTicks reset
392             if (--portStatisticsTicks == 0) {
393                 portStatisticsTicks = portTickNumber;
394                 return true;
395             }
396             return false;
397         }
398
399         public boolean decrementTableTicksIsZero() {
400             // Please ensure no code is inserted between the if check and the
401             // descriptionTicks reset
402             if(--tableStatisticsTicks == 0) {
403                 tableStatisticsTicks = tableTickNumber;
404                 return true;
405             }
406             return false;
407         }
408
409         @Override
410         public String toString() {
411             return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks
412                     + ",pT=" + portStatisticsTicks + ",tT=" + tableStatisticsTicks + "}";
413         }
414     }
415
416     private void printInfoMessage(String type, StatsRequest request) {
417         log.info(
418                 "{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
419                 new Object[] { type, HexString.toHexString(request.switchId),
420                         pendingStatsRequests.size(),
421                         statisticsCollector.getState().toString() });
422     }
423
424     protected void decrementTicks() {
425         StatsRequest request = null;
426         for (Map.Entry<Long, StatisticsTicks> entry : statisticsTimerTicks
427                 .entrySet()) {
428             StatisticsTicks clock = entry.getValue();
429             Long switchId = entry.getKey();
430             if (clock.decrementFlowTicksIsZero() == true) {
431                 request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ? new StatsRequest(
432                         switchId, OFStatisticsType.VENDOR) : new StatsRequest(
433                         switchId, OFStatisticsType.FLOW);
434                 // If a request for this switch is already in the queue, skip to
435                 // add this new request
436                 if (!pendingStatsRequests.contains(request)
437                         && false == pendingStatsRequests.offer(request)) {
438                     printInfoMessage("Flow", request);
439                 }
440             }
441
442             if (clock.decrementDescTicksIsZero() == true) {
443                 request = new StatsRequest(switchId, OFStatisticsType.DESC);
444                 // If a request for this switch is already in the queue, skip to
445                 // add this new request
446                 if (!pendingStatsRequests.contains(request)
447                         && false == pendingStatsRequests.offer(request)) {
448                     printInfoMessage("Description", request);
449                 }
450             }
451
452             if (clock.decrementPortTicksIsZero() == true) {
453                 request = new StatsRequest(switchId, OFStatisticsType.PORT);
454                 // If a request for this switch is already in the queue, skip to
455                 // add this new request
456                 if (!pendingStatsRequests.contains(request)
457                         && false == pendingStatsRequests.offer(request)) {
458                     printInfoMessage("Port", request);
459                 }
460             }
461
462             if(clock.decrementTableTicksIsZero() == true) {
463                 request = new StatsRequest(switchId, OFStatisticsType.TABLE);
464                 // If a request for this switch is already in the queue, skip to
465                 // add this new request
466                 if (!pendingStatsRequests.contains(request)
467                         && false == pendingStatsRequests.offer(request)) {
468                     printInfoMessage("Table", request);
469                 }
470             }
471         }
472     }
473
474     private void removeStatsRequestTasks(Long switchId) {
475         log.info("Cleaning Statistics database for switch {}",
476                 HexEncode.longToHexString(switchId));
477         // To be safe, let's attempt removal of both VENDOR and FLOW request. It
478         // does not hurt
479         pendingStatsRequests.remove(new StatsRequest(switchId,
480                 OFStatisticsType.VENDOR));
481         pendingStatsRequests.remove(new StatsRequest(switchId,
482                 OFStatisticsType.FLOW));
483         pendingStatsRequests.remove(new StatsRequest(switchId,
484                 OFStatisticsType.DESC));
485         pendingStatsRequests.remove(new StatsRequest(switchId,
486                 OFStatisticsType.PORT));
487         pendingStatsRequests.remove(new StatsRequest(switchId,
488                 OFStatisticsType.TABLE));
489         // Take care of the TX rate databases
490         switchPortStatsUpdated.remove(switchId);
491         txRates.remove(switchId);
492     }
493
494     private void clearFlowStatsAndTicks(Long switchId) {
495         statisticsTimerTicks.remove(switchId);
496         removeStatsRequestTasks(switchId);
497         flowStatistics.remove(switchId);
498         log.info("Statistics removed for switch {}",
499                 HexString.toHexString(switchId));
500     }
501
502     private void acquireStatistics(Long switchId, OFStatisticsType statType) {
503
504         // Query the switch on all matches
505         List<OFStatistics> values = this.acquireStatistics(switchId, statType,
506                 null);
507
508         // Update local caching database if got a valid response
509         if (values != null && !values.isEmpty()) {
510             if ((statType == OFStatisticsType.FLOW)
511                     || (statType == OFStatisticsType.VENDOR)) {
512                 flowStatistics.put(switchId, values);
513             } else if (statType == OFStatisticsType.DESC) {
514                 // Notify who may be interested in a description change
515                 notifyDescriptionListeners(switchId, values);
516
517                 // Overwrite cache
518                 descStatistics.put(switchId, values);
519             } else if (statType == OFStatisticsType.PORT) {
520                 // Overwrite cache with new port statistics for this switch
521                 portStatistics.put(switchId, values);
522
523                 // Wake up the thread which maintains the TX byte counters for
524                 // each port
525                 switchPortStatsUpdated.offer(switchId);
526             } else if (statType == OFStatisticsType.TABLE) {
527                 // Overwrite cache
528                 tableStatistics.put(switchId, values);
529             }
530         }
531     }
532
533     private void notifyDescriptionListeners(Long switchId,
534             List<OFStatistics> values) {
535         for (IStatisticsListener l : this.descriptionListeners) {
536             l.descriptionRefreshed(switchId,
537                     ((OFDescriptionStatistics) values.get(0)));
538         }
539     }
540
541     /*
542      * Generic function to get the statistics form a OF switch
543      */
544     @SuppressWarnings("unchecked")
545     private List<OFStatistics> acquireStatistics(Long switchId,
546             OFStatisticsType statsType, Object target) {
547         List<OFStatistics> values = null;
548         String type = null;
549         ISwitch sw = controller.getSwitch(switchId);
550
551         if (sw != null) {
552             OFStatisticsRequest req = new OFStatisticsRequest();
553             req.setStatisticType(statsType);
554             int requestLength = req.getLengthU();
555
556             if (statsType == OFStatisticsType.FLOW) {
557                 OFMatch match = null;
558                 if (target == null) {
559                     // All flows request
560                     match = new OFMatch();
561                     match.setWildcards(0xffffffff);
562                 } else if (!(target instanceof OFMatch)) {
563                     // Malformed request
564                     log.warn("Invalid target type for Flow stats request: {}",
565                             target.getClass());
566                     return null;
567                 } else {
568                     // Specific flow request
569                     match = (OFMatch) target;
570                 }
571                 OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest();
572                 specificReq.setMatch(match);
573                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
574                 specificReq.setTableId((byte) 0xff);
575                 req.setStatistics(Collections
576                         .singletonList((OFStatistics) specificReq));
577                 requestLength += specificReq.getLength();
578                 type = "FLOW";
579             } else if (statsType == OFStatisticsType.VENDOR) {
580                 V6StatsRequest specificReq = new V6StatsRequest();
581                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
582                 specificReq.setTableId((byte) 0xff);
583                 req.setStatistics(Collections
584                         .singletonList((OFStatistics) specificReq));
585                 requestLength += specificReq.getLength();
586                 type = "VENDOR";
587             } else if (statsType == OFStatisticsType.AGGREGATE) {
588                 OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest();
589                 OFMatch match = new OFMatch();
590                 match.setWildcards(0xffffffff);
591                 specificReq.setMatch(match);
592                 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
593                 specificReq.setTableId((byte) 0xff);
594                 req.setStatistics(Collections
595                         .singletonList((OFStatistics) specificReq));
596                 requestLength += specificReq.getLength();
597                 type = "AGGREGATE";
598             } else if (statsType == OFStatisticsType.PORT) {
599                 short targetPort;
600                 if (target == null) {
601                     // All ports request
602                     targetPort = (short) OFPort.OFPP_NONE.getValue();
603                 } else if (!(target instanceof Short)) {
604                     // Malformed request
605                     log.warn("Invalid target type for Port stats request: {}",
606                             target.getClass());
607                     return null;
608                 } else {
609                     // Specific port request
610                     targetPort = (Short) target;
611                 }
612                 OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest();
613                 specificReq.setPortNumber(targetPort);
614                 req.setStatistics(Collections
615                         .singletonList((OFStatistics) specificReq));
616                 requestLength += specificReq.getLength();
617                 type = "PORT";
618             } else if (statsType == OFStatisticsType.QUEUE) {
619                 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
620                 specificReq.setPortNumber((short) OFPort.OFPP_ALL.getValue());
621                 specificReq.setQueueId(0xffffffff);
622                 req.setStatistics(Collections
623                         .singletonList((OFStatistics) specificReq));
624                 requestLength += specificReq.getLength();
625                 type = "QUEUE";
626             } else if (statsType == OFStatisticsType.DESC) {
627                 type = "DESC";
628             } else if (statsType == OFStatisticsType.TABLE) {
629                 if(target != null){
630                     if (!(target instanceof Byte)) {
631                         // Malformed request
632                         log.warn("Invalid table id for table stats request: {}",
633                                 target.getClass());
634                         return null;
635                     }
636                     byte targetTable = (Byte) target;
637                     OFTableStatistics specificReq = new OFTableStatistics();
638                     specificReq.setTableId(targetTable);
639                     req.setStatistics(Collections
640                             .singletonList((OFStatistics) specificReq));
641                     requestLength += specificReq.getLength();
642                 }
643                 type = "TABLE";
644             }
645             req.setLengthU(requestLength);
646             Object result = sw.getStatistics(req);
647
648             if (result == null) {
649                 log.warn("Request Timed Out for ({}) from switch {}", type,
650                         HexString.toHexString(switchId));
651             } else if (result instanceof OFError) {
652                 log.warn("Switch {} failed to handle ({}) stats request: {}",
653                         new Object[] { HexString.toHexString(switchId), type,
654                         Utils.getOFErrorString((OFError) result) });
655                 if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
656                     log.warn(
657                             "Switching back to regular Flow stats requests for switch {}",
658                             HexString.toHexString(switchId));
659                     this.switchSupportsVendorExtStats.put(switchId,
660                             Boolean.FALSE);
661                 }
662             } else {
663                 values = (List<OFStatistics>) result;
664             }
665         }
666         return values;
667     }
668
669     @Override
670     public List<OFStatistics> getOFFlowStatistics(Long switchId) {
671         List<OFStatistics> list = flowStatistics.get(switchId);
672
673         /*
674          * Check on emptiness as interference between add and get is still
675          * possible on the inner list (the concurrentMap entry's value)
676          */
677         return (list == null || list.isEmpty()) ? this.dummyList
678                 : (list.get(0) instanceof OFVendorStatistics) ? this
679                         .v6StatsListToOFStatsList(list) : list;
680     }
681
682     @Override
683     public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch) {
684         List<OFStatistics> statsList = flowStatistics.get(switchId);
685
686         /*
687          * Check on emptiness as interference between add and get is still
688          * possible on the inner list (the concurrentMap entry's value)
689          */
690         if (statsList == null || statsList.isEmpty()) {
691             return this.dummyList;
692         }
693
694         if (statsList.get(0) instanceof OFVendorStatistics) {
695             /*
696              * Caller could provide regular OF match when we instead pull the
697              * vendor statistics from this node Caller is not supposed to know
698              * whether this switch supports vendor extensions statistics
699              * requests
700              */
701             V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch
702                     : new V6Match(ofMatch);
703
704             List<OFStatistics> targetList = v6StatsListToOFStatsList(statsList);
705             for (OFStatistics stats : targetList) {
706                 V6StatsReply v6Stats = (V6StatsReply) stats;
707                 V6Match v6Match = v6Stats.getMatch();
708                 if (v6Match.equals(targetMatch)) {
709                     List<OFStatistics> list = new ArrayList<OFStatistics>();
710                     list.add(stats);
711                     return list;
712                 }
713             }
714         } else {
715             for (OFStatistics stats : statsList) {
716                 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
717                 if (flowStats.getMatch().equals(ofMatch)) {
718                     List<OFStatistics> list = new ArrayList<OFStatistics>();
719                     list.add(stats);
720                     return list;
721                 }
722             }
723         }
724         return this.dummyList;
725     }
726
727     /*
728      * Converts the v6 vendor statistics to the OFStatistics
729      */
730     private List<OFStatistics> v6StatsListToOFStatsList(
731             List<OFStatistics> statistics) {
732         List<OFStatistics> v6statistics = new ArrayList<OFStatistics>();
733         if (statistics != null && !statistics.isEmpty()) {
734             for (OFStatistics stats : statistics) {
735                 if (stats instanceof OFVendorStatistics) {
736                     List<OFStatistics> r = getV6ReplyStatistics((OFVendorStatistics) stats);
737                     if (r != null) {
738                         v6statistics.addAll(r);
739                     }
740                 }
741             }
742         }
743         return v6statistics;
744     }
745
746     private static List<OFStatistics> getV6ReplyStatistics(
747             OFVendorStatistics stat) {
748         int length = stat.getLength();
749         List<OFStatistics> results = new ArrayList<OFStatistics>();
750         if (length < 12)
751             return null; // Nicira Hdr is 12 bytes. We need atleast that much
752         ByteBuffer data = ByteBuffer.allocate(length);
753         stat.writeTo(data);
754         data.rewind();
755         if (log.isTraceEnabled()) {
756             log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}",
757                     HexString.toHexString(data.array()));
758         }
759
760         int vendor = data.getInt(); // first 4 bytes is vendor id.
761         if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) {
762             log.warn("Unexpected vendor id: 0x{}", Integer.toHexString(vendor));
763             return null;
764         } else {
765             // go ahead by 8 bytes which is 8 bytes of 0
766             data.getLong(); // should be all 0's
767             length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have
768                           // been consumed
769         }
770
771         V6StatsReply v6statsreply;
772         int min_len;
773         while (length > 0) {
774             v6statsreply = new V6StatsReply();
775             min_len = v6statsreply.getLength();
776             if (length < v6statsreply.getLength())
777                 break;
778             v6statsreply.setActionFactory(stat.getActionFactory());
779             v6statsreply.readFrom(data);
780             if (v6statsreply.getLength() < min_len)
781                 break;
782             v6statsreply.setVendorId(vendor);
783             log.trace("V6StatsReply: {}", v6statsreply);
784             length -= v6statsreply.getLength();
785             results.add(v6statsreply);
786         }
787         return results;
788     }
789
790     @Override
791     public List<OFStatistics> queryStatistics(Long switchId,
792             OFStatisticsType statType, Object target) {
793         /*
794          * Caller does not know and it is not supposed to know whether this
795          * switch supports vendor extension. We adjust the target for him
796          */
797         if (statType == OFStatisticsType.FLOW) {
798             if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
799                 statType = OFStatisticsType.VENDOR;
800             }
801         }
802
803         List<OFStatistics> list = this.acquireStatistics(switchId, statType,
804                 target);
805
806         return (list == null) ? null
807                 : (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list)
808                         : list;
809     }
810
811     @Override
812     public List<OFStatistics> getOFDescStatistics(Long switchId) {
813         if (!descStatistics.containsKey(switchId))
814             return this.dummyList;
815
816         return descStatistics.get(switchId);
817     }
818
819     @Override
820     public List<OFStatistics> getOFPortStatistics(Long switchId) {
821         if (!portStatistics.containsKey(switchId)) {
822             return this.dummyList;
823         }
824
825         return portStatistics.get(switchId);
826     }
827
828     @Override
829     public List<OFStatistics> getOFPortStatistics(Long switchId, short portId) {
830         if (!portStatistics.containsKey(switchId)) {
831             return this.dummyList;
832         }
833         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
834         for (OFStatistics stats : portStatistics.get(switchId)) {
835             if (((OFPortStatisticsReply) stats).getPortNumber() == portId) {
836                 list.add(stats);
837                 break;
838             }
839         }
840         return list;
841     }
842
843     @Override
844     public List<OFStatistics> getOFTableStatistics(Long switchId) {
845         if (!tableStatistics.containsKey(switchId)) {
846             return this.dummyList;
847         }
848
849         return tableStatistics.get(switchId);
850     }
851
852     @Override
853     public List<OFStatistics> getOFTableStatistics(Long switchId, Byte tableId) {
854         if (!tableStatistics.containsKey(switchId)) {
855             return this.dummyList;
856         }
857
858         List<OFStatistics> list = new ArrayList<OFStatistics>(1);
859         for (OFStatistics stats : tableStatistics.get(switchId)) {
860             if (((OFTableStatistics) stats).getTableId() == tableId) {
861                 list.add(stats);
862                 break;
863             }
864         }
865         return list;
866     }
867
868     @Override
869     public int getFlowsNumber(long switchId) {
870         return this.flowStatistics.get(switchId).size();
871     }
872
873     /*
874      * InventoryShim replay for us all the switch addition which happened before
875      * we were brought up
876      */
877     @Override
878     public void updateNode(Node node, UpdateType type, Set<Property> props) {
879         Long switchId = (Long) node.getID();
880         switch (type) {
881         case ADDED:
882             addStatisticsTicks(switchId);
883             break;
884         case REMOVED:
885             clearFlowStatsAndTicks(switchId);
886         default:
887         }
888     }
889
890     @Override
891     public void updateNodeConnector(NodeConnector nodeConnector,
892             UpdateType type, Set<Property> props) {
893         // No action
894     }
895
896     /**
897      * Update the cached port rates for this switch with the latest retrieved
898      * port transmit byte count
899      * 
900      * @param switchId
901      */
902     private synchronized void updatePortsTxRate(long switchId) {
903         List<OFStatistics> newPortStatistics = this.portStatistics
904                 .get(switchId);
905         if (newPortStatistics == null) {
906             return;
907         }
908         Map<Short, TxRates> rates = this.txRates.get(switchId);
909         if (rates == null) {
910             // First time rates for this switch are added
911             rates = new HashMap<Short, TxRates>();
912             txRates.put(switchId, rates);
913         }
914         for (OFStatistics stats : newPortStatistics) {
915             OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats;
916             short port = newPortStat.getPortNumber();
917             TxRates portRatesHolder = rates.get(port);
918             if (portRatesHolder == null) {
919                 // First time rates for this port are added
920                 portRatesHolder = new TxRates();
921                 rates.put(port, portRatesHolder);
922             }
923             // Get and store the number of transmitted bytes for this port
924             // And handle the case where agent does not support the counter
925             long transmitBytes = newPortStat.getTransmitBytes();
926             long value = (transmitBytes < 0) ? 0 : transmitBytes;
927             portRatesHolder.update(value);
928         }
929     }
930
931     @Override
932     public synchronized long getTransmitRate(Long switchId, Short port) {
933         long average = 0;
934         if (switchId == null || port == null) {
935             return average;
936         }
937         Map<Short, TxRates> perSwitch = txRates.get(switchId);
938         if (perSwitch == null) {
939             return average;
940         }
941         TxRates portRates = perSwitch.get(port);
942         if (portRates == null) {
943             return average;
944         }
945         return portRates.getAverageTxRate();
946     }
947
948     /*
949      * Manual switch name configuration code
950      */
951     @Override
952     public String getHelp() {
953         StringBuffer help = new StringBuffer();
954         help.append("---OF Statistics Manager utilities---\n");
955         help.append("\t ofdumpstatsmgr         - "
956                 + "Print Internal Stats Mgr db\n");
957         help.append("\t ofstatsmgrintervals <fP> <pP> <dP>(in seconds) - "
958                 + "Set/Show flow/port/dedscription stats poll intervals\n");
959         return help.toString();
960     }
961
962     private boolean isValidSwitchId(String switchId) {
963         String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$";
964         String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$";
965
966         return (switchId != null && (switchId.matches(regexDatapathID) || switchId
967                 .matches(regexDatapathIDLong)));
968     }
969
970     public long getSwitchIDLong(String switchId) {
971         int radix = 16;
972         String switchString = "0";
973
974         if (isValidSwitchId(switchId)) {
975             if (switchId.contains(":")) {
976                 // Handle the 00:00:AA:BB:CC:DD:EE:FF notation
977                 switchString = switchId.replace(":", "");
978             } else if (switchId.contains("-")) {
979                 // Handle the 00-00-AA-BB-CC-DD-EE-FF notation
980                 switchString = switchId.replace("-", "");
981             } else {
982                 // Handle the 0123456789ABCDEF notation
983                 switchString = switchId;
984             }
985         }
986         return Long.parseLong(switchString, radix);
987     }
988
989     /*
990      * Internal information dump code
991      */
992     private String prettyPrintSwitchMap(ConcurrentMap<Long, StatisticsTicks> map) {
993         StringBuffer buffer = new StringBuffer();
994         buffer.append("{");
995         for (Entry<Long, StatisticsTicks> entry : map.entrySet()) {
996             buffer.append(HexString.toHexString(entry.getKey()) + "="
997                     + entry.getValue().toString() + " ");
998         }
999         buffer.append("}");
1000         return buffer.toString();
1001     }
1002
1003     public void _ofdumpstatsmgr(CommandInterpreter ci) {
1004         ci.println("Global Counter: " + counter);
1005         ci.println("Timer Ticks: " + prettyPrintSwitchMap(statisticsTimerTicks));
1006         ci.println("PendingStatsQueue: " + pendingStatsRequests);
1007         ci.println("PendingStatsQueue size: " + pendingStatsRequests.size());
1008         ci.println("Stats Collector alive: " + statisticsCollector.isAlive());
1009         ci.println("Stats Collector State: "
1010                 + statisticsCollector.getState().toString());
1011         ci.println("StatsTimer: " + statisticsTimer.toString());
1012         ci.println("Flow Stats Period: " + statisticsTickNumber + " s");
1013         ci.println("Desc Stats Period: " + descriptionTickNumber + " s");
1014         ci.println("Port Stats Period: " + portTickNumber + " s");
1015         ci.println("Table Stats Period: " + tableTickNumber + " s");
1016     }
1017
1018     public void _resetSwitchCapability(CommandInterpreter ci) {
1019         String sidString = ci.nextArgument();
1020         Long sid = null;
1021         if (sidString == null) {
1022             ci.println("Insert the switch id (numeric value)");
1023             return;
1024         }
1025         try {
1026             sid = Long.valueOf(sidString);
1027             this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE);
1028             ci.println("Vendor capability for switch " + sid + " set to "
1029                     + this.switchSupportsVendorExtStats.get(sid));
1030         } catch (NumberFormatException e) {
1031             ci.println("Invalid switch id. Has to be numeric.");
1032         }
1033
1034     }
1035
1036     public void _ofbw(CommandInterpreter ci) {
1037         String sidString = ci.nextArgument();
1038         Long sid = null;
1039         if (sidString == null) {
1040             ci.println("Insert the switch id (numeric value)");
1041             return;
1042         }
1043         try {
1044             sid = Long.valueOf(sidString);
1045         } catch (NumberFormatException e) {
1046             ci.println("Invalid switch id. Has to be numeric.");
1047         }
1048         if (sid != null) {
1049             Map<Short, TxRates> thisSwitchRates = txRates.get(sid);
1050             ci.println("Bandwidth utilization (" + factoredSamples
1051                     * portTickNumber + " sec average) for switch "
1052                     + HexEncode.longToHexString(sid) + ":");
1053             if (thisSwitchRates == null) {
1054                 ci.println("Not available");
1055             } else {
1056                 for (Entry<Short, TxRates> entry : thisSwitchRates.entrySet()) {
1057                     ci.println("Port: " + entry.getKey() + ": "
1058                             + entry.getValue().getAverageTxRate() + " bps");
1059                 }
1060             }
1061         }
1062     }
1063
1064     public void _txratewindow(CommandInterpreter ci) {
1065         String averageWindow = ci.nextArgument();
1066         short seconds = 0;
1067         if (averageWindow == null) {
1068             ci.println("Insert the length in seconds of the median "
1069                     + "window for tx rate");
1070             ci.println("Current: " + factoredSamples * portTickNumber + " secs");
1071             return;
1072         }
1073         try {
1074             seconds = Short.valueOf(averageWindow);
1075         } catch (NumberFormatException e) {
1076             ci.println("Invalid period.");
1077         }
1078         OFStatisticsManager.factoredSamples = (short) (seconds / portTickNumber);
1079         ci.println("New: " + factoredSamples * portTickNumber + " secs");
1080     }
1081
1082     public void _ofstatsmgrintervals(CommandInterpreter ci) {
1083         String flowStatsInterv = ci.nextArgument();
1084         String portStatsInterv = ci.nextArgument();
1085         String descStatsInterv = ci.nextArgument();
1086         String tableStatsInterv = ci.nextArgument();
1087
1088         if (flowStatsInterv == null || portStatsInterv == null
1089                 || descStatsInterv == null) {
1090             ci.println("Usage: ostatsmgrintervals <fP> <pP> <dP>(in seconds)");
1091             ci.println("Current Values: fP=" + statisticsTickNumber + "s pP="
1092                     + portTickNumber + "s dP=" + descriptionTickNumber + "s");
1093             return;
1094         }
1095         Short fP, pP, dP, tP;
1096         try {
1097             fP = Short.parseShort(flowStatsInterv);
1098             pP = Short.parseShort(portStatsInterv);
1099             dP = Short.parseShort(descStatsInterv);
1100             tP = Short.parseShort(tableStatsInterv);
1101         } catch (Exception e) {
1102             ci.println("Invalid format values: " + e.getMessage());
1103             return;
1104         }
1105
1106         if (pP <= 1 || fP <= 1 || dP <= 1 || tP <= 1) {
1107             ci.println("Invalid values. fP, pP, dP, tP have to be greater than 1.");
1108             return;
1109         }
1110
1111         statisticsTickNumber = fP;
1112         portTickNumber = pP;
1113         descriptionTickNumber = dP;
1114         tableTickNumber = tP;
1115
1116         ci.println("New Values: fP=" + statisticsTickNumber + "s pP="
1117                 + portTickNumber + "s dP=" + descriptionTickNumber + "s tP="
1118                 + tableTickNumber + "s");
1119     }
1120
1121     /**
1122      * This method retrieves user configurations from config.ini and updates
1123      * statisticsTickNumber/portTickNumber/descriptionTickNumber accordingly.
1124      */
1125     private void configStatsPollIntervals() {
1126         String fsStr = System.getProperty("of.flowStatsPollInterval");
1127         String psStr = System.getProperty("of.portStatsPollInterval");
1128         String dsStr = System.getProperty("of.descStatsPollInterval");
1129         String tsStr = System.getProperty("of.tableStatsPollInterval");
1130         Short fs, ps, ds, ts;
1131
1132         if (fsStr != null) {
1133             try {
1134                 fs = Short.parseShort(fsStr);
1135                 if (fs > 0) {
1136                     statisticsTickNumber = fs;
1137                 }
1138             } catch (Exception e) {
1139             }
1140         }
1141
1142         if (psStr != null) {
1143             try {
1144                 ps = Short.parseShort(psStr);
1145                 if (ps > 0) {
1146                     portTickNumber = ps;
1147                 }
1148             } catch (Exception e) {
1149             }
1150         }
1151
1152         if (dsStr != null) {
1153             try {
1154                 ds = Short.parseShort(dsStr);
1155                 if (ds > 0) {
1156                     descriptionTickNumber = ds;
1157                 }
1158             } catch (Exception e) {
1159             }
1160         }
1161
1162         if (tsStr != null) {
1163             try{
1164                 ts = Short.parseShort(tsStr);
1165                 if (ts > 0) {
1166                     tableTickNumber = ts;
1167                 }
1168             } catch (Exception e) {
1169             }
1170         }
1171     }
1172 }