2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.internal;
11 import java.nio.ByteBuffer;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.Deque;
15 import java.util.HashMap;
16 import java.util.HashSet;
17 import java.util.List;
19 import java.util.Map.Entry;
21 import java.util.Timer;
22 import java.util.TimerTask;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.LinkedBlockingDeque;
27 import java.util.concurrent.LinkedBlockingQueue;
29 import org.eclipse.osgi.framework.console.CommandInterpreter;
30 import org.eclipse.osgi.framework.console.CommandProvider;
31 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
32 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
33 import org.opendaylight.controller.protocol_plugin.openflow.IStatisticsListener;
34 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
36 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
37 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
38 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
39 import org.opendaylight.controller.sal.core.Node;
40 import org.opendaylight.controller.sal.core.NodeConnector;
41 import org.opendaylight.controller.sal.core.Property;
42 import org.opendaylight.controller.sal.core.UpdateType;
43 import org.opendaylight.controller.sal.utils.HexEncode;
44 import org.openflow.protocol.OFError;
45 import org.openflow.protocol.OFMatch;
46 import org.openflow.protocol.OFPort;
47 import org.openflow.protocol.OFStatisticsRequest;
48 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
49 import org.openflow.protocol.statistics.OFDescriptionStatistics;
50 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
51 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
52 import org.openflow.protocol.statistics.OFPortStatisticsReply;
53 import org.openflow.protocol.statistics.OFPortStatisticsRequest;
54 import org.openflow.protocol.statistics.OFQueueStatisticsRequest;
55 import org.openflow.protocol.statistics.OFStatistics;
56 import org.openflow.protocol.statistics.OFStatisticsType;
57 import org.openflow.protocol.statistics.OFVendorStatistics;
58 import org.openflow.util.HexString;
59 import org.osgi.framework.BundleContext;
60 import org.osgi.framework.FrameworkUtil;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
65 * It periodically polls the different OF statistics from the OF switches and
66 * caches them for quick retrieval for the above layers' modules It also
67 * provides an API to directly query the switch about the statistics
69 public class OFStatisticsManager implements IOFStatisticsManager,
70 IInventoryShimExternalListener, CommandProvider {
71 private static final Logger log = LoggerFactory
72 .getLogger(OFStatisticsManager.class);
73 private static final int initialSize = 64;
74 private static final long flowStatsPeriod = 10000;
75 private static final long descriptionStatsPeriod = 60000;
76 private static final long portStatsPeriod = 5000;
77 private static final long tickPeriod = 1000;
78 private static short statisticsTickNumber = (short) (flowStatsPeriod / tickPeriod);
79 private static short descriptionTickNumber = (short) (descriptionStatsPeriod / tickPeriod);
80 private static short portTickNumber = (short) (portStatsPeriod / tickPeriod);
81 private static short factoredSamples = (short) 2;
82 private static short counter = 1;
83 private IController controller = null;
84 private ConcurrentMap<Long, List<OFStatistics>> flowStatistics;
85 private ConcurrentMap<Long, List<OFStatistics>> descStatistics;
86 private ConcurrentMap<Long, List<OFStatistics>> portStatistics;
87 private List<OFStatistics> dummyList;
88 private ConcurrentMap<Long, StatisticsTicks> statisticsTimerTicks;
89 protected BlockingQueue<StatsRequest> pendingStatsRequests;
90 protected BlockingQueue<Long> switchPortStatsUpdated;
91 private Thread statisticsCollector;
92 private Thread txRatesUpdater;
93 private Timer statisticsTimer;
94 private TimerTask statisticsTimerTask;
95 private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
96 private Map<Long, Map<Short, TxRates>> txRates; // Per port sampled (every
97 // portStatsPeriod) transmit
99 private Set<IStatisticsListener> descriptionListeners;
102 * The object containing the latest factoredSamples tx rate samples for a
105 protected class TxRates {
106 Deque<Long> sampledTxBytes; // contains the latest factoredSamples
107 // sampled transmitted bytes
110 sampledTxBytes = new LinkedBlockingDeque<Long>();
113 public void update(Long txBytes) {
115 * Based on how many samples our average works on, we might have to
116 * remove the oldest sample
118 if (sampledTxBytes.size() == factoredSamples) {
119 sampledTxBytes.removeLast();
122 // Add the latest sample to the top of the queue
123 sampledTxBytes.addFirst(txBytes);
127 * Returns the average transmit rate in bps
129 * @return the average transmit rate [bps]
131 public long getAverageTxRate() {
134 * If we cannot provide the value for the time window length set
136 if (sampledTxBytes.size() < factoredSamples) {
139 long increment = (long) (sampledTxBytes.getFirst() - sampledTxBytes
141 long timePeriod = (long) (factoredSamples * portStatsPeriod)
143 average = (8L * increment) / timePeriod;
148 public void setController(IController core) {
149 this.controller = core;
152 public void unsetController(IController core) {
153 if (this.controller == core) {
154 this.controller = null;
159 * Function called by the dependency manager when all the required
160 * dependencies are satisfied
164 flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
165 descStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
166 portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
167 dummyList = new ArrayList<OFStatistics>(1);
168 statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(
170 pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(
172 switchPortStatsUpdated = new LinkedBlockingQueue<Long>(initialSize);
173 switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(
175 txRates = new HashMap<Long, Map<Short, TxRates>>(initialSize);
176 descriptionListeners = new HashSet<IStatisticsListener>();
178 configStatsPollIntervals();
180 // Initialize managed timers
181 statisticsTimer = new Timer();
182 statisticsTimerTask = new TimerTask() {
189 // Initialize Statistics collector thread
190 statisticsCollector = new Thread(new Runnable() {
195 StatsRequest req = pendingStatsRequests.take();
196 acquireStatistics(req.switchId, req.type);
197 } catch (InterruptedException e) {
198 log.warn("Flow Statistics Collector thread "
203 }, "Statistics Collector");
205 // Initialize Tx Rate Updater thread
206 txRatesUpdater = new Thread(new Runnable() {
211 long switchId = switchPortStatsUpdated.take();
212 updatePortsTxRate(switchId);
213 } catch (InterruptedException e) {
214 log.warn("TX Rate Updater thread interrupted", e);
218 }, "TX Rate Updater");
222 * Function called by the dependency manager when at least one dependency
223 * become unsatisfied or when the component is shutting down because for
224 * example bundle is being stopped.
231 * Function called by dependency manager after "init ()" is called and after
232 * the services provided by the class are registered in the service registry
236 // Start managed timers
237 statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, tickPeriod);
239 // Start statistics collector thread
240 statisticsCollector.start();
242 // Start bandwidth utilization computer thread
243 txRatesUpdater.start();
246 registerWithOSGIConsole();
250 * Function called by the dependency manager before the services exported by
251 * the component are unregistered, this will be followed by a "destroy ()"
256 // Stop managed timers
257 statisticsTimer.cancel();
260 public void setStatisticsListener(IStatisticsListener s) {
261 this.descriptionListeners.add(s);
264 public void unsetStatisticsListener(IStatisticsListener s) {
266 this.descriptionListeners.remove(s);
270 private void registerWithOSGIConsole() {
271 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
273 bundleContext.registerService(CommandProvider.class.getName(), this,
277 private static class StatsRequest {
278 protected Long switchId;
279 protected OFStatisticsType type;
281 public StatsRequest(Long d, OFStatisticsType t) {
286 public String toString() {
287 return "SReq = {switchId=" + switchId + ", type=" + type + "}";
291 public int hashCode() {
292 final int prime = 31;
294 result = prime * result
295 + ((switchId == null) ? 0 : switchId.hashCode());
296 result = prime * result + ((type == null) ? 0 : type.ordinal());
301 public boolean equals(Object obj) {
308 if (getClass() != obj.getClass()) {
311 StatsRequest other = (StatsRequest) obj;
312 if (switchId == null) {
313 if (other.switchId != null) {
316 } else if (!switchId.equals(other.switchId)) {
319 if (type != other.type) {
326 private void addStatisticsTicks(Long switchId) {
327 switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume
333 statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
334 log.info("Added Switch {} to target pool",
335 HexString.toHexString(switchId.longValue()));
338 protected static class StatisticsTicks {
339 private short flowStatisticsTicks;
340 private short descriptionTicks;
341 private short portStatisticsTicks;
343 public StatisticsTicks(boolean scattered) {
345 // scatter bursts by statisticsTickPeriod
348 } // being paranoid here
349 flowStatisticsTicks = (short) (1 + counter
350 % statisticsTickNumber);
351 descriptionTicks = (short) (1 + counter % descriptionTickNumber);
352 portStatisticsTicks = (short) (1 + counter % portTickNumber);
354 flowStatisticsTicks = statisticsTickNumber;
355 descriptionTicks = descriptionTickNumber;
356 portStatisticsTicks = portTickNumber;
360 public boolean decrementFlowTicksIsZero() {
361 // Please ensure no code is inserted between the if check and the
362 // flowStatisticsTicks reset
363 if (--flowStatisticsTicks == 0) {
364 flowStatisticsTicks = statisticsTickNumber;
370 public boolean decrementDescTicksIsZero() {
371 // Please ensure no code is inserted between the if check and the
372 // descriptionTicks reset
373 if (--descriptionTicks == 0) {
374 descriptionTicks = descriptionTickNumber;
380 public boolean decrementPortTicksIsZero() {
381 // Please ensure no code is inserted between the if check and the
382 // descriptionTicks reset
383 if (--portStatisticsTicks == 0) {
384 portStatisticsTicks = portTickNumber;
390 public String toString() {
391 return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks
392 + ",pT=" + portStatisticsTicks + "}";
396 private void printInfoMessage(String type, StatsRequest request) {
398 "{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
399 new Object[] { type, HexString.toHexString(request.switchId),
400 pendingStatsRequests.size(),
401 statisticsCollector.getState().toString() });
404 protected void decrementTicks() {
405 StatsRequest request = null;
406 for (Map.Entry<Long, StatisticsTicks> entry : statisticsTimerTicks
408 StatisticsTicks clock = entry.getValue();
409 Long switchId = entry.getKey();
410 if (clock.decrementFlowTicksIsZero() == true) {
411 request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ? new StatsRequest(
412 switchId, OFStatisticsType.VENDOR) : new StatsRequest(
413 switchId, OFStatisticsType.FLOW);
414 // If a request for this switch is already in the queue, skip to
415 // add this new request
416 if (!pendingStatsRequests.contains(request)
417 && false == pendingStatsRequests.offer(request)) {
418 printInfoMessage("Flow", request);
422 if (clock.decrementDescTicksIsZero() == true) {
423 request = new StatsRequest(switchId, OFStatisticsType.DESC);
424 // If a request for this switch is already in the queue, skip to
425 // add this new request
426 if (!pendingStatsRequests.contains(request)
427 && false == pendingStatsRequests.offer(request)) {
428 printInfoMessage("Description", request);
432 if (clock.decrementPortTicksIsZero() == true) {
433 request = new StatsRequest(switchId, OFStatisticsType.PORT);
434 // If a request for this switch is already in the queue, skip to
435 // add this new request
436 if (!pendingStatsRequests.contains(request)
437 && false == pendingStatsRequests.offer(request)) {
438 printInfoMessage("Port", request);
444 private void removeStatsRequestTasks(Long switchId) {
445 log.info("Cleaning Statistics database for switch {}",
446 HexEncode.longToHexString(switchId));
447 // To be safe, let's attempt removal of both VENDOR and FLOW request. It
449 pendingStatsRequests.remove(new StatsRequest(switchId,
450 OFStatisticsType.VENDOR));
451 pendingStatsRequests.remove(new StatsRequest(switchId,
452 OFStatisticsType.FLOW));
453 pendingStatsRequests.remove(new StatsRequest(switchId,
454 OFStatisticsType.DESC));
455 pendingStatsRequests.remove(new StatsRequest(switchId,
456 OFStatisticsType.PORT));
457 // Take care of the TX rate databases
458 switchPortStatsUpdated.remove(switchId);
459 txRates.remove(switchId);
462 private void clearFlowStatsAndTicks(Long switchId) {
463 statisticsTimerTicks.remove(switchId);
464 removeStatsRequestTasks(switchId);
465 flowStatistics.remove(switchId);
466 log.info("Statistics removed for switch {}",
467 HexString.toHexString(switchId));
470 private void acquireStatistics(Long switchId, OFStatisticsType statType) {
472 // Query the switch on all matches
473 List<OFStatistics> values = this.acquireStatistics(switchId, statType,
476 // Update local caching database if got a valid response
477 if (values != null && !values.isEmpty()) {
478 if ((statType == OFStatisticsType.FLOW)
479 || (statType == OFStatisticsType.VENDOR)) {
480 flowStatistics.put(switchId, values);
481 } else if (statType == OFStatisticsType.DESC) {
482 // Notify who may be interested in a description change
483 notifyDescriptionListeners(switchId, values);
486 descStatistics.put(switchId, values);
487 } else if (statType == OFStatisticsType.PORT) {
488 // Overwrite cache with new port statistics for this switch
489 portStatistics.put(switchId, values);
491 // Wake up the thread which maintains the TX byte counters for
493 switchPortStatsUpdated.offer(switchId);
498 private void notifyDescriptionListeners(Long switchId,
499 List<OFStatistics> values) {
500 for (IStatisticsListener l : this.descriptionListeners) {
501 l.descriptionRefreshed(switchId,
502 ((OFDescriptionStatistics) values.get(0)));
507 * Generic function to get the statistics form a OF switch
509 @SuppressWarnings("unchecked")
510 private List<OFStatistics> acquireStatistics(Long switchId,
511 OFStatisticsType statsType, Object target) {
512 List<OFStatistics> values = null;
514 ISwitch sw = controller.getSwitch(switchId);
517 OFStatisticsRequest req = new OFStatisticsRequest();
518 req.setStatisticType(statsType);
519 int requestLength = req.getLengthU();
521 if (statsType == OFStatisticsType.FLOW) {
522 OFMatch match = null;
523 if (target == null) {
525 match = new OFMatch();
526 match.setWildcards(0xffffffff);
527 } else if (!(target instanceof OFMatch)) {
529 log.warn("Invalid target type for Flow stats request: {}",
533 // Specific flow request
534 match = (OFMatch) target;
536 OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest();
537 specificReq.setMatch(match);
538 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
539 specificReq.setTableId((byte) 0xff);
540 req.setStatistics(Collections
541 .singletonList((OFStatistics) specificReq));
542 requestLength += specificReq.getLength();
544 } else if (statsType == OFStatisticsType.VENDOR) {
545 V6StatsRequest specificReq = new V6StatsRequest();
546 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
547 specificReq.setTableId((byte) 0xff);
548 req.setStatistics(Collections
549 .singletonList((OFStatistics) specificReq));
550 requestLength += specificReq.getLength();
552 } else if (statsType == OFStatisticsType.AGGREGATE) {
553 OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest();
554 OFMatch match = new OFMatch();
555 match.setWildcards(0xffffffff);
556 specificReq.setMatch(match);
557 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
558 specificReq.setTableId((byte) 0xff);
559 req.setStatistics(Collections
560 .singletonList((OFStatistics) specificReq));
561 requestLength += specificReq.getLength();
563 } else if (statsType == OFStatisticsType.PORT) {
565 if (target == null) {
567 targetPort = (short) OFPort.OFPP_NONE.getValue();
568 } else if (!(target instanceof Short)) {
570 log.warn("Invalid target type for Port stats request: {}",
574 // Specific port request
575 targetPort = (Short) target;
577 OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest();
578 specificReq.setPortNumber(targetPort);
579 req.setStatistics(Collections
580 .singletonList((OFStatistics) specificReq));
581 requestLength += specificReq.getLength();
583 } else if (statsType == OFStatisticsType.QUEUE) {
584 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
585 specificReq.setPortNumber((short) OFPort.OFPP_ALL.getValue());
586 specificReq.setQueueId(0xffffffff);
587 req.setStatistics(Collections
588 .singletonList((OFStatistics) specificReq));
589 requestLength += specificReq.getLength();
591 } else if (statsType == OFStatisticsType.DESC) {
593 } else if (statsType == OFStatisticsType.TABLE) {
596 req.setLengthU(requestLength);
597 Object result = sw.getStatistics(req);
599 if (result == null) {
600 log.warn("Request Timed Out for ({}) from switch {}", type,
601 HexString.toHexString(switchId));
602 } else if (result instanceof OFError) {
603 log.warn("Switch {} failed to handle ({}) stats request: {}",
604 new Object[] { HexString.toHexString(switchId), type,
605 Utils.getOFErrorString((OFError) result) });
606 if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
608 "Switching back to regular Flow stats requests for switch {}",
609 HexString.toHexString(switchId));
610 this.switchSupportsVendorExtStats.put(switchId,
614 values = (List<OFStatistics>) result;
621 public List<OFStatistics> getOFFlowStatistics(Long switchId) {
622 List<OFStatistics> list = flowStatistics.get(switchId);
625 * Check on emptiness as interference between add and get is still
626 * possible on the inner list (the concurrentMap entry's value)
628 return (list == null || list.isEmpty()) ? this.dummyList
629 : (list.get(0) instanceof OFVendorStatistics) ? this
630 .v6StatsListToOFStatsList(list) : list;
634 public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch) {
635 List<OFStatistics> statsList = flowStatistics.get(switchId);
638 * Check on emptiness as interference between add and get is still
639 * possible on the inner list (the concurrentMap entry's value)
641 if (statsList == null || statsList.isEmpty()) {
642 return this.dummyList;
645 if (statsList.get(0) instanceof OFVendorStatistics) {
647 * Caller could provide regular OF match when we instead pull the
648 * vendor statistics from this node Caller is not supposed to know
649 * whether this switch supports vendor extensions statistics
652 V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch
653 : new V6Match(ofMatch);
655 List<OFStatistics> targetList = v6StatsListToOFStatsList(statsList);
656 for (OFStatistics stats : targetList) {
657 V6StatsReply v6Stats = (V6StatsReply) stats;
658 V6Match v6Match = v6Stats.getMatch();
659 if (v6Match.equals(targetMatch)) {
660 List<OFStatistics> list = new ArrayList<OFStatistics>();
666 for (OFStatistics stats : statsList) {
667 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
668 if (flowStats.getMatch().equals(ofMatch)) {
669 List<OFStatistics> list = new ArrayList<OFStatistics>();
675 return this.dummyList;
679 * Converts the v6 vendor statistics to the OFStatistics
681 private List<OFStatistics> v6StatsListToOFStatsList(
682 List<OFStatistics> statistics) {
683 List<OFStatistics> v6statistics = new ArrayList<OFStatistics>();
684 if (statistics != null && !statistics.isEmpty()) {
685 for (OFStatistics stats : statistics) {
686 if (stats instanceof OFVendorStatistics) {
687 List<OFStatistics> r = getV6ReplyStatistics((OFVendorStatistics) stats);
689 v6statistics.addAll(r);
697 private static List<OFStatistics> getV6ReplyStatistics(
698 OFVendorStatistics stat) {
699 int length = stat.getLength();
700 List<OFStatistics> results = new ArrayList<OFStatistics>();
702 return null; // Nicira Hdr is 12 bytes. We need atleast that much
703 ByteBuffer data = ByteBuffer.allocate(length);
706 if (log.isTraceEnabled()) {
707 log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}",
708 HexString.toHexString(data.array()));
711 int vendor = data.getInt(); // first 4 bytes is vendor id.
712 if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) {
713 log.warn("Unexpected vendor id: 0x{}", Integer.toHexString(vendor));
716 // go ahead by 8 bytes which is 8 bytes of 0
717 data.getLong(); // should be all 0's
718 length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have
722 V6StatsReply v6statsreply;
725 v6statsreply = new V6StatsReply();
726 min_len = v6statsreply.getLength();
727 if (length < v6statsreply.getLength())
729 v6statsreply.setActionFactory(stat.getActionFactory());
730 v6statsreply.readFrom(data);
731 if (v6statsreply.getLength() < min_len)
733 v6statsreply.setVendorId(vendor);
734 log.trace("V6StatsReply: {}", v6statsreply);
735 length -= v6statsreply.getLength();
736 results.add(v6statsreply);
742 public List<OFStatistics> queryStatistics(Long switchId,
743 OFStatisticsType statType, Object target) {
745 * Caller does not know and it is not supposed to know whether this
746 * switch supports vendor extension. We adjust the target for him
748 if (statType == OFStatisticsType.FLOW) {
749 if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
750 statType = OFStatisticsType.VENDOR;
754 List<OFStatistics> list = this.acquireStatistics(switchId, statType,
757 return (list == null) ? null
758 : (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list)
763 public List<OFStatistics> getOFDescStatistics(Long switchId) {
764 if (!descStatistics.containsKey(switchId))
765 return this.dummyList;
767 return descStatistics.get(switchId);
771 public List<OFStatistics> getOFPortStatistics(Long switchId) {
772 if (!portStatistics.containsKey(switchId)) {
773 return this.dummyList;
776 return portStatistics.get(switchId);
780 public List<OFStatistics> getOFPortStatistics(Long switchId, short portId) {
781 if (!portStatistics.containsKey(switchId)) {
782 return this.dummyList;
784 List<OFStatistics> list = new ArrayList<OFStatistics>(1);
785 for (OFStatistics stats : portStatistics.get(switchId)) {
786 if (((OFPortStatisticsReply) stats).getPortNumber() == portId) {
795 public int getFlowsNumber(long switchId) {
796 return this.flowStatistics.get(switchId).size();
800 * InventoryShim replay for us all the switch addition which happened before
804 public void updateNode(Node node, UpdateType type, Set<Property> props) {
805 Long switchId = (Long) node.getID();
808 addStatisticsTicks(switchId);
811 clearFlowStatsAndTicks(switchId);
817 public void updateNodeConnector(NodeConnector nodeConnector,
818 UpdateType type, Set<Property> props) {
823 * Update the cached port rates for this switch with the latest retrieved
824 * port transmit byte count
828 private synchronized void updatePortsTxRate(long switchId) {
829 List<OFStatistics> newPortStatistics = this.portStatistics
831 if (newPortStatistics == null) {
834 Map<Short, TxRates> rates = this.txRates.get(switchId);
836 // First time rates for this switch are added
837 rates = new HashMap<Short, TxRates>();
838 txRates.put(switchId, rates);
840 for (OFStatistics stats : newPortStatistics) {
841 OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats;
842 short port = newPortStat.getPortNumber();
843 TxRates portRatesHolder = rates.get(port);
844 if (portRatesHolder == null) {
845 // First time rates for this port are added
846 portRatesHolder = new TxRates();
847 rates.put(port, portRatesHolder);
849 // Get and store the number of transmitted bytes for this port
850 // And handle the case where agent does not support the counter
851 long transmitBytes = newPortStat.getTransmitBytes();
852 long value = (transmitBytes < 0) ? 0 : transmitBytes;
853 portRatesHolder.update(value);
858 public synchronized long getTransmitRate(Long switchId, Short port) {
860 if (switchId == null || port == null) {
863 Map<Short, TxRates> perSwitch = txRates.get(switchId);
864 if (perSwitch == null) {
867 TxRates portRates = perSwitch.get(port);
868 if (portRates == null) {
871 return portRates.getAverageTxRate();
875 * Manual switch name configuration code
878 public String getHelp() {
879 StringBuffer help = new StringBuffer();
880 help.append("---OF Statistics Manager utilities---\n");
881 help.append("\t ofdumpstatsmgr - "
882 + "Print Internal Stats Mgr db\n");
883 help.append("\t ofstatsmgrintervals <fP> <pP> <dP>(in seconds) - "
884 + "Set/Show flow/port/dedscription stats poll intervals\n");
885 return help.toString();
888 private boolean isValidSwitchId(String switchId) {
889 String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$";
890 String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$";
892 return (switchId != null && (switchId.matches(regexDatapathID) || switchId
893 .matches(regexDatapathIDLong)));
896 public long getSwitchIDLong(String switchId) {
898 String switchString = "0";
900 if (isValidSwitchId(switchId)) {
901 if (switchId.contains(":")) {
902 // Handle the 00:00:AA:BB:CC:DD:EE:FF notation
903 switchString = switchId.replace(":", "");
904 } else if (switchId.contains("-")) {
905 // Handle the 00-00-AA-BB-CC-DD-EE-FF notation
906 switchString = switchId.replace("-", "");
908 // Handle the 0123456789ABCDEF notation
909 switchString = switchId;
912 return Long.parseLong(switchString, radix);
916 * Internal information dump code
918 private String prettyPrintSwitchMap(ConcurrentMap<Long, StatisticsTicks> map) {
919 StringBuffer buffer = new StringBuffer();
921 for (Entry<Long, StatisticsTicks> entry : map.entrySet()) {
922 buffer.append(HexString.toHexString(entry.getKey()) + "="
923 + entry.getValue().toString() + " ");
926 return buffer.toString();
929 public void _ofdumpstatsmgr(CommandInterpreter ci) {
930 ci.println("Global Counter: " + counter);
931 ci.println("Timer Ticks: " + prettyPrintSwitchMap(statisticsTimerTicks));
932 ci.println("PendingStatsQueue: " + pendingStatsRequests);
933 ci.println("PendingStatsQueue size: " + pendingStatsRequests.size());
934 ci.println("Stats Collector alive: " + statisticsCollector.isAlive());
935 ci.println("Stats Collector State: "
936 + statisticsCollector.getState().toString());
937 ci.println("StatsTimer: " + statisticsTimer.toString());
938 ci.println("Flow Stats Period: " + statisticsTickNumber + " s");
939 ci.println("Desc Stats Period: " + descriptionTickNumber + " s");
940 ci.println("Port Stats Period: " + portTickNumber + " s");
943 public void _resetSwitchCapability(CommandInterpreter ci) {
944 String sidString = ci.nextArgument();
946 if (sidString == null) {
947 ci.println("Insert the switch id (numeric value)");
951 sid = Long.valueOf(sidString);
952 this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE);
953 ci.println("Vendor capability for switch " + sid + " set to "
954 + this.switchSupportsVendorExtStats.get(sid));
955 } catch (NumberFormatException e) {
956 ci.println("Invalid switch id. Has to be numeric.");
961 public void _ofbw(CommandInterpreter ci) {
962 String sidString = ci.nextArgument();
964 if (sidString == null) {
965 ci.println("Insert the switch id (numeric value)");
969 sid = Long.valueOf(sidString);
970 } catch (NumberFormatException e) {
971 ci.println("Invalid switch id. Has to be numeric.");
974 Map<Short, TxRates> thisSwitchRates = txRates.get(sid);
975 ci.println("Bandwidth utilization (" + factoredSamples
976 * portTickNumber + " sec average) for switch "
977 + HexEncode.longToHexString(sid) + ":");
978 if (thisSwitchRates == null) {
979 ci.println("Not available");
981 for (Entry<Short, TxRates> entry : thisSwitchRates.entrySet()) {
982 ci.println("Port: " + entry.getKey() + ": "
983 + entry.getValue().getAverageTxRate() + " bps");
989 public void _txratewindow(CommandInterpreter ci) {
990 String averageWindow = ci.nextArgument();
992 if (averageWindow == null) {
993 ci.println("Insert the length in seconds of the median "
994 + "window for tx rate");
995 ci.println("Current: " + factoredSamples * portTickNumber + " secs");
999 seconds = Short.valueOf(averageWindow);
1000 } catch (NumberFormatException e) {
1001 ci.println("Invalid period.");
1003 OFStatisticsManager.factoredSamples = (short) (seconds / portTickNumber);
1004 ci.println("New: " + factoredSamples * portTickNumber + " secs");
1007 public void _ofstatsmgrintervals(CommandInterpreter ci) {
1008 String flowStatsInterv = ci.nextArgument();
1009 String portStatsInterv = ci.nextArgument();
1010 String descStatsInterv = ci.nextArgument();
1012 if (flowStatsInterv == null || portStatsInterv == null
1013 || descStatsInterv == null) {
1014 ci.println("Usage: ostatsmgrintervals <fP> <pP> <dP>(in seconds)");
1015 ci.println("Current Values: fP=" + statisticsTickNumber + "s pP="
1016 + portTickNumber + "s dP=" + descriptionTickNumber + "s");
1021 fP = Short.parseShort(flowStatsInterv);
1022 pP = Short.parseShort(portStatsInterv);
1023 dP = Short.parseShort(descStatsInterv);
1024 } catch (Exception e) {
1025 ci.println("Invalid format values: " + e.getMessage());
1029 if (pP <= 1 || fP <= 1 || dP <= 1) {
1030 ci.println("Invalid values. fP, pP, dP have to be greater than 1.");
1034 statisticsTickNumber = fP;
1035 portTickNumber = pP;
1036 descriptionTickNumber = dP;
1038 ci.println("New Values: fP=" + statisticsTickNumber + "s pP="
1039 + portTickNumber + "s dP=" + descriptionTickNumber + "s");
1043 * This method retrieves user configurations from config.ini and updates
1044 * statisticsTickNumber/portTickNumber/descriptionTickNumber accordingly.
1046 private void configStatsPollIntervals() {
1047 String fsStr = System.getProperty("of.flowStatsPollInterval");
1048 String psStr = System.getProperty("of.portStatsPollInterval");
1049 String dsStr = System.getProperty("of.descStatsPollInterval");
1052 if (fsStr != null) {
1054 fs = Short.parseShort(fsStr);
1056 statisticsTickNumber = fs;
1058 } catch (Exception e) {
1062 if (psStr != null) {
1064 ps = Short.parseShort(psStr);
1066 portTickNumber = ps;
1068 } catch (Exception e) {
1072 if (dsStr != null) {
1074 ds = Short.parseShort(dsStr);
1076 descriptionTickNumber = ds;
1078 } catch (Exception e) {