2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11 import java.nio.ByteBuffer;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.Deque;
15 import java.util.HashMap;
16 import java.util.List;
18 import java.util.Map.Entry;
20 import java.util.Timer;
21 import java.util.TimerTask;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.CopyOnWriteArraySet;
26 import java.util.concurrent.LinkedBlockingDeque;
27 import java.util.concurrent.LinkedBlockingQueue;
29 import org.eclipse.osgi.framework.console.CommandInterpreter;
30 import org.eclipse.osgi.framework.console.CommandProvider;
31 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
32 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsListener;
33 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
34 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
36 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
37 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
38 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
39 import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
40 import org.opendaylight.controller.sal.core.Node;
41 import org.opendaylight.controller.sal.core.NodeConnector;
42 import org.opendaylight.controller.sal.core.Property;
43 import org.opendaylight.controller.sal.core.UpdateType;
44 import org.opendaylight.controller.sal.utils.HexEncode;
45 import org.openflow.protocol.OFError;
46 import org.openflow.protocol.OFMatch;
47 import org.openflow.protocol.OFPort;
48 import org.openflow.protocol.OFStatisticsRequest;
49 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
50 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
51 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
52 import org.openflow.protocol.statistics.OFPortStatisticsReply;
53 import org.openflow.protocol.statistics.OFPortStatisticsRequest;
54 import org.openflow.protocol.statistics.OFQueueStatisticsRequest;
55 import org.openflow.protocol.statistics.OFStatistics;
56 import org.openflow.protocol.statistics.OFStatisticsType;
57 import org.openflow.protocol.statistics.OFTableStatistics;
58 import org.openflow.protocol.statistics.OFVendorStatistics;
59 import org.openflow.util.HexString;
60 import org.osgi.framework.BundleContext;
61 import org.osgi.framework.FrameworkUtil;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
66 * It periodically polls the different OF statistics from the OF switches and
67 * caches them for quick retrieval for the above layers' modules It also
68 * provides an API to directly query the switch about the statistics
70 public class OFStatisticsManager implements IOFStatisticsManager,
71 IInventoryShimExternalListener, CommandProvider {
72 private static final Logger log = LoggerFactory.getLogger(OFStatisticsManager.class);
73 private static final int INITIAL_SIZE = 64;
74 private static final long FLOW_STATS_PERIOD = 10000;
75 private static final long DESC_STATS_PERIOD = 60000;
76 private static final long PORT_STATS_PERIOD = 5000;
77 private static final long TABLE_STATS_PERIOD = 10000;
78 private static final long TICK = 1000;
79 private static short statisticsTickNumber = (short) (FLOW_STATS_PERIOD / TICK);
80 private static short descriptionTickNumber = (short) (DESC_STATS_PERIOD / TICK);
81 private static short portTickNumber = (short) (PORT_STATS_PERIOD / TICK);
82 private static short tableTickNumber = (short) (TABLE_STATS_PERIOD / TICK);
83 private static short factoredSamples = (short) 2;
84 private static short counter = 1;
85 private IController controller = null;
86 private ConcurrentMap<Long, List<OFStatistics>> flowStatistics;
87 private ConcurrentMap<Long, List<OFStatistics>> descStatistics;
88 private ConcurrentMap<Long, List<OFStatistics>> portStatistics;
89 private ConcurrentMap<Long, List<OFStatistics>> tableStatistics;
90 private List<OFStatistics> dummyList;
91 private ConcurrentMap<Long, StatisticsTicks> statisticsTimerTicks;
92 protected BlockingQueue<StatsRequest> pendingStatsRequests;
93 protected BlockingQueue<Long> switchPortStatsUpdated;
94 private Thread statisticsCollector;
95 private Thread txRatesUpdater;
96 private Timer statisticsTimer;
97 private TimerTask statisticsTimerTask;
98 private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
99 // Per port sampled (every portStatsPeriod) transmit rate
100 private Map<Long, Map<Short, TxRates>> txRates;
101 private Set<IOFStatisticsListener> statisticsListeners;
104 * The object containing the latest factoredSamples tx rate samples for a
107 protected class TxRates {
108 // contains the latest factoredSamples sampled transmitted bytes
109 Deque<Long> sampledTxBytes;
112 sampledTxBytes = new LinkedBlockingDeque<Long>();
115 public void update(Long txBytes) {
117 * Based on how many samples our average works on, we might have to
118 * remove the oldest sample
120 if (sampledTxBytes.size() == factoredSamples) {
121 sampledTxBytes.removeLast();
124 // Add the latest sample to the top of the queue
125 sampledTxBytes.addFirst(txBytes);
129 * Returns the average transmit rate in bps
131 * @return the average transmit rate [bps]
133 public long getAverageTxRate() {
136 * If we cannot provide the value for the time window length set
138 if (sampledTxBytes.size() < factoredSamples) {
141 long increment = sampledTxBytes.getFirst() - sampledTxBytes
143 long timePeriod = factoredSamples * PORT_STATS_PERIOD / TICK;
144 average = (8L * increment) / timePeriod;
149 public void setController(IController core) {
150 this.controller = core;
153 public void unsetController(IController core) {
154 if (this.controller == core) {
155 this.controller = null;
159 private short getStatsQueueSize() {
160 String statsQueueSizeStr = System.getProperty("of.statsQueueSize");
161 short statsQueueSize = INITIAL_SIZE;
162 if (statsQueueSizeStr != null) {
164 statsQueueSize = Short.parseShort(statsQueueSizeStr);
165 if (statsQueueSize <= 0) {
166 statsQueueSize = INITIAL_SIZE;
168 } catch (Exception e) {
171 return statsQueueSize;
174 IPluginOutConnectionService connectionPluginOutService;
175 void setIPluginOutConnectionService(IPluginOutConnectionService s) {
176 connectionPluginOutService = s;
179 void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
180 if (connectionPluginOutService == s) {
181 connectionPluginOutService = null;
186 * Function called by the dependency manager when all the required
187 * dependencies are satisfied
191 flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
192 descStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
193 portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
194 tableStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
195 dummyList = new ArrayList<OFStatistics>(1);
196 pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(getStatsQueueSize());
197 statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(INITIAL_SIZE);
198 switchPortStatsUpdated = new LinkedBlockingQueue<Long>(INITIAL_SIZE);
199 switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(INITIAL_SIZE);
200 txRates = new HashMap<Long, Map<Short, TxRates>>(INITIAL_SIZE);
201 statisticsListeners = new CopyOnWriteArraySet<IOFStatisticsListener>();
203 configStatsPollIntervals();
205 // Initialize managed timers
206 statisticsTimer = new Timer();
207 statisticsTimerTask = new TimerTask() {
214 // Initialize Statistics collector thread
215 statisticsCollector = new Thread(new Runnable() {
220 StatsRequest req = pendingStatsRequests.take();
221 queryStatisticsInternal(req.switchId, req.type);
222 } catch (InterruptedException e) {
223 log.warn("Flow Statistics Collector thread "
228 }, "Statistics Collector");
230 // Initialize Tx Rate Updater thread
231 txRatesUpdater = new Thread(new Runnable() {
236 long switchId = switchPortStatsUpdated.take();
237 updatePortsTxRate(switchId);
238 } catch (InterruptedException e) {
239 log.warn("TX Rate Updater thread interrupted", e);
243 }, "TX Rate Updater");
247 * Function called by the dependency manager when at least one dependency
248 * become unsatisfied or when the component is shutting down because for
249 * example bundle is being stopped.
256 * Function called by dependency manager after "init ()" is called and after
257 * the services provided by the class are registered in the service registry
261 // Start managed timers
262 statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, TICK);
264 // Start statistics collector thread
265 statisticsCollector.start();
267 // Start bandwidth utilization computer thread
268 txRatesUpdater.start();
271 registerWithOSGIConsole();
275 * Function called by the dependency manager before the services exported by
276 * the component are unregistered, this will be followed by a "destroy ()"
281 // Stop managed timers
282 statisticsTimer.cancel();
285 public void setStatisticsListener(IOFStatisticsListener s) {
286 this.statisticsListeners.add(s);
289 public void unsetStatisticsListener(IOFStatisticsListener s) {
291 this.statisticsListeners.remove(s);
295 private void registerWithOSGIConsole() {
296 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
297 bundleContext.registerService(CommandProvider.class.getName(), this, null);
300 private static class StatsRequest {
301 protected Long switchId;
302 protected OFStatisticsType type;
304 public StatsRequest(Long d, OFStatisticsType t) {
310 public String toString() {
311 return "SReq = {switchId=" + switchId + ", type=" + type + "}";
315 public int hashCode() {
316 final int prime = 31;
318 result = prime * result
319 + ((switchId == null) ? 0 : switchId.hashCode());
320 result = prime * result + ((type == null) ? 0 : type.ordinal());
325 public boolean equals(Object obj) {
332 if (getClass() != obj.getClass()) {
335 StatsRequest other = (StatsRequest) obj;
336 if (switchId == null) {
337 if (other.switchId != null) {
340 } else if (!switchId.equals(other.switchId)) {
343 if (type != other.type) {
350 private void addStatisticsTicks(Long switchId) {
351 switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume
357 statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
358 log.debug("Added Switch {} to target pool",
359 HexString.toHexString(switchId.longValue()));
362 protected static class StatisticsTicks {
363 private short flowStatisticsTicks;
364 private short descriptionTicks;
365 private short portStatisticsTicks;
366 private short tableStatisticsTicks;
368 public StatisticsTicks(boolean scattered) {
370 // scatter bursts by statisticsTickPeriod
373 } // being paranoid here
374 flowStatisticsTicks = (short) (1 + counter
375 % statisticsTickNumber);
376 descriptionTicks = (short) (1 + counter % descriptionTickNumber);
377 portStatisticsTicks = (short) (1 + counter % portTickNumber);
378 tableStatisticsTicks = (short) (1 + counter % tableTickNumber);
380 flowStatisticsTicks = statisticsTickNumber;
381 descriptionTicks = descriptionTickNumber;
382 portStatisticsTicks = portTickNumber;
383 tableStatisticsTicks = tableTickNumber;
387 public boolean decrementFlowTicksIsZero() {
388 // Please ensure no code is inserted between the if check and the
389 // flowStatisticsTicks reset
390 if (--flowStatisticsTicks == 0) {
391 flowStatisticsTicks = statisticsTickNumber;
397 public boolean decrementDescTicksIsZero() {
398 // Please ensure no code is inserted between the if check and the
399 // descriptionTicks reset
400 if (--descriptionTicks == 0) {
401 descriptionTicks = descriptionTickNumber;
407 public boolean decrementPortTicksIsZero() {
408 // Please ensure no code is inserted between the if check and the
409 // descriptionTicks reset
410 if (--portStatisticsTicks == 0) {
411 portStatisticsTicks = portTickNumber;
417 public boolean decrementTableTicksIsZero() {
418 // Please ensure no code is inserted between the if check and the
419 // descriptionTicks reset
420 if(--tableStatisticsTicks == 0) {
421 tableStatisticsTicks = tableTickNumber;
428 public String toString() {
429 return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks
430 + ",pT=" + portStatisticsTicks + ",tT=" + tableStatisticsTicks + "}";
434 private void printInfoMessage(String type, StatsRequest request) {
435 log.info("{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
436 new Object[] {type, HexString.toHexString(request.switchId), pendingStatsRequests.size(),
437 statisticsCollector.getState().toString() });
440 protected void decrementTicks() {
441 StatsRequest request = null;
442 for (Map.Entry<Long, StatisticsTicks> entry : statisticsTimerTicks
444 StatisticsTicks clock = entry.getValue();
445 Long switchId = entry.getKey();
446 if (clock.decrementFlowTicksIsZero()) {
447 request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ?
448 new StatsRequest(switchId, OFStatisticsType.VENDOR) :
449 new StatsRequest(switchId, OFStatisticsType.FLOW);
450 // If a request for this switch is already in the queue, skip to
451 // add this new request
452 if (!pendingStatsRequests.contains(request)
453 && false == pendingStatsRequests.offer(request)) {
454 printInfoMessage("Flow", request);
458 if (clock.decrementDescTicksIsZero()) {
459 request = new StatsRequest(switchId, OFStatisticsType.DESC);
460 // If a request for this switch is already in the queue, skip to
461 // add this new request
462 if (!pendingStatsRequests.contains(request)
463 && false == pendingStatsRequests.offer(request)) {
464 printInfoMessage("Description", request);
468 if (clock.decrementPortTicksIsZero()) {
469 request = new StatsRequest(switchId, OFStatisticsType.PORT);
470 // If a request for this switch is already in the queue, skip to
471 // add this new request
472 if (!pendingStatsRequests.contains(request)
473 && false == pendingStatsRequests.offer(request)) {
474 printInfoMessage("Port", request);
478 if(clock.decrementTableTicksIsZero()) {
479 request = new StatsRequest(switchId, OFStatisticsType.TABLE);
480 // If a request for this switch is already in the queue, skip to
481 // add this new request
482 if (!pendingStatsRequests.contains(request)
483 && false == pendingStatsRequests.offer(request)) {
484 printInfoMessage("Table", request);
490 private void removeStatsRequestTasks(Long switchId) {
491 log.debug("Cleaning Statistics database for switch {}",
492 HexEncode.longToHexString(switchId));
493 // To be safe, let's attempt removal of both VENDOR and FLOW request. It
495 pendingStatsRequests.remove(new StatsRequest(switchId,
496 OFStatisticsType.VENDOR));
497 pendingStatsRequests.remove(new StatsRequest(switchId,
498 OFStatisticsType.FLOW));
499 pendingStatsRequests.remove(new StatsRequest(switchId,
500 OFStatisticsType.DESC));
501 pendingStatsRequests.remove(new StatsRequest(switchId,
502 OFStatisticsType.PORT));
503 pendingStatsRequests.remove(new StatsRequest(switchId,
504 OFStatisticsType.TABLE));
505 // Take care of the TX rate databases
506 switchPortStatsUpdated.remove(switchId);
507 txRates.remove(switchId);
510 private void clearFlowStatsAndTicks(Long switchId) {
511 statisticsTimerTicks.remove(switchId);
512 removeStatsRequestTasks(switchId);
513 flowStatistics.remove(switchId);
514 log.debug("Statistics removed for switch {}",
515 HexString.toHexString(switchId));
518 private void queryStatisticsInternal(Long switchId, OFStatisticsType statType) {
520 // Query the switch on all matches
521 List<OFStatistics> values = this.fetchStatisticsFromSwitch(switchId, statType, null);
523 // If got a valid response update local cache and notify listeners
524 if (values != null && !values.isEmpty()) {
528 flowStatistics.put(switchId, values);
529 notifyFlowUpdate(switchId, values);
533 descStatistics.put(switchId, values);
534 // Notify who may be interested in a description change
535 notifyDescriptionUpdate(switchId, values);
538 // Overwrite cache with new port statistics for this switch
539 portStatistics.put(switchId, values);
541 // Wake up the thread which maintains the TX byte counters for
543 switchPortStatsUpdated.offer(switchId);
544 notifyPortUpdate(switchId, values);
548 tableStatistics.put(switchId, values);
549 notifyTableUpdate(switchId, values);
556 private void notifyDescriptionUpdate(Long switchId, List<OFStatistics> values) {
557 for (IOFStatisticsListener l : this.statisticsListeners) {
558 l.descriptionStatisticsRefreshed(switchId, values);
562 private void notifyFlowUpdate(Long switchId, List<OFStatistics> values) {
563 if (values.get(0) instanceof OFVendorStatistics) {
564 values = this.v6StatsListToOFStatsList(values);
567 for (IOFStatisticsListener l : this.statisticsListeners) {
568 l.flowStatisticsRefreshed(switchId, values);
573 private void notifyPortUpdate(Long switchId, List<OFStatistics> values) {
574 for (IOFStatisticsListener l : this.statisticsListeners) {
575 l.portStatisticsRefreshed(switchId, values);
579 private void notifyTableUpdate(Long switchId, List<OFStatistics> values) {
580 for (IOFStatisticsListener l : this.statisticsListeners) {
581 l.tableStatisticsRefreshed(switchId, values);
586 * Generic function to get the statistics form an OF switch
588 @SuppressWarnings("unchecked")
589 private List<OFStatistics> fetchStatisticsFromSwitch(Long switchId,
590 OFStatisticsType statsType, Object target) {
591 List<OFStatistics> values = null;
593 ISwitch sw = controller.getSwitch(switchId);
596 OFStatisticsRequest req = new OFStatisticsRequest();
597 req.setStatisticType(statsType);
598 int requestLength = req.getLengthU();
600 if (statsType == OFStatisticsType.FLOW) {
601 OFMatch match = null;
602 if (target == null) {
604 match = new OFMatch();
605 match.setWildcards(0xffffffff);
606 } else if (!(target instanceof OFMatch)) {
608 log.warn("Invalid target type for Flow stats request: {}",
612 // Specific flow request
613 match = (OFMatch) target;
615 OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest();
616 specificReq.setMatch(match);
617 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
618 specificReq.setTableId((byte) 0xff);
619 req.setStatistics(Collections
620 .singletonList((OFStatistics) specificReq));
621 requestLength += specificReq.getLength();
623 } else if (statsType == OFStatisticsType.VENDOR) {
624 V6StatsRequest specificReq = new V6StatsRequest();
625 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
626 specificReq.setTableId((byte) 0xff);
627 req.setStatistics(Collections
628 .singletonList((OFStatistics) specificReq));
629 requestLength += specificReq.getLength();
631 } else if (statsType == OFStatisticsType.AGGREGATE) {
632 OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest();
633 OFMatch match = new OFMatch();
634 match.setWildcards(0xffffffff);
635 specificReq.setMatch(match);
636 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
637 specificReq.setTableId((byte) 0xff);
638 req.setStatistics(Collections
639 .singletonList((OFStatistics) specificReq));
640 requestLength += specificReq.getLength();
642 } else if (statsType == OFStatisticsType.PORT) {
644 if (target == null) {
646 targetPort = OFPort.OFPP_NONE.getValue();
647 } else if (!(target instanceof Short)) {
649 log.warn("Invalid target type for Port stats request: {}",
653 // Specific port request
654 targetPort = (Short) target;
656 OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest();
657 specificReq.setPortNumber(targetPort);
658 req.setStatistics(Collections
659 .singletonList((OFStatistics) specificReq));
660 requestLength += specificReq.getLength();
662 } else if (statsType == OFStatisticsType.QUEUE) {
663 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
664 specificReq.setPortNumber(OFPort.OFPP_ALL.getValue());
665 specificReq.setQueueId(0xffffffff);
666 req.setStatistics(Collections
667 .singletonList((OFStatistics) specificReq));
668 requestLength += specificReq.getLength();
670 } else if (statsType == OFStatisticsType.DESC) {
672 } else if (statsType == OFStatisticsType.TABLE) {
674 if (!(target instanceof Byte)) {
676 log.warn("Invalid table id for table stats request: {}",
680 byte targetTable = (Byte) target;
681 OFTableStatistics specificReq = new OFTableStatistics();
682 specificReq.setTableId(targetTable);
683 req.setStatistics(Collections
684 .singletonList((OFStatistics) specificReq));
685 requestLength += specificReq.getLength();
689 req.setLengthU(requestLength);
690 Object result = sw.getStatistics(req);
692 if (result == null) {
693 log.warn("Request Timed Out for ({}) from switch {}", type,
694 HexString.toHexString(switchId));
695 } else if (result instanceof OFError) {
696 log.warn("Switch {} failed to handle ({}) stats request: {}",
697 new Object[] { HexString.toHexString(switchId), type,
698 Utils.getOFErrorString((OFError) result) });
699 if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
701 "Switching back to regular Flow stats requests for switch {}",
702 HexString.toHexString(switchId));
703 this.switchSupportsVendorExtStats.put(switchId,
707 values = (List<OFStatistics>) result;
714 public List<OFStatistics> getOFFlowStatistics(Long switchId) {
715 List<OFStatistics> list = flowStatistics.get(switchId);
718 * Check on emptiness as interference between add and get is still
719 * possible on the inner list (the concurrentMap entry's value)
721 return (list == null || list.isEmpty()) ? this.dummyList
722 : (list.get(0) instanceof OFVendorStatistics) ? this
723 .v6StatsListToOFStatsList(list) : list;
727 public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch, short priority) {
728 List<OFStatistics> statsList = flowStatistics.get(switchId);
731 * Check on emptiness as interference between add and get is still
732 * possible on the inner list (the concurrentMap entry's value)
734 if (statsList == null || statsList.isEmpty()) {
735 return this.dummyList;
738 if (statsList.get(0) instanceof OFVendorStatistics) {
740 * Caller could provide regular OF match when we instead pull the
741 * vendor statistics from this node Caller is not supposed to know
742 * whether this switch supports vendor extensions statistics
745 V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch
746 : new V6Match(ofMatch);
748 List<OFStatistics> targetList = v6StatsListToOFStatsList(statsList);
749 for (OFStatistics stats : targetList) {
750 V6StatsReply v6Stats = (V6StatsReply) stats;
751 V6Match v6Match = v6Stats.getMatch();
752 if (v6Stats.getPriority() == priority && v6Match.equals(targetMatch)) {
753 List<OFStatistics> list = new ArrayList<OFStatistics>();
759 for (OFStatistics stats : statsList) {
760 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
761 if (flowStats.getPriority() == priority && flowStats.getMatch().equals(ofMatch)) {
762 List<OFStatistics> list = new ArrayList<OFStatistics>();
768 return this.dummyList;
772 * Converts the v6 vendor statistics to the OFStatistics
774 private List<OFStatistics> v6StatsListToOFStatsList(
775 List<OFStatistics> statistics) {
776 List<OFStatistics> v6statistics = new ArrayList<OFStatistics>();
777 if (statistics != null && !statistics.isEmpty()) {
778 for (OFStatistics stats : statistics) {
779 if (stats instanceof OFVendorStatistics) {
780 List<OFStatistics> r = getV6ReplyStatistics((OFVendorStatistics) stats);
782 v6statistics.addAll(r);
790 private static List<OFStatistics> getV6ReplyStatistics(
791 OFVendorStatistics stat) {
792 int length = stat.getLength();
793 List<OFStatistics> results = new ArrayList<OFStatistics>();
795 return null; // Nicira Hdr is 12 bytes. We need atleast that much
796 ByteBuffer data = ByteBuffer.allocate(length);
799 if (log.isTraceEnabled()) {
800 log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}",
801 HexString.toHexString(data.array()));
804 int vendor = data.getInt(); // first 4 bytes is vendor id.
805 if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) {
806 log.warn("Unexpected vendor id: 0x{}", Integer.toHexString(vendor));
809 // go ahead by 8 bytes which is 8 bytes of 0
810 data.getLong(); // should be all 0's
811 length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have
815 V6StatsReply v6statsreply;
818 v6statsreply = new V6StatsReply();
819 min_len = v6statsreply.getLength();
820 if (length < v6statsreply.getLength())
822 v6statsreply.setActionFactory(stat.getActionFactory());
823 v6statsreply.readFrom(data);
824 if (v6statsreply.getLength() < min_len)
826 v6statsreply.setVendorId(vendor);
827 log.trace("V6StatsReply: {}", v6statsreply);
828 length -= v6statsreply.getLength();
829 results.add(v6statsreply);
835 public List<OFStatistics> queryStatistics(Long switchId,
836 OFStatisticsType statType, Object target) {
838 * Caller does not know and it is not supposed to know whether this
839 * switch supports vendor extension. We adjust the target for him
841 if (statType == OFStatisticsType.FLOW) {
842 if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
843 statType = OFStatisticsType.VENDOR;
847 List<OFStatistics> list = this.fetchStatisticsFromSwitch(switchId, statType,
850 return (list == null) ? null :
851 (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list) : list;
855 public List<OFStatistics> getOFDescStatistics(Long switchId) {
856 if (!descStatistics.containsKey(switchId))
857 return this.dummyList;
859 return descStatistics.get(switchId);
863 public List<OFStatistics> getOFPortStatistics(Long switchId) {
864 if (!portStatistics.containsKey(switchId)) {
865 return this.dummyList;
868 return portStatistics.get(switchId);
872 public List<OFStatistics> getOFPortStatistics(Long switchId, short portId) {
873 if (!portStatistics.containsKey(switchId)) {
874 return this.dummyList;
876 List<OFStatistics> list = new ArrayList<OFStatistics>(1);
877 for (OFStatistics stats : portStatistics.get(switchId)) {
878 if (((OFPortStatisticsReply) stats).getPortNumber() == portId) {
887 public List<OFStatistics> getOFTableStatistics(Long switchId) {
888 if (!tableStatistics.containsKey(switchId)) {
889 return this.dummyList;
892 return tableStatistics.get(switchId);
896 public List<OFStatistics> getOFTableStatistics(Long switchId, Byte tableId) {
897 if (!tableStatistics.containsKey(switchId)) {
898 return this.dummyList;
901 List<OFStatistics> list = new ArrayList<OFStatistics>(1);
902 for (OFStatistics stats : tableStatistics.get(switchId)) {
903 if (((OFTableStatistics) stats).getTableId() == tableId) {
912 public int getFlowsNumber(long switchId) {
913 return this.flowStatistics.get(switchId).size();
917 * InventoryShim replay for us all the switch addition which happened before
921 public void updateNode(Node node, UpdateType type, Set<Property> props) {
922 Long switchId = (Long) node.getID();
925 addStatisticsTicks(switchId);
928 clearFlowStatsAndTicks(switchId);
934 public void updateNodeConnector(NodeConnector nodeConnector,
935 UpdateType type, Set<Property> props) {
940 * Update the cached port rates for this switch with the latest retrieved
941 * port transmit byte count
945 private synchronized void updatePortsTxRate(long switchId) {
946 List<OFStatistics> newPortStatistics = this.portStatistics.get(switchId);
947 if (newPortStatistics == null) {
950 Map<Short, TxRates> rates = this.txRates.get(switchId);
952 // First time rates for this switch are added
953 rates = new HashMap<Short, TxRates>();
954 txRates.put(switchId, rates);
956 for (OFStatistics stats : newPortStatistics) {
957 OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats;
958 short port = newPortStat.getPortNumber();
959 TxRates portRatesHolder = rates.get(port);
960 if (portRatesHolder == null) {
961 // First time rates for this port are added
962 portRatesHolder = new TxRates();
963 rates.put(port, portRatesHolder);
965 // Get and store the number of transmitted bytes for this port
966 // And handle the case where agent does not support the counter
967 long transmitBytes = newPortStat.getTransmitBytes();
968 long value = (transmitBytes < 0) ? 0 : transmitBytes;
969 portRatesHolder.update(value);
974 public synchronized long getTransmitRate(Long switchId, Short port) {
976 if (switchId == null || port == null) {
979 Map<Short, TxRates> perSwitch = txRates.get(switchId);
980 if (perSwitch == null) {
983 TxRates portRates = perSwitch.get(port);
984 if (portRates == null) {
987 return portRates.getAverageTxRate();
991 * Manual switch name configuration code
994 public String getHelp() {
995 StringBuffer help = new StringBuffer();
996 help.append("---OF Statistics Manager utilities---\n");
997 help.append("\t ofdumpstatsmgr - "
998 + "Print Internal Stats Mgr db\n");
999 help.append("\t ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds) - "
1000 + "Set/Show flow/port/dedscription stats poll intervals\n");
1001 return help.toString();
1004 private boolean isValidSwitchId(String switchId) {
1005 String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$";
1006 String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$";
1008 return (switchId != null && (switchId.matches(regexDatapathID) || switchId
1009 .matches(regexDatapathIDLong)));
1012 public long getSwitchIDLong(String switchId) {
1014 String switchString = "0";
1016 if (isValidSwitchId(switchId)) {
1017 if (switchId.contains(":")) {
1018 // Handle the 00:00:AA:BB:CC:DD:EE:FF notation
1019 switchString = switchId.replace(":", "");
1020 } else if (switchId.contains("-")) {
1021 // Handle the 00-00-AA-BB-CC-DD-EE-FF notation
1022 switchString = switchId.replace("-", "");
1024 // Handle the 0123456789ABCDEF notation
1025 switchString = switchId;
1028 return Long.parseLong(switchString, radix);
1032 * Internal information dump code
1034 private String prettyPrintSwitchMap(ConcurrentMap<Long, StatisticsTicks> map) {
1035 StringBuffer buffer = new StringBuffer();
1037 for (Entry<Long, StatisticsTicks> entry : map.entrySet()) {
1038 buffer.append(HexString.toHexString(entry.getKey()) + "="
1039 + entry.getValue().toString() + " ");
1042 return buffer.toString();
1045 public void _ofdumpstatsmgr(CommandInterpreter ci) {
1046 ci.println("Global Counter: " + counter);
1047 ci.println("Timer Ticks: " + prettyPrintSwitchMap(statisticsTimerTicks));
1048 ci.println("PendingStatsQueue: " + pendingStatsRequests);
1049 ci.println("PendingStatsQueue size: " + pendingStatsRequests.size());
1050 ci.println("Stats Collector alive: " + statisticsCollector.isAlive());
1051 ci.println("Stats Collector State: "
1052 + statisticsCollector.getState().toString());
1053 ci.println("StatsTimer: " + statisticsTimer.toString());
1054 ci.println("Flow Stats Period: " + statisticsTickNumber + " s");
1055 ci.println("Desc Stats Period: " + descriptionTickNumber + " s");
1056 ci.println("Port Stats Period: " + portTickNumber + " s");
1057 ci.println("Table Stats Period: " + tableTickNumber + " s");
1060 public void _resetSwitchCapability(CommandInterpreter ci) {
1061 String sidString = ci.nextArgument();
1063 if (sidString == null) {
1064 ci.println("Insert the switch id (numeric value)");
1068 sid = Long.valueOf(sidString);
1069 this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE);
1070 ci.println("Vendor capability for switch " + sid + " set to "
1071 + this.switchSupportsVendorExtStats.get(sid));
1072 } catch (NumberFormatException e) {
1073 ci.println("Invalid switch id. Has to be numeric.");
1078 public void _ofbw(CommandInterpreter ci) {
1079 String sidString = ci.nextArgument();
1081 if (sidString == null) {
1082 ci.println("Insert the switch id (numeric value)");
1086 sid = Long.valueOf(sidString);
1087 } catch (NumberFormatException e) {
1088 ci.println("Invalid switch id. Has to be numeric.");
1091 Map<Short, TxRates> thisSwitchRates = txRates.get(sid);
1092 ci.println("Bandwidth utilization (" + factoredSamples
1093 * portTickNumber + " sec average) for switch "
1094 + HexEncode.longToHexString(sid) + ":");
1095 if (thisSwitchRates == null) {
1096 ci.println("Not available");
1098 for (Entry<Short, TxRates> entry : thisSwitchRates.entrySet()) {
1099 ci.println("Port: " + entry.getKey() + ": "
1100 + entry.getValue().getAverageTxRate() + " bps");
1106 public void _txratewindow(CommandInterpreter ci) {
1107 String averageWindow = ci.nextArgument();
1109 if (averageWindow == null) {
1110 ci.println("Insert the length in seconds of the median "
1111 + "window for tx rate");
1112 ci.println("Current: " + factoredSamples * portTickNumber + " secs");
1116 seconds = Short.valueOf(averageWindow);
1117 } catch (NumberFormatException e) {
1118 ci.println("Invalid period.");
1120 OFStatisticsManager.factoredSamples = (short) (seconds / portTickNumber);
1121 ci.println("New: " + factoredSamples * portTickNumber + " secs");
1124 public void _ofstatsmgrintervals(CommandInterpreter ci) {
1125 String flowStatsInterv = ci.nextArgument();
1126 String portStatsInterv = ci.nextArgument();
1127 String descStatsInterv = ci.nextArgument();
1128 String tableStatsInterv = ci.nextArgument();
1130 if (flowStatsInterv == null || portStatsInterv == null
1131 || descStatsInterv == null) {
1132 ci.println("Usage: ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds)");
1133 ci.println("Current Values: fP=" + statisticsTickNumber + "sec pP="
1134 + portTickNumber + "sec dP=" + descriptionTickNumber + "sec tP=" + tableTickNumber + " sec");
1137 Short fP, pP, dP, tP;
1139 fP = Short.parseShort(flowStatsInterv);
1140 pP = Short.parseShort(portStatsInterv);
1141 dP = Short.parseShort(descStatsInterv);
1142 tP = Short.parseShort(tableStatsInterv);
1143 } catch (Exception e) {
1144 ci.println("Invalid format values: " + e.getMessage());
1148 if (pP <= 1 || fP <= 1 || dP <= 1 || tP <= 1) {
1149 ci.println("Invalid values. fP, pP, dP, tP have to be greater than 1.");
1153 statisticsTickNumber = fP;
1154 portTickNumber = pP;
1155 descriptionTickNumber = dP;
1156 tableTickNumber = tP;
1158 ci.println("New Values: fP=" + statisticsTickNumber + "s pP="
1159 + portTickNumber + "s dP=" + descriptionTickNumber + "s tP="
1160 + tableTickNumber + "s");
1164 * This method retrieves user configurations from config.ini and updates
1165 * statisticsTickNumber/portTickNumber/descriptionTickNumber accordingly.
1167 private void configStatsPollIntervals() {
1168 String fsStr = System.getProperty("of.flowStatsPollInterval");
1169 String psStr = System.getProperty("of.portStatsPollInterval");
1170 String dsStr = System.getProperty("of.descStatsPollInterval");
1171 String tsStr = System.getProperty("of.tableStatsPollInterval");
1172 Short fs, ps, ds, ts;
1174 if (fsStr != null) {
1176 fs = Short.parseShort(fsStr);
1178 statisticsTickNumber = fs;
1180 } catch (Exception e) {
1184 if (psStr != null) {
1186 ps = Short.parseShort(psStr);
1188 portTickNumber = ps;
1190 } catch (Exception e) {
1194 if (dsStr != null) {
1196 ds = Short.parseShort(dsStr);
1198 descriptionTickNumber = ds;
1200 } catch (Exception e) {
1204 if (tsStr != null) {
1206 ts = Short.parseShort(tsStr);
1208 tableTickNumber = ts;
1210 } catch (Exception e) {