Move stats caching to FM StatisticsManager 75/575/7
authorYevgeny Khodorkovsky <ykhodork@cisco.com>
Mon, 8 Jul 2013 23:36:57 +0000 (16:36 -0700)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 10 Jul 2013 17:23:51 +0000 (17:23 +0000)
This commit is towards HA functionality and includes:
- Cache flow, port, table and description statistics in a cluster
  allocated cache in functional module Statistics Manager
- Add necessary interfaces to allow plugin stats updates
  to propagate up to Stats Manager FM:
  * IOFStatisticsListener - notifies plugin internal components
  * IReadFilterInternalListener - notifies plugin internal
    components (demuxed to containers).
  * IPluginOutReadService - SAL hook for plugins to publish reader
    notifications.
  * IReadServiceListener - SAL service, publishes reader notifications
- Change statistics manager implementation to return cluster
  cached stats instead of calling reader service (Reader service is
  still available).
- Style fixes
- Bug fix: getOFFlowStatistics(switch,match) should consider priority.

Change-Id: I7c7a32102ea43bd4e05444527ad73d2610958603
Signed-off-by: Yevgeny Khodorkovsky <ykhodork@cisco.com>
31 files changed:
opendaylight/northbound/integrationtest/src/test/java/org/opendaylight/controller/northbound/integrationtest/NorthboundIT.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/IOFStatisticsListener.java [new file with mode: 0644]
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/IOFStatisticsManager.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/IReadFilterInternalListener.java [new file with mode: 0644]
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/IReadServiceFilter.java [moved from opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/IPluginReadServiceFilter.java with 98% similarity]
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/IStatisticsListener.java [deleted file]
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/Activator.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/DescStatisticsConverter.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/InventoryService.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/InventoryServiceShim.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/OFStatisticsManager.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/ReadService.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/ReadServiceFilter.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/Utils.java
opendaylight/protocol_plugins/stub/src/main/java/org/opendaylight/controller/protocol_plugins/stub/internal/ReadService.java
opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/reader/FlowOnNode.java
opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/reader/IPluginOutReadService.java [new file with mode: 0644]
opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/reader/IReadServiceListener.java [new file with mode: 0644]
opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/reader/NodeConnectorStatistics.java
opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/reader/NodeDescription.java
opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/reader/NodeTableStatistics.java
opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/utils/NodeCreator.java
opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/Activator.java
opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/ReadService.java
opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/Topology.java
opendaylight/statisticsmanager/api/src/main/java/org/opendaylight/controller/statisticsmanager/IStatisticsManager.java
opendaylight/statisticsmanager/implementation/pom.xml
opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/Activator.java
opendaylight/statisticsmanager/implementation/src/main/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManager.java
opendaylight/statisticsmanager/integrationtest/pom.xml
opendaylight/statisticsmanager/integrationtest/src/test/java/org/opendaylight/controller/statisticsmanager/internal/StatisticsManagerIT.java

index 797bca798f7d811c491e318bc4fddda1d9710899..66fa52c5ab93df569d7487a3879177ad9dc53769 100644 (file)
@@ -1,22 +1,13 @@
 package org.opendaylight.controller.northbound.integrationtest;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.osgi.framework.ServiceReference;
-import org.osgi.framework.Bundle;
-import javax.inject.Inject;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.ops4j.pax.exam.CoreOptions.junitBundles;
+import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
+import static org.ops4j.pax.exam.CoreOptions.options;
+import static org.ops4j.pax.exam.CoreOptions.systemPackages;
+import static org.ops4j.pax.exam.CoreOptions.systemProperty;
 
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.Before;
-import org.junit.runner.RunWith;
-import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.BundleContext;
-import static org.junit.Assert.*;
-import org.ops4j.pax.exam.junit.Configuration;
-import static org.ops4j.pax.exam.CoreOptions.*;
-import org.ops4j.pax.exam.Option;
-import org.ops4j.pax.exam.util.PathUtils;
 import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -30,13 +21,17 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.commons.codec.binary.Base64;
+import javax.inject.Inject;
 
+import org.apache.commons.codec.binary.Base64;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.codehaus.jettison.json.JSONTokener;
-
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.opendaylight.controller.hosttracker.IfIptoHost;
 import org.opendaylight.controller.sal.core.Bandwidth;
 import org.opendaylight.controller.sal.core.ConstructionException;
