X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fprotocol_plugins%2Fopenflow_netty%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fprotocol_plugin%2Fopenflow%2Finternal%2FOFStatisticsManager.java;fp=opendaylight%2Fprotocol_plugins%2Fopenflow_netty%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fprotocol_plugin%2Fopenflow%2Finternal%2FOFStatisticsManager.java;h=cf9a2c517dec43bb44c05e6da4edbe6efd89c67e;hb=85073423c6069e4b58fffde7cf19c806b2b52dd5;hp=0000000000000000000000000000000000000000;hpb=c5630f2945eb5370f9829514ef72de41d41eb2be;p=controller.git diff --git a/opendaylight/protocol_plugins/openflow_netty/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/OFStatisticsManager.java b/opendaylight/protocol_plugins/openflow_netty/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/OFStatisticsManager.java new file mode 100644 index 0000000000..cf9a2c517d --- /dev/null +++ b/opendaylight/protocol_plugins/openflow_netty/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/OFStatisticsManager.java @@ -0,0 +1,1255 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.protocol_plugin.openflow.internal; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; +import java.util.Dictionary; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; + +import org.eclipse.osgi.framework.console.CommandInterpreter; +import org.eclipse.osgi.framework.console.CommandProvider; +import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener; +import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimInternalListener; +import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager; +import org.opendaylight.controller.protocol_plugin.openflow.IStatisticsListener; +import org.opendaylight.controller.protocol_plugin.openflow.IStatisticsServiceShimListener; +import org.opendaylight.controller.protocol_plugin.openflow.core.IController; +import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; +import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match; +import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply; +import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest; +import org.opendaylight.controller.sal.core.Node; +import org.opendaylight.controller.sal.core.NodeConnector; +import org.opendaylight.controller.sal.core.Property; +import org.opendaylight.controller.sal.core.UpdateType; +import org.opendaylight.controller.sal.utils.GlobalConstants; +import org.opendaylight.controller.sal.utils.HexEncode; +import org.openflow.protocol.OFError; +import org.openflow.protocol.OFMatch; +import org.openflow.protocol.OFPort; +import org.openflow.protocol.OFStatisticsRequest; +import org.openflow.protocol.statistics.OFAggregateStatisticsRequest; +import org.openflow.protocol.statistics.OFDescriptionStatistics; +import org.openflow.protocol.statistics.OFFlowStatisticsReply; +import org.openflow.protocol.statistics.OFFlowStatisticsRequest; +import org.openflow.protocol.statistics.OFPortStatisticsReply; +import org.openflow.protocol.statistics.OFPortStatisticsRequest; +import org.openflow.protocol.statistics.OFQueueStatisticsRequest; +import org.openflow.protocol.statistics.OFStatistics; +import org.openflow.protocol.statistics.OFStatisticsType; +import org.openflow.protocol.statistics.OFVendorStatistics; +import org.openflow.util.HexString; +import org.osgi.framework.BundleContext; +import org.osgi.framework.FrameworkUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * It periodically polls the different OF statistics from the OF switches and + * caches them for quick retrieval for the above layers' modules It also + * provides an API to directly query the switch about the statistics + */ +public class OFStatisticsManager implements IOFStatisticsManager, + IInventoryShimExternalListener, CommandProvider { + private static final Logger log = LoggerFactory + .getLogger(OFStatisticsManager.class); + private static final int initialSize = 64; + private static final long flowStatsPeriod = 10000; + private static final long descriptionStatsPeriod = 60000; + private static final long portStatsPeriod = 5000; + private static final long flowTableStatsPeriod = 60000; + private static final long tickPeriod = 1000; + private static short statisticsTickNumber = (short) (flowStatsPeriod / tickPeriod); + private static short descriptionTickNumber = (short) (descriptionStatsPeriod / tickPeriod); + private static short portTickNumber = (short) (portStatsPeriod / tickPeriod); + private static short flowTableTickNumber = (short) (flowTableStatsPeriod / tickPeriod); + private static short factoredSamples = (short) 2; + private static short counter = 1; + private IController controller = null; + private ConcurrentMap> flowStatistics; + private ConcurrentMap> descStatistics; + private ConcurrentMap> portStatistics; + private ConcurrentMap> flowTableStatistics; + private List dummyList; + private ConcurrentMap statisticsTimerTicks; + protected BlockingQueue pendingStatsRequests; + protected BlockingQueue switchPortStatsUpdated; + private Thread statisticsCollector; + private Thread txRatesUpdater; + private Timer statisticsTimer; + private TimerTask statisticsTimerTask; + private ConcurrentMap switchSupportsVendorExtStats; + private Map> txRates; // Per port sampled (every + // portStatsPeriod) transmit + // rate + private Set descriptionListeners; + private ConcurrentMap statisticsServiceShimListener; + + /** + * The object containing the latest factoredSamples tx rate samples for a + * given switch port + */ + protected class TxRates { + Deque sampledTxBytes; // contains the latest factoredSamples + // sampled transmitted bytes + + public TxRates() { + sampledTxBytes = new LinkedBlockingDeque(); + } + + public void update(Long txBytes) { + /* + * Based on how many samples our average works on, we might have to + * remove the oldest sample + */ + if (sampledTxBytes.size() == factoredSamples) { + sampledTxBytes.removeLast(); + } + + // Add the latest sample to the top of the queue + sampledTxBytes.addFirst(txBytes); + } + + /** + * Returns the average transmit rate in bps + * + * @return the average transmit rate [bps] + */ + public long getAverageTxRate() { + long average = 0; + /* + * If we cannot provide the value for the time window length set + */ + if (sampledTxBytes.size() < factoredSamples) { + return average; + } + long increment = (long) (sampledTxBytes.getFirst() - sampledTxBytes + .getLast()); + long timePeriod = (long) (factoredSamples * portStatsPeriod) + / (long) tickPeriod; + average = (8L * increment) / timePeriod; + return average; + } + } + + public void setController(IController core) { + this.controller = core; + } + + public void unsetController(IController core) { + if (this.controller == core) { + this.controller = null; + } + } + + /** + * Function called by the dependency manager when all the required + * dependencies are satisfied + * + */ + void init() { + flowStatistics = new ConcurrentHashMap>(); + descStatistics = new ConcurrentHashMap>(); + portStatistics = new ConcurrentHashMap>(); + flowTableStatistics = new ConcurrentHashMap>(); + dummyList = new ArrayList(1); + statisticsTimerTicks = new ConcurrentHashMap( + initialSize); + pendingStatsRequests = new LinkedBlockingQueue( + initialSize); + switchPortStatsUpdated = new LinkedBlockingQueue(initialSize); + switchSupportsVendorExtStats = new ConcurrentHashMap( + initialSize); + txRates = new HashMap>(initialSize); + descriptionListeners = new HashSet(); + + statisticsServiceShimListener = + new ConcurrentHashMap(); + configStatsPollIntervals(); + + // Initialize managed timers + statisticsTimer = new Timer(); + statisticsTimerTask = new TimerTask() { + @Override + public void run() { + decrementTicks(); + } + }; + + // Initialize Statistics collector thread + statisticsCollector = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + try { + StatsRequest req = pendingStatsRequests.take(); + acquireStatistics(req.switchId, req.type); + } catch (InterruptedException e) { + log.warn("Flow Statistics Collector thread " + + "interrupted", e); + } + } + } + }, "Statistics Collector"); + + // Initialize Tx Rate Updater thread + txRatesUpdater = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + try { + long switchId = switchPortStatsUpdated.take(); + updatePortsTxRate(switchId); + } catch (InterruptedException e) { + log.warn("TX Rate Updater thread interrupted", e); + } + } + } + }, "TX Rate Updater"); + } + + /** + * Function called by the dependency manager when at least one dependency + * become unsatisfied or when the component is shutting down because for + * example bundle is being stopped. + * + */ + void destroy() { + this.statisticsServiceShimListener = null; + } + + /** + * Function called by dependency manager after "init ()" is called and after + * the services provided by the class are registered in the service registry + * + */ + void start() { + // Start managed timers + statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, tickPeriod); + + // Start statistics collector thread + statisticsCollector.start(); + + // Start bandwidth utilization computer thread + txRatesUpdater.start(); + + // OSGI console + registerWithOSGIConsole(); + } + + /** + * Function called by the dependency manager before the services exported by + * the component are unregistered, this will be followed by a "destroy ()" + * calls + * + */ + void stop() { + // Stop managed timers + statisticsTimer.cancel(); + } + + public void setStatisticsListener(IStatisticsListener s) { + this.descriptionListeners.add(s); + } + + public void unsetStatisticsListener(IStatisticsListener s) { + if (s != null) { + this.descriptionListeners.remove(s); + } + } + + private void registerWithOSGIConsole() { + BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()) + .getBundleContext(); + bundleContext.registerService(CommandProvider.class.getName(), this, + null); + } + + private static class StatsRequest { + protected Long switchId; + protected OFStatisticsType type; + + public StatsRequest(Long d, OFStatisticsType t) { + switchId = d; + type = t; + } + + public String toString() { + return "SReq = {switchId=" + switchId + ", type=" + type + "}"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((switchId == null) ? 0 : switchId.hashCode()); + result = prime * result + ((type == null) ? 0 : type.ordinal()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + StatsRequest other = (StatsRequest) obj; + if (switchId == null) { + if (other.switchId != null) { + return false; + } + } else if (!switchId.equals(other.switchId)) { + return false; + } + if (type != other.type) { + return false; + } + return true; + } + } + + public void setStatisticsServiceShimListener(Map props, + IStatisticsServiceShimListener s) { + if (props == null) { + log.error("Didn't receive the service properties"); + return; + } + String containerName = (String) props.get("containerName"); + if (containerName == null) { + log.error("containerName not supplied"); + return; + } + if ((this.statisticsServiceShimListener != null) + && !this.statisticsServiceShimListener.containsValue(s)) { + this.statisticsServiceShimListener.put(containerName, s); + log.trace("Added inventoryShimInternalListener for container:" + + containerName); + } + + } + + public void unsetStatisticsServiceShimListener(Map props, + IStatisticsServiceShimListener s) { + if (props == null) { + log.error("Didn't receive the service properties"); + return; + } + String containerName = (String) props.get("containerName"); + if (containerName == null) { + log.error("containerName not supplied"); + return; + } + if ((this.statisticsServiceShimListener != null) + && this.statisticsServiceShimListener + .get(containerName) != null + && this.statisticsServiceShimListener + .get(containerName).equals(s)) { + this.statisticsServiceShimListener.remove(containerName); + log.trace("Removed inventoryShimInternalListener for container: " + + containerName); + } + } + + + private void addStatisticsTicks(Long switchId) { + // Start of Change + + // By default, ODL assumes that switch supports Vendor statistics. + // It is designed in such a way that if the switch doesn't support Vendor Stats, + // then "Flow Stats" would be acquired. + + // Behavior is overridden NOT to assume default support for Vendor Stats + + //switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume switch supports Vendor extension stats + switchSupportsVendorExtStats.put(switchId, Boolean.FALSE); + + // End of Change + + statisticsTimerTicks.put(switchId, new StatisticsTicks(true)); + log.info("Added Switch {} to target pool", + HexString.toHexString(switchId.longValue())); + } + + protected static class StatisticsTicks { + private short flowStatisticsTicks; + private short descriptionTicks; + private short portStatisticsTicks; + private short flowTableStatisticsTicks; + + public StatisticsTicks(boolean scattered) { + if (scattered) { + // scatter bursts by statisticsTickPeriod + if (++counter < 0) { + counter = 0; + } // being paranoid here + flowStatisticsTicks = (short) (1 + counter + % statisticsTickNumber); + descriptionTicks = (short) (1 + counter % descriptionTickNumber); + portStatisticsTicks = (short) (1 + counter % portTickNumber); + flowTableStatisticsTicks = (short) (1 + counter % flowTableTickNumber); + } else { + flowStatisticsTicks = statisticsTickNumber; + descriptionTicks = descriptionTickNumber; + portStatisticsTicks = portTickNumber; + flowTableStatisticsTicks = flowTableTickNumber; + } + } + + public boolean decrementFlowTicksIsZero() { + // Please ensure no code is inserted between the if check and the + // flowStatisticsTicks reset + if (--flowStatisticsTicks == 0) { + flowStatisticsTicks = statisticsTickNumber; + return true; + } + return false; + } + + public boolean decrementDescTicksIsZero() { + // Please ensure no code is inserted between the if check and the + // descriptionTicks reset + if (--descriptionTicks == 0) { + descriptionTicks = descriptionTickNumber; + return true; + } + return false; + } + + public boolean decrementPortTicksIsZero() { + // Please ensure no code is inserted between the if check and the + // descriptionTicks reset + if (--portStatisticsTicks == 0) { + portStatisticsTicks = portTickNumber; + return true; + } + return false; + } + + public boolean decrementFlowTableTicksIsZero() { + // Please ensure no code is inserted between the if check and the descriptionTicks reset + if (--flowTableStatisticsTicks == 0) { + flowTableStatisticsTicks = flowTableTickNumber; + return true; + } + return false; + } + + public String toString() { + return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks + + ",pT=" + portStatisticsTicks + ",tT=" + flowTableStatisticsTicks + "}"; + } + } + + private void printInfoMessage(String type, StatsRequest request) { + log.info( + "{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.", + new Object[] { type, HexString.toHexString(request.switchId), + pendingStatsRequests.size(), + statisticsCollector.getState().toString() }); + } + + protected void decrementTicks() { + StatsRequest request = null; + for (Map.Entry entry : statisticsTimerTicks + .entrySet()) { + StatisticsTicks clock = entry.getValue(); + Long switchId = entry.getKey(); + if (clock.decrementFlowTicksIsZero() == true) { + request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ? new StatsRequest( + switchId, OFStatisticsType.VENDOR) : new StatsRequest( + switchId, OFStatisticsType.FLOW); + // If a request for this switch is already in the queue, skip to + // add this new request + if (!pendingStatsRequests.contains(request) + && false == pendingStatsRequests.offer(request)) { + printInfoMessage("Flow", request); + } + } + + if (clock.decrementDescTicksIsZero() == true) { + request = new StatsRequest(switchId, OFStatisticsType.DESC); + // If a request for this switch is already in the queue, skip to + // add this new request + if (!pendingStatsRequests.contains(request) + && false == pendingStatsRequests.offer(request)) { + printInfoMessage("Description", request); + } + } + + if (clock.decrementPortTicksIsZero() == true) { + request = new StatsRequest(switchId, OFStatisticsType.PORT); + // If a request for this switch is already in the queue, skip to + // add this new request + if (!pendingStatsRequests.contains(request) + && false == pendingStatsRequests.offer(request)) { + printInfoMessage("Port", request); + } + } + + if (clock.decrementFlowTableTicksIsZero() == true) { + request = new StatsRequest(switchId, OFStatisticsType.TABLE); + // If a request for this switch is already in the queue, skip to add this new request + if (!pendingStatsRequests.contains(request) + && false == pendingStatsRequests.offer(request)) { + printInfoMessage("Flow Table", request); + } + } + } + } + + private void removeStatsRequestTasks(Long switchId) { + log.info("Cleaning Statistics database for switch {}", + HexEncode.longToHexString(switchId)); + // To be safe, let's attempt removal of both VENDOR and FLOW request. It + // does not hurt + pendingStatsRequests.remove(new StatsRequest(switchId, + OFStatisticsType.VENDOR)); + pendingStatsRequests.remove(new StatsRequest(switchId, + OFStatisticsType.FLOW)); + pendingStatsRequests.remove(new StatsRequest(switchId, + OFStatisticsType.DESC)); + pendingStatsRequests.remove(new StatsRequest(switchId, + OFStatisticsType.PORT)); + pendingStatsRequests.remove(new StatsRequest(switchId, + OFStatisticsType.TABLE)); + // Take care of the TX rate databases + switchPortStatsUpdated.remove(switchId); + txRates.remove(switchId); + } + + private void clearFlowStatsAndTicks(Long switchId) { + statisticsTimerTicks.remove(switchId); + removeStatsRequestTasks(switchId); + flowStatistics.remove(switchId); + log.info("Statistics removed for switch {}", + HexString.toHexString(switchId)); + } + + private void acquireStatistics(Long switchId, OFStatisticsType statType) { + + // Query the switch on all matches + List values = this.acquireStatistics(switchId, statType, + null); + + String containerName = GlobalConstants.DEFAULT.toString(); + + // Update local caching database if got a valid response + if (values != null && !values.isEmpty()) { + if ((statType == OFStatisticsType.FLOW) + || (statType == OFStatisticsType.VENDOR)) { + flowStatistics.put(switchId, values); + + notifyStatisticsListener(containerName, statType, switchId, values); + + } else if (statType == OFStatisticsType.DESC) { + // Notify who may be interested in a description change + notifyDescriptionListeners(switchId, values); + + // Overwrite cache + descStatistics.put(switchId, values); + + notifyStatisticsListener(containerName, statType, switchId, values); + } else if (statType == OFStatisticsType.PORT) { + // Overwrite cache with new port statistics for this switch + portStatistics.put(switchId, values); + + // Wake up the thread which maintains the TX byte counters for + // each port + switchPortStatsUpdated.offer(switchId); + + notifyStatisticsListener(containerName, statType, switchId, values); + } else if (statType == OFStatisticsType.TABLE) { + + // Overwrite cache + flowTableStatistics.put(switchId, values); + + notifyStatisticsListener(containerName, statType, switchId, values); + } + } + } + + private void notifyStatisticsListener(String containerName, OFStatisticsType type, Long switchId, List values){ + + IStatisticsServiceShimListener statisticsServiceShimListener = + this.statisticsServiceShimListener.get(containerName); + + if (statisticsServiceShimListener != null) { + + switch (type) { + case FLOW: + statisticsServiceShimListener. + flowStatisticsUpdate(switchId, values, containerName); + break; + case PORT: + statisticsServiceShimListener. + portStatisticsUpdate(switchId, values, containerName); + break; + case DESC: + statisticsServiceShimListener. + descStatisticsUpdate(switchId, values, containerName); + break; + case TABLE: + statisticsServiceShimListener. + flowTableStatisticsUpdate(switchId, values, containerName); + break; + default: + break; + } + + log.trace(type + " " + switchId + " on container " + + containerName); + } + } + + private void notifyDescriptionListeners(Long switchId, + List values) { + for (IStatisticsListener l : this.descriptionListeners) { + l.descriptionRefreshed(switchId, + ((OFDescriptionStatistics) values.get(0))); + } + } + + /* + * Generic function to get the statistics form a OF switch + */ + @SuppressWarnings("unchecked") + private List acquireStatistics(Long switchId, + OFStatisticsType statsType, Object target) { + List values = null; + String type = null; + ISwitch sw = controller.getSwitch(switchId); + + if (sw != null) { + OFStatisticsRequest req = new OFStatisticsRequest(); + req.setStatisticType(statsType); + int requestLength = req.getLengthU(); + + if (statsType == OFStatisticsType.FLOW) { + OFMatch match = null; + if (target == null) { + // All flows request + match = new OFMatch(); + match.setWildcards(0xffffffff); + } else if (!(target instanceof OFMatch)) { + // Malformed request + log.warn("Invalid target type for Flow stats request: {}", + target.getClass()); + return null; + } else { + // Specific flow request + match = (OFMatch) target; + } + OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest(); + specificReq.setMatch(match); + specificReq.setOutPort(OFPort.OFPP_NONE.getValue()); + specificReq.setTableId((byte) 0xff); + req.setStatistics(Collections + .singletonList((OFStatistics) specificReq)); + requestLength += specificReq.getLength(); + type = "FLOW"; + } else if (statsType == OFStatisticsType.VENDOR) { + V6StatsRequest specificReq = new V6StatsRequest(); + specificReq.setOutPort(OFPort.OFPP_NONE.getValue()); + specificReq.setTableId((byte) 0xff); + req.setStatistics(Collections + .singletonList((OFStatistics) specificReq)); + requestLength += specificReq.getLength(); + type = "VENDOR"; + } else if (statsType == OFStatisticsType.AGGREGATE) { + OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest(); + OFMatch match = new OFMatch(); + match.setWildcards(0xffffffff); + specificReq.setMatch(match); + specificReq.setOutPort(OFPort.OFPP_NONE.getValue()); + specificReq.setTableId((byte) 0xff); + req.setStatistics(Collections + .singletonList((OFStatistics) specificReq)); + requestLength += specificReq.getLength(); + type = "AGGREGATE"; + } else if (statsType == OFStatisticsType.PORT) { + short targetPort; + if (target == null) { + // All ports request + targetPort = (short) OFPort.OFPP_NONE.getValue(); + } else if (!(target instanceof Short)) { + // Malformed request + log.warn("Invalid target type for Port stats request: {}", + target.getClass()); + return null; + } else { + // Specific port request + targetPort = (Short) target; + } + OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest(); + specificReq.setPortNumber(targetPort); + req.setStatistics(Collections + .singletonList((OFStatistics) specificReq)); + requestLength += specificReq.getLength(); + type = "PORT"; + } else if (statsType == OFStatisticsType.QUEUE) { + OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest(); + specificReq.setPortNumber((short) OFPort.OFPP_ALL.getValue()); + specificReq.setQueueId(0xffffffff); + req.setStatistics(Collections + .singletonList((OFStatistics) specificReq)); + requestLength += specificReq.getLength(); + type = "QUEUE"; + } else if (statsType == OFStatisticsType.DESC) { + type = "DESC"; + } else if (statsType == OFStatisticsType.TABLE) { + type = "TABLE"; + } + req.setLengthU(requestLength); + Object result = sw.getStatistics(req); + + if (result == null) { + log.warn("Request Timed Out for ({}) from switch {}", type, + HexString.toHexString(switchId)); + } else if (result instanceof OFError) { + log.warn("Switch {} failed to handle ({}) stats request: {}", + new Object[] { HexString.toHexString(switchId), type, + Utils.getOFErrorString((OFError) result) }); + if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) { + log.warn( + "Switching back to regular Flow stats requests for switch {}", + HexString.toHexString(switchId)); + this.switchSupportsVendorExtStats.put(switchId, + Boolean.FALSE); + } + } else { + values = (List) result; + } + } + return values; + } + + @Override + public List getOFFlowStatistics(Long switchId) { + List list = flowStatistics.get(switchId); + + /* + * Check on emptiness as interference between add and get is still + * possible on the inner list (the concurrentMap entry's value) + */ + + return list; + + /* + return (list == null || list.isEmpty()) ? this.dummyList + : (list.get(0) instanceof OFVendorStatistics) ? this + .v6StatsListToOFStatsList(list) : list; + */ + } + + @Override + public List getOFFlowStatistics(Long switchId, OFMatch ofMatch) { + List statsList = flowStatistics.get(switchId); + + /* + * Check on emptiness as interference between add and get is still + * possible on the inner list (the concurrentMap entry's value) + */ + if (statsList == null || statsList.isEmpty()) { + return this.dummyList; + } + + if (statsList.get(0) instanceof OFVendorStatistics) { + /* + * Caller could provide regular OF match when we instead pull the + * vendor statistics from this node Caller is not supposed to know + * whether this switch supports vendor extensions statistics + * requests + */ + /* + V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch + : new V6Match(ofMatch); + + List targetList = v6StatsListToOFStatsList(statsList); + for (OFStatistics stats : targetList) { + V6StatsReply v6Stats = (V6StatsReply) stats; + V6Match v6Match = v6Stats.getMatch(); + if (v6Match.equals(targetMatch)) { + List list = new ArrayList(); + list.add(stats); + return list; + } + } + */ + } else { + for (OFStatistics stats : statsList) { + OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats; + if (flowStats.getMatch().equals(ofMatch)) { + List list = new ArrayList(); + list.add(stats); + return list; + } + } + } + return this.dummyList; + } + + /* + * Converts the v6 vendor statistics to the OFStatistics + */ + /* + private List v6StatsListToOFStatsList( + List statistics) { + List v6statistics = new ArrayList(); + if (statistics != null && !statistics.isEmpty()) { + for (OFStatistics stats : statistics) { + if (stats instanceof OFVendorStatistics) { + List r = getV6ReplyStatistics((OFVendorStatistics) stats); + if (r != null) { + v6statistics.addAll(r); + } + } + } + } + return v6statistics; + } + */ + + /* + + private static List getV6ReplyStatistics( + OFVendorStatistics stat) { + int length = stat.getLength(); + List results = new ArrayList(); + if (length < 12) + return null; // Nicira Hdr is 12 bytes. We need atleast that much + ByteBuffer data = ByteBuffer.allocate(length); + stat.writeTo(data); + data.rewind(); + log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}", + HexString.toHexString(data.array())); + + int vendor = data.getInt(); // first 4 bytes is vendor id. + if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) { + log.warn("Unexpected vendor id: 0x{}", Integer.toHexString(vendor)); + return null; + } else { + // go ahead by 8 bytes which is 8 bytes of 0 + data.getLong(); // should be all 0's + length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have + // been consumed + } + + V6StatsReply v6statsreply; + int min_len; + while (length > 0) { + v6statsreply = new V6StatsReply(); + min_len = v6statsreply.getLength(); + if (length < v6statsreply.getLength()) + break; + v6statsreply.setActionFactory(stat.getActionFactory()); + v6statsreply.readFrom(data); + if (v6statsreply.getLength() < min_len) + break; + v6statsreply.setVendorId(vendor); + log.trace("V6StatsReply: {}", v6statsreply); + length -= v6statsreply.getLength(); + results.add(v6statsreply); + } + return results; + } + */ + + @Override + public List queryStatistics(Long switchId, + OFStatisticsType statType, Object target) { + /* + * Caller does not know and it is not supposed to know whether this + * switch supports vendor extension. We adjust the target for him + */ + if (statType == OFStatisticsType.FLOW) { + if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) { + statType = OFStatisticsType.VENDOR; + } + } + + List list = this.acquireStatistics(switchId, statType, + target); + + return list; + + /* + return (list == null) ? null + : (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list) + : list; + */ + } + + @Override + public List getOFDescStatistics(Long switchId) { + if (!descStatistics.containsKey(switchId)) + return this.dummyList; + + return descStatistics.get(switchId); + } + + @Override + public List getOFPortStatistics(Long switchId) { + if (!portStatistics.containsKey(switchId)) { + return this.dummyList; + } + + return portStatistics.get(switchId); + } + + @Override + public List getOFPortStatistics(Long switchId, short portId) { + if (!portStatistics.containsKey(switchId)) { + return this.dummyList; + } + List list = new ArrayList(1); + for (OFStatistics stats : portStatistics.get(switchId)) { + if (((OFPortStatisticsReply) stats).getPortNumber() == portId) { + list.add(stats); + break; + } + } + return list; + } + + @Override + public List getOFTableStatistics(Long switchId) { + if (!flowTableStatistics.containsKey(switchId)) + return this.dummyList; + + return flowTableStatistics.get(switchId); + } + + @Override + public int getFlowsNumber(long switchId) { + return this.flowStatistics.get(switchId).size(); + } + + /* + * InventoryShim replay for us all the switch addition which happened before + * we were brought up + */ + @Override + public void updateNode(Node node, UpdateType type, Set props) { + Long switchId = (Long) node.getID(); + switch (type) { + case ADDED: + addStatisticsTicks(switchId); + break; + case REMOVED: + clearFlowStatsAndTicks(switchId); + default: + } + } + + @Override + public void updateNodeConnector(NodeConnector nodeConnector, + UpdateType type, Set props) { + // No action + } + + /** + * Update the cached port rates for this switch with the latest retrieved + * port transmit byte count + * + * @param switchId + */ + private synchronized void updatePortsTxRate(long switchId) { + List newPortStatistics = this.portStatistics + .get(switchId); + if (newPortStatistics == null) { + return; + } + Map rates = this.txRates.get(switchId); + if (rates == null) { + // First time rates for this switch are added + rates = new HashMap(); + txRates.put(switchId, rates); + } + for (OFStatistics stats : newPortStatistics) { + OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats; + short port = newPortStat.getPortNumber(); + TxRates portRatesHolder = rates.get(port); + if (portRatesHolder == null) { + // First time rates for this port are added + portRatesHolder = new TxRates(); + rates.put(port, portRatesHolder); + } + // Get and store the number of transmitted bytes for this port + // And handle the case where agent does not support the counter + long transmitBytes = newPortStat.getTransmitBytes(); + long value = (transmitBytes < 0) ? 0 : transmitBytes; + portRatesHolder.update(value); + } + } + + @Override + public synchronized long getTransmitRate(Long switchId, Short port) { + long average = 0; + if (switchId == null || port == null) { + return average; + } + Map perSwitch = txRates.get(switchId); + if (perSwitch == null) { + return average; + } + TxRates portRates = perSwitch.get(port); + if (portRates == null) { + return average; + } + return portRates.getAverageTxRate(); + } + + /* + * Manual switch name configuration code + */ + @Override + public String getHelp() { + StringBuffer help = new StringBuffer(); + help.append("---OF Statistics Manager utilities---\n"); + help.append("\t ofdumpstatsmgr - " + + "Print Internal Stats Mgr db\n"); + help.append("\t ofstatsmgrintervals (in seconds) - " + + "Set/Show flow/port/dedscription/table stats poll intervals\n"); + return help.toString(); + } + + private boolean isValidSwitchId(String switchId) { + String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$"; + String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$"; + + return (switchId != null && (switchId.matches(regexDatapathID) || switchId + .matches(regexDatapathIDLong))); + } + + public long getSwitchIDLong(String switchId) { + int radix = 16; + String switchString = "0"; + + if (isValidSwitchId(switchId)) { + if (switchId.contains(":")) { + // Handle the 00:00:AA:BB:CC:DD:EE:FF notation + switchString = switchId.replace(":", ""); + } else if (switchId.contains("-")) { + // Handle the 00-00-AA-BB-CC-DD-EE-FF notation + switchString = switchId.replace("-", ""); + } else { + // Handle the 0123456789ABCDEF notation + switchString = switchId; + } + } + return Long.parseLong(switchString, radix); + } + + /* + * Internal information dump code + */ + private String prettyPrintSwitchMap(ConcurrentMap map) { + StringBuffer buffer = new StringBuffer(); + buffer.append("{"); + for (Entry entry : map.entrySet()) { + buffer.append(HexString.toHexString(entry.getKey()) + "=" + + entry.getValue().toString() + " "); + } + buffer.append("}"); + return buffer.toString(); + } + + public void _ofdumpstatsmgr(CommandInterpreter ci) { + ci.println("Global Counter: " + counter); + ci.println("Timer Ticks: " + prettyPrintSwitchMap(statisticsTimerTicks)); + ci.println("PendingStatsQueue: " + pendingStatsRequests); + ci.println("PendingStatsQueue size: " + pendingStatsRequests.size()); + ci.println("Stats Collector alive: " + statisticsCollector.isAlive()); + ci.println("Stats Collector State: " + + statisticsCollector.getState().toString()); + ci.println("StatsTimer: " + statisticsTimer.toString()); + ci.println("Flow Stats Period: " + statisticsTickNumber + " s"); + ci.println("Desc Stats Period: " + descriptionTickNumber + " s"); + ci.println("Port Stats Period: " + portTickNumber + " s"); + ci.println("Flow Table Stats Period: " + flowTableTickNumber + " s"); + } + + public void _resetSwitchCapability(CommandInterpreter ci) { + String sidString = ci.nextArgument(); + Long sid = null; + if (sidString == null) { + ci.println("Insert the switch id (numeric value)"); + return; + } + try { + sid = Long.valueOf(sidString); + this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE); + ci.println("Vendor capability for switch " + sid + " set to " + + this.switchSupportsVendorExtStats.get(sid)); + } catch (NumberFormatException e) { + ci.println("Invalid switch id. Has to be numeric."); + } + + } + + public void _ofbw(CommandInterpreter ci) { + String sidString = ci.nextArgument(); + Long sid = null; + if (sidString == null) { + ci.println("Insert the switch id (numeric value)"); + return; + } + try { + sid = Long.valueOf(sidString); + } catch (NumberFormatException e) { + ci.println("Invalid switch id. Has to be numeric."); + } + if (sid != null) { + Map thisSwitchRates = txRates.get(sid); + ci.println("Bandwidth utilization (" + factoredSamples + * portTickNumber + " sec average) for switch " + + HexEncode.longToHexString(sid) + ":"); + if (thisSwitchRates == null) { + ci.println("Not available"); + } else { + for (Entry entry : thisSwitchRates.entrySet()) { + ci.println("Port: " + entry.getKey() + ": " + + entry.getValue().getAverageTxRate() + " bps"); + } + } + } + } + + public void _txratewindow(CommandInterpreter ci) { + String averageWindow = ci.nextArgument(); + short seconds = 0; + if (averageWindow == null) { + ci.println("Insert the length in seconds of the median " + + "window for tx rate"); + ci.println("Current: " + factoredSamples * portTickNumber + " secs"); + return; + } + try { + seconds = Short.valueOf(averageWindow); + } catch (NumberFormatException e) { + ci.println("Invalid period."); + } + OFStatisticsManager.factoredSamples = (short) (seconds / portTickNumber); + ci.println("New: " + factoredSamples * portTickNumber + " secs"); + } + + public void _ofstatsmgrintervals(CommandInterpreter ci) { + String flowStatsInterv = ci.nextArgument(); + String portStatsInterv = ci.nextArgument(); + String descStatsInterv = ci.nextArgument(); + String flowTableStatsInterv = ci.nextArgument(); + + if (flowStatsInterv == null || portStatsInterv == null + || descStatsInterv == null || flowTableStatsInterv == null) { + ci.println("Usage: ostatsmgrintervals (in seconds)"); + ci.println("Current Values: fP=" + statisticsTickNumber + "s pP=" + + portTickNumber + "s dP=" + descriptionTickNumber + + "s tP=" + flowTableTickNumber + "s"); + return; + } + Short fP, pP, dP, tP; + try { + fP = Short.parseShort(flowStatsInterv); + pP = Short.parseShort(portStatsInterv); + dP = Short.parseShort(descStatsInterv); + tP = Short.parseShort(flowTableStatsInterv); + } catch (Exception e) { + ci.println("Invalid format values: " + e.getMessage()); + return; + } + + if (pP <= 1 || fP <= 1 || dP <= 1 || tP <= 1) { + ci.println("Invalid values. fP, pP, dP, tP have to be greater than 1."); + return; + } + + statisticsTickNumber = fP; + portTickNumber = pP; + descriptionTickNumber = dP; + flowTableTickNumber = tP; + + + ci.println("New Values: fP=" + statisticsTickNumber + "s pP=" + + portTickNumber + "s dP=" + descriptionTickNumber + + "s tP=" + flowTableTickNumber + "s"); + } + + /** + * This method retrieves user configurations from config.ini and updates + * statisticsTickNumber/portTickNumber/descriptionTickNumber/flowTableTickNumber accordingly. + */ + private void configStatsPollIntervals() { + String fsStr = System.getProperty("of.flowStatsPollInterval"); + String psStr = System.getProperty("of.portStatsPollInterval"); + String dsStr = System.getProperty("of.descStatsPollInterval"); + String tsStr = System.getProperty("of.flowTableStatsPollInterval"); + Short fs, ps, ds, ts; + + if (fsStr != null) { + try { + fs = Short.parseShort(fsStr); + if (fs > 0) { + statisticsTickNumber = fs; + } + } catch (Exception e) { + } + } + + if (psStr != null) { + try { + ps = Short.parseShort(psStr); + if (ps > 0) { + portTickNumber = ps; + } + } catch (Exception e) { + } + } + + if (dsStr != null) { + try { + ds = Short.parseShort(dsStr); + if (ds > 0) { + descriptionTickNumber = ds; + } + } catch (Exception e) { + } + } + if (tsStr != null) { + try { + ts = Short.parseShort(dsStr); + if (ts > 0) { + flowTableTickNumber = ts; + } + } catch (Exception e) { + } + } + + } +}