2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11 import java.nio.ByteBuffer;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.Deque;
15 import java.util.HashMap;
16 import java.util.List;
18 import java.util.Map.Entry;
20 import java.util.Timer;
21 import java.util.TimerTask;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.CopyOnWriteArraySet;
26 import java.util.concurrent.LinkedBlockingDeque;
27 import java.util.concurrent.LinkedBlockingQueue;
29 import org.eclipse.osgi.framework.console.CommandInterpreter;
30 import org.eclipse.osgi.framework.console.CommandProvider;
31 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
32 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsListener;
33 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
34 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
36 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
37 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
38 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
39 import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
40 import org.opendaylight.controller.sal.core.Node;
41 import org.opendaylight.controller.sal.core.NodeConnector;
42 import org.opendaylight.controller.sal.core.Property;
43 import org.opendaylight.controller.sal.core.UpdateType;
44 import org.opendaylight.controller.sal.utils.HexEncode;
45 import org.openflow.protocol.OFError;
46 import org.openflow.protocol.OFMatch;
47 import org.openflow.protocol.OFPort;
48 import org.openflow.protocol.OFStatisticsRequest;
49 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
50 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
51 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
52 import org.openflow.protocol.statistics.OFPortStatisticsReply;
53 import org.openflow.protocol.statistics.OFPortStatisticsRequest;
54 import org.openflow.protocol.statistics.OFQueueStatisticsRequest;
55 import org.openflow.protocol.statistics.OFStatistics;
56 import org.openflow.protocol.statistics.OFStatisticsType;
57 import org.openflow.protocol.statistics.OFTableStatistics;
58 import org.openflow.protocol.statistics.OFVendorStatistics;
59 import org.openflow.util.HexString;
60 import org.osgi.framework.BundleContext;
61 import org.osgi.framework.FrameworkUtil;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
66 * Periodically polls the different OF statistics from the OF switches, caches
67 * them, and publishes results towards SAL. It also provides an API to directly
68 * query the switch for any specific statistics.
70 public class OFStatisticsManager implements IOFStatisticsManager, IInventoryShimExternalListener, CommandProvider {
71 private static final Logger log = LoggerFactory.getLogger(OFStatisticsManager.class);
72 private static final int INITIAL_SIZE = 64;
73 private static final long FLOW_STATS_PERIOD = 10000;
74 private static final long DESC_STATS_PERIOD = 60000;
75 private static final long PORT_STATS_PERIOD = 5000;
76 private static final long TABLE_STATS_PERIOD = 10000;
77 private static final long TICK = 1000;
78 private static short statisticsTickNumber = (short) (FLOW_STATS_PERIOD / TICK);
79 private static short descriptionTickNumber = (short) (DESC_STATS_PERIOD / TICK);
80 private static short portTickNumber = (short) (PORT_STATS_PERIOD / TICK);
81 private static short tableTickNumber = (short) (TABLE_STATS_PERIOD / TICK);
82 private static short factoredSamples = (short) 2;
83 private static short counter = 1;
84 private IController controller = null;
85 private ConcurrentMap<Long, List<OFStatistics>> flowStatistics;
86 private ConcurrentMap<Long, List<OFStatistics>> descStatistics;
87 private ConcurrentMap<Long, List<OFStatistics>> portStatistics;
88 private ConcurrentMap<Long, List<OFStatistics>> tableStatistics;
89 private ConcurrentMap<Long, StatisticsTicks> statisticsTimerTicks;
90 protected BlockingQueue<StatsRequest> pendingStatsRequests;
91 protected BlockingQueue<Long> switchPortStatsUpdated;
92 private Thread statisticsCollector;
93 private Thread txRatesUpdater;
94 private Timer statisticsTimer;
95 private TimerTask statisticsTimerTask;
96 private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
97 // Per port sampled (every portStatsPeriod) transmit rate
98 private Map<Long, Map<Short, TxRates>> txRates;
99 private Set<IOFStatisticsListener> statisticsListeners = new CopyOnWriteArraySet<IOFStatisticsListener>();
102 * The object containing the latest factoredSamples tx rate samples for a
105 protected class TxRates {
106 // contains the latest factoredSamples sampled transmitted bytes
107 Deque<Long> sampledTxBytes;
110 sampledTxBytes = new LinkedBlockingDeque<Long>();
113 public void update(Long txBytes) {
115 * Based on how many samples our average works on, we might have to
116 * remove the oldest sample
118 if (sampledTxBytes.size() == factoredSamples) {
119 sampledTxBytes.removeLast();
122 // Add the latest sample to the top of the queue
123 sampledTxBytes.addFirst(txBytes);
127 * Returns the average transmit rate in bps
129 * @return the average transmit rate [bps]
131 public long getAverageTxRate() {
134 * If we cannot provide the value for the time window length set
136 if (sampledTxBytes.size() < factoredSamples) {
139 long increment = sampledTxBytes.getFirst() - sampledTxBytes
141 long timePeriod = factoredSamples * PORT_STATS_PERIOD / TICK;
142 average = (8L * increment) / timePeriod;
147 public void setController(IController core) {
148 this.controller = core;
151 public void unsetController(IController core) {
152 if (this.controller == core) {
153 this.controller = null;
157 private short getStatsQueueSize() {
158 String statsQueueSizeStr = System.getProperty("of.statsQueueSize");
159 short statsQueueSize = INITIAL_SIZE;
160 if (statsQueueSizeStr != null) {
162 statsQueueSize = Short.parseShort(statsQueueSizeStr);
163 if (statsQueueSize <= 0) {
164 statsQueueSize = INITIAL_SIZE;
166 } catch (Exception e) {
169 return statsQueueSize;
172 IPluginOutConnectionService connectionPluginOutService;
173 void setIPluginOutConnectionService(IPluginOutConnectionService s) {
174 connectionPluginOutService = s;
177 void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
178 if (connectionPluginOutService == s) {
179 connectionPluginOutService = null;
184 * Function called by the dependency manager when all the required
185 * dependencies are satisfied
189 flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
190 descStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
191 portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
192 tableStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
193 pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(getStatsQueueSize());
194 statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(INITIAL_SIZE);
195 switchPortStatsUpdated = new LinkedBlockingQueue<Long>(INITIAL_SIZE);
196 switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(INITIAL_SIZE);
197 txRates = new HashMap<Long, Map<Short, TxRates>>(INITIAL_SIZE);
199 configStatsPollIntervals();
201 // Initialize managed timers
202 statisticsTimer = new Timer("Statistics Timer Ticks");
203 statisticsTimerTask = new TimerTask() {
210 // Initialize Statistics collector thread
211 statisticsCollector = new Thread(new Runnable() {
216 StatsRequest req = pendingStatsRequests.take();
217 queryStatisticsInternal(req.switchId, req.type);
218 } catch (InterruptedException e) {
219 log.warn("Flow Statistics Collector thread "
225 }, "Statistics Collector");
227 // Initialize Tx Rate Updater thread
228 txRatesUpdater = new Thread(new Runnable() {
233 long switchId = switchPortStatsUpdated.take();
234 updatePortsTxRate(switchId);
235 } catch (InterruptedException e) {
236 log.warn("TX Rate Updater thread interrupted", e);
241 }, "TX Rate Updater");
245 * Function called by the dependency manager when at least one dependency
246 * become unsatisfied or when the component is shutting down because for
247 * example bundle is being stopped.
251 statisticsListeners.clear();
255 * Function called by dependency manager after "init ()" is called and after
256 * the services provided by the class are registered in the service registry
260 // Start managed timers
261 statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, TICK);
263 // Start statistics collector thread
264 statisticsCollector.start();
266 // Start bandwidth utilization computer thread
267 txRatesUpdater.start();
270 registerWithOSGIConsole();
274 * Function called by the dependency manager before the services exported by
275 * the component are unregistered, this will be followed by a "destroy ()"
280 // Stop managed timers
281 statisticsTimer.cancel();
284 public void setStatisticsListener(IOFStatisticsListener s) {
285 this.statisticsListeners.add(s);
288 public void unsetStatisticsListener(IOFStatisticsListener s) {
290 this.statisticsListeners.remove(s);
294 private void registerWithOSGIConsole() {
295 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
296 bundleContext.registerService(CommandProvider.class.getName(), this, null);
299 private static class StatsRequest {
300 protected Long switchId;
301 protected OFStatisticsType type;
303 public StatsRequest(Long d, OFStatisticsType t) {
309 public String toString() {
310 return "SReq = {switchId=" + switchId + ", type=" + type + "}";
314 public int hashCode() {
315 final int prime = 31;
317 result = prime * result
318 + ((switchId == null) ? 0 : switchId.hashCode());
319 result = prime * result + ((type == null) ? 0 : type.ordinal());
324 public boolean equals(Object obj) {
331 if (getClass() != obj.getClass()) {
334 StatsRequest other = (StatsRequest) obj;
335 if (switchId == null) {
336 if (other.switchId != null) {
339 } else if (!switchId.equals(other.switchId)) {
342 if (type != other.type) {
349 private void addStatisticsTicks(Long switchId) {
350 switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume
356 statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
357 log.debug("Added Switch {} to target pool",
358 HexString.toHexString(switchId.longValue()));
361 protected static class StatisticsTicks {
362 private short flowStatisticsTicks;
363 private short descriptionTicks;
364 private short portStatisticsTicks;
365 private short tableStatisticsTicks;
367 public StatisticsTicks(boolean scattered) {
369 // scatter bursts by statisticsTickPeriod
372 } // being paranoid here
373 flowStatisticsTicks = (short) (1 + counter
374 % statisticsTickNumber);
375 descriptionTicks = (short) (1 + counter % descriptionTickNumber);
376 portStatisticsTicks = (short) (1 + counter % portTickNumber);
377 tableStatisticsTicks = (short) (1 + counter % tableTickNumber);
379 flowStatisticsTicks = statisticsTickNumber;
380 descriptionTicks = descriptionTickNumber;
381 portStatisticsTicks = portTickNumber;
382 tableStatisticsTicks = tableTickNumber;
386 public boolean decrementFlowTicksIsZero() {
387 // Please ensure no code is inserted between the if check and the
388 // flowStatisticsTicks reset
389 if (--flowStatisticsTicks == 0) {
390 flowStatisticsTicks = statisticsTickNumber;
396 public boolean decrementDescTicksIsZero() {
397 // Please ensure no code is inserted between the if check and the
398 // descriptionTicks reset
399 if (--descriptionTicks == 0) {
400 descriptionTicks = descriptionTickNumber;
406 public boolean decrementPortTicksIsZero() {
407 // Please ensure no code is inserted between the if check and the
408 // descriptionTicks reset
409 if (--portStatisticsTicks == 0) {
410 portStatisticsTicks = portTickNumber;
416 public boolean decrementTableTicksIsZero() {
417 // Please ensure no code is inserted between the if check and the
418 // descriptionTicks reset
419 if(--tableStatisticsTicks == 0) {
420 tableStatisticsTicks = tableTickNumber;
427 public String toString() {
428 return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks
429 + ",pT=" + portStatisticsTicks + ",tT=" + tableStatisticsTicks + "}";
433 private void printInfoMessage(String type, StatsRequest request) {
434 log.info("{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
435 new Object[] {type, HexString.toHexString(request.switchId), pendingStatsRequests.size(),
436 statisticsCollector.getState().toString() });
439 protected void decrementTicks() {
440 StatsRequest request = null;
441 for (Map.Entry<Long, StatisticsTicks> entry : statisticsTimerTicks
443 StatisticsTicks clock = entry.getValue();
444 Long switchId = entry.getKey();
445 if (clock.decrementFlowTicksIsZero()) {
446 request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ?
447 new StatsRequest(switchId, OFStatisticsType.VENDOR) :
448 new StatsRequest(switchId, OFStatisticsType.FLOW);
449 // If a request for this switch is already in the queue, skip to
450 // add this new request
451 if (!pendingStatsRequests.contains(request)
452 && false == pendingStatsRequests.offer(request)) {
453 printInfoMessage("Flow", request);
457 if (clock.decrementDescTicksIsZero()) {
458 request = new StatsRequest(switchId, OFStatisticsType.DESC);
459 // If a request for this switch is already in the queue, skip to
460 // add this new request
461 if (!pendingStatsRequests.contains(request)
462 && false == pendingStatsRequests.offer(request)) {
463 printInfoMessage("Description", request);
467 if (clock.decrementPortTicksIsZero()) {
468 request = new StatsRequest(switchId, OFStatisticsType.PORT);
469 // If a request for this switch is already in the queue, skip to
470 // add this new request
471 if (!pendingStatsRequests.contains(request)
472 && false == pendingStatsRequests.offer(request)) {
473 printInfoMessage("Port", request);
477 if(clock.decrementTableTicksIsZero()) {
478 request = new StatsRequest(switchId, OFStatisticsType.TABLE);
479 // If a request for this switch is already in the queue, skip to
480 // add this new request
481 if (!pendingStatsRequests.contains(request)
482 && false == pendingStatsRequests.offer(request)) {
483 printInfoMessage("Table", request);
489 private void removeStatsRequestTasks(Long switchId) {
490 log.debug("Cleaning Statistics database for switch {}",
491 HexEncode.longToHexString(switchId));
492 // To be safe, let's attempt removal of both VENDOR and FLOW request. It
494 pendingStatsRequests.remove(new StatsRequest(switchId,
495 OFStatisticsType.VENDOR));
496 pendingStatsRequests.remove(new StatsRequest(switchId,
497 OFStatisticsType.FLOW));
498 pendingStatsRequests.remove(new StatsRequest(switchId,
499 OFStatisticsType.DESC));
500 pendingStatsRequests.remove(new StatsRequest(switchId,
501 OFStatisticsType.PORT));
502 pendingStatsRequests.remove(new StatsRequest(switchId,
503 OFStatisticsType.TABLE));
504 // Take care of the TX rate databases
505 switchPortStatsUpdated.remove(switchId);
506 txRates.remove(switchId);
509 private void clearFlowStatsAndTicks(Long switchId) {
510 statisticsTimerTicks.remove(switchId);
511 removeStatsRequestTasks(switchId);
512 flowStatistics.remove(switchId);
513 log.debug("Statistics removed for switch {}",
514 HexString.toHexString(switchId));
517 private void queryStatisticsInternal(Long switchId, OFStatisticsType statType) {
519 // Query the switch on all matches
520 List<OFStatistics> values = this.fetchStatisticsFromSwitch(switchId, statType, null);
522 // If got a valid response update local cache and notify listeners
523 if (!values.isEmpty()) {
527 flowStatistics.put(switchId, values);
528 notifyFlowUpdate(switchId, values);
532 descStatistics.put(switchId, values);
533 // Notify who may be interested in a description change
534 notifyDescriptionUpdate(switchId, values);
537 // Overwrite cache with new port statistics for this switch
538 portStatistics.put(switchId, values);
540 // Wake up the thread which maintains the TX byte counters for
542 switchPortStatsUpdated.offer(switchId);
543 notifyPortUpdate(switchId, values);
547 tableStatistics.put(switchId, values);
548 notifyTableUpdate(switchId, values);
555 private void notifyDescriptionUpdate(Long switchId, List<OFStatistics> values) {
556 for (IOFStatisticsListener l : this.statisticsListeners) {
557 l.descriptionStatisticsRefreshed(switchId, values);
561 private void notifyFlowUpdate(Long switchId, List<OFStatistics> values) {
562 if (values.get(0) instanceof OFVendorStatistics) {
563 values = this.v6StatsListToOFStatsList(values);
566 for (IOFStatisticsListener l : this.statisticsListeners) {
567 l.flowStatisticsRefreshed(switchId, values);
572 private void notifyPortUpdate(Long switchId, List<OFStatistics> values) {
573 for (IOFStatisticsListener l : this.statisticsListeners) {
574 l.portStatisticsRefreshed(switchId, values);
578 private void notifyTableUpdate(Long switchId, List<OFStatistics> values) {
579 for (IOFStatisticsListener l : this.statisticsListeners) {
580 l.tableStatisticsRefreshed(switchId, values);
585 * Generic function to get the statistics form an OF switch
587 @SuppressWarnings("unchecked")
588 private List<OFStatistics> fetchStatisticsFromSwitch(Long switchId,
589 OFStatisticsType statsType, Object target) {
590 List<OFStatistics> values = Collections.emptyList();
592 ISwitch sw = controller.getSwitch(switchId);
595 OFStatisticsRequest req = new OFStatisticsRequest();
596 req.setStatisticType(statsType);
597 int requestLength = req.getLengthU();
599 if (statsType == OFStatisticsType.FLOW) {
600 OFMatch match = null;
601 if (target == null) {
603 match = new OFMatch();
604 match.setWildcards(0xffffffff);
605 } else if (!(target instanceof OFMatch)) {
607 log.warn("Invalid target type for Flow stats request: {}",
609 return Collections.emptyList();
611 // Specific flow request
612 match = (OFMatch) target;
614 OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest();
615 specificReq.setMatch(match);
616 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
617 specificReq.setTableId((byte) 0xff);
618 req.setStatistics(Collections
619 .singletonList((OFStatistics) specificReq));
620 requestLength += specificReq.getLength();
622 } else if (statsType == OFStatisticsType.VENDOR) {
623 V6StatsRequest specificReq = new V6StatsRequest();
624 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
625 specificReq.setTableId((byte) 0xff);
626 req.setStatistics(Collections
627 .singletonList((OFStatistics) specificReq));
628 requestLength += specificReq.getLength();
630 } else if (statsType == OFStatisticsType.AGGREGATE) {
631 OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest();
632 OFMatch match = new OFMatch();
633 match.setWildcards(0xffffffff);
634 specificReq.setMatch(match);
635 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
636 specificReq.setTableId((byte) 0xff);
637 req.setStatistics(Collections
638 .singletonList((OFStatistics) specificReq));
639 requestLength += specificReq.getLength();
641 } else if (statsType == OFStatisticsType.PORT) {
643 if (target == null) {
645 targetPort = OFPort.OFPP_NONE.getValue();
646 } else if (!(target instanceof Short)) {
648 log.warn("Invalid target type for Port stats request: {}",
650 return Collections.emptyList();
652 // Specific port request
653 targetPort = (Short) target;
655 OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest();
656 specificReq.setPortNumber(targetPort);
657 req.setStatistics(Collections
658 .singletonList((OFStatistics) specificReq));
659 requestLength += specificReq.getLength();
661 } else if (statsType == OFStatisticsType.QUEUE) {
662 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
663 specificReq.setPortNumber(OFPort.OFPP_ALL.getValue());
664 specificReq.setQueueId(0xffffffff);
665 req.setStatistics(Collections
666 .singletonList((OFStatistics) specificReq));
667 requestLength += specificReq.getLength();
669 } else if (statsType == OFStatisticsType.DESC) {
671 } else if (statsType == OFStatisticsType.TABLE) {
673 if (!(target instanceof Byte)) {
675 log.warn("Invalid table id for table stats request: {}",
677 return Collections.emptyList();
679 byte targetTable = (Byte) target;
680 OFTableStatistics specificReq = new OFTableStatistics();
681 specificReq.setTableId(targetTable);
682 req.setStatistics(Collections
683 .singletonList((OFStatistics) specificReq));
684 requestLength += specificReq.getLength();
688 req.setLengthU(requestLength);
689 Object result = sw.getStatistics(req);
691 if (result == null) {
692 log.warn("Request Timed Out for ({}) from switch {}", type,
693 HexString.toHexString(switchId));
694 } else if (result instanceof OFError) {
695 log.warn("Switch {} failed to handle ({}) stats request: {}",
696 new Object[] { HexString.toHexString(switchId), type,
697 Utils.getOFErrorString((OFError) result) });
698 if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
700 "Switching back to regular Flow stats requests for switch {}",
701 HexString.toHexString(switchId));
702 this.switchSupportsVendorExtStats.put(switchId,
706 values = (List<OFStatistics>) result;
713 public List<OFStatistics> getOFFlowStatistics(Long switchId) {
714 List<OFStatistics> list = flowStatistics.get(switchId);
717 * Check on emptiness as interference between add and get is still
718 * possible on the inner list (the concurrentMap entry's value)
720 return (list == null || list.isEmpty()) ? Collections.<OFStatistics>emptyList()
721 : (list.get(0) instanceof OFVendorStatistics) ? this
722 .v6StatsListToOFStatsList(list) : list;
726 public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch, short priority) {
727 List<OFStatistics> statsList = flowStatistics.get(switchId);
730 * Check on emptiness as interference between add and get is still
731 * possible on the inner list (the concurrentMap entry's value)
733 if (statsList == null || statsList.isEmpty()) {
734 return Collections.emptyList();
737 if (statsList.get(0) instanceof OFVendorStatistics) {
739 * Caller could provide regular OF match when we instead pull the
740 * vendor statistics from this node Caller is not supposed to know
741 * whether this switch supports vendor extensions statistics
744 V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch
745 : new V6Match(ofMatch);
747 List<OFStatistics> targetList = v6StatsListToOFStatsList(statsList);
748 for (OFStatistics stats : targetList) {
749 V6StatsReply v6Stats = (V6StatsReply) stats;
750 V6Match v6Match = v6Stats.getMatch();
751 if (v6Stats.getPriority() == priority && targetMatch.equals(v6Match)) {
752 List<OFStatistics> list = new ArrayList<OFStatistics>();
758 for (OFStatistics stats : statsList) {
759 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
760 if (flowStats.getPriority() == priority && ofMatch.equals(flowStats.getMatch())) {
761 List<OFStatistics> list = new ArrayList<OFStatistics>();
767 return Collections.emptyList();
771 * Converts the v6 vendor statistics to the OFStatistics
773 private List<OFStatistics> v6StatsListToOFStatsList(List<OFStatistics> statistics) {
774 if (statistics == null || statistics.isEmpty()) {
775 return Collections.emptyList();
777 List<OFStatistics> v6statistics = new ArrayList<OFStatistics>();
778 for (OFStatistics stats : statistics) {
779 if (stats instanceof OFVendorStatistics) {
780 List<OFStatistics> r = getV6ReplyStatistics((OFVendorStatistics) stats);
782 v6statistics.addAll(r);
789 private static List<OFStatistics> getV6ReplyStatistics(
790 OFVendorStatistics stat) {
791 int length = stat.getLength();
792 List<OFStatistics> results = new ArrayList<OFStatistics>();
794 // Nicira Hdr is 12 bytes. We need at least that much
795 return Collections.emptyList();
797 ByteBuffer data = ByteBuffer.allocate(length);
800 if (log.isTraceEnabled()) {
801 log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}",
802 HexString.toHexString(data.array()));
805 int vendor = data.getInt(); // first 4 bytes is vendor id.
806 if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) {
807 log.warn("Unexpected vendor id: 0x{}", Integer.toHexString(vendor));
808 return Collections.emptyList();
810 // go ahead by 8 bytes which is 8 bytes of 0
811 data.getLong(); // should be all 0's
812 length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have
816 V6StatsReply v6statsreply;
819 v6statsreply = new V6StatsReply();
820 min_len = v6statsreply.getLength();
821 if (length < v6statsreply.getLength()) {
824 v6statsreply.setActionFactory(stat.getActionFactory());
825 v6statsreply.readFrom(data);
826 if (v6statsreply.getLength() < min_len) {
829 v6statsreply.setVendorId(vendor);
830 log.trace("V6StatsReply: {}", v6statsreply);
831 length -= v6statsreply.getLength();
832 results.add(v6statsreply);
838 public List<OFStatistics> queryStatistics(Long switchId,
839 OFStatisticsType statType, Object target) {
841 * Caller does not know and it is not supposed to know whether this
842 * switch supports vendor extension. We adjust the target for him
844 if (statType == OFStatisticsType.FLOW) {
845 if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
846 statType = OFStatisticsType.VENDOR;
850 List<OFStatistics> list = this.fetchStatisticsFromSwitch(switchId, statType, target);
852 return (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list) : list;
856 public List<OFStatistics> getOFDescStatistics(Long switchId) {
857 if (!descStatistics.containsKey(switchId)) {
858 return Collections.emptyList();
861 return descStatistics.get(switchId);
865 public List<OFStatistics> getOFPortStatistics(Long switchId) {
866 if (!portStatistics.containsKey(switchId)) {
867 return Collections.emptyList();
870 return portStatistics.get(switchId);
874 public List<OFStatistics> getOFPortStatistics(Long switchId, short portId) {
875 if (!portStatistics.containsKey(switchId)) {
876 return Collections.emptyList();
878 List<OFStatistics> list = new ArrayList<OFStatistics>(1);
879 for (OFStatistics stats : portStatistics.get(switchId)) {
880 if (((OFPortStatisticsReply) stats).getPortNumber() == portId) {
889 public List<OFStatistics> getOFTableStatistics(Long switchId) {
890 if (!tableStatistics.containsKey(switchId)) {
891 return Collections.emptyList();
894 return tableStatistics.get(switchId);
898 public List<OFStatistics> getOFTableStatistics(Long switchId, Byte tableId) {
899 if (!tableStatistics.containsKey(switchId)) {
900 return Collections.emptyList();
903 List<OFStatistics> list = new ArrayList<OFStatistics>(1);
904 for (OFStatistics stats : tableStatistics.get(switchId)) {
905 if (((OFTableStatistics) stats).getTableId() == tableId) {
914 public int getFlowsNumber(long switchId) {
915 return this.flowStatistics.get(switchId).size();
919 * InventoryShim replay for us all the switch addition which happened before
923 public void updateNode(Node node, UpdateType type, Set<Property> props) {
924 Long switchId = (Long) node.getID();
927 addStatisticsTicks(switchId);
930 clearFlowStatsAndTicks(switchId);
936 public void updateNodeConnector(NodeConnector nodeConnector,
937 UpdateType type, Set<Property> props) {
942 * Update the cached port rates for this switch with the latest retrieved
943 * port transmit byte count
947 private synchronized void updatePortsTxRate(long switchId) {
948 List<OFStatistics> newPortStatistics = this.portStatistics.get(switchId);
949 if (newPortStatistics == null) {
952 Map<Short, TxRates> rates = this.txRates.get(switchId);
954 // First time rates for this switch are added
955 rates = new HashMap<Short, TxRates>();
956 txRates.put(switchId, rates);
958 for (OFStatistics stats : newPortStatistics) {
959 OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats;
960 short port = newPortStat.getPortNumber();
961 TxRates portRatesHolder = rates.get(port);
962 if (portRatesHolder == null) {
963 // First time rates for this port are added
964 portRatesHolder = new TxRates();
965 rates.put(port, portRatesHolder);
967 // Get and store the number of transmitted bytes for this port
968 // And handle the case where agent does not support the counter
969 long transmitBytes = newPortStat.getTransmitBytes();
970 long value = (transmitBytes < 0) ? 0 : transmitBytes;
971 portRatesHolder.update(value);
976 public synchronized long getTransmitRate(Long switchId, Short port) {
978 if (switchId == null || port == null) {
981 Map<Short, TxRates> perSwitch = txRates.get(switchId);
982 if (perSwitch == null) {
985 TxRates portRates = perSwitch.get(port);
986 if (portRates == null) {
989 return portRates.getAverageTxRate();
993 * Manual switch name configuration code
996 public String getHelp() {
997 StringBuffer help = new StringBuffer();
998 help.append("---OF Statistics Manager utilities---\n");
999 help.append("\t ofdumpstatsmgr - "
1000 + "Print Internal Stats Mgr db\n");
1001 help.append("\t ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds) - "
1002 + "Set/Show flow/port/dedscription stats poll intervals\n");
1003 return help.toString();
1006 private boolean isValidSwitchId(String switchId) {
1007 String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$";
1008 String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$";
1010 return (switchId != null && (switchId.matches(regexDatapathID) || switchId
1011 .matches(regexDatapathIDLong)));
1014 public long getSwitchIDLong(String switchId) {
1016 String switchString = "0";
1018 if (isValidSwitchId(switchId)) {
1019 if (switchId.contains(":")) {
1020 // Handle the 00:00:AA:BB:CC:DD:EE:FF notation
1021 switchString = switchId.replace(":", "");
1022 } else if (switchId.contains("-")) {
1023 // Handle the 00-00-AA-BB-CC-DD-EE-FF notation
1024 switchString = switchId.replace("-", "");
1026 // Handle the 0123456789ABCDEF notation
1027 switchString = switchId;
1030 return Long.parseLong(switchString, radix);
1034 * Internal information dump code
1036 private String prettyPrintSwitchMap(ConcurrentMap<Long, StatisticsTicks> map) {
1037 StringBuffer buffer = new StringBuffer();
1039 for (Entry<Long, StatisticsTicks> entry : map.entrySet()) {
1040 buffer.append(HexString.toHexString(entry.getKey()) + "="
1041 + entry.getValue().toString() + " ");
1044 return buffer.toString();
1047 public void _ofdumpstatsmgr(CommandInterpreter ci) {
1048 ci.println("Global Counter: " + counter);
1049 ci.println("Timer Ticks: " + prettyPrintSwitchMap(statisticsTimerTicks));
1050 ci.println("PendingStatsQueue: " + pendingStatsRequests);
1051 ci.println("PendingStatsQueue size: " + pendingStatsRequests.size());
1052 ci.println("Stats Collector alive: " + statisticsCollector.isAlive());
1053 ci.println("Stats Collector State: "
1054 + statisticsCollector.getState().toString());
1055 ci.println("StatsTimer: " + statisticsTimer.toString());
1056 ci.println("Flow Stats Period: " + statisticsTickNumber + " s");
1057 ci.println("Desc Stats Period: " + descriptionTickNumber + " s");
1058 ci.println("Port Stats Period: " + portTickNumber + " s");
1059 ci.println("Table Stats Period: " + tableTickNumber + " s");
1062 public void _resetSwitchCapability(CommandInterpreter ci) {
1063 String sidString = ci.nextArgument();
1065 if (sidString == null) {
1066 ci.println("Insert the switch id (numeric value)");
1070 sid = Long.valueOf(sidString);
1071 this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE);
1072 ci.println("Vendor capability for switch " + sid + " set to "
1073 + this.switchSupportsVendorExtStats.get(sid));
1074 } catch (NumberFormatException e) {
1075 ci.println("Invalid switch id. Has to be numeric.");
1080 public void _ofbw(CommandInterpreter ci) {
1081 String sidString = ci.nextArgument();
1083 if (sidString == null) {
1084 ci.println("Insert the switch id (numeric value)");
1088 sid = Long.valueOf(sidString);
1089 } catch (NumberFormatException e) {
1090 ci.println("Invalid switch id. Has to be numeric.");
1093 Map<Short, TxRates> thisSwitchRates = txRates.get(sid);
1094 ci.println("Bandwidth utilization (" + factoredSamples
1095 * portTickNumber + " sec average) for switch "
1096 + HexEncode.longToHexString(sid) + ":");
1097 if (thisSwitchRates == null) {
1098 ci.println("Not available");
1100 for (Entry<Short, TxRates> entry : thisSwitchRates.entrySet()) {
1101 ci.println("Port: " + entry.getKey() + ": "
1102 + entry.getValue().getAverageTxRate() + " bps");
1108 public void _txratewindow(CommandInterpreter ci) {
1109 String averageWindow = ci.nextArgument();
1111 if (averageWindow == null) {
1112 ci.println("Insert the length in seconds of the median "
1113 + "window for tx rate");
1114 ci.println("Current: " + factoredSamples * portTickNumber + " secs");
1118 seconds = Short.valueOf(averageWindow);
1119 } catch (NumberFormatException e) {
1120 ci.println("Invalid period.");
1122 OFStatisticsManager.factoredSamples = (short) (seconds / portTickNumber);
1123 ci.println("New: " + factoredSamples * portTickNumber + " secs");
1126 public void _ofstatsmgrintervals(CommandInterpreter ci) {
1127 String flowStatsInterv = ci.nextArgument();
1128 String portStatsInterv = ci.nextArgument();
1129 String descStatsInterv = ci.nextArgument();
1130 String tableStatsInterv = ci.nextArgument();
1132 if (flowStatsInterv == null || portStatsInterv == null
1133 || descStatsInterv == null) {
1134 ci.println("Usage: ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds)");
1135 ci.println("Current Values: fP=" + statisticsTickNumber + "sec pP="
1136 + portTickNumber + "sec dP=" + descriptionTickNumber + "sec tP=" + tableTickNumber + " sec");
1139 Short fP, pP, dP, tP;
1141 fP = Short.parseShort(flowStatsInterv);
1142 pP = Short.parseShort(portStatsInterv);
1143 dP = Short.parseShort(descStatsInterv);
1144 tP = Short.parseShort(tableStatsInterv);
1145 } catch (Exception e) {
1146 ci.println("Invalid format values: " + e.getMessage());
1150 if (pP <= 1 || fP <= 1 || dP <= 1 || tP <= 1) {
1151 ci.println("Invalid values. fP, pP, dP, tP have to be greater than 1.");
1155 statisticsTickNumber = fP;
1156 portTickNumber = pP;
1157 descriptionTickNumber = dP;
1158 tableTickNumber = tP;
1160 ci.println("New Values: fP=" + statisticsTickNumber + "s pP="
1161 + portTickNumber + "s dP=" + descriptionTickNumber + "s tP="
1162 + tableTickNumber + "s");
1166 * This method retrieves user configurations from config.ini and updates
1167 * statisticsTickNumber/portTickNumber/descriptionTickNumber accordingly.
1169 private void configStatsPollIntervals() {
1170 String fsStr = System.getProperty("of.flowStatsPollInterval");
1171 String psStr = System.getProperty("of.portStatsPollInterval");
1172 String dsStr = System.getProperty("of.descStatsPollInterval");
1173 String tsStr = System.getProperty("of.tableStatsPollInterval");
1174 Short fs, ps, ds, ts;
1176 if (fsStr != null) {
1178 fs = Short.parseShort(fsStr);
1180 statisticsTickNumber = fs;
1182 } catch (Exception e) {
1186 if (psStr != null) {
1188 ps = Short.parseShort(psStr);
1190 portTickNumber = ps;
1192 } catch (Exception e) {
1196 if (dsStr != null) {
1198 ds = Short.parseShort(dsStr);
1200 descriptionTickNumber = ds;
1202 } catch (Exception e) {
1206 if (tsStr != null) {
1208 ts = Short.parseShort(tsStr);
1210 tableTickNumber = ts;
1212 } catch (Exception e) {