@@ -51,6 +46,15 @@ import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
 import org.opendaylight.controller.switchmanager.IInventoryListener;
 import org.opendaylight.controller.usermanager.IUserManager;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.Configuration;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.util.PathUtils;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @RunWith(PaxExam.class)
 public class NorthboundIT {
@@ -58,7 +62,7 @@ public class NorthboundIT {
     // get the OSGI bundle context
     @Inject
     private BundleContext bc;
-    private IUserManager users = null;
+    private IUserManager userManager = null;
     private IInventoryListener invtoryListener = null;
     private IListenTopoUpdates topoUpdates = null;
 
@@ -99,10 +103,10 @@ public class NorthboundIT {
 
         ServiceReference r = bc.getServiceReference(IUserManager.class.getName());
         if (r != null) {
-            this.users = (IUserManager) bc.getService(r);
+            this.userManager = (IUserManager) bc.getService(r);
         }
         // If UserManager is null, cannot login to run tests.
-        assertNotNull(this.users);
+        assertNotNull(this.userManager);
 
         r = bc.getServiceReference(IfIptoHost.class.getName());
         if (r != null) {
@@ -145,8 +149,8 @@ public class NorthboundIT {
 
         try {
             URL url = new URL(restUrl);
-            this.users.getAuthorizationList();
-            this.users.authenticate("admin", "admin");
+            this.userManager.getAuthorizationList();
+            this.userManager.authenticate("admin", "admin");
             String authString = "admin:admin";
             byte[] authEncBytes = Base64.encodeBase64(authString.getBytes());
             String authStringEnc = new String(authEncBytes);
@@ -574,7 +578,7 @@ public class NorthboundIT {
 
     @Test
     public void testStatistics() throws JSONException {
-        String actionTypes[] = { "drop", "loopback", "flood", "floodAll", "controller", "swPath", "hwPath", "output",
+        final String actionTypes[] = { "drop", "loopback", "flood", "floodAll", "controller", "swPath", "hwPath", "output",
                 "setDlSrc", "setDlDst", "setDlType", "setVlanId", "setVlanPcp", "setVlanCfi", "popVlan", "pushVlan",
                 "setNwSrc", "setNwDst", "setNwTos", "setTpSrc", "setTpDst" };
         System.out.println("Starting Statistics JAXB client.");
@@ -595,7 +599,7 @@ public class NorthboundIT {
         for (int i = 0; i < flowStats.length(); i++) {
 
             JSONObject flowStat = flowStats.getJSONObject(i);
-            testFlowStat(flowStat, actionTypes[i]);
+            testFlowStat(flowStat, actionTypes[i], i);
 
         }
 
@@ -637,7 +641,7 @@ public class NorthboundIT {
         flowStats = json.getJSONArray("flowStat");
         for (int i = 0; i < flowStats.length(); i++) {
             JSONObject flowStat = flowStats.getJSONObject(i);
-            testFlowStat(flowStat, actionTypes[i]);
+            testFlowStat(flowStat, actionTypes[i], i);
         }
 
         result = getJsonResult(baseURL + "portstats/STUB/51966");
@@ -664,7 +668,7 @@ public class NorthboundIT {
         Assert.assertTrue(portStat.getInt("collisionCount") == 4);
     }
 
-    private void testFlowStat(JSONObject flowStat, String actionType) throws JSONException {
+    private void testFlowStat(JSONObject flowStat, String actionType, int actIndex) throws JSONException {
         Assert.assertTrue(flowStat.getInt("tableId") == 1);
         Assert.assertTrue(flowStat.getInt("durationSeconds") == 40);
         Assert.assertTrue(flowStat.getInt("durationNanoseconds") == 400);
@@ -673,7 +677,7 @@ public class NorthboundIT {
 
         // test that flow information is correct
         JSONObject flow = flowStat.getJSONObject("flow");
-        Assert.assertTrue(flow.getInt("priority") == 3500);
+        Assert.assertTrue(flow.getInt("priority") == (3500 + actIndex));
         Assert.assertTrue(flow.getInt("idleTimeout") == 1000);
         Assert.assertTrue(flow.getInt("hardTimeout") == 2000);
         Assert.assertTrue(flow.getInt("id") == 12345);
@@ -1258,33 +1262,31 @@ public class NorthboundIT {
                 mavenBundle("org.opendaylight.controller", "protocol_plugins.stub", "0.4.0-SNAPSHOT"),
 
                 // List all the opendaylight modules
-                mavenBundle("org.opendaylight.controller", "security", "0.4.0-SNAPSHOT").noStart(),
-                mavenBundle("org.opendaylight.controller", "sal", "0.5.0-SNAPSHOT"),
-                mavenBundle("org.opendaylight.controller", "sal.implementation", "0.4.0-SNAPSHOT"),
-                mavenBundle("org.opendaylight.controller", "statisticsmanager", "0.4.0-SNAPSHOT"),
-                mavenBundle("org.opendaylight.controller", "statisticsmanager.implementation", "0.4.0-SNAPSHOT"),
+                mavenBundle("org.opendaylight.controller", "configuration", "0.4.0-SNAPSHOT"),
+                mavenBundle("org.opendaylight.controller", "configuration.implementation", "0.4.0-SNAPSHOT"),
                 mavenBundle("org.opendaylight.controller", "containermanager", "0.4.0-SNAPSHOT"),
                 mavenBundle("org.opendaylight.controller", "containermanager.implementation", "0.4.0-SNAPSHOT"),
-                mavenBundle("org.opendaylight.controller", "forwardingrulesmanager", "0.4.0-SNAPSHOT"),
-                mavenBundle("org.opendaylight.controller", "forwardingrulesmanager.implementation", "0.4.0-SNAPSHOT"),
-                mavenBundle("org.opendaylight.controller", "arphandler", "0.4.0-SNAPSHOT"),
                 mavenBundle("org.opendaylight.controller", "clustering.services", "0.4.0-SNAPSHOT"),
                 mavenBundle("org.opendaylight.controller", "clustering.services-implementation", "0.4.0-SNAPSHOT"),
+                mavenBundle("org.opendaylight.controller", "security", "0.4.0-SNAPSHOT").noStart(),
+                mavenBundle("org.opendaylight.controller", "sal", "0.5.0-SNAPSHOT"),
+                mavenBundle("org.opendaylight.controller", "sal.implementation", "0.4.0-SNAPSHOT"),
                 mavenBundle("org.opendaylight.controller", "switchmanager", "0.4.0-SNAPSHOT"),
                 mavenBundle("org.opendaylight.controller", "switchmanager.implementation", "0.4.0-SNAPSHOT"),
-                mavenBundle("org.opendaylight.controller", "configuration", "0.4.0-SNAPSHOT"),
-                mavenBundle("org.opendaylight.controller", "configuration.implementation", "0.4.0-SNAPSHOT"),
+                mavenBundle("org.opendaylight.controller", "forwardingrulesmanager", "0.4.0-SNAPSHOT"),
+                mavenBundle("org.opendaylight.controller", "forwardingrulesmanager.implementation", "0.4.0-SNAPSHOT"),
+                mavenBundle("org.opendaylight.controller", "statisticsmanager", "0.4.0-SNAPSHOT"),
+                mavenBundle("org.opendaylight.controller", "statisticsmanager.implementation", "0.4.0-SNAPSHOT"),
+                mavenBundle("org.opendaylight.controller", "arphandler", "0.4.0-SNAPSHOT"),
                 mavenBundle("org.opendaylight.controller", "hosttracker", "0.4.0-SNAPSHOT"),
                 mavenBundle("org.opendaylight.controller", "hosttracker.implementation", "0.4.0-SNAPSHOT"),
                 mavenBundle("org.opendaylight.controller", "arphandler", "0.4.0-SNAPSHOT"),
                 mavenBundle("org.opendaylight.controller", "routing.dijkstra_implementation", "0.4.0-SNAPSHOT"),
                 mavenBundle("org.opendaylight.controller", "topologymanager", "0.4.0-SNAPSHOT"),
-
                 mavenBundle("org.opendaylight.controller", "usermanager", "0.4.0-SNAPSHOT"),
                 mavenBundle("org.opendaylight.controller", "usermanager.implementation", "0.4.0-SNAPSHOT"),
                 mavenBundle("org.opendaylight.controller", "logging.bridge", "0.4.0-SNAPSHOT"),
                 mavenBundle("org.opendaylight.controller", "clustering.test", "0.4.0-SNAPSHOT"),
-
                 mavenBundle("org.opendaylight.controller", "forwarding.staticrouting", "0.4.0-SNAPSHOT"),
 
                 // Northbound bundles
diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/IOFStatisticsListener.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/IOFStatisticsListener.java
new file mode 100644 (file)
index 0000000..1c22966
--- /dev/null
@@ -0,0 +1,20 @@
+package org.opendaylight.controller.protocol_plugin.openflow;
+
+import java.util.List;
+
+import org.openflow.protocol.statistics.OFStatistics;
+
+/**
+ * Interface defines the api which gets called when the information
+ * contained in the OF statistics reply message from a network is updated with
+ * new one.
+ */
+public interface IOFStatisticsListener {
+    public void descriptionStatisticsRefreshed(Long switchId, List<OFStatistics> description);
+
+    public void flowStatisticsRefreshed(Long switchId, List<OFStatistics> flows);
+
+    public void portStatisticsRefreshed(Long switchId, List<OFStatistics> ports);
+
+    public void tableStatisticsRefreshed(Long switchId, List<OFStatistics> tables);
+}
index d619b7377672616e538ecd161473fc8453a91015..fa225a8d3ee9e77cf5c034c73c383e26c518d3a6 100644 (file)
@@ -32,9 +32,10 @@ public interface IOFStatisticsManager {
      *
      * @param switchId the openflow datapath id
      * @param ofMatch the openflow match to query. If null, the query is intended for all the flows
+     * @param priority Priority of the wanted flow
      * @return the list of openflow statistics
      */
-    List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch);
+    List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch, short priority);
 
     /**
      * Return the description statistics for the specified switch.
diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/IReadFilterInternalListener.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/IReadFilterInternalListener.java
new file mode 100644 (file)
index 0000000..d5f4d31
--- /dev/null
@@ -0,0 +1,46 @@
+package org.opendaylight.controller.protocol_plugin.openflow;
+
+import java.util.List;
+
+import org.opendaylight.controller.sal.core.Node;
+import org.opendaylight.controller.sal.reader.FlowOnNode;
+import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
+import org.opendaylight.controller.sal.reader.NodeDescription;
+import org.opendaylight.controller.sal.reader.NodeTableStatistics;
+
+/**
+ * The Interface provides statistics updates to ReaderFilter listeners within
+ * the protocol plugin
+ */
+public interface IReadFilterInternalListener {
+
+    /**
+     * Notifies the hardware view of all the flow installed on the specified network node
+     * @param node
+     * @return
+     */
+    public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList);
+
+    /**
+     * Notifies the hardware view of the specified network node connector
+     * @param node
+     * @return
+     */
+    public void nodeConnectorStatisticsUpdated(Node node, List<NodeConnectorStatistics> ncStatsList);
+
+    /**
+     * Notifies all the table statistics for a node
+     * @param node
+     * @return
+     */
+    public void nodeTableStatisticsUpdated(Node node, List<NodeTableStatistics> tableStatsList);
+
+    /**
+     * Notifies the hardware view of all the flow installed on the specified network node
+     * @param node
+     * @return
+     */
+    public void nodeDescriptionStatisticsUpdated(Node node, NodeDescription nodeDescription);
+
+
+}
@@ -25,7 +25,7 @@ import org.opendaylight.controller.sal.reader.NodeTableStatistics;
  * It is implemented by the respective OF1.0 plugin component
  *
  */
-public interface IPluginReadServiceFilter {
+public interface IReadServiceFilter {
     /**
      * Returns the hardware image for the specified flow
      * on the specified network node for the passed container
diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/IStatisticsListener.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/IStatisticsListener.java
deleted file mode 100644 (file)
index d837e01..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-package org.opendaylight.controller.protocol_plugin.openflow;
-
-import org.openflow.protocol.statistics.OFDescriptionStatistics;
-
-/**
- * Interface which defines the api which gets called when the information
- * contained in the OF description statistics reply message from a network
- * is updated with new one.
- */
-public interface IStatisticsListener {
-        public void descriptionRefreshed(Long switchId,
-                                        OFDescriptionStatistics description);
-}
index 54c61b89abc7b951076741b5cf37e8e3c9f3cd79..6c09abbdc7cd851df41e9e72b2bba987cd6171e9 100644 (file)
@@ -19,10 +19,11 @@ import org.opendaylight.controller.protocol_plugin.openflow.IFlowProgrammerNotif
 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryProvider;
 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimInternalListener;
+import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsListener;
 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
-import org.opendaylight.controller.protocol_plugin.openflow.IPluginReadServiceFilter;
+import org.opendaylight.controller.protocol_plugin.openflow.IReadFilterInternalListener;
+import org.opendaylight.controller.protocol_plugin.openflow.IReadServiceFilter;
 import org.opendaylight.controller.protocol_plugin.openflow.IRefreshInternalProvider;
-import org.opendaylight.controller.protocol_plugin.openflow.IStatisticsListener;
 import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShimListener;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
@@ -37,6 +38,7 @@ import org.opendaylight.controller.sal.inventory.IPluginOutInventoryService;
 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
 import org.opendaylight.controller.sal.reader.IPluginInReadService;
+import org.opendaylight.controller.sal.reader.IPluginOutReadService;
 import org.opendaylight.controller.sal.topology.IPluginInTopologyService;
 import org.opendaylight.controller.sal.topology.IPluginOutTopologyService;
 import org.opendaylight.controller.sal.utils.GlobalConstants;
@@ -104,8 +106,7 @@ public class Activator extends ComponentActivatorAbstractBase {
             // export the service to be used by SAL
             c.setInterface(
                     new String[] { IPluginInTopologyService.class.getName(),
-                            ITopologyServiceShimListener.class.getName() },
-                    null);
+                            ITopologyServiceShimListener.class.getName() }, null);
             // Hook the services coming in from SAL, as optional in
             // case SAL is not yet there, could happen
             c.add(createContainerServiceDependency(containerName)
@@ -121,7 +122,8 @@ public class Activator extends ComponentActivatorAbstractBase {
         if (imp.equals(InventoryService.class)) {
             // export the service
             c.setInterface(
-                    new String[] { IPluginInInventoryService.class.getName(),
+                    new String[] {
+                            IPluginInInventoryService.class.getName(),
                             IInventoryShimInternalListener.class.getName(),
                             IInventoryProvider.class.getName() }, null);
 
@@ -168,11 +170,19 @@ public class Activator extends ComponentActivatorAbstractBase {
             // Set the protocolPluginType property which will be used
             // by SAL
             props.put(GlobalConstants.PROTOCOLPLUGINTYPE.toString(), Node.NodeIDType.OPENFLOW);
-            c.setInterface(IPluginInReadService.class.getName(), props);
+            c.setInterface(new String[] {
+                    IReadFilterInternalListener.class.getName(),
+                    IPluginInReadService.class.getName() }, props);
+
             c.add(createServiceDependency()
-                    .setService(IPluginReadServiceFilter.class)
+                    .setService(IReadServiceFilter.class)
                     .setCallbacks("setService", "unsetService")
                     .setRequired(true));
+            c.add(createContainerServiceDependency(containerName)
+                    .setService(IPluginOutReadService.class)
+                    .setCallbacks("setPluginOutReadServices",
+                            "unsetPluginOutReadServices")
+                    .setRequired(false));
         }
 
         if (imp.equals(FlowProgrammerNotifier.class)) {
@@ -240,7 +250,7 @@ public class Activator extends ComponentActivatorAbstractBase {
                             IMessageListener.class.getName(),
                             IContainerListener.class.getName(),
                             IInventoryShimExternalListener.class.getName() },
-                    props);
+                            props);
 
             c.add(createServiceDependency()
                     .setService(IController.class, "(name=Controller)")
@@ -257,9 +267,10 @@ public class Activator extends ComponentActivatorAbstractBase {
 
         if (imp.equals(ReadServiceFilter.class)) {
 
-            c.setInterface(
-                    new String[] { IPluginReadServiceFilter.class.getName(),
-                            IContainerListener.class.getName() }, null);
+            c.setInterface(new String[] {
+                    IReadServiceFilter.class.getName(),
+                    IContainerListener.class.getName(),
+                    IOFStatisticsListener.class.getName() }, null);
 
             c.add(createServiceDependency()
                     .setService(IController.class, "(name=Controller)")
@@ -269,6 +280,12 @@ public class Activator extends ComponentActivatorAbstractBase {
                     .setService(IOFStatisticsManager.class)
                     .setCallbacks("setService", "unsetService")
                     .setRequired(true));
+            c.add(createServiceDependency()
+                    .setService(IReadFilterInternalListener.class)
+                    .setCallbacks("setReadFilterInternalListener",
+                            "unsetReadFilterInternalListener")
+                    .setRequired(false));
+
         }
 
         if (imp.equals(OFStatisticsManager.class)) {
@@ -281,7 +298,7 @@ public class Activator extends ComponentActivatorAbstractBase {
                     .setCallbacks("setController", "unsetController")
                     .setRequired(true));
             c.add(createServiceDependency()
-                    .setService(IStatisticsListener.class)
+                    .setService(IOFStatisticsListener.class)
                     .setCallbacks("setStatisticsListener",
                             "unsetStatisticsListener").setRequired(false));
         }
@@ -337,7 +354,7 @@ public class Activator extends ComponentActivatorAbstractBase {
 
         if (imp.equals(InventoryServiceShim.class)) {
             c.setInterface(new String[] { IContainerListener.class.getName(),
-                    IStatisticsListener.class.getName()}, null);
+                    IOFStatisticsListener.class.getName()}, null);
 
             c.add(createServiceDependency()
                     .setService(IController.class, "(name=Controller)")
index ffd5fd4315747e40c7bf16b298c98f7c1bbd34eb..28c3dd6b4f7b0df028291dfa9d8d481201806086 100644 (file)
@@ -31,6 +31,10 @@ public class DescStatisticsConverter {
         this.ofDesc = (statsList == null || statsList.isEmpty())?
                 null : (OFDescriptionStatistics) statsList.get(0);
     }
+    public DescStatisticsConverter(OFDescriptionStatistics desc) {
+        this.hwDesc = null;
+        this.ofDesc = desc;
+    }
 
     public NodeDescription getHwDescription() {
         if (hwDesc == null && ofDesc != null) {
index 4869a9500b489ca58c1eaf72359fb8aaeb72cd12..6e6cb00f899d13a94188f7c76c439a68b331f573 100644 (file)
@@ -8,8 +8,6 @@
 
 package org.opendaylight.controller.protocol_plugin.openflow.internal;
 
-import java.util.Collections;
-import java.util.Date;
 import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -17,15 +15,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.apache.felix.dm.Component;
 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryProvider;
 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimInternalListener;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
-import org.opendaylight.controller.sal.core.ConstructionException;
 import org.opendaylight.controller.sal.core.Node;
-import org.opendaylight.controller.sal.core.Node.NodeIDType;
 import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.core.Property;
 import org.opendaylight.controller.sal.core.UpdateType;
@@ -47,8 +44,7 @@ public class InventoryService implements IInventoryShimInternalListener,
         IPluginInInventoryService, IInventoryProvider {
     protected static final Logger logger = LoggerFactory
             .getLogger(InventoryService.class);
-    private Set<IPluginOutInventoryService> pluginOutInventoryServices = Collections
-            .synchronizedSet(new HashSet<IPluginOutInventoryService>());
+    private Set<IPluginOutInventoryService> pluginOutInventoryServices;
     private IController controller = null;
     private ConcurrentMap<Node, Map<String, Property>> nodeProps; // properties are maintained in global container only
     private ConcurrentMap<NodeConnector, Map<String, Property>> nodeConnectorProps; // properties are maintained in global container only
@@ -83,6 +79,7 @@ public class InventoryService implements IInventoryShimInternalListener,
 
         nodeProps = new ConcurrentHashMap<Node, Map<String, Property>>();
         nodeConnectorProps = new ConcurrentHashMap<NodeConnector, Map<String, Property>>();
+        pluginOutInventoryServices = new CopyOnWriteArraySet<IPluginOutInventoryService>();
     }
 
     /**
@@ -129,19 +126,6 @@ public class InventoryService implements IInventoryShimInternalListener,
         }
     }
 
-    protected Node OFSwitchToNode(ISwitch sw) {
-        Node node = null;
-        Object id = sw.getId();
-
-        try {
-            node = new Node(NodeIDType.OPENFLOW, id);
-        } catch (ConstructionException e) {
-            logger.error("", e);
-        }
-
-        return node;
-    }
-
     /**
      * Retrieve nodes from openflow
      */
@@ -206,11 +190,10 @@ public class InventoryService implements IInventoryShimInternalListener,
         }
 
         // update sal and discovery
-        synchronized (pluginOutInventoryServices) {
-            for (IPluginOutInventoryService service : pluginOutInventoryServices) {
-                service.updateNodeConnector(nodeConnector, type, props);
-            }
+        for (IPluginOutInventoryService service : pluginOutInventoryServices) {
+            service.updateNodeConnector(nodeConnector, type, props);
         }
+
     }
 
     private void addNode(Node node, Set<Property> props) {
@@ -242,10 +225,8 @@ public class InventoryService implements IInventoryShimInternalListener,
         nodeProps.put(node, propMap);
 
         // update sal
-        synchronized (pluginOutInventoryServices) {
-            for (IPluginOutInventoryService service : pluginOutInventoryServices) {
-                service.updateNode(node, UpdateType.ADDED, props);
-            }
+        for (IPluginOutInventoryService service : pluginOutInventoryServices) {
+            service.updateNode(node, UpdateType.ADDED, props);
         }
     }
 
@@ -268,10 +249,8 @@ public class InventoryService implements IInventoryShimInternalListener,
         }
 
         // update sal
-        synchronized (pluginOutInventoryServices) {
-            for (IPluginOutInventoryService service : pluginOutInventoryServices) {
-                service.updateNode(node, UpdateType.REMOVED, null);
-            }
+        for (IPluginOutInventoryService service : pluginOutInventoryServices) {
+            service.updateNode(node, UpdateType.REMOVED, null);
         }
     }
 
@@ -296,10 +275,8 @@ public class InventoryService implements IInventoryShimInternalListener,
 
         // Update SAL if we got new properties
         if (!newProperties.isEmpty()) {
-            synchronized (pluginOutInventoryServices) {
-                for (IPluginOutInventoryService service : pluginOutInventoryServices) {
-                    service.updateNode(node, UpdateType.CHANGED, newProperties);
-                }
+            for (IPluginOutInventoryService service : pluginOutInventoryServices) {
+                service.updateNode(node, UpdateType.CHANGED, newProperties);
             }
         }
     }
index e2d77b5b73f1b1c3c73da363b2f40949319c0a0a..a6d5a15313719829f92d8be37f2ec3766d7dea1e 100644 (file)
@@ -19,7 +19,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimInternalListener;
-import org.opendaylight.controller.protocol_plugin.openflow.IStatisticsListener;
+import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsListener;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
@@ -40,11 +40,13 @@ import org.opendaylight.controller.sal.core.Tables;
 import org.opendaylight.controller.sal.core.TimeStamp;
 import org.opendaylight.controller.sal.core.UpdateType;
 import org.opendaylight.controller.sal.utils.GlobalConstants;
+import org.opendaylight.controller.sal.utils.NodeCreator;
 import org.openflow.protocol.OFMessage;
 import org.openflow.protocol.OFPortStatus;
 import org.openflow.protocol.OFPortStatus.OFPortReason;
 import org.openflow.protocol.OFType;
 import org.openflow.protocol.statistics.OFDescriptionStatistics;
+import org.openflow.protocol.statistics.OFStatistics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,7 +58,7 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class InventoryServiceShim implements IContainerListener,
-        IMessageListener, ISwitchStateListener, IStatisticsListener {
+        IMessageListener, ISwitchStateListener, IOFStatisticsListener {
     protected static final Logger logger = LoggerFactory
             .getLogger(InventoryServiceShim.class);
     private IController controller = null;
@@ -178,7 +180,7 @@ public class InventoryServiceShim implements IContainerListener,
 
     protected void handlePortStatusMessage(ISwitch sw, OFPortStatus m)
             throws ConstructionException {
-        Node node = new Node(NodeIDType.OPENFLOW, sw.getId());
+        Node node = NodeCreator.createOFNode(sw.getId());
         NodeConnector nodeConnector = PortConverter.toNodeConnector(m.getDesc()
                 .getPortNumber(), node);
         UpdateType type = null;
@@ -391,14 +393,7 @@ public class InventoryServiceShim implements IContainerListener,
     }
 
     private void addNode(ISwitch sw) {
-        Node node;
-        try {
-            node = new Node(NodeIDType.OPENFLOW, sw.getId());
-        } catch (ConstructionException e) {
-            logger.error("{}", e.getMessage());
-            return;
-        }
-
+        Node node = NodeCreator.createOFNode(sw.getId());
         UpdateType type = UpdateType.ADDED;
 
         Set<Property> props = new HashSet<Property>();
@@ -460,19 +455,11 @@ public class InventoryServiceShim implements IContainerListener,
     }
 
     @Override
-    public void descriptionRefreshed(Long switchId,
-            OFDescriptionStatistics descriptionStats) {
-        Node node;
-        try {
-            node = new Node(NodeIDType.OPENFLOW, switchId);
-        } catch (ConstructionException e) {
-            logger.error("{}", e.getMessage());
-            return;
-        }
-
+    public void descriptionStatisticsRefreshed(Long switchId, List<OFStatistics> descriptionStats) {
+        Node node = NodeCreator.createOFNode(switchId);
         Set<Property> properties = new HashSet<Property>(1);
-        Description desc = new Description(
-                descriptionStats.getDatapathDescription());
+        OFDescriptionStatistics ofDesc = (OFDescriptionStatistics) descriptionStats.get(0);
+        Description desc = new Description(ofDesc.getDatapathDescription());
         properties.add(desc);
 
         // Notify all internal and external listeners
@@ -489,4 +476,19 @@ public class InventoryServiceShim implements IContainerListener,
 
         return mac;
     }
+
+    @Override
+    public void flowStatisticsRefreshed(Long switchId, List<OFStatistics> flows) {
+        // Nothing to do
+    }
+
+    @Override
+    public void portStatisticsRefreshed(Long switchId, List<OFStatistics> ports) {
+        // Nothing to do
+    }
+
+    @Override
+    public void tableStatisticsRefreshed(Long switchId, List<OFStatistics> tables) {
+        // Nothing to do
+    }
 }
index 21a632e1de0226dadfd035b432f2dec85f5a57f2..b63517b8add5d90a4df4aecd386c63767fd47e50 100644 (file)
@@ -13,7 +13,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -23,14 +22,15 @@ import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.eclipse.osgi.framework.console.CommandInterpreter;
 import org.eclipse.osgi.framework.console.CommandProvider;
 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExternalListener;
+import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsListener;
 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
-import org.opendaylight.controller.protocol_plugin.openflow.IStatisticsListener;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
@@ -46,7 +46,6 @@ import org.openflow.protocol.OFMatch;
 import org.openflow.protocol.OFPort;
 import org.openflow.protocol.OFStatisticsRequest;
 import org.openflow.protocol.statistics.OFAggregateStatisticsRequest;
-import org.openflow.protocol.statistics.OFDescriptionStatistics;
 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
 import org.openflow.protocol.statistics.OFPortStatisticsReply;
@@ -69,18 +68,17 @@ import org.slf4j.LoggerFactory;
  */
 public class OFStatisticsManager implements IOFStatisticsManager,
 IInventoryShimExternalListener, CommandProvider {
-    private static final Logger log = LoggerFactory
-            .getLogger(OFStatisticsManager.class);
-    private static final int initialSize = 64;
-    private static final long flowStatsPeriod = 10000;
-    private static final long descriptionStatsPeriod = 60000;
-    private static final long portStatsPeriod = 5000;
-    private static final long tableStatsPeriod = 10000;
-    private static final long tickPeriod = 1000;
-    private static short statisticsTickNumber = (short) (flowStatsPeriod / tickPeriod);
-    private static short descriptionTickNumber = (short) (descriptionStatsPeriod / tickPeriod);
-    private static short portTickNumber = (short) (portStatsPeriod / tickPeriod);
-    private static short tableTickNumber = (short) (tableStatsPeriod / tickPeriod);
+    private static final Logger log = LoggerFactory.getLogger(OFStatisticsManager.class);
+    private static final int INITIAL_SIZE = 64;
+    private static final long FLOW_STATS_PERIOD = 10000;
+    private static final long DESC_STATS_PERIOD = 60000;
+    private static final long PORT_STATS_PERIOD = 5000;
+    private static final long TABLE_STATS_PERIOD = 10000;
+    private static final long TICK = 1000;
+    private static short statisticsTickNumber = (short) (FLOW_STATS_PERIOD / TICK);
+    private static short descriptionTickNumber = (short) (DESC_STATS_PERIOD / TICK);
+    private static short portTickNumber = (short) (PORT_STATS_PERIOD / TICK);
+    private static short tableTickNumber = (short) (TABLE_STATS_PERIOD / TICK);
     private static short factoredSamples = (short) 2;
     private static short counter = 1;
     private IController controller = null;
@@ -97,18 +95,17 @@ IInventoryShimExternalListener, CommandProvider {
     private Timer statisticsTimer;
     private TimerTask statisticsTimerTask;
     private ConcurrentMap<Long, Boolean> switchSupportsVendorExtStats;
-    private Map<Long, Map<Short, TxRates>> txRates; // Per port sampled (every
-                                                    // portStatsPeriod) transmit
-                                                    // rate
-    private Set<IStatisticsListener> descriptionListeners;
+    // Per port sampled (every portStatsPeriod) transmit rate
+    private Map<Long, Map<Short, TxRates>> txRates;
+    private Set<IOFStatisticsListener> statisticsListeners;
 
     /**
      * The object containing the latest factoredSamples tx rate samples for a
      * given switch port
      */
     protected class TxRates {
-        Deque<Long> sampledTxBytes; // contains the latest factoredSamples
-                                    // sampled transmitted bytes
+        // contains the latest factoredSamples sampled transmitted bytes
+        Deque<Long> sampledTxBytes;
 
         public TxRates() {
             sampledTxBytes = new LinkedBlockingDeque<Long>();
@@ -140,10 +137,9 @@ IInventoryShimExternalListener, CommandProvider {
             if (sampledTxBytes.size() < factoredSamples) {
                 return average;
             }
-            long increment = (long) (sampledTxBytes.getFirst() - sampledTxBytes
-                    .getLast());
-            long timePeriod = (long) (factoredSamples * portStatsPeriod)
-                    / (long) tickPeriod;
+            long increment = sampledTxBytes.getFirst() - sampledTxBytes
+                    .getLast();
+            long timePeriod = factoredSamples * PORT_STATS_PERIOD / TICK;
             average = (8L * increment) / timePeriod;
             return average;
         }
@@ -170,15 +166,12 @@ IInventoryShimExternalListener, CommandProvider {
         portStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
         tableStatistics = new ConcurrentHashMap<Long, List<OFStatistics>>();
         dummyList = new ArrayList<OFStatistics>(1);
-        statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(
-                initialSize);
-        pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(
-                initialSize);
-        switchPortStatsUpdated = new LinkedBlockingQueue<Long>(initialSize);
-        switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(
-                initialSize);
-        txRates = new HashMap<Long, Map<Short, TxRates>>(initialSize);
-        descriptionListeners = new HashSet<IStatisticsListener>();
+        statisticsTimerTicks = new ConcurrentHashMap<Long, StatisticsTicks>(INITIAL_SIZE);
+        pendingStatsRequests = new LinkedBlockingQueue<StatsRequest>(INITIAL_SIZE);
+        switchPortStatsUpdated = new LinkedBlockingQueue<Long>(INITIAL_SIZE);
+        switchSupportsVendorExtStats = new ConcurrentHashMap<Long, Boolean>(INITIAL_SIZE);
+        txRates = new HashMap<Long, Map<Short, TxRates>>(INITIAL_SIZE);
+        statisticsListeners = new CopyOnWriteArraySet<IOFStatisticsListener>();
 
         configStatsPollIntervals();
 
@@ -198,7 +191,7 @@ IInventoryShimExternalListener, CommandProvider {
                 while (true) {
                     try {
                         StatsRequest req = pendingStatsRequests.take();
-                        acquireStatistics(req.switchId, req.type);
+                        queryStatisticsInternal(req.switchId, req.type);
                     } catch (InterruptedException e) {
                         log.warn("Flow Statistics Collector thread "
                                 + "interrupted", e);
@@ -239,7 +232,7 @@ IInventoryShimExternalListener, CommandProvider {
      */
     void start() {
         // Start managed timers
-        statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, tickPeriod);
+        statisticsTimer.scheduleAtFixedRate(statisticsTimerTask, 0, TICK);
 
         // Start statistics collector thread
         statisticsCollector.start();
@@ -262,21 +255,19 @@ IInventoryShimExternalListener, CommandProvider {
         statisticsTimer.cancel();
     }
 
-    public void setStatisticsListener(IStatisticsListener s) {
-        this.descriptionListeners.add(s);
+    public void setStatisticsListener(IOFStatisticsListener s) {
+        this.statisticsListeners.add(s);
     }
 
-    public void unsetStatisticsListener(IStatisticsListener s) {
+    public void unsetStatisticsListener(IOFStatisticsListener s) {
         if (s != null) {
-            this.descriptionListeners.remove(s);
+            this.statisticsListeners.remove(s);
         }
     }
 
     private void registerWithOSGIConsole() {
-        BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
-                .getBundleContext();
-        bundleContext.registerService(CommandProvider.class.getName(), this,
-                null);
+        BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
+        bundleContext.registerService(CommandProvider.class.getName(), this, null);
     }
 
     private static class StatsRequest {
@@ -337,7 +328,7 @@ IInventoryShimExternalListener, CommandProvider {
                                                                   // extension
                                                                   // stats
         statisticsTimerTicks.put(switchId, new StatisticsTicks(true));
-        log.info("Added Switch {} to target pool",
+        log.debug("Added Switch {} to target pool",
                 HexString.toHexString(switchId.longValue()));
     }
 
@@ -414,11 +405,9 @@ IInventoryShimExternalListener, CommandProvider {
     }
 
     private void printInfoMessage(String type, StatsRequest request) {
-        log.info(
-                "{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
-                new Object[] { type, HexString.toHexString(request.switchId),
-                        pendingStatsRequests.size(),
-                        statisticsCollector.getState().toString() });
+        log.info("{} stats request not inserted for switch: {}. Queue size: {}. Collector state: {}.",
+                new Object[] {type, HexString.toHexString(request.switchId), pendingStatsRequests.size(),
+                statisticsCollector.getState().toString() });
     }
 
     protected void decrementTicks() {
@@ -427,10 +416,10 @@ IInventoryShimExternalListener, CommandProvider {
                 .entrySet()) {
             StatisticsTicks clock = entry.getValue();
             Long switchId = entry.getKey();
-            if (clock.decrementFlowTicksIsZero() == true) {
-                request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ? new StatsRequest(
-                        switchId, OFStatisticsType.VENDOR) : new StatsRequest(
-                        switchId, OFStatisticsType.FLOW);
+            if (clock.decrementFlowTicksIsZero()) {
+                request = (switchSupportsVendorExtStats.get(switchId) == Boolean.TRUE) ?
+                        new StatsRequest(switchId, OFStatisticsType.VENDOR) :
+                        new StatsRequest(switchId, OFStatisticsType.FLOW);
                 // If a request for this switch is already in the queue, skip to
                 // add this new request
                 if (!pendingStatsRequests.contains(request)
@@ -439,7 +428,7 @@ IInventoryShimExternalListener, CommandProvider {
                 }
             }
 
-            if (clock.decrementDescTicksIsZero() == true) {
+            if (clock.decrementDescTicksIsZero()) {
                 request = new StatsRequest(switchId, OFStatisticsType.DESC);
                 // If a request for this switch is already in the queue, skip to
                 // add this new request
@@ -449,7 +438,7 @@ IInventoryShimExternalListener, CommandProvider {
                 }
             }
 
-            if (clock.decrementPortTicksIsZero() == true) {
+            if (clock.decrementPortTicksIsZero()) {
                 request = new StatsRequest(switchId, OFStatisticsType.PORT);
                 // If a request for this switch is already in the queue, skip to
                 // add this new request
@@ -459,7 +448,7 @@ IInventoryShimExternalListener, CommandProvider {
                 }
             }
 
-            if(clock.decrementTableTicksIsZero() == true) {
+            if(clock.decrementTableTicksIsZero()) {
                 request = new StatsRequest(switchId, OFStatisticsType.TABLE);
                 // If a request for this switch is already in the queue, skip to
                 // add this new request
@@ -472,7 +461,7 @@ IInventoryShimExternalListener, CommandProvider {
     }
 
     private void removeStatsRequestTasks(Long switchId) {
-        log.info("Cleaning Statistics database for switch {}",
+        log.debug("Cleaning Statistics database for switch {}",
                 HexEncode.longToHexString(switchId));
         // To be safe, let's attempt removal of both VENDOR and FLOW request. It
         // does not hurt
@@ -495,54 +484,83 @@ IInventoryShimExternalListener, CommandProvider {
         statisticsTimerTicks.remove(switchId);
         removeStatsRequestTasks(switchId);
         flowStatistics.remove(switchId);
-        log.info("Statistics removed for switch {}",
+        log.debug("Statistics removed for switch {}",
                 HexString.toHexString(switchId));
     }
 
-    private void acquireStatistics(Long switchId, OFStatisticsType statType) {
+    private void queryStatisticsInternal(Long switchId, OFStatisticsType statType) {
 
         // Query the switch on all matches
-        List<OFStatistics> values = this.acquireStatistics(switchId, statType,
-                null);
+        List<OFStatistics> values = this.fetchStatisticsFromSwitch(switchId, statType, null);
 
-        // Update local caching database if got a valid response
+        // If got a valid response update local cache and notify listeners
         if (values != null && !values.isEmpty()) {
-            if ((statType == OFStatisticsType.FLOW)
-                    || (statType == OFStatisticsType.VENDOR)) {
-                flowStatistics.put(switchId, values);
-            } else if (statType == OFStatisticsType.DESC) {
-                // Notify who may be interested in a description change
-                notifyDescriptionListeners(switchId, values);
-
-                // Overwrite cache
-                descStatistics.put(switchId, values);
-            } else if (statType == OFStatisticsType.PORT) {
-                // Overwrite cache with new port statistics for this switch
-                portStatistics.put(switchId, values);
-
-                // Wake up the thread which maintains the TX byte counters for
-                // each port
-                switchPortStatsUpdated.offer(switchId);
-            } else if (statType == OFStatisticsType.TABLE) {
-                // Overwrite cache
-                tableStatistics.put(switchId, values);
+            switch (statType) {
+                case FLOW:
+                case VENDOR:
+                    flowStatistics.put(switchId, values);
+                    notifyFlowUpdate(switchId, values);
+                    break;
+                case DESC:
+                    // Overwrite cache
+                    descStatistics.put(switchId, values);
+                    // Notify who may be interested in a description change
+                    notifyDescriptionUpdate(switchId, values);
+                    break;
+                case PORT:
+                    // Overwrite cache with new port statistics for this switch
+                    portStatistics.put(switchId, values);
+
+                    // Wake up the thread which maintains the TX byte counters for
+                    // each port
+                    switchPortStatsUpdated.offer(switchId);
+                    notifyPortUpdate(switchId, values);
+                    break;
+                case TABLE:
+                    // Overwrite cache
+                    tableStatistics.put(switchId, values);
+                    notifyTableUpdate(switchId, values);
+                    break;
+                default:
             }
         }
     }
 
-    private void notifyDescriptionListeners(Long switchId,
-            List<OFStatistics> values) {
-        for (IStatisticsListener l : this.descriptionListeners) {
-            l.descriptionRefreshed(switchId,
-                    ((OFDescriptionStatistics) values.get(0)));
+    private void notifyDescriptionUpdate(Long switchId, List<OFStatistics> values) {
+        for (IOFStatisticsListener l : this.statisticsListeners) {
+            l.descriptionStatisticsRefreshed(switchId, values);
+        }
+    }
+
+    private void notifyFlowUpdate(Long switchId, List<OFStatistics> values) {
+        if (values.get(0) instanceof OFVendorStatistics) {
+            values = this.v6StatsListToOFStatsList(values);
+        }
+
+        if (!values.isEmpty()) { //possiblly filtered out by v6StatsListToOFStatsList()
+            for (IOFStatisticsListener l : this.statisticsListeners) {
+                l.flowStatisticsRefreshed(switchId, values);
+            }
+        }
+    }
+
+    private void notifyPortUpdate(Long switchId, List<OFStatistics> values) {
+        for (IOFStatisticsListener l : this.statisticsListeners) {
+            l.portStatisticsRefreshed(switchId, values);
+        }
+    }
+
+    private void notifyTableUpdate(Long switchId, List<OFStatistics> values) {
+        for (IOFStatisticsListener l : this.statisticsListeners) {
+            l.tableStatisticsRefreshed(switchId, values);
         }
     }
 
     /*
-     * Generic function to get the statistics form a OF switch
+     * Generic function to get the statistics form an OF switch
      */
     @SuppressWarnings("unchecked")
-    private List<OFStatistics> acquireStatistics(Long switchId,
+    private List<OFStatistics> fetchStatisticsFromSwitch(Long switchId,
             OFStatisticsType statsType, Object target) {
         List<OFStatistics> values = null;
         String type = null;
@@ -599,7 +617,7 @@ IInventoryShimExternalListener, CommandProvider {
                 short targetPort;
                 if (target == null) {
                     // All ports request
-                    targetPort = (short) OFPort.OFPP_NONE.getValue();
+                    targetPort = OFPort.OFPP_NONE.getValue();
                 } else if (!(target instanceof Short)) {
                     // Malformed request
                     log.warn("Invalid target type for Port stats request: {}",
@@ -617,7 +635,7 @@ IInventoryShimExternalListener, CommandProvider {
                 type = "PORT";
             } else if (statsType == OFStatisticsType.QUEUE) {
                 OFQueueStatisticsRequest specificReq = new OFQueueStatisticsRequest();
-                specificReq.setPortNumber((short) OFPort.OFPP_ALL.getValue());
+                specificReq.setPortNumber(OFPort.OFPP_ALL.getValue());
                 specificReq.setQueueId(0xffffffff);
                 req.setStatistics(Collections
                         .singletonList((OFStatistics) specificReq));
@@ -680,7 +698,7 @@ IInventoryShimExternalListener, CommandProvider {
     }
 
     @Override
-    public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch) {
+    public List<OFStatistics> getOFFlowStatistics(Long switchId, OFMatch ofMatch, short priority) {
         List<OFStatistics> statsList = flowStatistics.get(switchId);
 
         /*
@@ -705,7 +723,7 @@ IInventoryShimExternalListener, CommandProvider {
             for (OFStatistics stats : targetList) {
                 V6StatsReply v6Stats = (V6StatsReply) stats;
                 V6Match v6Match = v6Stats.getMatch();
-                if (v6Match.equals(targetMatch)) {
+                if (v6Stats.getPriority() == priority && v6Match.equals(targetMatch)) {
                     List<OFStatistics> list = new ArrayList<OFStatistics>();
                     list.add(stats);
                     return list;
@@ -714,7 +732,7 @@ IInventoryShimExternalListener, CommandProvider {
         } else {
             for (OFStatistics stats : statsList) {
                 OFFlowStatisticsReply flowStats = (OFFlowStatisticsReply) stats;
-                if (flowStats.getMatch().equals(ofMatch)) {
+                if (flowStats.getPriority() == priority && flowStats.getMatch().equals(ofMatch)) {
                     List<OFStatistics> list = new ArrayList<OFStatistics>();
                     list.add(stats);
                     return list;
@@ -800,12 +818,11 @@ IInventoryShimExternalListener, CommandProvider {
             }
         }
 
-        List<OFStatistics> list = this.acquireStatistics(switchId, statType,
+        List<OFStatistics> list = this.fetchStatisticsFromSwitch(switchId, statType,
                 target);
 
-        return (list == null) ? null
-                : (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list)
-                        : list;
+        return (list == null) ? null :
+            (statType == OFStatisticsType.VENDOR) ? v6StatsListToOFStatsList(list) : list;
     }
 
     @Override
@@ -900,8 +917,7 @@ IInventoryShimExternalListener, CommandProvider {
      * @param switchId
      */
     private synchronized void updatePortsTxRate(long switchId) {
-        List<OFStatistics> newPortStatistics = this.portStatistics
-                .get(switchId);
+        List<OFStatistics> newPortStatistics = this.portStatistics.get(switchId);
         if (newPortStatistics == null) {
             return;
         }
@@ -954,7 +970,7 @@ IInventoryShimExternalListener, CommandProvider {
         help.append("---OF Statistics Manager utilities---\n");
         help.append("\t ofdumpstatsmgr         - "
                 + "Print Internal Stats Mgr db\n");
-        help.append("\t ofstatsmgrintervals <fP> <pP> <dP>(in seconds) - "
+        help.append("\t ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds) - "
                 + "Set/Show flow/port/dedscription stats poll intervals\n");
         return help.toString();
     }
@@ -1087,9 +1103,9 @@ IInventoryShimExternalListener, CommandProvider {
 
         if (flowStatsInterv == null || portStatsInterv == null
                 || descStatsInterv == null) {
-            ci.println("Usage: ostatsmgrintervals <fP> <pP> <dP>(in seconds)");
-            ci.println("Current Values: fP=" + statisticsTickNumber + "s pP="
-                    + portTickNumber + "s dP=" + descriptionTickNumber + "s");
+            ci.println("Usage: ofstatsmgrintervals <fP> <pP> <dP> <tP> (all in seconds)");
+            ci.println("Current Values: fP=" + statisticsTickNumber + "sec pP="
+                    + portTickNumber + "sec dP=" + descriptionTickNumber + "sec tP=" + tableTickNumber + " sec");
             return;
         }
         Short fP, pP, dP, tP;
index 20d13b7a3dc9325244ff05f5f586cd1b8e31e1d9..585e4f32c46d83b1edbe758ad2b635655639dcc8 100644 (file)
@@ -11,12 +11,12 @@ package org.opendaylight.controller.protocol_plugin.openflow.internal;
 
 import java.util.Dictionary;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.apache.felix.dm.Component;
-import org.opendaylight.controller.protocol_plugin.openflow.IPluginReadServiceFilter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.opendaylight.controller.protocol_plugin.openflow.IReadFilterInternalListener;
+import org.opendaylight.controller.protocol_plugin.openflow.IReadServiceFilter;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.Node.NodeIDType;
 import org.opendaylight.controller.sal.core.NodeConnector;
@@ -24,20 +24,21 @@ import org.opendaylight.controller.sal.core.NodeTable;
 import org.opendaylight.controller.sal.flowprogrammer.Flow;
 import org.opendaylight.controller.sal.reader.FlowOnNode;
 import org.opendaylight.controller.sal.reader.IPluginInReadService;
+import org.opendaylight.controller.sal.reader.IPluginOutReadService;
 import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
 import org.opendaylight.controller.sal.reader.NodeDescription;
 import org.opendaylight.controller.sal.reader.NodeTableStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Container Instance of IPluginInReadService implementation class
- *
- *
- *
  */
-public class ReadService implements IPluginInReadService {
+public class ReadService implements IPluginInReadService, IReadFilterInternalListener {
     private static final Logger logger = LoggerFactory
             .getLogger(ReadService.class);
-    private IPluginReadServiceFilter filter;
+    private IReadServiceFilter filter;
+    private Set<IPluginOutReadService> pluginOutReadServices;
     private String containerName;
 
     /**
@@ -50,6 +51,7 @@ public class ReadService implements IPluginInReadService {
         Dictionary<Object, Object> props = c.getServiceProperties();
         containerName = (props != null) ? (String) props.get("containerName")
                 : null;
+        pluginOutReadServices = new CopyOnWriteArraySet<IPluginOutReadService>();
     }
 
     /**
@@ -79,14 +81,28 @@ public class ReadService implements IPluginInReadService {
     void stop() {
     }
 
-    public void setService(IPluginReadServiceFilter filter) {
+    public void setService(IReadServiceFilter filter) {
         this.filter = filter;
     }
 
-    public void unsetService(IPluginReadServiceFilter filter) {
+    public void unsetService(IReadServiceFilter filter) {
         this.filter = null;
     }
 
+    public void setPluginOutReadServices(IPluginOutReadService service) {
+        logger.trace("Got a service set request {}", service);
+        if (this.pluginOutReadServices != null) {
+            this.pluginOutReadServices.add(service);
+        }
+    }
+
+    public void unsetPluginOutReadServices(
+            IPluginOutReadService service) {
+        logger.trace("Got a service UNset request");
+        if (this.pluginOutReadServices != null) {
+            this.pluginOutReadServices.remove(service);
+        }
+    }
     @Override
     public FlowOnNode readFlow(Node node, Flow flow, boolean cached) {
         if (!node.getType().equals(NodeIDType.OPENFLOW)) {
@@ -168,4 +184,32 @@ public class ReadService implements IPluginInReadService {
 
         return filter.readAllNodeTable(containerName, node, cached);
     }
+
+    @Override
+    public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList) {
+        for (IPluginOutReadService service : pluginOutReadServices) {
+            service.nodeFlowStatisticsUpdated(node, flowStatsList);
+        }
+    }
+
+    @Override
+    public void nodeConnectorStatisticsUpdated(Node node, List<NodeConnectorStatistics> ncStatsList) {
+        for (IPluginOutReadService service : pluginOutReadServices) {
+            service.nodeConnectorStatisticsUpdated(node, ncStatsList);
+        }
+    }
+
+    @Override
+    public void nodeTableStatisticsUpdated(Node node, List<NodeTableStatistics> tableStatsList) {
+        for (IPluginOutReadService service : pluginOutReadServices) {
+            service.nodeTableStatisticsUpdated(node, tableStatsList);
+        }
+    }
+
+    @Override
+    public void nodeDescriptionStatisticsUpdated(Node node, NodeDescription nodeDescription) {
+        for (IPluginOutReadService service : pluginOutReadServices) {
+            service.descriptionStatisticsUpdated(node, nodeDescription);
+        }
+    }
 }
index 22e8a4dc74eb897847c0d2835f9060f3f506ea01..dbcd5c8f37d23b3f46abdbb5b4adad3e9392a15b 100644 (file)
@@ -15,17 +15,15 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 
+import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsListener;
 import org.opendaylight.controller.protocol_plugin.openflow.IOFStatisticsManager;
-import org.opendaylight.controller.protocol_plugin.openflow.IPluginReadServiceFilter;
+import org.opendaylight.controller.protocol_plugin.openflow.IReadFilterInternalListener;
+import org.opendaylight.controller.protocol_plugin.openflow.IReadServiceFilter;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
-import org.openflow.protocol.OFMatch;
-import org.openflow.protocol.statistics.OFPortStatisticsReply;
-import org.openflow.protocol.statistics.OFStatistics;
-import org.openflow.protocol.statistics.OFStatisticsType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.opendaylight.controller.sal.action.Action;
 import org.opendaylight.controller.sal.action.ActionType;
 import org.opendaylight.controller.sal.action.Output;
@@ -46,7 +44,14 @@ import org.opendaylight.controller.sal.utils.GlobalConstants;
 import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
 import org.opendaylight.controller.sal.utils.NodeCreator;
 import org.opendaylight.controller.sal.utils.NodeTableCreator;
+import org.openflow.protocol.OFMatch;
+import org.openflow.protocol.statistics.OFFlowStatisticsReply;
+import org.openflow.protocol.statistics.OFPortStatisticsReply;
+import org.openflow.protocol.statistics.OFStatistics;
+import org.openflow.protocol.statistics.OFStatisticsType;
 import org.openflow.protocol.statistics.OFTableStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 /**
  * Read Service shim layer which is in charge of filtering the flow statistics
  * based on container. It is a Global instance.
@@ -54,14 +59,15 @@ import org.openflow.protocol.statistics.OFTableStatistics;
  *
  *
  */
-public class ReadServiceFilter implements IPluginReadServiceFilter,
-        IContainerListener {
+public class ReadServiceFilter implements IReadServiceFilter, IContainerListener, IOFStatisticsListener {
     private static final Logger logger = LoggerFactory
             .getLogger(ReadServiceFilter.class);
     private IController controller = null;
     private IOFStatisticsManager statsMgr = null;
     private Map<String, Set<NodeConnector>> containerToNc;
+    private Map<String, Set<Node>> containerToNode;
     private Map<String, Set<NodeTable>> containerToNt;
+    private ConcurrentMap<String, IReadFilterInternalListener> readFilterInternalListeners;
 
     public void setController(IController core) {
         this.controller = core;
@@ -73,6 +79,39 @@ public class ReadServiceFilter implements IPluginReadServiceFilter,
         }
     }
 
+    public void setReadFilterInternalListener(Map<?, ?> props, IReadFilterInternalListener s) {
+        if (props == null) {
+            logger.error("Failed setting Read Filter Listener, property map is null.");
+            return;
+        }
+        String containerName = (String) props.get("containerName");
+        if (containerName == null) {
+            logger.error("Failed setting Read Filter Listener, container name not supplied.");
+            return;
+        }
+        if ((this.readFilterInternalListeners != null) && !this.readFilterInternalListeners.containsValue(s)) {
+            this.readFilterInternalListeners.put(containerName, s);
+            logger.trace("Added Read Filter Listener for container {}", containerName);
+        }
+    }
+
+    public void unsetReadFilterInternalListener(Map<?, ?> props, IReadFilterInternalListener s) {
+        if (props == null) {
+            logger.error("Failed unsetting Read Filter Listener, property map is null.");
+            return;
+        }
+        String containerName = (String) props.get("containerName");
+        if (containerName == null) {
+            logger.error("Failed unsetting Read Filter Listener, containerName not supplied");
+            return;
+        }
+        if ((this.readFilterInternalListeners != null) && this.readFilterInternalListeners.get(containerName) != null
+                && this.readFilterInternalListeners.get(containerName).equals(s)) {
+            this.readFilterInternalListeners.remove(containerName);
+            logger.trace("Removed Read Filter Listener for container {}", containerName);
+        }
+    }
+
     /**
      * Function called by the dependency manager when all the required
      * dependencies are satisfied
@@ -81,6 +120,8 @@ public class ReadServiceFilter implements IPluginReadServiceFilter,
     void init() {
         containerToNc = new HashMap<String, Set<NodeConnector>>();
         containerToNt = new HashMap<String, Set<NodeTable>>();
+        containerToNode = new HashMap<String, Set<Node>>();
+        readFilterInternalListeners = new ConcurrentHashMap<String, IReadFilterInternalListener>();
     }
 
     /**
@@ -131,9 +172,20 @@ public class ReadServiceFilter implements IPluginReadServiceFilter,
 
         long sid = (Long) node.getID();
         OFMatch ofMatch = new FlowConverter(flow).getOFMatch();
-        List<OFStatistics> ofList = (cached == true) ? statsMgr
-                .getOFFlowStatistics(sid, ofMatch) : statsMgr.queryStatistics(
-                sid, OFStatisticsType.FLOW, ofMatch);
+        List<OFStatistics> ofList;
+        if (cached == true){
+            ofList = statsMgr.getOFFlowStatistics(sid, ofMatch, flow.getPriority());
+        } else {
+            ofList = statsMgr.queryStatistics(sid, OFStatisticsType.FLOW, ofMatch);
+            for (OFStatistics ofStat : ofList) {
+                if (((OFFlowStatisticsReply)ofStat).getPriority() == flow.getPriority()){
+                    ofList = new ArrayList<OFStatistics>(1);
+                    ofList.add(ofStat);
+                    break;
+                }
+            }
+        }
+
 
         /*
          * Convert and filter the statistics per container
@@ -212,7 +264,7 @@ public class ReadServiceFilter implements IPluginReadServiceFilter,
     }
 
     /**
-     * Filters a list of FlowOnNode elements based on the container
+     * Filters a list of OFStatistics elements based on the container
      *
      * @param container
      * @param nodeId
@@ -333,8 +385,7 @@ public class ReadServiceFilter implements IPluginReadServiceFilter,
      * @param flow
      * @return
      */
-    private boolean flowVlanBelongsToContainer(String container, Node node,
-            Flow flow) {
+    private boolean flowVlanBelongsToContainer(String container, Node node, Flow flow) {
         return true; // Always true for now
     }
 
@@ -363,7 +414,7 @@ public class ReadServiceFilter implements IPluginReadServiceFilter,
         // If an outgoing port is specified, it must belong to this container
         for (Action action : flow.getActions()) {
             if (action.getType() == ActionType.OUTPUT) {
-                NodeConnector outPort = (NodeConnector) ((Output) action)
+                NodeConnector outPort = ((Output) action)
                         .getPort();
                 if (!containerOwnsNodeConnector(container, outPort)) {
                     return false;
@@ -374,49 +425,69 @@ public class ReadServiceFilter implements IPluginReadServiceFilter,
     }
 
     @Override
-    public void containerFlowUpdated(String containerName,
-            ContainerFlow previousFlow, ContainerFlow currentFlow, UpdateType t) {
-
+    public void containerFlowUpdated(String containerName, ContainerFlow previousFlow,
+            ContainerFlow currentFlow, UpdateType t) {
+        // TODO
     }
 
     @Override
     public void nodeConnectorUpdated(String containerName, NodeConnector p,
             UpdateType type) {
-        Set<NodeConnector> target = null;
 
         switch (type) {
         case ADDED:
             if (!containerToNc.containsKey(containerName)) {
-                containerToNc.put(containerName, new HashSet<NodeConnector>());
+                containerToNc.put(containerName, new ConcurrentSkipListSet<NodeConnector>());
             }
             containerToNc.get(containerName).add(p);
-            break;
-        case CHANGED:
+            if (!containerToNode.containsKey(containerName)) {
+                containerToNode.put(containerName, new HashSet<Node>());
+            }
+            containerToNode.get(containerName).add(p.getNode());
             break;
         case REMOVED:
-            target = containerToNc.get(containerName);
-            if (target != null) {
-                target.remove(p);
+            Set<NodeConnector> ncSet = containerToNc.get(containerName);
+            if (ncSet != null) {
+                //remove this nc from container map
+                ncSet.remove(p);
+
+                //check if there are still ports of this node in this container
+                //and if not, remove its mapping
+                boolean nodeInContainer = false;
+                Node node = p.getNode();
+                for (NodeConnector nodeConnector : ncSet) {
+                    if (nodeConnector.getNode().equals(node)){
+                        nodeInContainer = true;
+                        break;
+                    }
+                }
+                if (! nodeInContainer) {
+                    Set<Node> nodeSet = containerToNode.get(containerName);
+                    if (nodeSet != null) {
+                        nodeSet.remove(node);
+                    }
+                }
+
             }
             break;
+        case CHANGED:
         default:
         }
     }
 
     @Override
-    public void tagUpdated(String containerName, Node n, short oldTag,
-            short newTag, UpdateType t) {
+    public void tagUpdated(String containerName, Node n, short oldTag, short newTag, UpdateType t) {
         // Not interested in this event
     }
 
     @Override
     public void containerModeUpdated(UpdateType t) {
-        // do nothing
+        // Not interested in this event
     }
 
     @Override
-    public NodeConnectorStatistics readNodeConnector(String containerName,
-            NodeConnector connector, boolean cached) {
+    public NodeConnectorStatistics readNodeConnector(
+            String containerName, NodeConnector connector, boolean cached) {
         if (!containerOwnsNodeConnector(containerName, connector)) {
             return null;
         }
@@ -470,30 +541,93 @@ public class ReadServiceFilter implements IPluginReadServiceFilter,
         Node node = table.getNode();
         long sid = (Long) node.getID();
         Byte tableId = (Byte) table.getID();
-        List<OFStatistics> ofList = (cached == true) ? statsMgr
-                .getOFTableStatistics(sid, tableId) : statsMgr.queryStatistics(
-                        sid, OFStatisticsType.TABLE, tableId);
+        List<OFStatistics> ofList = (cached == true) ? statsMgr.getOFTableStatistics(sid, tableId) :
+            statsMgr.queryStatistics(sid, OFStatisticsType.TABLE, tableId);
 
-                List<NodeTableStatistics> ntStatistics = new TableStatisticsConverter(
-                        sid, ofList).getNodeTableStatsList();
+        List<NodeTableStatistics> ntStatistics =
+                new TableStatisticsConverter(sid, ofList).getNodeTableStatsList();
 
-                return (ntStatistics.isEmpty()) ? new NodeTableStatistics()
-                : ntStatistics.get(0);
+        return (ntStatistics.isEmpty()) ? new NodeTableStatistics() : ntStatistics.get(0);
     }
 
     @Override
-    public List<NodeTableStatistics> readAllNodeTable(String containerName,
-            Node node, boolean cached) {
+    public List<NodeTableStatistics> readAllNodeTable(String containerName, Node node, boolean cached) {
         long sid = (Long) node.getID();
-        List<OFStatistics> ofList = (cached == true) ? statsMgr
-                .getOFTableStatistics(sid) : statsMgr.queryStatistics(sid,
-                        OFStatisticsType.FLOW, null);
+        List<OFStatistics> ofList = (cached == true) ?
+                statsMgr.getOFTableStatistics(sid) : statsMgr.queryStatistics(sid, OFStatisticsType.FLOW, null);
 
-                List<OFStatistics> filteredList = filterTableListPerContainer(
-                        containerName, sid, ofList);
+        List<OFStatistics> filteredList = filterTableListPerContainer(containerName, sid, ofList);
 
-                return new TableStatisticsConverter(sid, filteredList)
-                .getNodeTableStatsList();
+        return new TableStatisticsConverter(sid, filteredList).getNodeTableStatsList();
     }
 
+    @Override
+    public void descriptionStatisticsRefreshed(Long switchId, List<OFStatistics> description) {
+        String container;
+        Node node = NodeCreator.createOFNode(switchId);
+        NodeDescription nodeDescription = new DescStatisticsConverter(description).getHwDescription();
+        for (Map.Entry<String, IReadFilterInternalListener> l : readFilterInternalListeners.entrySet()) {
+            container = l.getKey();
+            if (container == GlobalConstants.DEFAULT.toString()
+                    || (containerToNode.containsKey(container) && containerToNode.get(container).contains(node))) {
+                l.getValue().nodeDescriptionStatisticsUpdated(node, nodeDescription);
+            }
+        }
+    }
+
+    @Override
+    public void flowStatisticsRefreshed(Long switchId, List<OFStatistics> flows) {
+        String container;
+        Node node = NodeCreator.createOFNode(switchId);
+        for (Map.Entry<String, IReadFilterInternalListener> l : readFilterInternalListeners.entrySet()) {
+            container = l.getKey();
+
+            // Convert and filter the statistics per container
+            List<FlowOnNode> flowOnNodeList = new FlowStatisticsConverter(flows).getFlowOnNodeList(node);
+            flowOnNodeList = filterFlowListPerContainer(container, node, flowOnNodeList);
+
+            // notify listeners
+            if (!flowOnNodeList.isEmpty()) {
+                l.getValue().nodeFlowStatisticsUpdated(node, flowOnNodeList);
+            }
+        }
+    }
+
+    @Override
+    public void portStatisticsRefreshed(Long switchId, List<OFStatistics> ports) {
+        String container;
+        Node node = NodeCreator.createOFNode(switchId);
+        for (Map.Entry<String, IReadFilterInternalListener> l : readFilterInternalListeners.entrySet()) {
+            container = l.getKey();
+
+            // Convert and filter the statistics per container
+            List<OFStatistics> filteredPorts = filterPortListPerContainer(container, switchId, ports);
+            List<NodeConnectorStatistics> ncStatsList = new PortStatisticsConverter(switchId, filteredPorts)
+                    .getNodeConnectorStatsList();
+
+            // notify listeners
+            if (!ncStatsList.isEmpty()) {
+                l.getValue().nodeConnectorStatisticsUpdated(node, ncStatsList);
+            }
+        }
+    }
+
+    @Override
+    public void tableStatisticsRefreshed(Long switchId, List<OFStatistics> tables) {
+        String container;
+        Node node = NodeCreator.createOFNode(switchId);
+        for (Map.Entry<String, IReadFilterInternalListener> l : readFilterInternalListeners.entrySet()) {
+            container = l.getKey();
+
+            // Convert and filter the statistics per container
+            List<OFStatistics> filteredList = filterTableListPerContainer(container, switchId, tables);
+            List<NodeTableStatistics> tableStatsList = new TableStatisticsConverter(switchId, filteredList)
+                    .getNodeTableStatsList();
+
+            // notify listeners
+            if (!tableStatsList.isEmpty()) {
+                l.getValue().nodeTableStatisticsUpdated(node, tableStatsList);
+            }
+        }
+    }
 }
index 4f7bf4889fd6b6d1c59a07e9d06f384a3ae3f54b..8d2ca8704a6b46aee8a6f05771d4392445777241 100644 (file)
@@ -21,8 +21,12 @@ import org.openflow.protocol.OFError.OFHelloFailedCode;
 import org.openflow.protocol.OFError.OFPortModFailedCode;
 import org.openflow.protocol.OFError.OFQueueOpFailedCode;
 
-public abstract class Utils {
-    public static String getOFErrorString(OFError error) {
+public final class Utils {
+
+    private Utils() { //prevent instantiation
+        throw new AssertionError();
+    }
+    static String getOFErrorString(OFError error) {
         // Handle VENDOR extension errors here
         if (error.getErrorType() == V6Error.NICIRA_VENDOR_ERRORTYPE) {
             V6Error er = new V6Error(error);
index b580021b690f996a35b396423bc337aed8752752..82c0e593b3fc00ae31c7939b296ffb9acb01bdf2 100644 (file)
@@ -3,14 +3,8 @@ package org.opendaylight.controller.protocol_plugins.stub.internal;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.Dictionary;
 import java.util.List;
 
-import org.apache.felix.dm.Component;
-//import org.opendaylight.controller.protocol_plugin_stubs.IPluginReadServiceFilter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.opendaylight.controller.sal.action.Action;
 import org.opendaylight.controller.sal.action.Controller;
 import org.opendaylight.controller.sal.action.Drop;
@@ -45,6 +39,8 @@ import org.opendaylight.controller.sal.reader.IPluginInReadService;
 import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
 import org.opendaylight.controller.sal.reader.NodeDescription;
 import org.opendaylight.controller.sal.reader.NodeTableStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 /**
  * Stub Implementation for IPluginInReadService used by SAL
  *
@@ -104,7 +100,7 @@ public class ReadService implements IPluginInReadService {
 
         ArrayList<FlowOnNode> list = new ArrayList<FlowOnNode>();
         ArrayList<Action> actionList = new ArrayList<Action>();
-        actionList.add(new Drop());
+        actionList.add(new Drop()); //IT assumes this is first element
         actionList.add(new Loopback());
         actionList.add(new Flood());
         actionList.add(new FloodAll());
@@ -139,6 +135,7 @@ public class ReadService implements IPluginInReadService {
         actionList.add(new SetTpSrc(4201));
         actionList.add(new SetTpDst(8080));
 
+        short priority = 3500; //IT assumes this value
         for (Action a : actionList) {
             Flow flow = new Flow();
             Match match = new Match();
@@ -152,7 +149,7 @@ public class ReadService implements IPluginInReadService {
             List<Action> actions = new ArrayList<Action>();
             actions.add(a);
             flow.setActions(actions);
-            flow.setPriority((short) 3500);
+            flow.setPriority(priority++);
             flow.setIdleTimeout((short) 1000);
             flow.setHardTimeout((short) 2000);
             flow.setId(12345);
index 6466177ecdce792d245d59b467f4af980e768662..163d04b3ffc253ed028d25b8a8d70686c82b325e 100644 (file)
@@ -9,6 +9,8 @@
 
 package org.opendaylight.controller.sal.reader;
 
+import java.io.Serializable;
+
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
@@ -23,7 +25,9 @@ import org.opendaylight.controller.sal.flowprogrammer.Flow;
 
 @XmlRootElement (name="FlowStat")
 @XmlAccessorType(XmlAccessType.NONE)
-public class FlowOnNode {
+public class FlowOnNode implements Serializable{
+    private static final long serialVersionUID = 1L;
+
     @XmlElement
     private Flow flow; // Flow descriptor
     @XmlElement
diff --git a/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/reader/IPluginOutReadService.java b/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/reader/IPluginOutReadService.java
new file mode 100644 (file)
index 0000000..3c1b6f2
--- /dev/null
@@ -0,0 +1,50 @@
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.reader;
+
+import java.util.List;
+
+import org.opendaylight.controller.sal.core.Node;
+
+/**
+ * @file   IPluginOutReadService.java
+ *
+ * @brief  Hardware statistics updates service to be offered by protocol plugins
+ */
+public interface IPluginOutReadService {
+
+    /**
+     * Notifies the hardware view of all the flow installed on the specified network node
+     * @param node
+     * @return
+     */
+    public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList);
+
+    /**
+     * Notifies the hardware view of the specified network node connector
+     * @param node
+     * @return
+     */
+    public void nodeConnectorStatisticsUpdated(Node node, List<NodeConnectorStatistics> ncStatsList);
+
+    /**
+     * Notifies all the table statistics for a node
+     * @param node
+     * @return
+     */
+    public void nodeTableStatisticsUpdated(Node node, List<NodeTableStatistics> tableStatsList);
+    /**
+     * Notifies the hardware view of node description changes
+     * @param node
+     * @return
+     */
+    public void descriptionStatisticsUpdated(Node node, NodeDescription nodeDescription );
+
+}
diff --git a/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/reader/IReadServiceListener.java b/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/reader/IReadServiceListener.java
new file mode 100644 (file)
index 0000000..2cf237b
--- /dev/null
@@ -0,0 +1,11 @@
+package org.opendaylight.controller.sal.reader;
+
+
+/**
+ * @file   IReadServiceListener.java
+ *
+ * @brief  SAL service to be consumed by functional modules that are interested in reader updates
+ */
+public interface IReadServiceListener extends IPluginOutReadService {
+
+}
index 49018d83edde915d6334fa39c5ad9a5e97ae872f..848b19ee0f0d98070c9571b96a4434741aaef735 100644 (file)
@@ -1,4 +1,3 @@
-
 /*
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
  *
@@ -9,6 +8,8 @@
 
 package org.opendaylight.controller.sal.reader;
 
+import java.io.Serializable;
+
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
@@ -18,46 +19,47 @@ import org.opendaylight.controller.sal.core.NodeConnector;
 
 /**
  * Represents the statistics for the node conenctor
- *
- *
- *
  */
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.NONE)
-public class NodeConnectorStatistics {
-        @XmlElement
+public class NodeConnectorStatistics implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    @XmlElement
     private NodeConnector nodeConnector;
-        @XmlElement
+    @XmlElement
     private long receivePackets;
-        @XmlElement
+    @XmlElement
     private long transmitPackets;
-        @XmlElement
+    @XmlElement
     private long receiveBytes;
-        @XmlElement
+    @XmlElement
     private long transmitBytes;
-        @XmlElement
+    @XmlElement
     private long receiveDrops;
-        @XmlElement
+    @XmlElement
     private long transmitDrops;
-        @XmlElement
+    @XmlElement
     private long receiveErrors;
-        @XmlElement
+    @XmlElement
     private long transmitErrors;
-        @XmlElement
+    @XmlElement
     private long receiveFrameError;
-        @XmlElement
+    @XmlElement
     private long receiveOverRunError;
-        @XmlElement
+    @XmlElement
     private long receiveCrcError;
-        @XmlElement
+    @XmlElement
     private long collisionCount;
 
-        //To Satisfy JAXB
-        public NodeConnectorStatistics() {
+    // To Satisfy JAXB
+    public NodeConnectorStatistics() {
+
+    }
 
-        }
     /**
      * Set the node connector
+     *
      * @param port
      */
     public void setNodeConnector(NodeConnector port) {
@@ -66,6 +68,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Returns the node connector
+     *
      * @return
      */
     public NodeConnector getNodeConnector() {
@@ -74,6 +77,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Set the rx packet count's value
+     *
      * @param count
      */
     public void setReceivePacketCount(long count) {
@@ -82,6 +86,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Returns the rx packet count for the port
+     *
      * @return
      */
     public long getReceivePacketCount() {
@@ -90,6 +95,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Set the tx packet count's value
+     *
      * @param count
      */
     public void setTransmitPacketCount(long count) {
@@ -98,6 +104,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Returns the tx packet count for the port
+     *
      * @return
      */
     public long getTransmitPacketCount() {
@@ -106,6 +113,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Set the rx byte count's value
+     *
      * @param count
      */
     public void setReceiveByteCount(long count) {
@@ -114,6 +122,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Return the rx byte count for the port
+     *
      * @return
      */
     public long getReceiveByteCount() {
@@ -122,6 +131,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Set the tx byte count's value
+     *
      * @param count
      */
     public void setTransmitByteCount(long count) {
@@ -130,6 +140,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Return the tx byte count for the port
+     *
      * @return
      */
     public long getTransmitByteCount() {
@@ -138,6 +149,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Set the rx drop count's value
+     *
      * @param count
      */
     public void setReceiveDropCount(long count) {
@@ -146,6 +158,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Returns the rx drop count for the port
+     *
      * @return
      */
     public long getReceiveDropCount() {
@@ -154,6 +167,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Set the tx drop count's value
+     *
      * @param count
      */
     public void setTransmitDropCount(long count) {
@@ -162,6 +176,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Returns the tx drop count for the port
+     *
      * @return
      */
     public long getTransmitDropCount() {
@@ -170,6 +185,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Set the rx error count's value
+     *
      * @param count
      */
     public void setReceiveErrorCount(long count) {
@@ -178,6 +194,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Return the rx error count for the port
+     *
      * @return
      */
     public long getReceiveErrorCount() {
@@ -186,6 +203,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Set the tx error count's value
+     *
      * @param count
      */
     public void setTransmitErrorCount(long count) {
@@ -194,6 +212,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Return the tx error count for the port
+     *
      * @return
      */
     public long getTransmitErrorCount() {
@@ -202,6 +221,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Set the rx frame error value
+     *
      * @param count
      */
     public void setReceiveFrameErrorCount(long count) {
@@ -210,6 +230,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Returns the rx frame error for the port
+     *
      * @return
      */
     public long getReceiveFrameErrorCount() {
@@ -218,6 +239,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Set the rx overrun error value
+     *
      * @param count
      */
     public void setReceiveOverRunErrorCount(long count) {
@@ -226,6 +248,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Return the rx overrun error for the port
+     *
      * @return
      */
     public long getReceiveOverRunErrorCount() {
@@ -234,6 +257,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Set the rx CRC Error value
+     *
      * @param count
      */
     public void setReceiveCRCErrorCount(long count) {
@@ -242,6 +266,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Return the rx CRC error for the port
+     *
      * @return
      */
     public long getReceiveCRCErrorCount() {
@@ -250,6 +275,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Set the collisionCount count's value
+     *
      * @param count
      */
     public void setCollisionCount(long count) {
@@ -258,6 +284,7 @@ public class NodeConnectorStatistics {
 
     /**
      * Return the collisionCount count for the port
+     *
      * @return
      */
     public long getCollisionCount() {
@@ -267,16 +294,16 @@ public class NodeConnectorStatistics {
     @Override
     public String toString() {
         return "NodeConnectorStats[portNumber = " + nodeConnector
-                + ", receivePackets = " + receivePackets
-                + ", transmitPackets = " + transmitPackets
-                + ", receiveBytes = " + receiveBytes + ", transmitBytes = "
-                + transmitBytes + ", receiveDrops = " + receiveDrops
-                + ", transmitDrops = " + transmitDrops + ", receiveErrors = "
-                + receiveErrors + ", transmitErrors = " + transmitErrors
-                + ", receiveFrameError = " + receiveFrameError
-                + ", receiveOverRunError = " + receiveOverRunError
-                + ", receiveCrcError = " + receiveCrcError
-                + ", collisionCount = " + collisionCount + "]";
+            + ", receivePackets = " + receivePackets
+            + ", transmitPackets = " + transmitPackets
+            + ", receiveBytes = " + receiveBytes + ", transmitBytes = "
+            + transmitBytes + ", receiveDrops = " + receiveDrops
+            + ", transmitDrops = " + transmitDrops + ", receiveErrors = "
+            + receiveErrors + ", transmitErrors = " + transmitErrors
+            + ", receiveFrameError = " + receiveFrameError
+            + ", receiveOverRunError = " + receiveOverRunError
+            + ", receiveCrcError = " + receiveCrcError
+            + ", collisionCount = " + collisionCount + "]";
     }
 
 }
index 26c8e1dec352673efff83fcc9350bb053567338e..8f99b05a091cdf268619f821d0aaca567a3af82d 100644 (file)
@@ -9,11 +9,15 @@
 
 package org.opendaylight.controller.sal.reader;
 
+import java.io.Serializable;
+
 
 /**
  * Represents the network node description information
  */
-public class NodeDescription {
+public class NodeDescription implements Serializable, Cloneable{
+    private static final long serialVersionUID = 1L;
+
     private String manufacturer;
     private String hardware;
     private String software;
@@ -70,4 +74,15 @@ public class NodeDescription {
                         + hardware + ", software=" + software + ", serialNumber="
                         + serialNumber + ", description=" + description + "]";
     }
+    @Override
+    public NodeDescription clone() {
+        NodeDescription nd = new NodeDescription();
+        nd.setDescription(description);
+        nd.setHardware(hardware);
+        nd.setManufacturer(manufacturer);
+        nd.setSerialNumber(serialNumber);
+        nd.setSoftware(software);
+
+        return nd;
+    }
 }
index 46448bd09832d31493542b17aa9bc2f129820f1b..3b359cb812408d0112b07b5e653b6c3e567a9a5b 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.sal.reader;
 
+import java.io.Serializable;
+
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
@@ -22,7 +24,9 @@ import org.opendaylight.controller.sal.core.NodeTable;
 
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.NONE)
-public class NodeTableStatistics {
+public class NodeTableStatistics implements Serializable {
+    private static final long serialVersionUID = 1L;
+
     @XmlElement
     private NodeTable nodeTable;
     @XmlElement
index c12831dffcc61ae28632daecd43658258ae0bbaf..b6346fb7502054daa171f8ad6e61463faef8c3e5 100644 (file)
@@ -22,8 +22,8 @@ import org.slf4j.LoggerFactory;
  *
  */
 public abstract class NodeCreator {
-    protected static final Logger logger = LoggerFactory
-    .getLogger(NodeCreator.class);
+    protected static final Logger logger = LoggerFactory.getLogger(NodeCreator.class);
+
     public static Node createOFNode(Long switchId) {
         try {
             return new Node(NodeIDType.OPENFLOW, switchId);
index b56a96e50cb8acc844f3419276bc4140a1ca34a0..85d239f4b9e1c52a265e2cb685fcb64925dac3d2 100644 (file)
@@ -23,7 +23,9 @@ import org.opendaylight.controller.sal.packet.IListenDataPacket;
 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
 import org.opendaylight.controller.sal.packet.IPluginOutDataPacketService;
 import org.opendaylight.controller.sal.reader.IPluginInReadService;
+import org.opendaylight.controller.sal.reader.IPluginOutReadService;
 import org.opendaylight.controller.sal.reader.IReadService;
+import org.opendaylight.controller.sal.reader.IReadServiceListener;
 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
 import org.opendaylight.controller.sal.topology.IPluginInTopologyService;
 import org.opendaylight.controller.sal.topology.IPluginOutTopologyService;
@@ -145,14 +147,22 @@ public class Activator extends ComponentActivatorAbstractBase {
         }
 
         if (imp.equals(ReadService.class)) {
-            // It is the provider of IReadService
-            c.setInterface(IReadService.class.getName(), null);
+            // export services
+            c.setInterface(new String[] {
+                    IReadService.class.getName(),IPluginOutReadService.class.getName()}, null);
 
             // It is also the consumer of IPluginInReadService
             c.add(createContainerServiceDependency(containerName)
                     .setService(IPluginInReadService.class)
                     .setCallbacks("setService", "unsetService")
-                    .setRequired(true));
+                    .setRequired(false));
+
+            //consumes plugins' reader updates
+            c.add(createContainerServiceDependency(containerName)
+                    .setService(IReadServiceListener.class)
+                    .setCallbacks("setReaderListener", "unsetReaderListener")
+                    .setRequired(false));
+
         }
 
         /************************/
index edc91173890970b12680fa27628fe378a9ba1490..ed0394d2096aed52f93232def9137c9f4d3daf5f 100644 (file)
@@ -9,12 +9,14 @@
 
 package org.opendaylight.controller.sal.implementation.internal;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.eclipse.osgi.framework.console.CommandInterpreter;
 import org.eclipse.osgi.framework.console.CommandProvider;
@@ -25,15 +27,17 @@ import org.opendaylight.controller.sal.action.Output;
 import org.opendaylight.controller.sal.action.PopVlan;
 import org.opendaylight.controller.sal.core.ConstructionException;
 import org.opendaylight.controller.sal.core.Node;
-import org.opendaylight.controller.sal.core.NodeTable;
-import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.core.Node.NodeIDType;
+import org.opendaylight.controller.sal.core.NodeConnector;
+import org.opendaylight.controller.sal.core.NodeTable;
 import org.opendaylight.controller.sal.flowprogrammer.Flow;
 import org.opendaylight.controller.sal.match.Match;
 import org.opendaylight.controller.sal.match.MatchType;
 import org.opendaylight.controller.sal.reader.FlowOnNode;
 import org.opendaylight.controller.sal.reader.IPluginInReadService;
+import org.opendaylight.controller.sal.reader.IPluginOutReadService;
 import org.opendaylight.controller.sal.reader.IReadService;
+import org.opendaylight.controller.sal.reader.IReadServiceListener;
 import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
 import org.opendaylight.controller.sal.reader.NodeDescription;
 import org.opendaylight.controller.sal.reader.NodeTableStatistics;
@@ -49,19 +53,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The SAL Read Service. It dispatches the read request to
- * the proper SDN protocol plugin
- *
- *
- *
+ * The SAL Read Service. Dispatches read requests to the proper SDN protocol
+ * plugin, and notifies any listeners on updates from any plugin readers
  */
-public class ReadService implements IReadService, CommandProvider {
+public class ReadService implements IReadService, CommandProvider, IPluginOutReadService {
 
-    protected static final Logger logger = LoggerFactory
-            .getLogger(ReadService.class);
-    private ConcurrentHashMap<String, IPluginInReadService>
-        pluginReader =
-        new ConcurrentHashMap<String, IPluginInReadService>();
+    protected static final Logger logger = LoggerFactory.getLogger(ReadService.class);
+    private ConcurrentHashMap<String, IPluginInReadService> pluginReader;
+    private Set<IReadServiceListener> readerListeners;
 
     /**
      * Function called by the dependency manager when all the required
@@ -69,6 +68,8 @@ public class ReadService implements IReadService, CommandProvider {
      *
      */
     void init() {
+        pluginReader = new ConcurrentHashMap<String, IPluginInReadService>();
+        readerListeners = new CopyOnWriteArraySet<IReadServiceListener>();
     }
 
     /**
@@ -103,7 +104,7 @@ public class ReadService implements IReadService, CommandProvider {
     }
 
     // Set the reference to the plugin flow Reader service
-    public void setService(Map props, IPluginInReadService s) {
+    public void setService(Map<?, ?> props, IPluginInReadService s) {
         if (this.pluginReader == null) {
             logger.error("pluginReader store null");
             return;
@@ -112,7 +113,7 @@ public class ReadService implements IReadService, CommandProvider {
         logger.trace("Got a service set request {}", s);
         String type = null;
         for (Object e : props.entrySet()) {
-            Map.Entry entry = (Map.Entry) e;
+            Map.Entry<?, ?> entry = (Map.Entry<?, ?>) e;
             logger.trace("Prop key:({}) value:({})", entry.getKey(),
                     entry.getValue());
         }
@@ -130,7 +131,7 @@ public class ReadService implements IReadService, CommandProvider {
         }
     }
 
-    public void unsetService(Map props, IPluginInReadService s) {
+    public void unsetService(Map<?, ?> props, IPluginInReadService s) {
         if (this.pluginReader == null) {
             logger.error("pluginReader store null");
             return;
@@ -139,7 +140,7 @@ public class ReadService implements IReadService, CommandProvider {
         String type = null;
         logger.debug("Received unsetpluginReader request");
         for (Object e : props.entrySet()) {
-            Map.Entry entry = (Map.Entry) e;
+            Map.Entry<?, ?> entry = (Map.Entry<?, ?>) e;
             logger.trace("Prop key:({}) value:({})", entry.getKey(),
                     entry.getValue());
         }
@@ -156,6 +157,15 @@ public class ReadService implements IReadService, CommandProvider {
             logger.debug("Removed the pluginReader for type: {}", type);
         }
     }
+    public void setReaderListener(IReadServiceListener service) {
+        logger.trace("Got a listener set request {}", service);
+        this.readerListeners.add(service);
+    }
+
+    public void unsetReaderListener(IReadServiceListener service) {
+        logger.trace("Got a listener Unset request");
+        this.readerListeners.remove(service);
+    }
 
     @Override
     public FlowOnNode readFlow(Node node, Flow flow) {
@@ -165,7 +175,7 @@ public class ReadService implements IReadService, CommandProvider {
                         .readFlow(node, flow, true);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
@@ -177,7 +187,7 @@ public class ReadService implements IReadService, CommandProvider {
                         .readFlow(node, flow, false);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
@@ -189,7 +199,7 @@ public class ReadService implements IReadService, CommandProvider {
                         .readAllFlow(node, true);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
@@ -201,7 +211,7 @@ public class ReadService implements IReadService, CommandProvider {
                         .readAllFlow(node, false);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
@@ -213,7 +223,7 @@ public class ReadService implements IReadService, CommandProvider {
                         .readDescription(node, true);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
@@ -225,7 +235,7 @@ public class ReadService implements IReadService, CommandProvider {
                         .readDescription(node, false);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
@@ -238,7 +248,7 @@ public class ReadService implements IReadService, CommandProvider {
                         .readNodeConnector(connector, true);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
@@ -252,7 +262,7 @@ public class ReadService implements IReadService, CommandProvider {
                         .readNodeConnector(connector, false);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
@@ -264,7 +274,7 @@ public class ReadService implements IReadService, CommandProvider {
                         .readAllNodeConnector(node, true);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
@@ -276,7 +286,7 @@ public class ReadService implements IReadService, CommandProvider {
                         .readAllNodeTable(node, true);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
@@ -290,7 +300,7 @@ public class ReadService implements IReadService, CommandProvider {
                         .readNodeTable(table, false);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
@@ -303,7 +313,7 @@ public class ReadService implements IReadService, CommandProvider {
                         .readNodeTable(table, true);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
@@ -315,7 +325,7 @@ public class ReadService implements IReadService, CommandProvider {
                         .readAllNodeConnector(node, false);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return null;
     }
 
@@ -328,10 +338,38 @@ public class ReadService implements IReadService, CommandProvider {
                         .getTransmitRate(connector);
             }
         }
-        logger.warn("Plugin unavailable");
+        logger.warn("Plugin {} unavailable", node.getType());
         return 0;
     }
 
+    @Override
+    public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList) {
+        for (IReadServiceListener l : readerListeners){
+            l.nodeFlowStatisticsUpdated(node, flowStatsList);
+        }
+    }
+
+    @Override
+    public void nodeConnectorStatisticsUpdated(Node node, List<NodeConnectorStatistics> ncStatsList) {
+        for (IReadServiceListener l : readerListeners){
+            l.nodeConnectorStatisticsUpdated(node, ncStatsList);
+        }
+    }
+
+    @Override
+    public void nodeTableStatisticsUpdated(Node node, List<NodeTableStatistics> tableStatsList) {
+        for (IReadServiceListener l : readerListeners){
+            l.nodeTableStatisticsUpdated(node, tableStatsList);
+        }
+    }
+
+    @Override
+    public void descriptionStatisticsUpdated(Node node, NodeDescription nodeDescription) {
+        for (IReadServiceListener l : readerListeners){
+            l.descriptionStatisticsUpdated(node, nodeDescription);
+        }
+    }
+
     // ---------------- OSGI TEST CODE ------------------------------//
 
     private void registerWithOSGIConsole() {
@@ -567,5 +605,4 @@ public class ReadService implements IReadService, CommandProvider {
         actions.add(new Controller());
         return new Flow(match, actions);
     }
-
 }
index 2f517488a955ccbd7af16f05249114d9aa8e19fb..d48809664871002d32d11d7afc5ba679c4440987 100644 (file)
@@ -8,15 +8,12 @@
 
 package org.opendaylight.controller.sal.implementation.internal;
 
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.Collections;
 
 import org.opendaylight.controller.sal.core.Edge;
-import org.opendaylight.controller.sal.core.Property;
-import org.opendaylight.controller.sal.core.UpdateType;
 import org.opendaylight.controller.sal.topology.IListenTopoUpdates;
 import org.opendaylight.controller.sal.topology.IPluginInTopologyService;
 import org.opendaylight.controller.sal.topology.IPluginOutTopologyService;
index a2ab07ac7b6bc7249f44ca1c66415c731faf738c..8267e4d0299334a534a9844d759392a1e763d15f 100644 (file)
@@ -27,12 +27,15 @@ import org.opendaylight.controller.sal.reader.NodeTableStatistics;
  */
 public interface IStatisticsManager {
     /**
-     * Return all the statistics for all the flows present on the specified node in the current container context.
-     * If the context is the default container, the returned statistics are for all the flows installed on the node,
-     * regardless of the container they belong to
+     * Return all the statistics for all the flows present on the specified node
+     * in the current container context. If the context is the default
+     * container, the returned statistics are for all the flows installed on the
+     * node, regardless of the container they belong to
      *
-     * @param node  the network node
-     * @return the list of flows installed on the network node
+     * @param node
+     *            The network node
+     * @return List of flows installed on the network node. Null if specified
+     *         node is null. Empty list if node/stat is not present.
      */
     List<FlowOnNode> getFlows(Node node);
 
@@ -40,45 +43,50 @@ public interface IStatisticsManager {
      * Returns the statistics for the flows specified in the list
      *
      * @param flows
-     * @return  the list of flows installed on the network node
+     * @return A map of flows per node installed on that node, empty map if
+     *         flows is null/empty.
      */
-    Map<Node, List<FlowOnNode>> getFlowStatisticsForFlowList(
-            List<FlowEntry> flows);
+    Map<Node, List<FlowOnNode>> getFlowStatisticsForFlowList(List<FlowEntry> flows);
 
     /**
-     * Returns the number of flows installed on the switch in the current container context
-     * If the context is the default container, the returned value is the number of all the
-     * flows installed on the switch regardless of the container they belong to
+     * Returns the number of flows installed on the switch in the current
+     * container context If the context is the default container, the returned
+     * value is the number of all the flows installed on the switch regardless
+     * of the container they belong to
      *
-     * @param switchId
-     * @return
+     * @param node
+     * @return number of flows on specified node or (-1) if node was not found
      */
     int getFlowsNumber(Node node);
 
     /**
-     * Returns the node description for the specified node retrieved and cached by the
-     * protocol plugin component which collects the node statistics
+     * Returns the node description for the specified node retrieved by the
+     * protocol plugin component and cached by statistics manager.
+     * Null if node not found.
      *
      * @param node
-     * @return
+     * @return node description
      */
     NodeDescription getNodeDescription(Node node);
 
     /**
-     * Returns the statistics for the specified node connector as it was retrieved
-     * and cached by the protocol plugin component which collects the node connector statistics
+     * Returns the statistics for the specified node connector as it was
+     * retrieved by the protocol plugin component and cached by statistics
+     * manager.
      *
      * @param node
-     * @return
+     * @return Node connector statistics or null if requested stats was not
+     *         found.
      */
-    NodeConnectorStatistics getNodeConnectorStatistics(
-            NodeConnector nodeConnector);
+    NodeConnectorStatistics getNodeConnectorStatistics(NodeConnector nodeConnector);
 
     /**
-     * Returns the statistics for all the node connector present on the specified network node
+     * Returns the statistics for all the node connector present on the
+     * specified network node
      *
      * @param node
-     * @return
+     * @return List of node connector statistics. Null if node is null. Empty
+     *         list if node/stats is not present.
      */
     List<NodeConnectorStatistics> getNodeConnectorStatistics(Node node);
 
@@ -86,7 +94,7 @@ public interface IStatisticsManager {
      * Returns the statistics for the specified table of the node
      *
      * @param nodeTable
-     * @return
+     * @return Table statistics. Null if node table is null or stats not found.
      */
     NodeTableStatistics getNodeTableStatistics(NodeTable nodeTable);
 
@@ -94,7 +102,8 @@ public interface IStatisticsManager {
      * Returns the statistics for all the tables of the node
      *
      * @param nodeTable
-     * @return
+     * @return List of table stats. Null if node is null. Empty list if
+     *         node/stats not found.
      */
     List <NodeTableStatistics> getNodeTableStatistics(Node node);
 }
index 98ddfa5e82493c90a8313a0df3ea0acd597f3bab..769749302651e7841322ff4eca29829e38860a23 100644 (file)
         <configuration>
           <instructions>
             <Import-Package>
+              org.opendaylight.controller.clustering.services,
               org.opendaylight.controller.containermanager,
               org.opendaylight.controller.sal.core,
-              org.opendaylight.controller.sal.flowprogrammer, org.slf4j,
+              org.opendaylight.controller.sal.flowprogrammer,
               org.opendaylight.controller.sal.reader,
+              org.opendaylight.controller.sal.utils,
+              org.slf4j,
+              org.opendaylight.controller.sal.inventory,
+              org.opendaylight.controller.sal.match,
+              org.opendaylight.controller.switchmanager,
               org.opendaylight.controller.statisticsmanager,
               org.opendaylight.controller.forwardingrulesmanager,
               org.apache.felix.dm
index faba64a749a0a2ee64437da24212f0598be0e962..f9a07ed0f88e9da5472c92225cf17cdffbcffd6b 100644 (file)
 package org.opendaylight.controller.statisticsmanager.internal;
 
 import org.apache.felix.dm.Component;
+import org.opendaylight.controller.clustering.services.IClusterContainerServices;
 import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
+import org.opendaylight.controller.sal.core.IContainer;
+import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
 import org.opendaylight.controller.sal.reader.IReadService;
+import org.opendaylight.controller.sal.reader.IReadServiceListener;
 import org.opendaylight.controller.statisticsmanager.IStatisticsManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Activator extends ComponentActivatorAbstractBase {
-    protected static final Logger logger = LoggerFactory
-            .getLogger(Activator.class);
+    protected static final Logger logger = LoggerFactory.getLogger(Activator.class);
 
     /**
-     * Function called when the activator starts just after some
-     * initializations are done by the
-     * ComponentActivatorAbstractBase.
+     * Function called when the activator starts just after some initializations
+     * are done by the ComponentActivatorAbstractBase.
      *
      */
     public void init() {
     }
 
     /**
-     * Function called when the activator stops just before the
-     * cleanup done by ComponentActivatorAbstractBase
+     * Function called when the activator stops just before the cleanup done by
+     * ComponentActivatorAbstractBase
      *
      */
     public void destroy() {
     }
 
     /**
-     * Function that is used to communicate to dependency manager the
-     * list of known implementations for services inside a container
+     * Function that is used to communicate to dependency manager the list of
+     * known implementations for services inside a container
      *
      *
      * @return An array containing all the CLASS objects that will be
-     * instantiated in order to get an fully working implementation
-     * Object
+     *         instantiated in order to get an fully working implementation
+     *         Object
      */
     public Object[] getImplementations() {
         Object[] res = { StatisticsManager.class };
@@ -52,27 +54,35 @@ public class Activator extends ComponentActivatorAbstractBase {
     }
 
     /**
-     * Function that is called when configuration of the dependencies
-     * is required.
+     * Function that is called when configuration of the dependencies is
+     * required.
      *
-     * @param c dependency manager Component object, used for
-     * configuring the dependencies exported and imported
-     * @param imp Implementation class that is being configured,
-     * needed as long as the same routine can configure multiple
-     * implementations
-     * @param containerName The containerName being configured, this allow
-     * also optional per-container different behavior if needed, usually
-     * should not be the case though.
+     * @param c
+     *            dependency manager Component object, used for configuring the
+     *            dependencies exported and imported
+     * @param imp
+     *            Implementation class that is being configured, needed as long
+     *            as the same routine can configure multiple implementations
+     * @param containerName
+     *            The containerName being configured, this allow also optional
+     *            per-container different behavior if needed, usually should not
+     *            be the case though.
      */
     public void configureInstance(Component c, Object imp, String containerName) {
         if (imp.equals(StatisticsManager.class)) {
             // export the service
-            c.setInterface(new String[] { IStatisticsManager.class.getName() },
-                    null);
+            c.setInterface(new String[] {
+                    IStatisticsManager.class.getName(),
+                    IReadServiceListener.class.getName(),
+                    IListenInventoryUpdates.class.getName() }, null);
+
+            c.add(createContainerServiceDependency(containerName).setService(IReadService.class)
+                    .setCallbacks("setReaderService", "unsetReaderService").setRequired(true));
+            c.add(createContainerServiceDependency(containerName).setService(IClusterContainerServices.class)
+                    .setCallbacks("setClusterContainerService", "unsetClusterContainerService").setRequired(true));
+            c.add(createContainerServiceDependency(containerName).setService(IContainer.class)
+                    .setCallbacks("setIContainer", "unsetIContainer").setRequired(true));
 
-            c.add(createContainerServiceDependency(containerName).setService(
-                    IReadService.class).setCallbacks("setReaderService",
-                    "unsetReaderService").setRequired(false));
         }
     }
 }
index 9aef40c4134424a5cc955557c585e301dd279c68..f5c13b61053ee5eaaa7eab5594608cd3da4ce5d7 100644 (file)
 package org.opendaylight.controller.statisticsmanager.internal;
 
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
+import org.opendaylight.controller.clustering.services.CacheConfigException;
+import org.opendaylight.controller.clustering.services.CacheExistException;
+import org.opendaylight.controller.clustering.services.IClusterContainerServices;
+import org.opendaylight.controller.clustering.services.IClusterServices;
 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
+import org.opendaylight.controller.sal.core.IContainer;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.core.NodeTable;
+import org.opendaylight.controller.sal.core.Property;
+import org.opendaylight.controller.sal.core.UpdateType;
 import org.opendaylight.controller.sal.flowprogrammer.Flow;
+import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
 import org.opendaylight.controller.sal.reader.FlowOnNode;
 import org.opendaylight.controller.sal.reader.IReadService;
+import org.opendaylight.controller.sal.reader.IReadServiceListener;
 import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
 import org.opendaylight.controller.sal.reader.NodeDescription;
 import org.opendaylight.controller.sal.reader.NodeTableStatistics;
+import org.opendaylight.controller.sal.utils.ServiceHelper;
 import org.opendaylight.controller.statisticsmanager.IStatisticsManager;
+import org.opendaylight.controller.switchmanager.ISwitchManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The class which implements the methods for retrieving
- * the network nodes statistics.
+ * The class caches latest network nodes statistics as notified by reader
+ * services and provides API to retrieve them.
  */
-public class StatisticsManager implements IStatisticsManager {
-    private static final Logger log = LoggerFactory
-            .getLogger(StatisticsManager.class);
+public class StatisticsManager implements IStatisticsManager, IReadServiceListener, IListenInventoryUpdates {
+    private static final Logger log = LoggerFactory.getLogger(StatisticsManager.class);
+    private IContainer container;
+    private IClusterContainerServices clusterContainerService;
     private IReadService reader;
+    //statistics caches
+    private ConcurrentMap<Node, List<FlowOnNode>> flowStatistics;
+    private ConcurrentMap<Node, List<NodeConnectorStatistics>> nodeConnectorStatistics;
+    private ConcurrentMap<Node, List<NodeTableStatistics>> tableStatistics;
+    private ConcurrentMap<Node, NodeDescription> descriptionStatistics;
 
-    public StatisticsManager() {
+    private void nonClusterObjectCreate() {
+        flowStatistics = new ConcurrentHashMap<Node, List<FlowOnNode>>();
+        nodeConnectorStatistics = new ConcurrentHashMap<Node, List<NodeConnectorStatistics>>();
+        tableStatistics = new ConcurrentHashMap<Node, List<NodeTableStatistics>>();
+        descriptionStatistics = new ConcurrentHashMap<Node, NodeDescription>();
+    }
+
+    @SuppressWarnings("deprecation")
+    private void allocateCaches() {
+        if (clusterContainerService == null) {
+            nonClusterObjectCreate();
+            log.error("Clustering service unavailable. Allocated non-cluster statistics manager cache.");
+            return;
+        }
 
+        try {
+            clusterContainerService.createCache("statisticsmanager.flowStatistics",
+                    EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            clusterContainerService.createCache("statisticsmanager.nodeConnectorStatistics",
+                    EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            clusterContainerService.createCache("statisticsmanager.tableStatistics",
+                    EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+            clusterContainerService.createCache("statisticsmanager.descriptionStatistics",
+                    EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+
+        } catch (CacheConfigException cce) {
+            log.error("Statistics cache configuration invalid - check cache mode");
+        } catch (CacheExistException ce) {
+            log.debug("Skipping statistics cache creation - already present");
+        }
+    }
+    @SuppressWarnings({ "unchecked", "deprecation" })
+    private void retrieveCaches() {
+        ConcurrentMap<?, ?> map;
+
+        if (this.clusterContainerService == null) {
+            log.warn("Can't retrieve statistics manager cache, Clustering service unavailable.");
+            return;
+        }
+
+        log.debug("Statistics Manager - retrieveCaches for Container {}", container);
+
+        map = clusterContainerService.getCache("statisticsmanager.flowStatistics");
+        if (map != null) {
+            this.flowStatistics = (ConcurrentMap<Node, List<FlowOnNode>>) map;
+        } else {
+            log.error("Cache allocation failed for statisticsmanager.flowStatistics in container {}", container.getName());
+        }
+
+        map = clusterContainerService.getCache("statisticsmanager.nodeConnectorStatistics");
+        if (map != null) {
+            this.nodeConnectorStatistics = (ConcurrentMap<Node, List<NodeConnectorStatistics>>) map;
+        } else {
+            log.error("Cache allocation failed for statisticsmanager.nodeConnectorStatistics in container {}", container.getName());
+        }
+
+        map = clusterContainerService.getCache("statisticsmanager.tableStatistics");
+        if (map != null) {
+            this.tableStatistics = (ConcurrentMap<Node, List<NodeTableStatistics>>) map;
+        } else {
+            log.error("Cache allocation failed for statisticsmanager.tableStatistics in container {}", container.getName());
+        }
+
+        map = clusterContainerService.getCache("statisticsmanager.descriptionStatistics");
+        if (map != null) {
+            this.descriptionStatistics = (ConcurrentMap<Node, NodeDescription>) map;
+        } else {
+            log.error("Cache allocation failed for statisticsmanager.descriptionStatistics in container {}", container.getName());
+        }
     }
 
     /**
@@ -48,6 +138,9 @@ public class StatisticsManager implements IStatisticsManager {
      */
     void init() {
         log.debug("INIT called!");
+        allocateCaches();
+        retrieveCaches();
+
     }
 
     /**
@@ -70,6 +163,27 @@ public class StatisticsManager implements IStatisticsManager {
         log.debug("START called!");
     }
 
+    /**
+     * Function called after registering the service in OSGi service registry.
+     */
+    void started(){
+        //retrieve current statistics so we don't have to wait for next refresh
+        ISwitchManager switchManager = (ISwitchManager) ServiceHelper.getInstance(
+                ISwitchManager.class, container.getName(), this);
+        if (reader != null && switchManager != null) {
+            Set<Node> nodeSet = switchManager.getNodes();
+            for (Node node : nodeSet) {
+                flowStatistics.put(node, reader.readAllFlows(node));
+                descriptionStatistics.put(node, reader.readDescription(node));
+                tableStatistics.put(node, reader.readNodeTable(node));
+                nodeConnectorStatistics.put(node, reader.readNodeConnectors(node));
+            }
+
+        } else {
+            log.warn("Failed to retrieve current statistics. Statistics will not be immidiately available!");
+        }
+    }
+
     /**
      * Function called by the dependency manager before the services
      * exported by the component are unregistered, this will be
@@ -80,66 +194,199 @@ public class StatisticsManager implements IStatisticsManager {
         log.debug("STOP called!");
     }
 
+    void setClusterContainerService(IClusterContainerServices s) {
+        log.debug("Cluster Service set for Statistics Mgr");
+        this.clusterContainerService = s;
+    }
+
+    void unsetClusterContainerService(IClusterContainerServices s) {
+        if (this.clusterContainerService == s) {
+            log.debug("Cluster Service removed for Statistics Mgr!");
+            this.clusterContainerService = null;
+        }
+    }
+    void setIContainer(IContainer c){
+        container = c;
+    }
+    public void unsetIContainer(IContainer s) {
+        if (this.container == s) {
+            this.container = null;
+        }
+    }
+
     public void setReaderService(IReadService service) {
         log.debug("Got inventory service set request {}", service);
         this.reader = service;
     }
 
     public void unsetReaderService(IReadService service) {
-        log.debug("Got a service UNset request");
+        log.debug("Got a service UNset request {}", service);
         this.reader = null;
     }
 
     @Override
     public List<FlowOnNode> getFlows(Node node) {
-        return reader.readAllFlows(node);
+        if (node == null) {
+            return null;
+        }
+
+        List<FlowOnNode> flowList = new ArrayList<FlowOnNode>();
+        List<FlowOnNode> cachedList = flowStatistics.get(node);
+        if (cachedList != null){
+            flowList.addAll(cachedList);
+        }
+        return flowList;
     }
 
     @Override
-    public Map<Node, List<FlowOnNode>> getFlowStatisticsForFlowList(
-            List<FlowEntry> flowList) {
-        Map<Node, List<FlowOnNode>> map = new HashMap<Node, List<FlowOnNode>>();
-        if (flowList != null) {
-            for (FlowEntry entry : flowList) {
-                Node node = entry.getNode();
-                Flow flow = entry.getFlow();
-                List<FlowOnNode> list = (map.containsKey(node)) ? map.get(node)
-                        : new ArrayList<FlowOnNode>();
-                list.add(reader.readFlow(node, flow));
-                map.put(node, list);
+    public Map<Node, List<FlowOnNode>> getFlowStatisticsForFlowList(List<FlowEntry> flowList) {
+        Map<Node, List<FlowOnNode>> statMapOutput = new HashMap<Node, List<FlowOnNode>>();
+
+        if (flowList == null || flowList.isEmpty()){
+            return statMapOutput;
+        }
+
+        Node node;
+        //index FlowEntries' flows by node so we don't traverse entire flow list for each flowEntry
+        Map<Node, Set<Flow>> index = new HashMap<Node, Set<Flow>>();
+        for (FlowEntry flowEntry : flowList) {
+            node = flowEntry.getNode();
+            Set<Flow> set = (index.containsKey(node) ? index.get(node) : new HashSet<Flow>());
+            set.add(flowEntry.getFlow());
+            index.put(node, set);
+        }
+
+        //iterate over flows per indexed node and add to output
+        for (Entry<Node, Set<Flow>> indexEntry : index.entrySet()) {
+            node = indexEntry.getKey();
+            List<FlowOnNode> flowsPerNode = flowStatistics.get(node);
+
+            if (flowsPerNode != null && !flowsPerNode.isEmpty()){
+                List<FlowOnNode> filteredFlows = statMapOutput.containsKey(node) ?
+                        statMapOutput.get(node) : new ArrayList<FlowOnNode>();
+
+                for (FlowOnNode flowOnNode : flowsPerNode) {
+                    if (indexEntry.getValue().contains(flowOnNode.getFlow())) {
+                        filteredFlows.add(flowOnNode);
+                    }
+                }
+                statMapOutput.put(node, filteredFlows);
             }
         }
-        return map;
+        return statMapOutput;
     }
 
     @Override
     public int getFlowsNumber(Node node) {
-        return reader.readAllFlows(node).size();
+        List<FlowOnNode> l;
+        if (node == null || (l = flowStatistics.get(node)) == null){
+            return -1;
+        }
+        return l.size();
     }
 
     @Override
     public NodeDescription getNodeDescription(Node node) {
-        return reader.readDescription(node);
+        if (node == null){
+            return null;
+        }
+        NodeDescription nd = descriptionStatistics.get(node);
+        return nd != null? nd.clone() : null;
     }
 
     @Override
-    public NodeConnectorStatistics getNodeConnectorStatistics(
-            NodeConnector nodeConnector) {
-        return reader.readNodeConnector(nodeConnector);
+    public NodeConnectorStatistics getNodeConnectorStatistics(NodeConnector nodeConnector) {
+        if (nodeConnector == null){
+            return null;
+        }
+
+        List<NodeConnectorStatistics> statList = nodeConnectorStatistics.get(nodeConnector.getNode());
+        if (statList != null){
+            for (NodeConnectorStatistics stat : statList) {
+                if (stat.getNodeConnector().equals(nodeConnector)){
+                    return stat;
+                }
+            }
+        }
+        return null;
     }
 
     @Override
     public List<NodeConnectorStatistics> getNodeConnectorStatistics(Node node) {
-        return reader.readNodeConnectors(node);
+        if (node == null){
+            return null;
+        }
+
+        List<NodeConnectorStatistics> statList = new ArrayList<NodeConnectorStatistics>();
+        List<NodeConnectorStatistics> cachedList = nodeConnectorStatistics.get(node);
+        if (cachedList != null) {
+            statList.addAll(cachedList);
+        }
+        return statList;
     }
 
     @Override
     public NodeTableStatistics getNodeTableStatistics(NodeTable nodeTable) {
-        return reader.readNodeTable(nodeTable);
+        if (nodeTable == null){
+            return null;
+        }
+        List<NodeTableStatistics> statList = tableStatistics.get(nodeTable.getNode());
+        if (statList != null){
+            for (NodeTableStatistics stat : statList) {
+                if (stat.getNodeTable().getID().equals(nodeTable.getID())){
+                    return stat;
+                }
+            }
+        }
+        return null;
     }
 
     @Override
     public List<NodeTableStatistics> getNodeTableStatistics(Node node){
-        return reader.readNodeTable(node);
+        if (node == null){
+            return null;
+        }
+        List<NodeTableStatistics> statList = new ArrayList<NodeTableStatistics>();
+        List<NodeTableStatistics> cachedList = tableStatistics.get(node);
+        if (cachedList != null) {
+            statList.addAll(cachedList);
+        }
+        return statList;
+    }
+
+    @Override
+    public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList) {
+        this.flowStatistics.put(node, flowStatsList);
+    }
+
+    @Override
+    public void nodeConnectorStatisticsUpdated(Node node, List<NodeConnectorStatistics> ncStatsList) {
+        this.nodeConnectorStatistics.put(node, ncStatsList);
+    }
+
+    @Override
+    public void nodeTableStatisticsUpdated(Node node, List<NodeTableStatistics> tableStatsList) {
+        this.tableStatistics.put(node, tableStatsList);
+    }
+
+    @Override
+    public void descriptionStatisticsUpdated(Node node, NodeDescription nodeDescription) {
+        this.descriptionStatistics.put(node, nodeDescription);
+    }
+
+    @Override
+    public void updateNode(Node node, UpdateType type, Set<Property> props) {
+        //if node is removed, remove stats mappings
+        if (type == UpdateType.REMOVED) {
+            flowStatistics.remove(node);
+            nodeConnectorStatistics.remove(node);
+            tableStatistics.remove(node);
+            descriptionStatistics.remove(node);
+        }
+    }
+
+    @Override
+    public void updateNodeConnector(NodeConnector nodeConnector, UpdateType type, Set<Property> props) {
+        // not interested in this update
     }
 }
index 8fcad060b72e818b355447ccf4140df8c17e889d..f6e84ca78b3e0d7e79d90d5cd7a5615a25ffb0c9 100644 (file)
       <artifactId>statisticsmanager.implementation</artifactId>
       <version>0.4.0-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>switchmanager.implementation</artifactId>
+      <version>0.4.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>switchmanager</artifactId>
+      <version>0.4.0-SNAPSHOT</version>
+    </dependency>
+
       <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>configuration</artifactId>
index 64d752dd3168727497f4588ae5921bc2a9effc84..00629a43b11470f66b6b1f2045e4ae3d3aabbf76 100644 (file)
@@ -1,20 +1,25 @@
 package org.opendaylight.controller.statisticsmanager.internal;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.ops4j.pax.exam.CoreOptions.junitBundles;
+import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
+import static org.ops4j.pax.exam.CoreOptions.options;
+import static org.ops4j.pax.exam.CoreOptions.systemPackages;
+import static org.ops4j.pax.exam.CoreOptions.systemProperty;
+
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.osgi.framework.ServiceReference;
-import org.osgi.framework.Bundle;
 import javax.inject.Inject;
 
 import org.junit.Assert;
-import org.junit.Test;
 import org.junit.Before;
+import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.opendaylight.controller.forwardingrulesmanager.FlowEntry;
 import org.opendaylight.controller.sal.action.Action;
@@ -28,16 +33,16 @@ import org.opendaylight.controller.sal.match.MatchType;
 import org.opendaylight.controller.sal.reader.FlowOnNode;
 import org.opendaylight.controller.sal.reader.NodeConnectorStatistics;
 import org.opendaylight.controller.sal.reader.NodeDescription;
-import org.opendaylight.controller.sal.utils.NodeCreator;
-import org.opendaylight.controller.statisticsmanager.*;
-import org.ops4j.pax.exam.junit.PaxExam;
-import org.osgi.framework.BundleContext;
-import static org.junit.Assert.*;
-import org.ops4j.pax.exam.junit.Configuration;
-import static org.ops4j.pax.exam.CoreOptions.*;
-
+import org.opendaylight.controller.statisticsmanager.IStatisticsManager;
 import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.Configuration;
+import org.ops4j.pax.exam.junit.PaxExam;
 import org.ops4j.pax.exam.util.PathUtils;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @RunWith(PaxExam.class)
 public class StatisticsManagerIT {
@@ -84,46 +89,44 @@ public class StatisticsManagerIT {
                         .versionAsInProject(),
                 mavenBundle("ch.qos.logback", "logback-classic")
                         .versionAsInProject(),
-                // List all the bundles on which the test case depends
-                mavenBundle("org.opendaylight.controller", "sal")
-                        .versionAsInProject(),
-                mavenBundle("org.opendaylight.controller", "sal.implementation")
-                        .versionAsInProject(),
-                mavenBundle("org.opendaylight.controller", "statisticsmanager")
-                        .versionAsInProject(),
-                mavenBundle("org.opendaylight.controller",
-                        "statisticsmanager.implementation")
-                        .versionAsInProject(),
-                mavenBundle("org.opendaylight.controller",
-                        "protocol_plugins.stub").versionAsInProject(),
                 // needed by statisticsmanager
                 mavenBundle("org.opendaylight.controller", "containermanager")
-                        .versionAsInProject(),
-                mavenBundle("org.opendaylight.controller",
-                        "containermanager.implementation").versionAsInProject(),
-                mavenBundle("org.opendaylight.controller",
-                        "forwardingrulesmanager").versionAsInProject(),
-
-                mavenBundle("org.opendaylight.controller",
-                        "clustering.services").versionAsInProject(),
+                    .versionAsInProject(),
+                mavenBundle("org.opendaylight.controller", "containermanager.implementation")
+                    .versionAsInProject(),
+                mavenBundle("org.opendaylight.controller", "clustering.services")
+                    .versionAsInProject(),
                 mavenBundle("org.opendaylight.controller", "clustering.stub")
-                        .versionAsInProject(),
-
+                    .versionAsInProject(),
                 // needed by forwardingrulesmanager
-                mavenBundle("org.opendaylight.controller", "switchmanager")
-                        .versionAsInProject(),
                 mavenBundle("org.opendaylight.controller", "configuration")
-                        .versionAsInProject(),
-
-                mavenBundle("org.opendaylight.controller",
-                        "configuration.implementation").versionAsInProject(),
+                    .versionAsInProject(),
+                mavenBundle("org.opendaylight.controller", "configuration.implementation")
+                    .versionAsInProject(),
                 mavenBundle("org.opendaylight.controller", "hosttracker")
-                        .versionAsInProject(),
+                    .versionAsInProject(),
+
+                // List all the bundles on which the test case depends
+                mavenBundle("org.opendaylight.controller", "sal")
+                    .versionAsInProject(),
+                mavenBundle("org.opendaylight.controller", "sal.implementation")
+                    .versionAsInProject(),
+                mavenBundle("org.opendaylight.controller", "protocol_plugins.stub")
+                    .versionAsInProject(),
+                mavenBundle("org.opendaylight.controller", "switchmanager")
+                    .versionAsInProject(),
+                mavenBundle("org.opendaylight.controller", "switchmanager.implementation")
+                    .versionAsInProject(),
+                mavenBundle("org.opendaylight.controller", "statisticsmanager")
+                    .versionAsInProject(),
+                mavenBundle("org.opendaylight.controller", "statisticsmanager.implementation")
+                    .versionAsInProject(),
+                mavenBundle("org.opendaylight.controller", "forwardingrulesmanager")
+                    .versionAsInProject(),
 
                 // needed by hosttracker
                 mavenBundle("org.opendaylight.controller", "topologymanager")
                         .versionAsInProject(),
-
                 mavenBundle("org.jboss.spec.javax.transaction",
                         "jboss-transaction-api_1.1_spec").versionAsInProject(),
                 mavenBundle("org.apache.commons", "commons-lang3")
@@ -223,6 +226,11 @@ public class StatisticsManagerIT {
         List<Action> actions = new ArrayList<Action>();
         actions.add(action);
         flow.setActions(actions);
+        // as in stub
+        flow.setPriority((short) 3500);
+        flow.setIdleTimeout((short) 1000);
+        flow.setHardTimeout((short) 2000);
+        flow.setId(12345);
 
         try {
             Node node = new Node("STUB", 0xCAFE);