Performacne improvements via adding a netty-based openflowj and openflow plugin;...
[controller.git] / opendaylight / protocol_plugins / openflow_netty / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / internal / OFStatisticsManager.java
diff --git a/opendaylight/protocol_plugins/openflow_netty/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/OFStatisticsManager.java b/opendaylight/protocol_plugins/openflow_netty/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/OFStatisticsManager.java
new file mode 100644 (file)
index 0000000..cf9a2c5
--- /dev/null
@@ -0,0 +1,1255 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.protocol_plugin.openflow.internal;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.eclipse.osgi.framework.console.CommandInterpreter;
+import org.eclipse.osgi.framework.console.CommandProvider;
+import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
+import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimInternalListener;
+import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
+import org.opendaylight.controller.protocol_plugin.openflow.IStatisticsListener;
+import org.opendaylight.controller.protocol_plugin.openflow.IStatisticsServiceShimListener;
+import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
+import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
+import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
+import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
+import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
+import org.opendaylight.controller.sal.core.Node;
+import org.opendaylight.controller.sal.core.NodeConnector;
+import org.opendaylight.controller.sal.core.Property;
+import org.opendaylight.controller.sal.core.UpdateType;
+import org.opendaylight.controller.sal.utils.GlobalConstants;
+import org.opendaylight.controller.sal.utils.HexEncode;
+import org.openflow.protocol.OFError;
+import org.openflow.protocol.OFMatch;
+import org.openflow.protocol.OFPort;
+import org.openflow.protocol.OFStatisticsRequest;
+import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
+import org.openflow.protocol.statistics.OFDescriptionStatistics;
+import org.openflow.protocol.statistics.OFFlowStatisticsReply;
+import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
+import org.openflow.protocol.statistics.OFPortStatisticsReply;
+import org.openflow.protocol.statistics.OFPortStatisticsRequest;
+import org.openflow.protocol.statistics.OFQueueStatisticsRequest;
+import org.openflow.protocol.statistics.OFStatistics;
+import org.openflow.protocol.statistics.OFStatisticsType;
+import org.openflow.protocol.statistics.OFVendorStatistics;
+import org.openflow.util.HexString;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * It periodically polls the different OF statistics from the OF switches and
+ * caches them for quick retrieval for the above layers' modules It also
+ * provides an API to directly query the switch about the statistics
+ */
+public class OFStatisticsManager implements IOFStatisticsManager,
+        IInventoryShimExternalListener, CommandProvider {
+    private static final Logger log = LoggerFactory
+            .getLogger(OFStatisticsManager.class);
+    private static final int initialSize = 64;
+    private static final long flowStatsPeriod = 10000;
+    private static final long descriptionStatsPeriod = 60000;
+    private static final long portStatsPeriod = 5000;
+    private static final long flowTableStatsPeriod = 60000;
+    private static final long tickPeriod = 1000;
+    private static short statisticsTickNumber = (short) (flowStatsPeriod / tickPeriod);
+    private static short descriptionTickNumber = (short) (descriptionStatsPeriod / tickPeriod);
+    private static short portTickNumber = (short) (portStatsPeriod / tickPeriod);
+    private static short flowTableTickNumber = (short) (flowTableStatsPeriod / tickPeriod);
+    private static short factoredSamples = (short) 2;
+    private static short counter = 1;
+    private IController controller = null;
+    private ConcurrentMap<Long, List<OFStatistics>> flowStatistics;
+    private ConcurrentMap<Long, List<OFStatistics>> descStatistics;
+    private ConcurrentMap<Long, List<OFStatistics>> portStatistics;
+    private ConcurrentMap<Long, List<OFStatistics>> flowTableStatistics;
+    private List<OFStatistics> dummyList;
+    private ConcurrentMap<Long, StatisticsTicks> statisticsTimerTicks;
+    protected BlockingQueue<StatsRequest> pendingStatsRequests;
+    protected BlockingQueue<Long> switchPortStatsUpdated;
+    private Thread statisticsCollector;
+    private Thread txRatesUpdater;
+    private Timer statisticsTimer;
+    private TimerTask statisticsTimerTask;
+    private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
+    private Map<Long, Map<Short, TxRates>> txRates; // Per port sampled (every
+                                                    // portStatsPeriod) transmit
+                                                    // rate
+    private Set<IStatisticsListener> descriptionListeners;
+    private ConcurrentMap<String, IStatisticsServiceShimListener> statisticsServiceShimListener;
+
+    /**
+     * The object containing the latest factoredSamples tx rate samples for a
+     * given switch port
+     */
+    protected class TxRates {
+        Deque<Long> sampledTxBytes; // contains the latest factoredSamples
+                                    // sampled transmitted bytes
+
+        public TxRates() {
+            sampledTxBytes = new LinkedBlockingDeque<Long>();
+        }
+
+        public void update(Long txBytes) {
+            /*
+             * Based on how many samples our average works on, we might have to
+             * remove the oldest sample
+             */
+            if (sampledTxBytes.size() == factoredSamples) {
+                sampledTxBytes.removeLast();
+            }
+
+            // Add the latest sample to the top of the queue
+            sampledTxBytes.addFirst(txBytes);
+        }
+
+        /**
+         * Returns the average transmit rate in bps
+         *
+         * @return the average transmit rate [bps]
+         */
+        public long getAverageTxRate() {
+            long average = 0;
+            /*
+             * If we cannot provide the value for the time window length set
+             */
+            if (sampledTxBytes.size() < factoredSamples) {
+                return average;
+            }
+            long increment = (long) (sampledTxBytes.getFirst() - sampledTxBytes
+                    .getLast());
+            long timePeriod = (long) (factoredSamples * portStatsPeriod)
+                    / (long) tickPeriod;
+            average = (8L * increment) / timePeriod;
+            return average;
+        }
+    }
+
+    public void setController(IController core) {
+        this.controller = core;
+    }
+
+    public void unsetController(IController core) {
+        if (this.controller == core) {
+            this.controller = null;
+        }
+    }
+
+    /**
+     * Function called by the dependency manager when all the required
+     * dependencies are satisfied
+     *
+     */
+    void init() {
+        flowStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
+        descStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
+        portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
+        flowTableStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
+        dummyList = new ArrayList<OFStatistics>(1);
+        statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(
+                initialSize);
+        pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(
+                initialSize);
+        switchPortStatsUpdated = new LinkedBlockingQueue<Long>(initialSize);
+        switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(
+                initialSize);
+        txRates = new HashMap<Long, Map<Short, TxRates>>(initialSize);
+        descriptionListeners = new HashSet<IStatisticsListener>();
+
+        statisticsServiceShimListener =
+                new ConcurrentHashMap<String, IStatisticsServiceShimListener>();
+        configStatsPollIntervals();
+
+        // Initialize managed timers
+        statisticsTimer = new Timer();
+        statisticsTimerTask = new TimerTask() {
+            @Override
+            public void run() {
+                decrementTicks();
+            }
+        };
+
+        // Initialize Statistics collector thread
+        statisticsCollector = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (true) {
+                    try {
+                        StatsRequest req = pendingStatsRequests.take();
+                        acquireStatistics(req.switchId, req.type);
+                    } catch (InterruptedException e) {
+                        log.warn("Flow Statistics Collector thread "
+                                + "interrupted", e);
+                    }
+                }
+            }
+        }, "Statistics Collector");
+
+        // Initialize Tx Rate Updater thread
+        txRatesUpdater = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (true) {
+                    try {
+                        long switchId = switchPortStatsUpdated.take();
+                        updatePortsTxRate(switchId);
+                    } catch (InterruptedException e) {
+                        log.warn("TX Rate Updater thread interrupted", e);
+                    }
+                }
+            }
+        }, "TX Rate Updater");
+    }
+
+    /**
+     * Function called by the dependency manager when at least one dependency
+     * become unsatisfied or when the component is shutting down because for
+     * example bundle is being stopped.
+     *
+     */
+    void destroy() {
+        this.statisticsServiceShimListener = null;
+    }
+
+    /**
+     * Function called by dependency manager after "init ()" is called and after
+     * the services provided by the class are registered in the service registry
+     *
+     */
+    void start() {
+        // Start managed timers
+        statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, tickPeriod);
+
+        // Start statistics collector thread
+        statisticsCollector.start();
+
+        // Start bandwidth utilization computer thread
+        txRatesUpdater.start();
+
+        // OSGI console
+        registerWithOSGIConsole();
+    }
+
+    /**
+     * Function called by the dependency manager before the services exported by
+     * the component are unregistered, this will be followed by a "destroy ()"
+     * calls
+     *
+     */
+    void stop() {
+        // Stop managed timers
+        statisticsTimer.cancel();
+    }
+
+    public void setStatisticsListener(IStatisticsListener s) {
+        this.descriptionListeners.add(s);
+    }
+
+    public void unsetStatisticsListener(IStatisticsListener s) {
+        if (s != null) {
+            this.descriptionListeners.remove(s);
+        }
+    }
+
+    private void registerWithOSGIConsole() {
+        BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
+                .getBundleContext();
+        bundleContext.registerService(CommandProvider.class.getName(), this,
+                null);
+    }
+
+    private static class StatsRequest {
+        protected Long switchId;
+        protected OFStatisticsType type;
+
+        public StatsRequest(Long d, OFStatisticsType t) {
+            switchId = d;
+            type = t;
+        }
+
+        public String toString() {
+            return "SReq = {switchId=" + switchId + ", type=" + type + "}";
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result
+                    + ((switchId == null) ? 0 : switchId.hashCode());
+            result = prime * result + ((type == null) ? 0 : type.ordinal());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            StatsRequest other = (StatsRequest) obj;
+            if (switchId == null) {
+                if (other.switchId != null) {
+                    return false;
+                }
+            } else if (!switchId.equals(other.switchId)) {
+                return false;
+            }
+            if (type != other.type) {
+                return false;
+            }
+            return true;
+        }
+    }
+
+    public void setStatisticsServiceShimListener(Map<?, ?> props,
+            IStatisticsServiceShimListener s) {
+        if (props == null) {
+            log.error("Didn't receive the service properties");
+            return;
+        }
+        String containerName = (String) props.get("containerName");
+        if (containerName == null) {
+            log.error("containerName not supplied");
+            return;
+        }
+        if ((this.statisticsServiceShimListener != null)
+                && !this.statisticsServiceShimListener.containsValue(s)) {
+            this.statisticsServiceShimListener.put(containerName, s);
+            log.trace("Added inventoryShimInternalListener for container:"
+                    + containerName);
+        }
+
+    }
+
+    public void unsetStatisticsServiceShimListener(Map<?, ?> props,
+            IStatisticsServiceShimListener s) {
+        if (props == null) {
+            log.error("Didn't receive the service properties");
+            return;
+        }
+        String containerName = (String) props.get("containerName");
+        if (containerName == null) {
+            log.error("containerName not supplied");
+            return;
+        }
+        if ((this.statisticsServiceShimListener != null)
+                && this.statisticsServiceShimListener
+                    .get(containerName) != null
+                && this.statisticsServiceShimListener
+                    .get(containerName).equals(s)) {
+            this.statisticsServiceShimListener.remove(containerName);
+            log.trace("Removed inventoryShimInternalListener for container: "
+                            + containerName);
+        }
+    }
+
+
+    private void addStatisticsTicks(Long switchId) {
+        // Start of Change
+
+        // By default, ODL assumes that switch supports Vendor statistics.
+        // It is designed in such a way that if the switch doesn't support Vendor Stats,
+        // then "Flow Stats" would be acquired.
+
+        // Behavior is overridden NOT to assume default support for Vendor Stats
+
+        //switchSupportsVendorExtStats.put(switchId, Boolean.TRUE); // Assume switch supports Vendor extension stats
+        switchSupportsVendorExtStats.put(switchId, Boolean.FALSE);
+
+        // End of Change
+
+        statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
+        log.info("Added Switch {} to target pool",
+                HexString.toHexString(switchId.longValue()));
+    }
+
+    protected static class StatisticsTicks {
+        private short flowStatisticsTicks;
+        private short descriptionTicks;
+        private short portStatisticsTicks;
+        private short flowTableStatisticsTicks;
+
+        public StatisticsTicks(boolean scattered) {
+            if (scattered) {
+                // scatter bursts by statisticsTickPeriod
+                if (++counter < 0) {
+                    counter = 0;
+                } // being paranoid here
+                flowStatisticsTicks = (short) (1 + counter
+                        % statisticsTickNumber);
+                descriptionTicks = (short) (1 + counter % descriptionTickNumber);
+                portStatisticsTicks = (short) (1 + counter % portTickNumber);
+                flowTableStatisticsTicks = (short) (1 + counter % flowTableTickNumber);
+            } else {
+                flowStatisticsTicks = statisticsTickNumber;
+                descriptionTicks = descriptionTickNumber;
+                portStatisticsTicks = portTickNumber;
+                flowTableStatisticsTicks = flowTableTickNumber;
+            }
+        }
+
+        public boolean decrementFlowTicksIsZero() {
+            // Please ensure no code is inserted between the if check and the
+            // flowStatisticsTicks reset
+            if (--flowStatisticsTicks == 0) {
+                flowStatisticsTicks = statisticsTickNumber;
+                return true;
+            }
+            return false;
+        }
+
+        public boolean decrementDescTicksIsZero() {
+            // Please ensure no code is inserted between the if check and the
+            // descriptionTicks reset
+            if (--descriptionTicks == 0) {
+                descriptionTicks = descriptionTickNumber;
+                return true;
+            }
+            return false;
+        }
+
+        public boolean decrementPortTicksIsZero() {
+            // Please ensure no code is inserted between the if check and the
+            // descriptionTicks reset
+            if (--portStatisticsTicks == 0) {
+                portStatisticsTicks = portTickNumber;
+                return true;
+            }
+            return false;
+        }
+
+        public boolean decrementFlowTableTicksIsZero() {
+            // Please ensure no code is inserted between the if check and the descriptionTicks reset
+            if (--flowTableStatisticsTicks == 0) {
+                flowTableStatisticsTicks = flowTableTickNumber;
+                return true;
+            }
+            return false;
+        }
+
+        public String toString() {
+            return "{fT=" + flowStatisticsTicks + ",dT=" + descriptionTicks
+                    + ",pT=" + portStatisticsTicks + ",tT=" + flowTableStatisticsTicks + "}";
+        }
+    }
+
+    private void printInfoMessage(String type, StatsRequest request) {
+        log.info(
+                "{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
+                new Object[] { type, HexString.toHexString(request.switchId),
+                        pendingStatsRequests.size(),
+                        statisticsCollector.getState().toString() });
+    }
+
+    protected void decrementTicks() {
+        StatsRequest request = null;
+        for (Map.Entry<Long, StatisticsTicks> entry : statisticsTimerTicks
+                .entrySet()) {
+            StatisticsTicks clock = entry.getValue();
+            Long switchId = entry.getKey();
+            if (clock.decrementFlowTicksIsZero() == true) {
+                request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ? new StatsRequest(
+                        switchId, OFStatisticsType.VENDOR) : new StatsRequest(
+                        switchId, OFStatisticsType.FLOW);
+                // If a request for this switch is already in the queue, skip to
+                // add this new request
+                if (!pendingStatsRequests.contains(request)
+                        && false == pendingStatsRequests.offer(request)) {
+                    printInfoMessage("Flow", request);
+                }
+            }
+
+            if (clock.decrementDescTicksIsZero() == true) {
+                request = new StatsRequest(switchId, OFStatisticsType.DESC);
+                // If a request for this switch is already in the queue, skip to
+                // add this new request
+                if (!pendingStatsRequests.contains(request)
+                        && false == pendingStatsRequests.offer(request)) {
+                    printInfoMessage("Description", request);
+                }
+            }
+
+            if (clock.decrementPortTicksIsZero() == true) {
+                request = new StatsRequest(switchId, OFStatisticsType.PORT);
+                // If a request for this switch is already in the queue, skip to
+                // add this new request
+                if (!pendingStatsRequests.contains(request)
+                        && false == pendingStatsRequests.offer(request)) {
+                    printInfoMessage("Port", request);
+                }
+            }
+
+            if (clock.decrementFlowTableTicksIsZero() == true) {
+                request = new StatsRequest(switchId, OFStatisticsType.TABLE);
+                // If a request for this switch is already in the queue, skip to add this new request
+                if (!pendingStatsRequests.contains(request)
+                        && false == pendingStatsRequests.offer(request)) {
+                    printInfoMessage("Flow Table", request);
+                }
+            }
+        }
+    }
+
+    private void removeStatsRequestTasks(Long switchId) {
+        log.info("Cleaning Statistics database for switch {}",
+                HexEncode.longToHexString(switchId));
+        // To be safe, let's attempt removal of both VENDOR and FLOW request. It
+        // does not hurt
+        pendingStatsRequests.remove(new StatsRequest(switchId,
+                OFStatisticsType.VENDOR));
+        pendingStatsRequests.remove(new StatsRequest(switchId,
+                OFStatisticsType.FLOW));
+        pendingStatsRequests.remove(new StatsRequest(switchId,
+                OFStatisticsType.DESC));
+        pendingStatsRequests.remove(new StatsRequest(switchId,
+                OFStatisticsType.PORT));
+        pendingStatsRequests.remove(new StatsRequest(switchId,
+                OFStatisticsType.TABLE));
+        // Take care of the TX rate databases
+        switchPortStatsUpdated.remove(switchId);
+        txRates.remove(switchId);
+    }
+
+    private void clearFlowStatsAndTicks(Long switchId) {
+        statisticsTimerTicks.remove(switchId);
+        removeStatsRequestTasks(switchId);
+        flowStatistics.remove(switchId);
+        log.info("Statistics removed for switch {}",
+                HexString.toHexString(switchId));
+    }
+
+    private void acquireStatistics(Long switchId, OFStatisticsType statType) {
+
+        // Query the switch on all matches
+        List<OFStatistics> values = this.acquireStatistics(switchId, statType,
+                null);
+
+        String containerName = GlobalConstants.DEFAULT.toString();
+
+        // Update local caching database if got a valid response
+        if (values != null && !values.isEmpty()) {
+            if ((statType == OFStatisticsType.FLOW)
+                    || (statType == OFStatisticsType.VENDOR)) {
+                flowStatistics.put(switchId, values);
+
+                notifyStatisticsListener(containerName, statType, switchId, values);
+
+            } else if (statType == OFStatisticsType.DESC) {
+                // Notify who may be interested in a description change
+                   notifyDescriptionListeners(switchId, values);
+
+                // Overwrite cache
+                descStatistics.put(switchId, values);
+
+                notifyStatisticsListener(containerName, statType, switchId, values);
+            } else if (statType == OFStatisticsType.PORT) {
+                // Overwrite cache with new port statistics for this switch
+                portStatistics.put(switchId, values);
+
+                // Wake up the thread which maintains the TX byte counters for
+                // each port
+                switchPortStatsUpdated.offer(switchId);
+
+                notifyStatisticsListener(containerName, statType, switchId, values);
+            } else if (statType == OFStatisticsType.TABLE) {
+
+                // Overwrite cache
+                   flowTableStatistics.put(switchId, values);
+
+                notifyStatisticsListener(containerName, statType, switchId, values);
+            }
+        }
+    }
+
+    private void notifyStatisticsListener(String containerName, OFStatisticsType type, Long switchId, List<OFStatistics> values){
+
+        IStatisticsServiceShimListener statisticsServiceShimListener =
+                                            this.statisticsServiceShimListener.get(containerName);
+
+        if (statisticsServiceShimListener != null) {
+
+            switch (type) {
+             case FLOW:
+                 statisticsServiceShimListener.
+                                     flowStatisticsUpdate(switchId, values, containerName);
+                 break;
+             case PORT:
+                 statisticsServiceShimListener.
+                                     portStatisticsUpdate(switchId, values, containerName);
+                 break;
+             case DESC:
+                 statisticsServiceShimListener.
+                                     descStatisticsUpdate(switchId, values, containerName);
+                 break;
+             case TABLE:
+                 statisticsServiceShimListener.
+                                     flowTableStatisticsUpdate(switchId, values, containerName);
+                 break;
+             default:
+                 break;
+            }
+
+            log.trace(type + " " + switchId + " on container "
+                                            + containerName);
+        }
+    }
+
+    private void notifyDescriptionListeners(Long switchId,
+            List<OFStatistics> values) {
+        for (IStatisticsListener l : this.descriptionListeners) {
+            l.descriptionRefreshed(switchId,
+                    ((OFDescriptionStatistics) values.get(0)));
+        }
+    }
+
+    /*
+     * Generic function to get the statistics form a OF switch
+     */
+    @SuppressWarnings("unchecked")
+    private List<OFStatistics> acquireStatistics(Long switchId,
+            OFStatisticsType statsType, Object target) {
+        List<OFStatistics> values = null;
+        String type = null;
+        ISwitch sw = controller.getSwitch(switchId);
+
+        if (sw != null) {
+            OFStatisticsRequest req = new OFStatisticsRequest();
+            req.setStatisticType(statsType);
+            int requestLength = req.getLengthU();
+
+            if (statsType == OFStatisticsType.FLOW) {
+                OFMatch match = null;
+                if (target == null) {
+                    // All flows request
+                    match = new OFMatch();
+                    match.setWildcards(0xffffffff);
+                } else if (!(target instanceof OFMatch)) {
+                    // Malformed request
+                    log.warn("Invalid target type for Flow stats request: {}",
+                            target.getClass());
+                    return null;
+                } else {
+                    // Specific flow request
+                    match = (OFMatch) target;
+                }
+                OFFlowStatisticsRequest specificReq = new OFFlowStatisticsRequest();
+                specificReq.setMatch(match);
+                specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
+                specificReq.setTableId((byte) 0xff);
+                req.setStatistics(Collections
+                        .singletonList((OFStatistics) specificReq));
+                requestLength += specificReq.getLength();
+                type = "FLOW";
+            } else if (statsType == OFStatisticsType.VENDOR) {
+                V6StatsRequest specificReq = new V6StatsRequest();
+                specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
+                specificReq.setTableId((byte) 0xff);
+                req.setStatistics(Collections
+                        .singletonList((OFStatistics) specificReq));
+                requestLength += specificReq.getLength();
+                type = "VENDOR";
+            } else if (statsType == OFStatisticsType.AGGREGATE) {
+                OFAggregateStatisticsRequest specificReq = new OFAggregateStatisticsRequest();
+                OFMatch match = new OFMatch();
+                match.setWildcards(0xffffffff);
+                specificReq.setMatch(match);
+                specificReq.setOutPort(OFPort.OFPP_NONE.getValue());
+                specificReq.setTableId((byte) 0xff);
+                req.setStatistics(Collections
+                        .singletonList((OFStatistics) specificReq));
+                requestLength += specificReq.getLength();
+                type = "AGGREGATE";
+            } else if (statsType == OFStatisticsType.PORT) {
+                short targetPort;
+                if (target == null) {
+                    // All ports request
+                    targetPort = (short) OFPort.OFPP_NONE.getValue();
+                } else if (!(target instanceof Short)) {
+                    // Malformed request
+                    log.warn("Invalid target type for Port stats request: {}",
+                            target.getClass());
+                    return null;
+                } else {
+                    // Specific port request
+                    targetPort = (Short) target;
+                }
+                OFPortStatisticsRequest specificReq = new OFPortStatisticsRequest();
+                specificReq.setPortNumber(targetPort);
+                req.setStatistics(Collections
+                        .singletonList((OFStatistics) specificReq));
+                requestLength += specificReq.getLength();
+                type = "PORT";
+            } else if (statsType == OFStatisticsType.QUEUE) {
+                OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
+                specificReq.setPortNumber((short) OFPort.OFPP_ALL.getValue());
+                specificReq.setQueueId(0xffffffff);
+                req.setStatistics(Collections
+                        .singletonList((OFStatistics) specificReq));
+                requestLength += specificReq.getLength();
+                type = "QUEUE";
+            } else if (statsType == OFStatisticsType.DESC) {
+                type = "DESC";
+            } else if (statsType == OFStatisticsType.TABLE) {
+                type = "TABLE";
+            }
+            req.setLengthU(requestLength);
+            Object result = sw.getStatistics(req);
+
+            if (result == null) {
+                log.warn("Request Timed Out for ({}) from switch {}", type,
+                        HexString.toHexString(switchId));
+            } else if (result instanceof OFError) {
+                log.warn("Switch {} failed to handle ({}) stats request: {}",
+                        new Object[] { HexString.toHexString(switchId), type,
+                                Utils.getOFErrorString((OFError) result) });
+                if (this.switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
+                    log.warn(
+                            "Switching back to regular Flow stats requests for switch {}",
+                            HexString.toHexString(switchId));
+                    this.switchSupportsVendorExtStats.put(switchId,
+                            Boolean.FALSE);
+                }
+            } else {
+                values = (List<OFStatistics>) result;
+            }
+        }
+        return values;
+    }
+
+    @Override
+    public List<OFStatistics> getOFFlowStatistics(Long switchId) {
+        List<OFStatistics> list = flowStatistics.get(switchId);
+
+        /*
+         * Check on emptiness as interference between add and get is still
+         * possible on the inner list (the concurrentMap entry's value)
+         */
+
+        return list;
+
+        /*
+        return (list == null || list.isEmpty()) ? this.dummyList
+                : (list.get(0) instanceof OFVendorStatistics) ? this
+                        .v6StatsListToOFStatsList(list) : list;
+                        */
+    }
+
+    @Override
+    public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch) {
+        List<OFStatistics> statsList = flowStatistics.get(switchId);
+
+        /*
+         * Check on emptiness as interference between add and get is still
+         * possible on the inner list (the concurrentMap entry's value)
+         */
+        if (statsList == null || statsList.isEmpty()) {
+            return this.dummyList;
+        }
+
+        if (statsList.get(0) instanceof OFVendorStatistics) {
+            /*
+             * Caller could provide regular OF match when we instead pull the
+             * vendor statistics from this node Caller is not supposed to know
+             * whether this switch supports vendor extensions statistics
+             * requests
+             */
+            /*
+            V6Match targetMatch = (ofMatch instanceof V6Match) ? (V6Match) ofMatch
+                    : new V6Match(ofMatch);
+
+            List<OFStatistics> targetList = v6StatsListToOFStatsList(statsList);
+            for (OFStatistics stats : targetList) {
+                V6StatsReply v6Stats = (V6StatsReply) stats;
+                V6Match v6Match = v6Stats.getMatch();
+                if (v6Match.equals(targetMatch)) {
+                    List<OFStatistics> list = new ArrayList<OFStatistics>();
+                    list.add(stats);
+                    return list;
+                }
+            }
+            */
+        } else {
+            for (OFStatistics stats : statsList) {
+                OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
+                if (flowStats.getMatch().equals(ofMatch)) {
+                    List<OFStatistics> list = new ArrayList<OFStatistics>();
+                    list.add(stats);
+                    return list;
+                }
+            }
+        }
+        return this.dummyList;
+    }
+
+    /*
+     * Converts the v6 vendor statistics to the OFStatistics
+     */
+    /*
+    private List<OFStatistics> v6StatsListToOFStatsList(
+            List<OFStatistics> statistics) {
+        List<OFStatistics> v6statistics = new ArrayList<OFStatistics>();
+        if (statistics != null && !statistics.isEmpty()) {
+            for (OFStatistics stats : statistics) {
+                if (stats instanceof OFVendorStatistics) {
+                    List<OFStatistics> r = getV6ReplyStatistics((OFVendorStatistics) stats);
+                    if (r != null) {
+                        v6statistics.addAll(r);
+                    }
+                }
+            }
+        }
+        return v6statistics;
+    }
+    */
+
+    /*
+
+    private static List<OFStatistics> getV6ReplyStatistics(
+            OFVendorStatistics stat) {
+        int length = stat.getLength();
+        List<OFStatistics> results = new ArrayList<OFStatistics>();
+        if (length < 12)
+            return null; // Nicira Hdr is 12 bytes. We need atleast that much
+        ByteBuffer data = ByteBuffer.allocate(length);
+        stat.writeTo(data);
+        data.rewind();
+        log.trace("getV6ReplyStatistics: Buffer BYTES ARE {}",
+                HexString.toHexString(data.array()));
+
+        int vendor = data.getInt(); // first 4 bytes is vendor id.
+        if (vendor != V6StatsRequest.NICIRA_VENDOR_ID) {
+            log.warn("Unexpected vendor id: 0x{}", Integer.toHexString(vendor));
+            return null;
+        } else {
+            // go ahead by 8 bytes which is 8 bytes of 0
+            data.getLong(); // should be all 0's
+            length -= 12; // 4 bytes Nicira Hdr + 8 bytes from above line have
+                          // been consumed
+        }
+
+        V6StatsReply v6statsreply;
+        int min_len;
+        while (length > 0) {
+            v6statsreply = new V6StatsReply();
+            min_len = v6statsreply.getLength();
+            if (length < v6statsreply.getLength())
+                break;
+            v6statsreply.setActionFactory(stat.getActionFactory());
+            v6statsreply.readFrom(data);
+            if (v6statsreply.getLength() < min_len)
+                break;
+            v6statsreply.setVendorId(vendor);
+            log.trace("V6StatsReply: {}", v6statsreply);
+            length -= v6statsreply.getLength();
+            results.add(v6statsreply);
+        }
+        return results;
+    }
+    */
+
+    @Override
+    public List<OFStatistics> queryStatistics(Long switchId,
+            OFStatisticsType statType, Object target) {
+        /*
+         * Caller does not know and it is not supposed to know whether this
+         * switch supports vendor extension. We adjust the target for him
+         */
+        if (statType == OFStatisticsType.FLOW) {
+            if (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) {
+                statType = OFStatisticsType.VENDOR;
+            }
+        }
+
+        List<OFStatistics> list = this.acquireStatistics(switchId, statType,
+                target);
+
+        return list;
+
+        /*
+        return (list == null) ? null
+                : (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list)
+                        : list;
+                        */
+    }
+
+    @Override
+    public List<OFStatistics> getOFDescStatistics(Long switchId) {
+        if (!descStatistics.containsKey(switchId))
+            return this.dummyList;
+
+        return descStatistics.get(switchId);
+    }
+
+    @Override
+    public List<OFStatistics> getOFPortStatistics(Long switchId) {
+        if (!portStatistics.containsKey(switchId)) {
+            return this.dummyList;
+        }
+
+        return portStatistics.get(switchId);
+    }
+
+    @Override
+    public List<OFStatistics> getOFPortStatistics(Long switchId, short portId) {
+        if (!portStatistics.containsKey(switchId)) {
+            return this.dummyList;
+        }
+        List<OFStatistics> list = new ArrayList<OFStatistics>(1);
+        for (OFStatistics stats : portStatistics.get(switchId)) {
+            if (((OFPortStatisticsReply) stats).getPortNumber() == portId) {
+                list.add(stats);
+                break;
+            }
+        }
+        return list;
+    }
+
+    @Override
+    public List<OFStatistics> getOFTableStatistics(Long switchId) {
+        if (!flowTableStatistics.containsKey(switchId))
+            return this.dummyList;
+
+        return flowTableStatistics.get(switchId);
+    }
+
+    @Override
+    public int getFlowsNumber(long switchId) {
+        return this.flowStatistics.get(switchId).size();
+    }
+
+    /*
+     * InventoryShim replay for us all the switch addition which happened before
+     * we were brought up
+     */
+    @Override
+    public void updateNode(Node node, UpdateType type, Set<Property> props) {
+        Long switchId = (Long) node.getID();
+        switch (type) {
+        case ADDED:
+            addStatisticsTicks(switchId);
+            break;
+        case REMOVED:
+            clearFlowStatsAndTicks(switchId);
+        default:
+        }
+    }
+
+    @Override
+    public void updateNodeConnector(NodeConnector nodeConnector,
+            UpdateType type, Set<Property> props) {
+        // No action
+    }
+
+    /**
+     * Update the cached port rates for this switch with the latest retrieved
+     * port transmit byte count
+     *
+     * @param switchId
+     */
+    private synchronized void updatePortsTxRate(long switchId) {
+        List<OFStatistics> newPortStatistics = this.portStatistics
+                .get(switchId);
+        if (newPortStatistics == null) {
+            return;
+        }
+        Map<Short, TxRates> rates = this.txRates.get(switchId);
+        if (rates == null) {
+            // First time rates for this switch are added
+            rates = new HashMap<Short, TxRates>();
+            txRates.put(switchId, rates);
+        }
+        for (OFStatistics stats : newPortStatistics) {
+            OFPortStatisticsReply newPortStat = (OFPortStatisticsReply) stats;
+            short port = newPortStat.getPortNumber();
+            TxRates portRatesHolder = rates.get(port);
+            if (portRatesHolder == null) {
+                // First time rates for this port are added
+                portRatesHolder = new TxRates();
+                rates.put(port, portRatesHolder);
+            }
+            // Get and store the number of transmitted bytes for this port
+            // And handle the case where agent does not support the counter
+            long transmitBytes = newPortStat.getTransmitBytes();
+            long value = (transmitBytes < 0) ? 0 : transmitBytes;
+            portRatesHolder.update(value);
+        }
+    }
+
+    @Override
+    public synchronized long getTransmitRate(Long switchId, Short port) {
+        long average = 0;
+        if (switchId == null || port == null) {
+            return average;
+        }
+        Map<Short, TxRates> perSwitch = txRates.get(switchId);
+        if (perSwitch == null) {
+            return average;
+        }
+        TxRates portRates = perSwitch.get(port);
+        if (portRates == null) {
+            return average;
+        }
+        return portRates.getAverageTxRate();
+    }
+
+    /*
+     * Manual switch name configuration code
+     */
+    @Override
+    public String getHelp() {
+        StringBuffer help = new StringBuffer();
+        help.append("---OF Statistics Manager utilities---\n");
+        help.append("\t ofdumpstatsmgr         - "
+                + "Print Internal Stats Mgr db\n");
+        help.append("\t ofstatsmgrintervals <fP> <pP> <dP> <tP>(in seconds) - "
+                + "Set/Show flow/port/dedscription/table stats poll intervals\n");
+        return help.toString();
+    }
+
+    private boolean isValidSwitchId(String switchId) {
+        String regexDatapathID = "^([0-9a-fA-F]{1,2}[:-]){7}[0-9a-fA-F]{1,2}$";
+        String regexDatapathIDLong = "^[0-9a-fA-F]{1,16}$";
+
+        return (switchId != null && (switchId.matches(regexDatapathID) || switchId
+                .matches(regexDatapathIDLong)));
+    }
+
+    public long getSwitchIDLong(String switchId) {
+        int radix = 16;
+        String switchString = "0";
+
+        if (isValidSwitchId(switchId)) {
+            if (switchId.contains(":")) {
+                // Handle the 00:00:AA:BB:CC:DD:EE:FF notation
+                switchString = switchId.replace(":", "");
+            } else if (switchId.contains("-")) {
+                // Handle the 00-00-AA-BB-CC-DD-EE-FF notation
+                switchString = switchId.replace("-", "");
+            } else {
+                // Handle the 0123456789ABCDEF notation
+                switchString = switchId;
+            }
+        }
+        return Long.parseLong(switchString, radix);
+    }
+
+    /*
+     * Internal information dump code
+     */
+    private String prettyPrintSwitchMap(ConcurrentMap<Long, StatisticsTicks> map) {
+        StringBuffer buffer = new StringBuffer();
+        buffer.append("{");
+        for (Entry<Long, StatisticsTicks> entry : map.entrySet()) {
+            buffer.append(HexString.toHexString(entry.getKey()) + "="
+                    + entry.getValue().toString() + " ");
+        }
+        buffer.append("}");
+        return buffer.toString();
+    }
+
+    public void _ofdumpstatsmgr(CommandInterpreter ci) {
+        ci.println("Global Counter: " + counter);
+        ci.println("Timer Ticks: " + prettyPrintSwitchMap(statisticsTimerTicks));
+        ci.println("PendingStatsQueue: " + pendingStatsRequests);
+        ci.println("PendingStatsQueue size: " + pendingStatsRequests.size());
+        ci.println("Stats Collector alive: " + statisticsCollector.isAlive());
+        ci.println("Stats Collector State: "
+                + statisticsCollector.getState().toString());
+        ci.println("StatsTimer: " + statisticsTimer.toString());
+        ci.println("Flow Stats Period: " + statisticsTickNumber + " s");
+        ci.println("Desc Stats Period: " + descriptionTickNumber + " s");
+        ci.println("Port Stats Period: " + portTickNumber + " s");
+        ci.println("Flow Table Stats Period: " + flowTableTickNumber + " s");
+    }
+
+    public void _resetSwitchCapability(CommandInterpreter ci) {
+        String sidString = ci.nextArgument();
+        Long sid = null;
+        if (sidString == null) {
+            ci.println("Insert the switch id (numeric value)");
+            return;
+        }
+        try {
+            sid = Long.valueOf(sidString);
+            this.switchSupportsVendorExtStats.put(sid, Boolean.TRUE);
+            ci.println("Vendor capability for switch " + sid + " set to "
+                    + this.switchSupportsVendorExtStats.get(sid));
+        } catch (NumberFormatException e) {
+            ci.println("Invalid switch id. Has to be numeric.");
+        }
+
+    }
+
+    public void _ofbw(CommandInterpreter ci) {
+        String sidString = ci.nextArgument();
+        Long sid = null;
+        if (sidString == null) {
+            ci.println("Insert the switch id (numeric value)");
+            return;
+        }
+        try {
+            sid = Long.valueOf(sidString);
+        } catch (NumberFormatException e) {
+            ci.println("Invalid switch id. Has to be numeric.");
+        }
+        if (sid != null) {
+            Map<Short, TxRates> thisSwitchRates = txRates.get(sid);
+            ci.println("Bandwidth utilization (" + factoredSamples
+                    * portTickNumber + " sec average) for switch "
+                    + HexEncode.longToHexString(sid) + ":");
+            if (thisSwitchRates == null) {
+                ci.println("Not available");
+            } else {
+                for (Entry<Short, TxRates> entry : thisSwitchRates.entrySet()) {
+                    ci.println("Port: " + entry.getKey() + ": "
+                            + entry.getValue().getAverageTxRate() + " bps");
+                }
+            }
+        }
+    }
+
+    public void _txratewindow(CommandInterpreter ci) {
+        String averageWindow = ci.nextArgument();
+        short seconds = 0;
+        if (averageWindow == null) {
+            ci.println("Insert the length in seconds of the median "
+                    + "window for tx rate");
+            ci.println("Current: " + factoredSamples * portTickNumber + " secs");
+            return;
+        }
+        try {
+            seconds = Short.valueOf(averageWindow);
+        } catch (NumberFormatException e) {
+            ci.println("Invalid period.");
+        }
+        OFStatisticsManager.factoredSamples = (short) (seconds / portTickNumber);
+        ci.println("New: " + factoredSamples * portTickNumber + " secs");
+    }
+
+    public void _ofstatsmgrintervals(CommandInterpreter ci) {
+        String flowStatsInterv = ci.nextArgument();
+        String portStatsInterv = ci.nextArgument();
+        String descStatsInterv = ci.nextArgument();
+        String flowTableStatsInterv = ci.nextArgument();
+
+        if (flowStatsInterv == null || portStatsInterv == null
+                || descStatsInterv == null || flowTableStatsInterv == null) {
+            ci.println("Usage: ostatsmgrintervals <fP> <pP> <dP> <tP>(in seconds)");
+            ci.println("Current Values: fP=" + statisticsTickNumber + "s pP="
+                    + portTickNumber + "s dP=" + descriptionTickNumber +
+                    "s tP=" + flowTableTickNumber + "s");
+            return;
+        }
+        Short fP, pP, dP, tP;
+        try {
+            fP = Short.parseShort(flowStatsInterv);
+            pP = Short.parseShort(portStatsInterv);
+            dP = Short.parseShort(descStatsInterv);
+            tP = Short.parseShort(flowTableStatsInterv);
+        } catch (Exception e) {
+            ci.println("Invalid format values: " + e.getMessage());
+            return;
+        }
+
+        if (pP <= 1 || fP <= 1 || dP <= 1 || tP <= 1) {
+            ci.println("Invalid values. fP, pP, dP, tP have to be greater than 1.");
+            return;
+        }
+
+        statisticsTickNumber = fP;
+        portTickNumber = pP;
+        descriptionTickNumber = dP;
+        flowTableTickNumber = tP;
+
+
+        ci.println("New Values: fP=" + statisticsTickNumber + "s pP="
+                + portTickNumber + "s dP=" + descriptionTickNumber +
+                "s tP=" + flowTableTickNumber + "s");
+    }
+
+    /**
+     * This method retrieves user configurations from config.ini and updates
+     * statisticsTickNumber/portTickNumber/descriptionTickNumber/flowTableTickNumber accordingly.
+     */
+    private void configStatsPollIntervals() {
+        String fsStr = System.getProperty("of.flowStatsPollInterval");
+        String psStr = System.getProperty("of.portStatsPollInterval");
+        String dsStr = System.getProperty("of.descStatsPollInterval");
+        String tsStr = System.getProperty("of.flowTableStatsPollInterval");
+        Short fs, ps, ds, ts;
+
+        if (fsStr != null) {
+            try {
+                fs = Short.parseShort(fsStr);
+                if (fs > 0) {
+                    statisticsTickNumber = fs;
+                }
+            } catch (Exception e) {
+            }
+        }
+
+        if (psStr != null) {
+            try {
+                ps = Short.parseShort(psStr);
+                if (ps > 0) {
+                    portTickNumber = ps;
+                }
+            } catch (Exception e) {
+            }
+        }
+
+        if (dsStr != null) {
+            try {
+                ds = Short.parseShort(dsStr);
+                if (ds > 0) {
+                    descriptionTickNumber = ds;
+                }
+            } catch (Exception e) {
+            }
+        }
+        if (tsStr != null) {
+            try {
+                ts = Short.parseShort(dsStr);
+                if (ts > 0) {
+                    flowTableTickNumber = ts;
+                }
+            } catch (Exception e) {
+            }
+        }
+
+    }
+}