3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.protocol_plugin.openflow.internal;
12 import java.nio.ByteBuffer;
13 import java.util.ArrayList;
14 import java.util.Collections;
15 import java.util.Deque;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.List;
20 import java.util.Map.Entry;
22 import java.util.Timer;
23 import java.util.TimerTask;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.LinkedBlockingDeque;
28 import java.util.concurrent.LinkedBlockingQueue;
30 import org.eclipse.osgi.framework.console.CommandInterpreter;
31 import org.eclipse.osgi.framework.console.CommandProvider;
32 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
33 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
34 import org.opendaylight.controller.protocol_plugin.openflow.IStatisticsListener;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
36 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
37 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
38 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
39 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
40 import org.opendaylight.controller.sal.core.Node;
41 import org.opendaylight.controller.sal.core.NodeConnector;
42 import org.opendaylight.controller.sal.core.Property;
43 import org.opendaylight.controller.sal.core.UpdateType;
44 import org.opendaylight.controller.sal.utils.HexEncode;
45 import org.openflow.protocol.OFError;
46 import org.openflow.protocol.OFMatch;
47 import org.openflow.protocol.OFPort;
48 import org.openflow.protocol.OFStatisticsRequest;
49 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
50 import org.openflow.protocol.statistics.OFDescriptionStatistics;
51 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
52 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
53 import org.openflow.protocol.statistics.OFPortStatisticsReply;
54 import org.openflow.protocol.statistics.OFPortStatisticsRequest;
55 import org.openflow.protocol.statistics.OFQueueStatisticsRequest;
56 import org.openflow.protocol.statistics.OFStatistics;
57 import org.openflow.protocol.statistics.OFStatisticsType;
58 import org.openflow.protocol.statistics.OFVendorStatistics;
59 import org.openflow.util.HexString;
60 import org.osgi.framework.BundleContext;
61 import org.osgi.framework.FrameworkUtil;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
66 * It periodically polls the different OF statistics from the OF switches
67 * and caches them for quick retrieval for the above layers' modules
68 * It also provides an API to directly query the switch about the statistics
70 public class OFStatisticsManager implements IOFStatisticsManager,
71 IInventoryShimExternalListener, CommandProvider {
72 private static final Logger log = LoggerFactory
73 .getLogger(OFStatisticsManager.class);
74 private static final int initialSize = 64;
75 private static final long flowStatsPeriod = 10000;
76 private static final long descriptionStatsPeriod = 60000;
77 private static final long portStatsPeriod = 5000;
78 private static final long tickPeriod = 1000;
79 private static short statisticsTickNumber = (short) (flowStatsPeriod / tickPeriod);
80 private static short descriptionTickNumber = (short) (descriptionStatsPeriod / tickPeriod);
81 private static short portTickNumber = (short) (portStatsPeriod / tickPeriod);
82 private static short factoredSamples = (short) 2;
83 private static short counter = 1;
84 private IController controller = null;
85 private ConcurrentMap<Long, List<OFStatistics>> flowStatistics;
86 private ConcurrentMap<Long, List<OFStatistics>> descStatistics;
87 private ConcurrentMap<Long, List<OFStatistics>> portStatistics;
88 private List<OFStatistics> dummyList;
89 private ConcurrentMap<Long, StatisticsTicks> statisticsTimerTicks;
90 protected BlockingQueue<StatsRequest> pendingStatsRequests;
91 protected BlockingQueue<Long> switchPortStatsUpdated;
92 private Thread statisticsCollector;
93 private Thread txRatesUpdater;
94 private Timer statisticsTimer;
95 private TimerTask statisticsTimerTask;
96 private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
97 private Map<Long, Map<Short, TxRates>> txRates; // Per port sampled (every portStatsPeriod) transmit rate
98 private Set<IStatisticsListener> descriptionListeners;
101 * The object containing the latest factoredSamples tx rate samples
102 * for a given switch port
104 protected class TxRates {
105 Deque<Long> sampledTxBytes; // contains the latest factoredSamples sampled transmitted bytes
108 sampledTxBytes = new LinkedBlockingDeque<Long>();
111 public void update(Long txBytes) {
113 * Based on how many samples our average works on,
114 * we might have to remove the oldest sample
116 if (sampledTxBytes.size() == factoredSamples) {
117 sampledTxBytes.removeLast();
120 // Add the latest sample to the top of the queue
121 sampledTxBytes.addFirst(txBytes);
125 * Returns the average transmit rate in bps
126 * @return the average transmit rate [bps]
128 public long getAverageTxRate() {
131 * If we cannot provide the value for the time window length set
133 if (sampledTxBytes.size() < factoredSamples) {
136 long increment = (long) (sampledTxBytes.getFirst() - sampledTxBytes
138 long timePeriod = (long) (factoredSamples * portStatsPeriod)
140 average = (8 * increment) / timePeriod;
145 public void setController(IController core) {
146 this.controller = core;
149 public void unsetController(IController core) {
150 if (this.controller == core) {
151 this.controller = null;
156 * Function called by the dependency manager when all the required
157 * dependencies are satisfied
161 flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
162 descStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
163 portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
164 dummyList = new ArrayList<OFStatistics>(1);
165 statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(
167 pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(
169 switchPortStatsUpdated = new LinkedBlockingQueue<Long>(initialSize);
170 switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(
172 txRates = new HashMap<Long, Map<Short, TxRates>>(initialSize);
173 descriptionListeners = new HashSet<IStatisticsListener>();
175 // Initialize managed timers
176 statisticsTimer = new Timer();
177 statisticsTimerTask = new TimerTask() {
184 // Initialize Statistics collector thread
185 statisticsCollector = new Thread(new Runnable() {
190 StatsRequest req = pendingStatsRequests.take();
191 acquireStatistics(req.switchId, req.type);
192 } catch (InterruptedException e) {
193 log.warn("Flow Statistics Collector thread " +
198 }, "Statistics Collector");
200 // Initialize Tx Rate Updater thread
201 txRatesUpdater = new Thread(new Runnable() {
206 long switchId = switchPortStatsUpdated.take();
207 updatePortsTxRate(switchId);
208 } catch (InterruptedException e) {
209 log.warn("TX Rate Updater thread interrupted");
213 }, "TX Rate Updater");
217 * Function called by the dependency manager when at least one
218 * dependency become unsatisfied or when the component is shutting
219 * down because for example bundle is being stopped.
226 * Function called by dependency manager after "init ()" is called
227 * and after the services provided by the class are registered in
228 * the service registry
232 // Start managed timers
233 statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, tickPeriod);
235 // Start statistics collector thread
236 statisticsCollector.start();
238 // Start bandwidth utilization computer thread
239 txRatesUpdater.start();
242 registerWithOSGIConsole();
246 * Function called by the dependency manager before the services
247 * exported by the component are unregistered, this will be
248 * followed by a "destroy ()" calls
252 // Stop managed timers
253 statisticsTimer.cancel();
256 public void setStatisticsListener(IStatisticsListener s) {
257 this.descriptionListeners.add(s);
260 public void unsetStatisticsListener(IStatisticsListener s) {
262 this.descriptionListeners.remove(s);
266 private void registerWithOSGIConsole() {
267 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
269 bundleContext.registerService(CommandProvider.class.getName(), this,
273 private static class StatsRequest {
274 protected Long switchId;
275 protected OFStatisticsType type;
277 public StatsRequest(Long d, OFStatisticsType t) {
282 public String toString() {
283 return "SReq = {switchId=" + switchId + ", type=" + type + "}";
287 public int hashCode() {
288 final int prime = 31;
290 result = prime * result
291 + ((switchId == null) ? 0 : switchId.hashCode());
292 result = prime * result + ((type == null) ? 0 : type.ordinal());
297 public boolean equals(Object obj) {
304 if (getClass() != obj.getClass()) {
307 StatsRequest other = (StatsRequest) obj;
308 if (switchId == null) {
309 if (other.switchId != null) {
312 } else if (!switchId.equals(other.switchId)) {
315 if (type != other.type) {
322 private void addStatisticsTicks(Long switchId) {
323 switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume switch supports Vendor extension stats
324 statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
325 log.info("Added Switch {} to target pool", HexString
326 .toHexString(switchId.longValue()));
329 protected static class StatisticsTicks {
330 private short flowStatisticsTicks;
331 private short descriptionTicks;
332 private short portStatisticsTicks;
334 public StatisticsTicks(boolean scattered) {
336 // scatter bursts by statisticsTickPeriod
339 } // being paranoid here
340 flowStatisticsTicks = (short) (1 + counter
341 % statisticsTickNumber);
342 descriptionTicks = (short) (1 + counter % descriptionTickNumber);
343 portStatisticsTicks = (short) (1 + counter % portTickNumber);
345 flowStatisticsTicks = statisticsTickNumber;
346 descriptionTicks = descriptionTickNumber;
347 portStatisticsTicks = portTickNumber;
351 public boolean decrementFlowTicksIsZero() {
352 // Please ensure no code is inserted between the if check and the flowStatisticsTicks reset
353 if (--flowStatisticsTicks == 0) {
354 flowStatisticsTicks = statisticsTickNumber;
360 public boolean decrementDescTicksIsZero() {
361 // Please ensure no code is inserted between the if check and the descriptionTicks reset
362 if (--descriptionTicks == 0) {
363 descriptionTicks = descriptionTickNumber;
369 public boolean decrementPortTicksIsZero() {
370 // Please ensure no code is inserted between the if check and the descriptionTicks reset
371 if (--portStatisticsTicks == 0) {
372 portStatisticsTicks = portTickNumber;
378 public String toString() {
379 return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks
380 + ",pT=" + portStatisticsTicks + "}";
384 private void printInfoMessage(String type, StatsRequest request) {
388 + " stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
389 new Object[] { HexString.toHexString(request.switchId),
390 pendingStatsRequests.size(),
391 statisticsCollector.getState().toString() });
394 protected void decrementTicks() {
395 StatsRequest request = null;
396 for (Map.Entry<Long, StatisticsTicks> entry : statisticsTimerTicks
398 StatisticsTicks clock = entry.getValue();
399 Long switchId = entry.getKey();
400 if (clock.decrementFlowTicksIsZero() == true) {
401 request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ? new StatsRequest(
402 switchId, OFStatisticsType.VENDOR)
403 : new StatsRequest(switchId, OFStatisticsType.FLOW);
404 // If a request for this switch is already in the queue, skip to add this new request
405 if (!pendingStatsRequests.contains(request)
406 && false == pendingStatsRequests.offer(request)) {
407 printInfoMessage("Flow", request);
411 if (clock.decrementDescTicksIsZero() == true) {
412 request = new StatsRequest(switchId, OFStatisticsType.DESC);
413 // If a request for this switch is already in the queue, skip to add this new request
414 if (!pendingStatsRequests.contains(request)
415 && false == pendingStatsRequests.offer(request)) {
416 printInfoMessage("Description", request);
420 if (clock.decrementPortTicksIsZero() == true) {
421 request = new StatsRequest(switchId, OFStatisticsType.PORT);
422 // If a request for this switch is already in the queue, skip to add this new request
423 if (!pendingStatsRequests.contains(request)
424 && false == pendingStatsRequests.offer(request)) {
425 printInfoMessage("Port", request);
431 private void removeStatsRequestTasks(Long switchId) {
432 log.info("Cleaning Statistics database for switch "
433 + HexEncode.longToHexString(switchId));
434 // To be safe, let's attempt removal of both VENDOR and FLOW request. It does not hurt
435 pendingStatsRequests.remove(new StatsRequest(switchId,
436 OFStatisticsType.VENDOR));
437 pendingStatsRequests.remove(new StatsRequest(switchId,
438 OFStatisticsType.FLOW));
439 pendingStatsRequests.remove(new StatsRequest(switchId,
440 OFStatisticsType.DESC));
441 pendingStatsRequests.remove(new StatsRequest(switchId,
442 OFStatisticsType.PORT));
443 // Take care of the TX rate databases
444 switchPortStatsUpdated.remove(switchId);
445 txRates.remove(switchId);
448 private void clearFlowStatsAndTicks(Long switchId) {
449 statisticsTimerTicks.remove(switchId);
450 removeStatsRequestTasks(switchId);
451 flowStatistics.remove(switchId);
452 log.info("Statistics removed for switch "
453 + HexString.toHexString(switchId));
456 private void acquireStatistics(Long switchId, OFStatisticsType statType) {
458 // Query the switch on all matches
459 List<OFStatistics> values = this.acquireStatistics(switchId, statType,
462 // Update local caching database if got a valid response
463 if (values != null && !values.isEmpty()) {
464 if ((statType == OFStatisticsType.FLOW)
465 || (statType == OFStatisticsType.VENDOR)) {
466 flowStatistics.put(switchId, values);
467 } else if (statType == OFStatisticsType.DESC) {
468 // Notify who may be interested in a description change
469 notifyDescriptionListeners(switchId, values);
472 descStatistics.put(switchId, values);
473 } else if (statType == OFStatisticsType.PORT) {
474 // Overwrite cache with new port statistics for this switch
475 portStatistics.put(switchId, values);
477 // Wake up the thread which maintains the TX byte counters for each port
478 switchPortStatsUpdated.offer(switchId);
483 private void notifyDescriptionListeners(Long switchId,
484 List<OFStatistics> values) {
485 for (IStatisticsListener l : this.descriptionListeners) {
486 l.descriptionRefreshed(switchId,
487 ((OFDescriptionStatistics)values.get(0)));
492 * Generic function to get the statistics form a OF switch
494 @SuppressWarnings("unchecked")
495 private List<OFStatistics> acquireStatistics(Long switchId,
496 OFStatisticsType statsType, Object target) {
497 List<OFStatistics> values = null;
499 ISwitch sw = controller.getSwitch(switchId);
502 OFStatisticsRequest req = new OFStatisticsRequest();
503 req.setStatisticType(statsType);
504 int requestLength = req.getLengthU();
506 if (statsType == OFStatisticsType.FLOW) {
507 OFMatch match = null;
508 if (target == null) {
510 match = new OFMatch();
511 match.setWildcards(0xffffffff);
512 } else if (!(target instanceof OFMatch)) {
514 log.warn("Invalid target type for Flow stats request: "
515 + target.getClass());
518 // Specific flow request
519 match = (OFMatch) target;
521 OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest();
522 specificReq.setMatch(match);
523 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
524 specificReq.setTableId((byte) 0xff);
525 req.setStatistics(Collections
526 .singletonList((OFStatistics) specificReq));
527 requestLength += specificReq.getLength();
529 } else if (statsType == OFStatisticsType.VENDOR) {
530 V6StatsRequest specificReq = new V6StatsRequest();
531 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
532 specificReq.setTableId((byte) 0xff);
533 req.setStatistics(Collections
534 .singletonList((OFStatistics) specificReq));
535 requestLength += specificReq.getLength();
537 } else if (statsType == OFStatisticsType.AGGREGATE) {
538 OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest();
539 OFMatch match = new OFMatch();
540 match.setWildcards(0xffffffff);
541 specificReq.setMatch(match);
542 specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
543 specificReq.setTableId((byte) 0xff);
544 req.setStatistics(Collections
545 .singletonList((OFStatistics) specificReq));
546 requestLength += specificReq.getLength();
548 } else if (statsType == OFStatisticsType.PORT) {
550 if (target == null) {
552 targetPort = (short) OFPort.OFPP_NONE.getValue();
553 } else if (!(target instanceof Short)) {
555 log.warn("Invalid target type for Port stats request: "
556 + target.getClass());
559 // Specific port request
560 targetPort = (Short) target;
562 OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest();
563 specificReq.setPortNumber(targetPort);
564 req.setStatistics(Collections
565 .singletonList((OFStatistics) specificReq));
566 requestLength += specificReq.getLength();
568 } else if (statsType == OFStatisticsType.QUEUE) {
569 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
570 specificReq.setPortNumber((short) OFPort.OFPP_ALL.getValue());
571 specificReq.setQueueId(0xffffffff);
572 req.setStatistics(Collections
573 .singletonList((OFStatistics) specificReq));
574 requestLength += specificReq.getLength();
576 } else if (statsType == OFStatisticsType.DESC) {
578 } else if (statsType == OFStatisticsType.TABLE) {
581 req.setLengthU(requestLength);
582 Object result = sw.getStatistics(req);
584 if (result == null) {
585 log.warn("Request Timed Out for ({}) from switch {}", type,
586 HexString.toHexString(switchId));
587 } else if (result instanceof OFError) {
588 log.warn("Switch {} failed to handle ({}) stats request: "
589 + Utils.getOFErrorString((OFError) result), HexString
590 .toHexString(switchId), type);
591 if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
594 "Switching back to regular Flow stats requests for switch {}",
595 HexString.toHexString(switchId));
596 this.switchSupportsVendorExtStats.put(switchId,
600 values = (List<OFStatistics>) result;
607 public List<OFStatistics> getOFFlowStatistics(Long switchId) {
608 List<OFStatistics> list = flowStatistics.get(switchId);
611 * Check on emptiness as interference between add and get is still
612 * possible on the inner list (the concurrentMap entry's value)
614 return (list == null || list.isEmpty()) ? this.dummyList
615 : (list.get(0) instanceof OFVendorStatistics) ? this
616 .v6StatsListToOFStatsList(list) : list;
620 public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch) {
621 List<OFStatistics> statsList = flowStatistics.get(switchId);
624 * Check on emptiness as interference between add and get is still
625 * possible on the inner list (the concurrentMap entry's value)
627 if (statsList == null || statsList.isEmpty()) {
628 return this.dummyList;
631 if (statsList.get(0) instanceof OFVendorStatistics) {
633 * Caller could provide regular OF match when we
634 * instead pull the vendor statistics from this node
635 * Caller is not supposed to know whether this switch supports
636 * vendor extensions statistics requests
638 V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch
639 : new V6Match(ofMatch);
641 List<OFStatistics> targetList = v6StatsListToOFStatsList(statsList);
642 for (OFStatistics stats : targetList) {
643 V6StatsReply v6Stats = (V6StatsReply) stats;
644 V6Match v6Match = v6Stats.getMatch();
645 if (v6Match.equals(targetMatch)) {
646 List<OFStatistics> list = new ArrayList<OFStatistics>();
652 for (OFStatistics stats : statsList) {
653 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
654 if (flowStats.getMatch().equals(ofMatch)) {
655 List<OFStatistics> list = new ArrayList<OFStatistics>();
661 return this.dummyList;
665 * Converts the v6 vendor statistics to the OFStatistics
667 private List<OFStatistics> v6StatsListToOFStatsList(
668 List<OFStatistics> statistics) {
669 List<OFStatistics> v6statistics = new ArrayList<OFStatistics>();
670 if (statistics != null && !statistics.isEmpty()) {
671 for (OFStatistics stats : statistics) {
672 if (stats instanceof OFVendorStatistics) {
673 List<OFStatistics> r = getV6ReplyStatistics((OFVendorStatistics) stats);
675 v6statistics.addAll(r);
683 private static List<OFStatistics> getV6ReplyStatistics(
684 OFVendorStatistics stat) {
685 int length = stat.getLength();
686 List<OFStatistics> results = new ArrayList<OFStatistics>();
688 return null; // Nicira Hdr is 12 bytes. We need atleast that much
689 ByteBuffer data = ByteBuffer.allocate(length);
692 log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}", HexString
693 .toHexString(data.array()));
695 int vendor = data.getInt(); //first 4 bytes is vendor id.
696 if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) {
698 .debug("Unexpected vendor id: 0x{}", Integer
699 .toHexString(vendor));
702 //go ahead by 8 bytes which is 8 bytes of 0
703 data.getLong(); //should be all 0's
704 length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have been consumed
707 V6StatsReply v6statsreply;
710 v6statsreply = new V6StatsReply();
711 min_len = v6statsreply.getLength();
712 if (length < v6statsreply.getLength())
714 v6statsreply.setActionFactory(stat.getActionFactory());
715 v6statsreply.readFrom(data);
716 if (v6statsreply.getLength() < min_len)
718 v6statsreply.setVendorId(vendor);
719 log.trace("V6StatsReply: {}", v6statsreply);
720 length -= v6statsreply.getLength();
721 results.add(v6statsreply);
727 public List<OFStatistics> queryStatistics(Long switchId,
728 OFStatisticsType statType, Object target) {
730 * Caller does not know and it is not supposed to know whether
731 * this switch supports vendor extension. We adjust the target for him
733 if (statType == OFStatisticsType.FLOW) {
734 if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
735 statType = OFStatisticsType.VENDOR;
739 List<OFStatistics> list = this.acquireStatistics(switchId, statType,
742 return (list == null) ? null
743 : (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list)
748 public List<OFStatistics> getOFDescStatistics(Long switchId) {
749 if (!descStatistics.containsKey(switchId))
750 return this.dummyList;
752 return descStatistics.get(switchId);
756 public List<OFStatistics> getOFPortStatistics(Long switchId) {
757 if (!portStatistics.containsKey(switchId)) {
758 return this.dummyList;
761 return portStatistics.get(switchId);
765 public List<OFStatistics> getOFPortStatistics(Long switchId, short portId) {
766 if (!portStatistics.containsKey(switchId)) {
767 return this.dummyList;
769 List<OFStatistics> list = new ArrayList<OFStatistics>(1);
770 for (OFStatistics stats : portStatistics.get(switchId)) {
771 if (((OFPortStatisticsReply) stats).getPortNumber() == portId) {
780 public int getFlowsNumber(long switchId) {
781 return this.flowStatistics.get(switchId).size();
785 * InventoryShim replay for us all the switch addition which happened before we were brought up
788 public void updateNode(Node node, UpdateType type, Set<Property> props) {
789 Long switchId = (Long) node.getID();
792 addStatisticsTicks(switchId);
795 clearFlowStatsAndTicks(switchId);
801 public void updateNodeConnector(NodeConnector nodeConnector,
802 UpdateType type, Set<Property> props) {
807 * Update the cached port rates for this switch with the latest
808 * retrieved port transmit byte count
811 private synchronized void updatePortsTxRate(long switchId) {
812 List<OFStatistics> newPortStatistics = this.portStatistics
814 if (newPortStatistics == null) {
817 Map<Short, TxRates> rates = this.txRates.get(switchId);
819 // First time rates for this switch are added
820 rates = new HashMap<Short, TxRates>();
821 txRates.put(switchId, rates);
823 for (OFStatistics stats : newPortStatistics) {
824 OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats;
825 short port = newPortStat.getPortNumber();
826 TxRates portRatesHolder = rates.get(port);
827 if (portRatesHolder == null) {
828 // First time rates for this port are added
829 portRatesHolder = new TxRates();
830 rates.put(port, portRatesHolder);
832 // Get and store the number of transmitted bytes for this port
833 // And handle the case where agent does not support the counter
834 long transmitBytes = newPortStat.getTransmitBytes();
835 long value = (transmitBytes < 0) ? 0 : transmitBytes;
836 portRatesHolder.update(value);
841 public synchronized long getTransmitRate(Long switchId, Short port) {
843 if (switchId == null || port == null) {
846 Map<Short, TxRates> perSwitch = txRates.get(switchId);
847 if (perSwitch == null) {
850 TxRates portRates = perSwitch.get(port);
851 if (portRates == null) {
854 return portRates.getAverageTxRate();
858 * Manual switch name configuration code
861 public String getHelp() {
862 StringBuffer help = new StringBuffer();
863 help.append("---OF Statistics Manager utilities---\n");
864 help.append("\t ofdumpstatsmgr - " +
865 "Print Internal Stats Mgr db\n");
866 return help.toString();
869 private boolean isValidSwitchId(String switchId) {
870 String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$";
871 String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$";
873 return (switchId != null && (switchId.matches(regexDatapathID) || switchId
874 .matches(regexDatapathIDLong)));
877 public long getSwitchIDLong(String switchId) {
879 String switchString = "0";
881 if (isValidSwitchId(switchId)) {
882 if (switchId.contains(":")) {
883 // Handle the 00:00:AA:BB:CC:DD:EE:FF notation
884 switchString = switchId.replace(":", "");
885 } else if (switchId.contains("-")) {
886 // Handle the 00-00-AA-BB-CC-DD-EE-FF notation
887 switchString = switchId.replace("-", "");
889 // Handle the 0123456789ABCDEF notation
890 switchString = switchId;
893 return Long.parseLong(switchString, radix);
897 * Internal information dump code
899 private String prettyPrintSwitchMap(ConcurrentMap<Long, StatisticsTicks> map) {
900 StringBuffer buffer = new StringBuffer();
902 for (Entry<Long, StatisticsTicks> entry : map.entrySet()) {
903 buffer.append(HexString.toHexString(entry.getKey()) + "="
904 + entry.getValue().toString() + " ");
907 return buffer.toString();
910 public void _ofdumpstatsmgr(CommandInterpreter ci) {
911 ci.println("Global Counter: " + counter);
913 .println("Timer Ticks: "
914 + prettyPrintSwitchMap(statisticsTimerTicks));
915 ci.println("PendingStatsQueue: " + pendingStatsRequests);
916 ci.println("PendingStatsQueue size: " + pendingStatsRequests.size());
917 ci.println("Stats Collector alive: " + statisticsCollector.isAlive());
918 ci.println("Stats Collector State: "
919 + statisticsCollector.getState().toString());
920 ci.println("StatsTimer: " + statisticsTimer.toString());
921 ci.println("Flow Stats Period: " + statisticsTickNumber + " s");
922 ci.println("Desc Stats Period: " + descriptionTickNumber + " s");
923 ci.println("Port Stats Period: " + portTickNumber + " s");
926 public void _resetSwitchCapability(CommandInterpreter ci) {
927 String sidString = ci.nextArgument();
929 if (sidString == null) {
930 ci.println("Insert the switch id (numeric value)");
934 sid = Long.valueOf(sidString);
935 this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE);
936 ci.println("Vendor capability for switch " + sid + " set to "
937 + this.switchSupportsVendorExtStats.get(sid));
938 } catch (NumberFormatException e) {
939 ci.println("Invalid switch id. Has to be numeric.");
944 public void _ofbw(CommandInterpreter ci) {
945 String sidString = ci.nextArgument();
947 if (sidString == null) {
948 ci.println("Insert the switch id (numeric value)");
952 sid = Long.valueOf(sidString);
953 } catch (NumberFormatException e) {
954 ci.println("Invalid switch id. Has to be numeric.");
957 Map<Short, TxRates> thisSwitchRates = txRates.get(sid);
958 ci.println("Bandwidth utilization (" + factoredSamples
959 * portTickNumber + " sec average) for switch "
960 + HexEncode.longToHexString(sid) + ":");
961 if (thisSwitchRates == null) {
962 ci.println("Not available");
964 for (Entry<Short, TxRates> entry : thisSwitchRates.entrySet()) {
965 ci.println("Port: " + entry.getKey() + ": "
966 + entry.getValue().getAverageTxRate() + " bps");
972 public void _txratewindow(CommandInterpreter ci) {
973 String averageWindow = ci.nextArgument();
975 if (averageWindow == null) {
976 ci.println("Insert the length in seconds of the median " +
977 "window for tx rate");
978 ci.println("Current: " + factoredSamples * portTickNumber
983 seconds = Short.valueOf(averageWindow);
984 } catch (NumberFormatException e) {
985 ci.println("Invalid period.");
987 OFStatisticsManager.factoredSamples = (short) (seconds/portTickNumber);
988 ci.println("New: " + factoredSamples * portTickNumber + " secs");
991 public void _ofstatsmgrintervals(CommandInterpreter ci) {
992 String flowStatsInterv = ci.nextArgument();
993 String portStatsInterv = ci.nextArgument();
995 if (flowStatsInterv == null || portStatsInterv == null) {
997 ci.println("Usage: ostatsmgrintervals <fP> <pP> (in seconds)");
998 ci.println("Current Values: fP=" + statisticsTickNumber +
999 "s pP=" + portTickNumber + "s");
1004 fP = Short.parseShort(flowStatsInterv);
1005 pP = Short.parseShort(portStatsInterv);
1006 } catch (Exception e) {
1007 ci.println("Invalid format values: " + e.getMessage());
1011 if (pP <= 1 || fP <=1) {
1012 ci.println("Invalid values. fP and pP have to be greater than 1.");
1016 statisticsTickNumber = fP;
1017 portTickNumber = pP;
1019 ci.println("New Values: fP=" + statisticsTickNumber +
1020 "s pP=" + portTickNumber + "s");