Merge "Fixed a typo on a comment in IPv4.java."
authorAlessandro Boch <aboch@cisco.com>
Wed, 31 Jul 2013 00:34:08 +0000 (00:34 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 31 Jul 2013 00:34:08 +0000 (00:34 +0000)
41 files changed:
opendaylight/connectionmanager/api/pom.xml [new file with mode: 0644]
opendaylight/connectionmanager/api/src/main/java/org/opendaylight/controller/connectionmanager/ConnectionMgmtScheme.java [new file with mode: 0644]
opendaylight/connectionmanager/api/src/main/java/org/opendaylight/controller/connectionmanager/IConnectionManager.java [new file with mode: 0644]
opendaylight/connectionmanager/implementation/pom.xml [new file with mode: 0644]
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/Activator.java [new file with mode: 0644]
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java [new file with mode: 0644]
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionMgmtEvent.java [new file with mode: 0644]
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionMgmtEventType.java [new file with mode: 0644]
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/AbstractScheme.java [new file with mode: 0644]
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/AnyControllerScheme.java [new file with mode: 0644]
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/ControllerConfig.java [new file with mode: 0644]
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/LoadBalancedScheme.java [new file with mode: 0644]
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/RoundRobinScheme.java [new file with mode: 0644]
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/SchemeFactory.java [new file with mode: 0644]
opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/SingleControllerScheme.java [new file with mode: 0644]
opendaylight/distribution/opendaylight/pom.xml
opendaylight/protocol_plugins/openflow/pom.xml
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/Controller.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/Activator.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/DataPacketMuxDemux.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/DataPacketServices.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/DiscoveryService.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/FlowProgrammerNotifier.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/FlowProgrammerService.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/InventoryService.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/InventoryServiceShim.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/OFStatisticsManager.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/ReadService.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/ReadServiceFilter.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/TopologyServiceShim.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/TopologyServices.java
opendaylight/sal/connection/api/pom.xml [new file with mode: 0644]
opendaylight/sal/connection/api/src/main/java/org/opendaylight/controller/sal/connection/ConnectionConstants.java [new file with mode: 0644]
opendaylight/sal/connection/api/src/main/java/org/opendaylight/controller/sal/connection/IConnectionListener.java [new file with mode: 0644]
opendaylight/sal/connection/api/src/main/java/org/opendaylight/controller/sal/connection/IConnectionService.java [new file with mode: 0644]
opendaylight/sal/connection/api/src/main/java/org/opendaylight/controller/sal/connection/IPluginInConnectionService.java [new file with mode: 0644]
opendaylight/sal/connection/api/src/main/java/org/opendaylight/controller/sal/connection/IPluginOutConnectionService.java [new file with mode: 0644]
opendaylight/sal/connection/implementation/pom.xml [new file with mode: 0644]
opendaylight/sal/connection/implementation/src/main/java/org/opendaylight/controller/sal/connection/implementation/internal/Activator.java [new file with mode: 0644]
opendaylight/sal/connection/implementation/src/main/java/org/opendaylight/controller/sal/connection/implementation/internal/ConnectionService.java [new file with mode: 0644]
opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/Activator.java

diff --git a/opendaylight/connectionmanager/api/pom.xml b/opendaylight/connectionmanager/api/pom.xml
new file mode 100644 (file)
index 0000000..b27209d
--- /dev/null
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>commons.opendaylight</artifactId>
+    <version>1.4.0-SNAPSHOT</version>
+    <relativePath>../../commons/opendaylight</relativePath>
+  </parent>
+
+  <artifactId>connectionmanager</artifactId>
+  <version>0.1.0-SNAPSHOT</version>
+  <packaging>bundle</packaging>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.felix</groupId>
+        <artifactId>maven-bundle-plugin</artifactId>
+        <version>2.3.6</version>
+        <extensions>true</extensions>
+        <configuration>
+          <instructions>
+            <Import-Package>
+              org.opendaylight.controller.sal.core,
+              org.opendaylight.controller.sal.utils,
+              org.opendaylight.controller.sal.connection
+            </Import-Package>
+            <Export-Package>
+              org.opendaylight.controller.connectionmanager
+            </Export-Package>
+          </instructions>
+          <manifestLocation>${project.basedir}/META-INF</manifestLocation>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal</artifactId>
+      <version>0.5.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal.connection</artifactId>
+      <version>0.1.0-SNAPSHOT</version>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/opendaylight/connectionmanager/api/src/main/java/org/opendaylight/controller/connectionmanager/ConnectionMgmtScheme.java b/opendaylight/connectionmanager/api/src/main/java/org/opendaylight/controller/connectionmanager/ConnectionMgmtScheme.java
new file mode 100644 (file)
index 0000000..49e77b2
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.connectionmanager;
+
+/**
+ * Enumeration that represents the Connectivity Scheme / Algorithm for South-Bound nodes
+ * towards an Active-Active Clustered Controllers.
+ */
+public enum ConnectionMgmtScheme {
+    /**
+     * All the nodes are connected with a Single Controller.
+     * The SingleControllerScheme algorithm will determine that one controller to which all
+     * the nodes are connected with.
+     * This is like Active-Standby model from a South-Bound perspective.
+     */
+    SINGLE_CONTROLLER("All nodes connected with a Single Controller"),
+
+    /**
+     * Any node can be connected with any controller. But with just 1 master controller.
+     */
+    ANY_CONTROLLER_ONE_MASTER("Nodes can to connect with any controller in the cluster"),
+
+    /**
+     * Simple Round Robin Scheme that will let the nodes connect with each controller in
+     * Active-Active cluster in a round robin fashion.
+     */
+    ROUND_ROBIN("Each node is connected with individual Controller in Round-Robin fashion"),
+
+    /**
+     * Complex Load Balancing scheme that will let the nodes connect with controller based
+     * on the resource usage in each of the controllers in a cluster.
+     */
+    LOAD_BALANCED("Connect nodes to controllers based on the Controller Load"),
+
+    /**
+     * Container based scheme will let the nodes connect with controller based
+     * on the container configuration.
+     */
+    CONTAINER_BASED("Connect nodes to controllers based on Container they belong to");
+
+    private ConnectionMgmtScheme(String name) {
+        this.name = name;
+    }
+
+    private String name;
+
+    public String toString() {
+        return name;
+    }
+
+    public static ConnectionMgmtScheme fromString(String pName) {
+        for(ConnectionMgmtScheme p:ConnectionMgmtScheme.values()) {
+            if (p.toString().equals(pName)) {
+                return p;
+            }
+        }
+        return null;
+    }
+}
diff --git a/opendaylight/connectionmanager/api/src/main/java/org/opendaylight/controller/connectionmanager/IConnectionManager.java b/opendaylight/connectionmanager/api/src/main/java/org/opendaylight/controller/connectionmanager/IConnectionManager.java
new file mode 100644 (file)
index 0000000..12b1969
--- /dev/null
@@ -0,0 +1,94 @@
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.connectionmanager;
+
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Set;
+
+import org.opendaylight.controller.sal.connection.ConnectionConstants;
+import org.opendaylight.controller.sal.core.Node;
+import org.opendaylight.controller.sal.utils.Status;
+
+/**
+ * Connection Manager provides south-bound connectivity services.
+ * The APIs are currently focused towards Active-Active Clustering support
+ * wherein the node can connect to any of the Active Controller in the Cluster.
+ * This component can also host the necessary logic for south-bound connectivity
+ * when partial cluster is identified during Partition scenarios.
+ *
+ * This (and its corresponding implementation) component can also be enhanced further
+ * for more fancy algorithms/criteria for connection acceptance.
+ */
+
+public interface IConnectionManager {
+    /**
+     * This method returns Connectivity Algorithm (Scheme) that is currently being used.
+     *
+     * @return ConnectionMgmtScheme Enum that represents the active scheme.
+     */
+    public ConnectionMgmtScheme getActiveScheme();
+
+    /**
+     * Method that will retrieve and return a Set of Nodes that is currently connected to the given controller.
+     *
+     * @param controller InetAddress of the Controller that is currently connected to a set of Nodes.
+     *
+     * @return Set<Node> Set of Nodes connected to a controller.
+     */
+    public Set<Node> getNodes(InetAddress controller);
+
+    /**
+     * Method that will retrieve and return a Set of Nodes that is currently connected to
+     * the controller on which this method is executed.
+     *
+     * @return Set<Node> Set of Nodes connected to this controller.
+     */
+    public Set<Node> getLocalNodes();
+
+    /**
+     * Method to test if a node is local to a controller.
+     *
+     * @return true if node is local to this controller. false otherwise.
+     */
+    public boolean isLocal(Node node);
+
+    /**
+     * Disconnect a Node from the controller.
+     *
+     * @return Status of the disconnect Operation.
+     */
+    public Status disconnect(Node node);
+
+    /**
+     * Connect to a node
+     *
+     * @param connectionIdentifier identifier with which the application would refer to a given connection.
+     * @param params Connection Params in Map format. This is entirely handled by the south-bound
+     * plugins and is an opaque value for SAL or Connection Manager. Typical values keyed inside
+     * this params are Management IP-Address, Username, Password, Security Keys, etc...
+     *
+     *  @return Node Node connected to.
+     */
+    public Node connect (String connectionIdentifier, Map<ConnectionConstants, String> params);
+
+    /**
+     * Connect to a node
+     *
+     * @param type Type of the node representing NodeIDType.
+     * @param connectionIdentifier identifier with which the application would refer to a given connection.
+     * @param params Connection Params in Map format. This is entirely handled by the south-bound
+     * plugins and is an opaque value for SAL or Connection Manager. Typical values keyed inside
+     * this params are Management IP-Address, Username, Password, Security Keys, etc...
+     *
+     *  @return Status of the Connect Operation.
+     */
+    public Node connect(String type, String connectionIdentifier, Map<ConnectionConstants, String> params);
+}
diff --git a/opendaylight/connectionmanager/implementation/pom.xml b/opendaylight/connectionmanager/implementation/pom.xml
new file mode 100644 (file)
index 0000000..2c92cf2
--- /dev/null
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>commons.opendaylight</artifactId>
+    <version>1.4.0-SNAPSHOT</version>
+    <relativePath>../../commons/opendaylight</relativePath>
+  </parent>
+
+  <artifactId>connectionmanager.implementation</artifactId>
+  <version>0.1.0-SNAPSHOT</version>
+  <packaging>bundle</packaging>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.felix</groupId>
+        <artifactId>maven-bundle-plugin</artifactId>
+        <version>2.3.6</version>
+        <extensions>true</extensions>
+        <configuration>
+          <instructions>
+            <Import-Package>
+              org.opendaylight.controller.connectionmanager,
+              org.opendaylight.controller.clustering.services,
+              org.opendaylight.controller.sal.utils,
+              org.opendaylight.controller.sal.core,
+              org.opendaylight.controller.sal.connection,
+              org.opendaylight.controller.sal.inventory,
+              org.eclipse.osgi.framework.console,
+              org.osgi.framework,
+              org.slf4j,
+              org.apache.felix.dm
+            </Import-Package>
+            <Export-Package>
+            </Export-Package>
+            <Bundle-Activator>
+              org.opendaylight.controller.connectionmanager.internal.Activator
+            </Bundle-Activator>
+          </instructions>
+          <manifestLocation>${project.basedir}/META-INF</manifestLocation>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>clustering.services</artifactId>
+      <version>0.4.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>connectionmanager</artifactId>
+      <version>0.1.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal</artifactId>
+      <version>0.5.0-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal.connection</artifactId>
+      <version>0.1.0-SNAPSHOT</version>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/Activator.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/Activator.java
new file mode 100644 (file)
index 0000000..5ebbfe2
--- /dev/null
@@ -0,0 +1,102 @@
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.connectionmanager.internal;
+
+import java.util.Dictionary;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Set;
+
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
+import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
+import org.opendaylight.controller.clustering.services.ICoordinatorChangeAware;
+import org.opendaylight.controller.connectionmanager.IConnectionManager;
+import org.opendaylight.controller.sal.connection.IConnectionListener;
+import org.opendaylight.controller.sal.connection.IConnectionService;
+import org.apache.felix.dm.Component;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
+import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
+
+public class Activator extends ComponentActivatorAbstractBase {
+    protected static final Logger logger = LoggerFactory
+            .getLogger(Activator.class);
+
+    /**
+     * Function called when the activator starts just after some
+     * initializations are done by the
+     * ComponentActivatorAbstractBase.
+     *
+     */
+    public void init() {
+    }
+
+    /**
+     * Function called when the activator stops just before the
+     * cleanup done by ComponentActivatorAbstractBase
+     *
+     */
+    public void destroy() {
+    }
+
+    /**
+     * Method which tells how many Global implementations are
+     * supported by the bundle. This way we can tune the number of
+     * components created. This components will be created ONLY at the
+     * time of bundle startup and will be destroyed only at time of
+     * bundle destruction, this is the major difference with the
+     * implementation retrieved via getImplementations where all of
+     * them are assumed to be in a container!
+     *
+     *
+     * @return The list of implementations the bundle will support,
+     * in Global version
+     */
+    protected Object[] getGlobalImplementations() {
+        Object[] res = { ConnectionManager.class };
+        return res;
+    }
+
+    /**
+     * Configure the dependency for a given instance Global
+     *
+     * @param c Component assigned for this instance, this will be
+     * what will be used for configuration
+     * @param imp implementation to be configured
+     * @param containerName container on which the configuration happens
+     */
+    protected void configureGlobalInstance(Component c, Object imp) {
+        if (imp.equals(ConnectionManager.class)) {
+            Dictionary<String, Object> props = new Hashtable<String, Object>();
+            Set<String> propSet = new HashSet<String>();
+            propSet.add("connectionmanager.nodeconnections");
+            props.put("cachenames", propSet);
+            props.put("scope", "Global");
+
+            // export the service
+            c.setInterface(new String[] { IConnectionManager.class.getName(),
+                                          IConnectionListener.class.getName(),
+                                          ICoordinatorChangeAware.class.getName(),
+                                          IListenInventoryUpdates.class.getName(),
+                                          ICacheUpdateAware.class.getName()},
+                                          props);
+
+            c.add(createServiceDependency()
+                    .setService(IClusterGlobalServices.class)
+                    .setCallbacks("setClusterServices", "unsetClusterServices")
+                    .setRequired(true));
+
+            c.add(createServiceDependency().setService(IConnectionService.class)
+                    .setCallbacks("setConnectionService", "unsetConnectionService")
+                    .setRequired(true));
+        }
+    }
+}
diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java
new file mode 100644 (file)
index 0000000..c4b7d4f
--- /dev/null
@@ -0,0 +1,342 @@
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+/**
+ * Connection Manager provides south-bound connectivity services.
+ * The APIs are currently focused towards Active-Active Clustering support
+ * wherein the node can connect to any of the Active Controller in the Cluster.
+ * This component can also host the necessary logic for south-bound connectivity
+ * when partial cluster is identified during Partition scenarios.
+ *
+ * But this (and its corresponding implementation) component can also be used for
+ * basic connectivity mechansims for various south-bound plugins.
+ */
+
+package org.opendaylight.controller.connectionmanager.internal;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.eclipse.osgi.framework.console.CommandInterpreter;
+import org.eclipse.osgi.framework.console.CommandProvider;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
+import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
+import org.opendaylight.controller.clustering.services.ICoordinatorChangeAware;
+import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
+import org.opendaylight.controller.connectionmanager.IConnectionManager;
+import org.opendaylight.controller.connectionmanager.scheme.AbstractScheme;
+import org.opendaylight.controller.connectionmanager.scheme.SchemeFactory;
+import org.opendaylight.controller.sal.connection.ConnectionConstants;
+import org.opendaylight.controller.sal.connection.IConnectionListener;
+import org.opendaylight.controller.sal.connection.IConnectionService;
+import org.opendaylight.controller.sal.core.Node;
+import org.opendaylight.controller.sal.core.NodeConnector;
+import org.opendaylight.controller.sal.core.Property;
+import org.opendaylight.controller.sal.core.UpdateType;
+import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
+import org.opendaylight.controller.sal.utils.Status;
+import org.opendaylight.controller.sal.utils.StatusCode;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+
+public class ConnectionManager implements IConnectionManager, IConnectionListener,
+                                          ICoordinatorChangeAware, IListenInventoryUpdates,
+                                          ICacheUpdateAware<Node, Set<InetAddress>>,
+                                          CommandProvider {
+    private static final Logger logger = LoggerFactory.getLogger(ConnectionManager.class);
+    private ConnectionMgmtScheme activeScheme = ConnectionMgmtScheme.ANY_CONTROLLER_ONE_MASTER;
+    private IClusterGlobalServices clusterServices;
+    private ConcurrentMap<ConnectionMgmtScheme, AbstractScheme> schemes;
+    private IConnectionService connectionService;
+    private Thread connectionEventThread;
+    private BlockingQueue<ConnectionMgmtEvent> connectionEvents;
+
+    public void setClusterServices(IClusterGlobalServices i) {
+        this.clusterServices = i;
+    }
+
+    public void unsetClusterServices(IClusterGlobalServices i) {
+        if (this.clusterServices == i) {
+            this.clusterServices = null;
+        }
+    }
+
+    public void setConnectionService(IConnectionService i) {
+        this.connectionService = i;
+    }
+
+    public void unsetConnectionService(IConnectionService i) {
+        if (this.connectionService == i) {
+            this.connectionService = null;
+        }
+    }
+
+    public void started() {
+        connectionEventThread = new Thread(new EventHandler(), "ConnectionEvent Thread");
+        connectionEventThread.start();
+
+        registerWithOSGIConsole();
+        notifyClusterViewChanged();
+    }
+
+    public void init() {
+        this.connectionEvents = new LinkedBlockingQueue<ConnectionMgmtEvent>();
+        schemes = new ConcurrentHashMap<ConnectionMgmtScheme, AbstractScheme>();
+        for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) {
+            AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme, clusterServices);
+            if (schemeImpl != null) schemes.put(scheme, schemeImpl);
+        }
+    }
+
+    public void stop() {
+        connectionEventThread.interrupt();
+        Set<Node> localNodes = getLocalNodes();
+        if (localNodes != null) {
+            AbstractScheme scheme = schemes.get(activeScheme);
+            for (Node localNode : localNodes) {
+                connectionService.disconnect(localNode);
+                if (scheme != null) scheme.removeNode(localNode);
+            }
+        }
+    }
+
+    @Override
+    public ConnectionMgmtScheme getActiveScheme() {
+        return activeScheme;
+    }
+
+    @Override
+    public Set<Node> getNodes(InetAddress controller) {
+        AbstractScheme scheme = schemes.get(activeScheme);
+        if (scheme == null) return null;
+        return scheme.getNodes(controller);
+    }
+
+    @Override
+    public Set<Node> getLocalNodes() {
+        AbstractScheme scheme = schemes.get(activeScheme);
+        if (scheme == null) return null;
+        return scheme.getNodes();
+    }
+
+    @Override
+    public boolean isLocal(Node node) {
+        AbstractScheme scheme = schemes.get(activeScheme);
+        if (scheme == null) return false;
+        return scheme.isLocal(node);
+    }
+
+    @Override
+    public void updateNode(Node node, UpdateType type, Set<Property> props) {
+        logger.debug("updateNode: {} type {} props {}", node, type, props);
+        AbstractScheme scheme = schemes.get(activeScheme);
+        if (scheme == null) return;
+        switch (type) {
+        case ADDED:
+            scheme.addNode(node);
+            break;
+        case REMOVED:
+            scheme.removeNode(node);
+            break;
+        default:
+                break;
+        }
+    }
+
+    @Override
+    public void updateNodeConnector(NodeConnector nodeConnector,
+            UpdateType type, Set<Property> props) {
+        logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector, type, props);
+        AbstractScheme scheme = schemes.get(activeScheme);
+        if (scheme == null) return;
+        switch (type) {
+        case ADDED:
+            scheme.addNode(nodeConnector.getNode());
+            break;
+        default:
+                break;
+        }
+    }
+
+    @Override
+    public void coordinatorChanged() {
+        AbstractScheme scheme = schemes.get(activeScheme);
+        if (scheme == null) return;
+        scheme.handleClusterViewChanged();
+        notifyClusterViewChanged();
+    }
+
+    @Override
+    public Node connect(String connectionIdentifier, Map<ConnectionConstants, String> params) {
+        if (connectionService == null) return null;
+        return connectionService.connect(connectionIdentifier, params);
+    }
+
+    @Override
+    public Node connect(String type, String connectionIdentifier, Map<ConnectionConstants, String> params) {
+        if (connectionService == null) return null;
+        return connectionService.connect(type, connectionIdentifier, params);
+    }
+
+    @Override
+    public Status disconnect (Node node) {
+        if (connectionService == null) return new Status(StatusCode.NOSERVICE);
+        return connectionService.disconnect(node);
+    }
+
+    @Override
+    public void entryCreated(Node key, String cacheName, boolean originLocal) {
+        AbstractScheme scheme = schemes.get(activeScheme);
+        logger.debug("Created : {} cache : {} existingValue : {}", key, cacheName, scheme.getNodeConnections().get(key));
+    }
+
+    /*
+     * Clustering Services' doesnt provide the existing states in the cache update callbacks.
+     * Hence, using a scratch local cache to maintain the existing state.
+     *
+     */
+    private ConcurrentMap<Node, Set<InetAddress>> existingConnections = new ConcurrentHashMap<Node, Set<InetAddress>>();
+
+    @Override
+    public void entryUpdated(Node node, Set<InetAddress> newControllers, String cacheName, boolean originLocal) {
+        if (originLocal) return;
+        Set<InetAddress> existingControllers = existingConnections.get(node);
+        if (existingControllers != null) {
+            logger.debug("Processing Update for : {} NewControllers : {} existingControllers : {}", node,
+                    newControllers.toString(), existingControllers.toString());
+            if (newControllers.size() < existingControllers.size()) {
+                Set<InetAddress> removed = new HashSet<InetAddress>(existingControllers);
+                if (removed.removeAll(newControllers)) {
+                    logger.debug("notifyNodeDisconnectFromMaster({})", node);
+                    notifyNodeDisconnectedEvent(node);
+                }
+            }
+        } else {
+            logger.debug("Ignoring the Update for : {} NewControllers : {}", node, newControllers.toString());
+        }
+        existingConnections.put(node, newControllers);
+    }
+
+    @Override
+    public void entryDeleted(Node key, String cacheName, boolean originLocal) {
+        if (originLocal) return;
+        logger.debug("Deleted : {} cache : {}", key, cacheName);
+        notifyNodeDisconnectedEvent(key);
+    }
+
+    private void enqueueConnectionEvent(ConnectionMgmtEvent event) {
+        try {
+            if (!connectionEvents.contains(event)) {
+                this.connectionEvents.put(event);
+            }
+        } catch (InterruptedException e) {
+            logger.debug("enqueueConnectionEvent caught Interrupt Exception for event {}", event);
+        }
+    }
+
+    private void notifyClusterViewChanged() {
+        ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null);
+        enqueueConnectionEvent(event);
+    }
+
+    private void notifyNodeDisconnectedEvent(Node node) {
+        ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node);
+        enqueueConnectionEvent(event);
+    }
+
+    /*
+     * this thread monitors the connectionEvent queue for new incoming events from
+     */
+    private class EventHandler implements Runnable {
+        @Override
+        public void run() {
+
+            while (true) {
+                try {
+                    ConnectionMgmtEvent ev = connectionEvents.take();
+                    ConnectionMgmtEventType eType = ev.getEvent();
+                    switch (eType) {
+                    case NODE_DISCONNECTED_FROM_MASTER:
+                        Node node = (Node)ev.getData();
+                        connectionService.notifyNodeDisconnectFromMaster(node);
+                        break;
+                    case CLUSTER_VIEW_CHANGED:
+                        connectionService.notifyClusterViewChanged();
+                        break;
+                    default:
+                        logger.error("Unknown Connection event {}", eType.ordinal());
+                    }
+                } catch (InterruptedException e) {
+                    connectionEvents.clear();
+                    return;
+                }
+            }
+        }
+    }
+
+    private void registerWithOSGIConsole() {
+        BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
+                .getBundleContext();
+        bundleContext.registerService(CommandProvider.class.getName(), this,
+                null);
+    }
+
+    public void _scheme (CommandInterpreter ci) {
+        String schemeStr = ci.nextArgument();
+        if (schemeStr == null) {
+            ci.println("Please enter valid Scheme name");
+            ci.println("Current Scheme : " + activeScheme.name());
+            return;
+        }
+        ConnectionMgmtScheme scheme = ConnectionMgmtScheme.valueOf(schemeStr);
+        if (scheme == null) {
+            ci.println("Please enter a valid Scheme name");
+            return;
+        }
+        activeScheme = scheme;
+    }
+
+    public void _printNodes (CommandInterpreter ci) {
+        String controller = ci.nextArgument();
+        if (controller == null) {
+            ci.println("Nodes connected to this controller : ");
+            if (this.getLocalNodes() == null) ci.println("None");
+            else ci.println(this.getLocalNodes().toString());
+            return;
+        }
+        try {
+            InetAddress address = InetAddress.getByName(controller);
+            ci.println("Nodes connected to controller "+controller);
+            if (this.getNodes(address) == null) ci.println("None");
+            else ci.println(this.getNodes(address).toString());
+            return;
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public String getHelp() {
+        StringBuffer help = new StringBuffer();
+        help.append("---Connection Manager---\n");
+        help.append("\t scheme [<name>]                      - Print / Set scheme\n");
+        help.append("\t printNodes [<controller>]            - Print connected nodes\n");
+        return help.toString();
+    }
+}
diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionMgmtEvent.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionMgmtEvent.java
new file mode 100644 (file)
index 0000000..f07672d
--- /dev/null
@@ -0,0 +1,47 @@
+
+package org.opendaylight.controller.connectionmanager.internal;
+
+public class ConnectionMgmtEvent {
+    ConnectionMgmtEventType event;
+    Object data;
+    public ConnectionMgmtEvent(ConnectionMgmtEventType event, Object data) {
+        this.event = event;
+        this.data = data;
+    }
+    public ConnectionMgmtEventType getEvent() {
+        return event;
+    }
+    public Object getData() {
+        return data;
+    }
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((data == null) ? 0 : data.hashCode());
+        result = prime * result + ((event == null) ? 0 : event.hashCode());
+        return result;
+    }
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        ConnectionMgmtEvent other = (ConnectionMgmtEvent) obj;
+        if (data == null) {
+            if (other.data != null)
+                return false;
+        } else if (!data.equals(other.data))
+            return false;
+        if (event != other.event)
+            return false;
+        return true;
+    }
+    @Override
+    public String toString() {
+        return "ConnectionMgmtEvent [event=" + event + ", data=" + data + "]";
+    }
+}
diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionMgmtEventType.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionMgmtEventType.java
new file mode 100644 (file)
index 0000000..98399b0
--- /dev/null
@@ -0,0 +1,25 @@
+package org.opendaylight.controller.connectionmanager.internal;
+
+public enum ConnectionMgmtEventType {
+    NODE_DISCONNECTED_FROM_MASTER("Node is disconnected from master"),
+    CLUSTER_VIEW_CHANGED("Cluster Composition changed");
+
+    private ConnectionMgmtEventType(String name) {
+        this.name = name;
+    }
+
+    private String name;
+
+    public String toString() {
+        return name;
+    }
+
+    public static ConnectionMgmtEventType fromString(String pName) {
+        for(ConnectionMgmtEventType p:ConnectionMgmtEventType.values()) {
+            if (p.toString().equals(pName)) {
+                return p;
+            }
+        }
+        return null;
+    }
+}
diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/AbstractScheme.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/AbstractScheme.java
new file mode 100644 (file)
index 0000000..a490916
--- /dev/null
@@ -0,0 +1,289 @@
+package org.opendaylight.controller.connectionmanager.scheme;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.opendaylight.controller.clustering.services.CacheConfigException;
+import org.opendaylight.controller.clustering.services.CacheExistException;
+import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
+import org.opendaylight.controller.clustering.services.IClusterServices;
+import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
+import org.opendaylight.controller.sal.core.Node;
+import org.opendaylight.controller.sal.utils.Status;
+import org.opendaylight.controller.sal.utils.StatusCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractScheme {
+    private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
+    protected IClusterGlobalServices clusterServices = null;
+    /*
+     * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
+     * But, such a data-structure results in some complex event processing during the Cluster operations
+     * to sync up the Connection states.
+     *
+     * A data-structure with Node as the key and set of controllers provides a good balance
+     * between the needed functionality and simpler clustering implementation for Connection Manager.
+     */
+    protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
+    protected abstract boolean isConnectionAllowedInternal(Node node);
+    private String name;
+
+    protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
+        this.clusterServices = clusterServices;
+        if (type != null) name = type.name();
+        else name = "UNKNOWN";
+        if (clusterServices != null) {
+            allocateCaches();
+            retrieveCaches();
+        }
+    }
+
+    protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
+        ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
+        for (Node node : nodeConnections.keySet()) {
+            Set<InetAddress> controllers = nodeConnections.get(node);
+            if (controllers == null) continue;
+            for (InetAddress controller : controllers) {
+                Set<Node> nodes = controllerNodesMap.get(controller);
+                if (nodes == null) {
+                    nodes = new HashSet<Node>();
+                }
+                nodes.add(node);
+                controllerNodesMap.put(controller, nodes);
+            }
+        }
+        return controllerNodesMap;
+    }
+
+    public boolean isConnectionAllowed (Node node) {
+        if (clusterServices == null || nodeConnections == null) {
+            return false;
+        }
+
+        return isConnectionAllowedInternal(node);
+    }
+
+    @SuppressWarnings("deprecation")
+    public void handleClusterViewChanged() {
+        List<InetAddress> controllers = clusterServices.getClusteredControllers();
+        ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
+        List<InetAddress> toRemove = new ArrayList<InetAddress>();
+        for (InetAddress c : controllerNodesMap.keySet()) {
+            if (!controllers.contains(c)) {
+                toRemove.add(c);
+            }
+        }
+
+        for (InetAddress c : toRemove) {
+            log.debug("Removing Controller : {} from the Connections table", c);
+            for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
+                Node node = nodeIterator.next();
+                Set <InetAddress> oldControllers = nodeConnections.get(node);
+                Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
+                if (newControllers.remove(c)) {
+                    boolean replaced = false;
+                    try {
+                        replaced = nodeConnections.replace(node, oldControllers, newControllers);
+                    } catch (Exception e) {
+                        log.debug("Replace exception : ", e);
+                        replaced = false;
+                    }
+                    if (!replaced) {
+                        try {
+                            Thread.sleep(10);
+                        } catch (InterruptedException e) {
+                        }
+                        handleClusterViewChanged();
+                    }
+                }
+            }
+        }
+    }
+
+    public Set<Node> getNodes(InetAddress controller) {
+        if (nodeConnections == null) return null;
+        ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
+        return controllerNodesMap.get(controller);
+    }
+
+    @SuppressWarnings("deprecation")
+    public Set<Node> getNodes() {
+        return getNodes(clusterServices.getMyAddress());
+    }
+
+    public Set<InetAddress> getControllers(Node node) {
+        if (nodeConnections != null) return nodeConnections.get(node);
+        return null;
+    }
+
+    public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
+        return nodeConnections;
+    }
+
+    @SuppressWarnings("deprecation")
+    public boolean isLocal(Node node) {
+        if (nodeConnections == null) return false;
+        InetAddress myController = clusterServices.getMyAddress();
+        Set<InetAddress> controllers = nodeConnections.get(node);
+        return (controllers != null && controllers.contains(myController));
+    }
+
+    @SuppressWarnings("deprecation")
+    public Status removeNode (Node node) {
+        return removeNodeFromController(node, clusterServices.getMyAddress());
+    }
+
+    protected Status removeNodeFromController (Node node, InetAddress controller) {
+        if (node == null || controller == null) {
+            return new Status(StatusCode.BADREQUEST);
+        }
+
+        if (clusterServices == null || nodeConnections == null) {
+            return new Status(StatusCode.SUCCESS);
+        }
+
+        Set<InetAddress> oldControllers = nodeConnections.get(node);
+
+        if (oldControllers != null && oldControllers.contains(controller)) {
+            Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
+            if (newControllers.remove(controller)) {
+                if (newControllers.size() > 0) {
+                    boolean replaced = false;
+                    try {
+                    replaced = nodeConnections.replace(node, oldControllers, newControllers);
+                    } catch (Exception e) {
+                        log.debug("Replace exception : ", e);
+                        replaced = false;
+                    }
+                    if (!replaced) {
+                        try {
+                            Thread.sleep(10);
+                        } catch (InterruptedException e) {
+                        }
+                        return removeNodeFromController(node, controller);
+                    }
+                } else {
+                    nodeConnections.remove(node);
+                }
+            }
+        }
+        return new Status(StatusCode.SUCCESS);
+
+    }
+
+    /*
+     * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
+     * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
+     */
+    private Status putNodeToController (Node node, InetAddress controller) {
+        if (clusterServices == null || nodeConnections == null) {
+            return new Status(StatusCode.SUCCESS);
+        }
+        log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
+
+        Set <InetAddress> oldControllers = nodeConnections.get(node);
+        Set <InetAddress> newControllers = null;
+        if (oldControllers == null) {
+            newControllers = new HashSet<InetAddress>();
+        } else {
+            if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
+                /*
+                 * In certain race conditions, the putIfAbsent fails to be atomic.
+                 * This check is added to identify such cases and report an warning
+                 * for debugging.
+                 */
+                log.warn("States Exists for {} : {}", node, oldControllers.toString());
+            }
+            newControllers = new HashSet<InetAddress>(oldControllers);
+        }
+        newControllers.add(controller);
+
+        if (nodeConnections.putIfAbsent(node, newControllers) != null) {
+            log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
+            /*
+             * This check is needed again to take care of the case where some schemes
+             * would not allow nodes to be connected to multiple controllers.
+             * Hence, if putIfAbsent fails, that means, some other controller is competing
+             * with this controller to take hold of a Node.
+             */
+            if (isConnectionAllowed(node)) {
+                log.debug("Trying to replace old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
+                        controller.getHostAddress(), node.toString());
+                if (!nodeConnections.replace(node, oldControllers, newControllers)) {
+                    try {
+                        Thread.sleep(10);
+                    } catch (InterruptedException e) {
+                    }
+                    log.debug("Replace failed... old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
+                            controller.getHostAddress(), node.toString());
+                    return putNodeToController(node, controller);
+                } else {
+                    log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
+                            controller.getHostAddress(), node.toString());
+                }
+            } else {
+                return new Status(StatusCode.CONFLICT);
+            }
+        } else {
+            log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
+        }
+        return new Status(StatusCode.SUCCESS);
+    }
+
+    public Status addNode (Node node, InetAddress controller) {
+        if (node == null || controller == null) {
+            return new Status(StatusCode.BADREQUEST);
+        }
+        if (isLocal(node)) return new Status(StatusCode.SUCCESS);
+        if (isConnectionAllowed(node)) {
+            return putNodeToController(node, controller);
+        } else {
+            return new Status(StatusCode.NOTALLOWED);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    public Status addNode (Node node) {
+        return addNode(node, clusterServices.getMyAddress());
+    }
+
+    @SuppressWarnings({ "unchecked", "deprecation" })
+    private void retrieveCaches() {
+        if (this.clusterServices == null) {
+            log.error("un-initialized clusterServices, can't retrieve cache");
+            return;
+        }
+
+        nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache("connectionmanager."+name+".nodeconnections");
+
+        if (nodeConnections == null) {
+            log.error("\nFailed to get caches");
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private void allocateCaches() {
+        if (this.clusterServices == null) {
+            log.error("un-initialized clusterServices, can't create cache");
+            return;
+        }
+
+        try {
+            clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
+        } catch (CacheExistException cee) {
+            log.error("\nCache already exists - destroy and recreate if needed");
+        } catch (CacheConfigException cce) {
+            log.error("\nCache configuration invalid - check cache mode");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/AnyControllerScheme.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/AnyControllerScheme.java
new file mode 100644 (file)
index 0000000..02282c3
--- /dev/null
@@ -0,0 +1,34 @@
+package org.opendaylight.controller.connectionmanager.scheme;
+
+import java.net.InetAddress;
+import java.util.Set;
+import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
+import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
+import org.opendaylight.controller.sal.core.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AnyControllerScheme extends AbstractScheme {
+    private static final Logger logger = LoggerFactory.getLogger(AnyControllerScheme.class);
+    private static AbstractScheme myScheme= null;
+
+    protected AnyControllerScheme(IClusterGlobalServices clusterServices) {
+        super(clusterServices, ConnectionMgmtScheme.ANY_CONTROLLER_ONE_MASTER);
+    }
+
+    public static AbstractScheme getScheme(IClusterGlobalServices clusterServices) {
+        if (myScheme == null) {
+            myScheme = new AnyControllerScheme(clusterServices);
+        }
+        return myScheme;
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean isConnectionAllowedInternal(Node node) {
+        if (nodeConnections == null) return true;
+        Set <InetAddress> controllers = nodeConnections.get(node);
+        if (controllers == null || controllers.size() == 0) return true;
+        return (controllers.size() == 1 && controllers.contains(clusterServices.getMyAddress()));
+    }
+}
diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/ControllerConfig.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/ControllerConfig.java
new file mode 100644 (file)
index 0000000..4db80a5
--- /dev/null
@@ -0,0 +1,62 @@
+package org.opendaylight.controller.connectionmanager.scheme;
+
+import java.net.InetAddress;
+
+/**
+ * Configuration object that can be used to prioritize or add weight to a given controller.
+ * This can be potentially used by the Connection management scheme algorithms.
+ *
+ * This is currently not used.
+ *
+ */
+public class ControllerConfig {
+    private InetAddress controllerId;
+    private int priority;
+    private int weight;
+
+    public ControllerConfig(InetAddress controllerId, int priority, int weight) {
+        this.controllerId = controllerId;
+        this.priority = priority;
+        this.weight = weight;
+    }
+
+    public InetAddress getControllerId() {
+        return controllerId;
+    }
+    public int getPriority() {
+        return priority;
+    }
+    public int getWeight() {
+        return weight;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result
+                + ((controllerId == null) ? 0 : controllerId.hashCode());
+        return result;
+    }
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        ControllerConfig other = (ControllerConfig) obj;
+        if (controllerId == null) {
+            if (other.controllerId != null)
+                return false;
+        } else if (!controllerId.equals(other.controllerId))
+            return false;
+        return true;
+    }
+    @Override
+    public String toString() {
+        return "ControllerConfig [controllerId=" + controllerId + ", priority="
+                + priority + ", weight=" + weight + "]";
+    }
+}
diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/LoadBalancedScheme.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/LoadBalancedScheme.java
new file mode 100644 (file)
index 0000000..a80494b
--- /dev/null
@@ -0,0 +1,30 @@
+package org.opendaylight.controller.connectionmanager.scheme;
+
+
+import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
+import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
+import org.opendaylight.controller.sal.core.Node;
+
+/**
+ * Load Balancing scheme will let the nodes connect with controller based
+ * on the resource usage in each of the controllers in a cluster.
+ *
+ * Incomplete and Currently not used.
+ */
+
+class LoadBalancedScheme extends AbstractScheme {
+
+    protected LoadBalancedScheme(IClusterGlobalServices clusterServices) {
+        super(clusterServices, ConnectionMgmtScheme.LOAD_BALANCED);
+    }
+
+    public static AbstractScheme getScheme(IClusterGlobalServices clusterServices) {
+        return null;
+    }
+
+    @Override
+    public boolean isConnectionAllowedInternal(Node node) {
+        return false;
+    }
+
+}
diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/RoundRobinScheme.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/RoundRobinScheme.java
new file mode 100644 (file)
index 0000000..4a498dc
--- /dev/null
@@ -0,0 +1,23 @@
+package org.opendaylight.controller.connectionmanager.scheme;
+
+import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
+import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
+import org.opendaylight.controller.sal.core.Node;
+
+class RoundRobinScheme extends AbstractScheme {
+    protected RoundRobinScheme(IClusterGlobalServices clusterServices) {
+        super(clusterServices, ConnectionMgmtScheme.ROUND_ROBIN);
+        // TODO Auto-generated constructor stub
+    }
+
+    public static AbstractScheme getScheme(IClusterGlobalServices clusterServices) {
+        return null;
+    }
+
+    @Override
+    public boolean isConnectionAllowedInternal(Node node) {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+}
diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/SchemeFactory.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/SchemeFactory.java
new file mode 100644 (file)
index 0000000..7e13745
--- /dev/null
@@ -0,0 +1,19 @@
+package org.opendaylight.controller.connectionmanager.scheme;
+
+import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
+import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
+
+public class SchemeFactory {
+    public static AbstractScheme getScheme(ConnectionMgmtScheme scheme, IClusterGlobalServices clusterServices) {
+        if (scheme == ConnectionMgmtScheme.SINGLE_CONTROLLER) {
+            return SingleControllerScheme.getScheme(clusterServices);
+        } else if (scheme == ConnectionMgmtScheme.ROUND_ROBIN) {
+            return RoundRobinScheme.getScheme(clusterServices);
+        } else if (scheme == ConnectionMgmtScheme.LOAD_BALANCED) {
+            return LoadBalancedScheme.getScheme(clusterServices);
+        } else if (scheme == ConnectionMgmtScheme.ANY_CONTROLLER_ONE_MASTER) {
+            return AnyControllerScheme.getScheme(clusterServices);
+        }
+        return null;
+    }
+}
diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/SingleControllerScheme.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/scheme/SingleControllerScheme.java
new file mode 100644 (file)
index 0000000..d80911a
--- /dev/null
@@ -0,0 +1,35 @@
+package org.opendaylight.controller.connectionmanager.scheme;
+
+import java.net.InetAddress;
+import java.util.Set;
+import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
+import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
+import org.opendaylight.controller.sal.core.Node;
+
+class SingleControllerScheme extends AbstractScheme {
+
+    private static AbstractScheme myScheme= null;
+
+    protected SingleControllerScheme(IClusterGlobalServices clusterServices) {
+        super(clusterServices, ConnectionMgmtScheme.SINGLE_CONTROLLER);
+    }
+
+    public static AbstractScheme getScheme(IClusterGlobalServices clusterServices) {
+        if (myScheme == null) {
+            myScheme = new SingleControllerScheme(clusterServices);
+        }
+        return myScheme;
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public boolean isConnectionAllowedInternal(Node node) {
+        if (nodeConnections == null) return true;
+        for (Node existingNode : nodeConnections.keySet()) {
+            Set<InetAddress> controllers = nodeConnections.get(existingNode);
+            if (controllers == null || controllers.size() == 0) continue;
+            if (!controllers.contains(clusterServices.getMyAddress())) return false;
+        }
+        return true;
+    }
+}
index 50b34faac847f8a084e44b42cee41d93c2cc3a81..e6871afa772860e05ba22db915005823af7d6243 100644 (file)
     <module>../../topologymanager</module>
     <module>../../usermanager/api</module>
     <module>../../usermanager/implementation</module>
+    <module>../../connectionmanager/api</module>
+    <module>../../connectionmanager/implementation</module>
     <module>../../security</module>
 
-
     <module>../../../third-party/openflowj</module>
     <module>../../../third-party/net.sf.jung2</module>
     <module>../../../third-party/jersey-servlet</module>
     <module>../../sal/api</module>
     <module>../../sal/implementation</module>
 
+    <!-- SAL Extension bundles -->
+    <module>../../sal/connection/api</module>
+    <module>../../sal/connection/implementation</module>
+
     <!--  Web bundles -->
     <module>../../web/root</module>
     <module>../../web/flows</module>
index 9d84d45967a44114a960dfc4ab3cd56f671e2dfd..373d67dab15976a117d15a989d716948cf42621b 100644 (file)
@@ -32,6 +32,7 @@
               org.opendaylight.controller.sal.inventory,
               org.opendaylight.controller.sal.match,
               org.opendaylight.controller.sal.utils,
+              org.opendaylight.controller.sal.connection,
               org.apache.commons.lang3.builder,
               org.apache.commons.lang3.tuple,
               org.apache.felix.dm,
       <artifactId>sal</artifactId>
       <version>0.5.0-SNAPSHOT</version>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal.connection</artifactId>
+      <version>0.1.0-SNAPSHOT</version>
+    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller.thirdparty</groupId>
       <artifactId>org.openflow.openflowj</artifactId>
index 40f594bd2a0e12bfe13215fb50672909fe5de856..c7c6c8924d29807da61aa5896c4b2b1e1b79cc59 100644 (file)
@@ -30,6 +30,12 @@ import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitchStateListener;
+import org.opendaylight.controller.sal.connection.ConnectionConstants;
+import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
+import org.opendaylight.controller.sal.core.Node;
+import org.opendaylight.controller.sal.utils.Status;
+import org.opendaylight.controller.sal.utils.StatusCode;
 import org.openflow.protocol.OFMessage;
 import org.openflow.protocol.OFType;
 import org.openflow.util.HexString;
@@ -38,7 +44,7 @@ import org.osgi.framework.FrameworkUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class Controller implements IController, CommandProvider {
+public class Controller implements IController, CommandProvider, IPluginInConnectionService {
     private static final Logger logger = LoggerFactory
             .getLogger(Controller.class);
     private ControllerIO controllerIO;
@@ -221,8 +227,7 @@ public class Controller implements IController, CommandProvider {
             // create new switch
             int i = this.switchInstanceNumber.addAndGet(1);
             String instanceName = "SwitchHandler-" + i;
-            SwitchHandler switchHandler = new SwitchHandler(this, sc,
-                    instanceName);
+            SwitchHandler switchHandler = new SwitchHandler(this, sc, instanceName);
             switchHandler.start();
             if (sc.isConnected()) {
                 logger.info("Switch:{} is connected to the Controller",
@@ -375,4 +380,34 @@ public class Controller implements IController, CommandProvider {
         help.append("\t controllerShowConnConfig\n");
         return help.toString();
     }
+
+    @Override
+    public Status disconnect(Node node) {
+        ISwitch sw = getSwitch((Long) node.getID());
+        if (sw != null) disconnectSwitch(sw);
+        return new Status(StatusCode.SUCCESS);
+    }
+
+    @Override
+    public Node connect(String connectionIdentifier, Map<ConnectionConstants, String> params) {
+        return null;
+    }
+
+    /**
+     * View Change notification
+     */
+    public void notifyClusterViewChanged() {
+        for (ISwitch sw : switches.values()) {
+            notifySwitchAdded(sw);
+        }
+    }
+
+    /**
+     * Node Disconnected from the node's master controller.
+     */
+    @Override
+    public void notifyNodeDisconnectFromMaster(Node node) {
+        ISwitch sw = switches.get((Long)node.getID());
+        if (sw != null) notifySwitchAdded(sw);
+    }
 }
index 6c09abbdc7cd851df41e9e72b2bba987cd6171e9..16393fa21f0518fed7691f0960fbd53c02b54ce0 100644 (file)
@@ -28,6 +28,8 @@ import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShim
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
 import org.opendaylight.controller.protocol_plugin.openflow.core.internal.Controller;
+import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
 import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
 import org.opendaylight.controller.sal.core.IContainerListener;
 import org.opendaylight.controller.sal.core.Node;
@@ -162,6 +164,11 @@ public class Activator extends ComponentActivatorAbstractBase {
                     .setCallbacks("setPluginOutDataPacketService",
                             "unsetPluginOutDataPacketService")
                     .setRequired(false));
+            c.add(createServiceDependency()
+                    .setService(IPluginOutConnectionService.class)
+                    .setCallbacks("setIPluginOutConnectionService",
+                            "unsetIPluginOutConnectionService")
+                    .setRequired(false));
         }
 
         if (imp.equals(ReadService.class)) {
@@ -178,11 +185,18 @@ public class Activator extends ComponentActivatorAbstractBase {
                     .setService(IReadServiceFilter.class)
                     .setCallbacks("setService", "unsetService")
                     .setRequired(true));
+
             c.add(createContainerServiceDependency(containerName)
                     .setService(IPluginOutReadService.class)
                     .setCallbacks("setPluginOutReadServices",
                             "unsetPluginOutReadServices")
                     .setRequired(false));
+
+            c.add(createServiceDependency()
+                    .setService(IPluginOutConnectionService.class)
+                    .setCallbacks("setIPluginOutConnectionService",
+                            "unsetIPluginOutConnectionService")
+                    .setRequired(false));
         }
 
         if (imp.equals(FlowProgrammerNotifier.class)) {
@@ -198,6 +212,11 @@ public class Activator extends ComponentActivatorAbstractBase {
                     .setCallbacks("setPluginOutFlowProgrammerService",
                             "unsetPluginOutFlowProgrammerService")
                     .setRequired(true));
+            c.add(createServiceDependency()
+                    .setService(IPluginOutConnectionService.class)
+                    .setCallbacks("setIPluginOutConnectionService",
+                            "unsetIPluginOutConnectionService")
+                    .setRequired(false));
         }
     }
 
@@ -213,7 +232,7 @@ public class Activator extends ComponentActivatorAbstractBase {
     public Object[] getGlobalImplementations() {
         Object[] res = { Controller.class, OFStatisticsManager.class,
                 FlowProgrammerService.class, ReadServiceFilter.class,
-                DiscoveryService.class, DataPacketMuxDemux.class,
+                DiscoveryService.class, DataPacketMuxDemux.class, InventoryService.class,
                 InventoryServiceShim.class, TopologyServiceShim.class };
         return res;
     }
@@ -235,7 +254,10 @@ public class Activator extends ComponentActivatorAbstractBase {
             logger.debug("Activator configureGlobalInstance( ) is called");
             Dictionary<String, Object> props = new Hashtable<String, Object>();
             props.put("name", "Controller");
-            c.setInterface(IController.class.getName(), props);
+            props.put(GlobalConstants.PROTOCOLPLUGINTYPE.toString(), Node.NodeIDType.OPENFLOW);
+            c.setInterface(new String[] { IController.class.getName(),
+                                          IPluginInConnectionService.class.getName()},
+                                          props);
         }
 
         if (imp.equals(FlowProgrammerService.class)) {
@@ -263,6 +285,11 @@ public class Activator extends ComponentActivatorAbstractBase {
                             "unsetsetFlowProgrammerNotifier")
                     .setRequired(false));
 
+            c.add(createServiceDependency()
+                    .setService(IPluginOutConnectionService.class)
+                    .setCallbacks("setIPluginOutConnectionService",
+                            "unsetIPluginOutConnectionService")
+                    .setRequired(false));
         }
 
         if (imp.equals(ReadServiceFilter.class)) {
@@ -285,7 +312,6 @@ public class Activator extends ComponentActivatorAbstractBase {
                     .setCallbacks("setReadFilterInternalListener",
                             "unsetReadFilterInternalListener")
                     .setRequired(false));
-
         }
 
         if (imp.equals(OFStatisticsManager.class)) {
@@ -327,6 +353,11 @@ public class Activator extends ComponentActivatorAbstractBase {
                     .setService(IDiscoveryListener.class)
                     .setCallbacks("setDiscoveryListener",
                             "unsetDiscoveryListener").setRequired(true));
+            c.add(createServiceDependency()
+                    .setService(IPluginOutConnectionService.class)
+                    .setCallbacks("setIPluginOutConnectionService",
+                            "unsetIPluginOutConnectionService")
+                    .setRequired(false));
         }
 
         // DataPacket mux/demux services, which is teh actual engine
@@ -350,6 +381,34 @@ public class Activator extends ComponentActivatorAbstractBase {
                     .setService(IDataPacketListen.class)
                     .setCallbacks("setIDataPacketListen",
                             "unsetIDataPacketListen").setRequired(false));
+            c.add(createServiceDependency()
+                    .setService(IPluginOutConnectionService.class)
+                    .setCallbacks("setIPluginOutConnectionService",
+                            "unsetIPluginOutConnectionService")
+                    .setRequired(false));
+        }
+
+        if (imp.equals(InventoryService.class)) {
+            // export the service
+            Dictionary<String, Object> props = new Hashtable<String, Object>();
+            props.put("scope", "Global");
+
+            c.setInterface(
+                    new String[] { IPluginInInventoryService.class.getName(),
+                            IInventoryShimInternalListener.class.getName(),
+                            IInventoryProvider.class.getName() }, props);
+
+            // Now lets add a service dependency to make sure the
+            // provider of service exists
+            c.add(createServiceDependency()
+                    .setService(IController.class, "(name=Controller)")
+                    .setCallbacks("setController", "unsetController")
+                    .setRequired(true));
+            c.add(createServiceDependency()
+                    .setService(IPluginOutInventoryService.class, "(scope=Global)")
+                    .setCallbacks("setPluginOutInventoryServices",
+                            "unsetPluginOutInventoryServices")
+                    .setRequired(false));
         }
 
         if (imp.equals(InventoryServiceShim.class)) {
@@ -361,15 +420,25 @@ public class Activator extends ComponentActivatorAbstractBase {
                     .setCallbacks("setController", "unsetController")
                     .setRequired(true));
             c.add(createServiceDependency()
-                    .setService(IInventoryShimInternalListener.class)
+                    .setService(IInventoryShimInternalListener.class, "(!(scope=Global))")
                     .setCallbacks("setInventoryShimInternalListener",
                             "unsetInventoryShimInternalListener")
                     .setRequired(true));
+            c.add(createServiceDependency()
+                    .setService(IInventoryShimInternalListener.class, "(scope=Global)")
+                    .setCallbacks("setInventoryShimGlobalInternalListener",
+                            "unsetInventoryShimGlobalInternalListener")
+                    .setRequired(true));
             c.add(createServiceDependency()
                     .setService(IInventoryShimExternalListener.class)
                     .setCallbacks("setInventoryShimExternalListener",
                             "unsetInventoryShimExternalListener")
                     .setRequired(false));
+            c.add(createServiceDependency()
+                    .setService(IPluginOutConnectionService.class)
+                    .setCallbacks("setIPluginOutConnectionService",
+                            "unsetIPluginOutConnectionService")
+                    .setRequired(false));
         }
 
         if (imp.equals(TopologyServiceShim.class)) {
index 2d8cdd206a7e2c651151c6515e003ba8e862c8fa..7a01d3bd19bce2060a6749bb017e08026a84aaea 100644 (file)
@@ -32,6 +32,7 @@ import org.openflow.protocol.action.OFActionOutput;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
 import org.opendaylight.controller.sal.core.ConstructionException;
 import org.opendaylight.controller.sal.core.ContainerFlow;
 import org.opendaylight.controller.sal.core.IContainerListener;
@@ -60,6 +61,7 @@ public class DataPacketMuxDemux implements IContainerListener,
     private ConcurrentMap<String, List<ContainerFlow>> container2FlowSpecs = new ConcurrentHashMap<String, List<ContainerFlow>>();
     // Track local data packet listener
     private List<IDataPacketListen> iDataPacketListen = new CopyOnWriteArrayList<IDataPacketListen>();
+    private IPluginOutConnectionService connectionOutService;
 
     void setIDataPacketListen(IDataPacketListen s) {
         if (this.iDataPacketListen != null) {
@@ -128,6 +130,16 @@ public class DataPacketMuxDemux implements IContainerListener,
         }
     }
 
+    void setIPluginOutConnectionService(IPluginOutConnectionService s) {
+        connectionOutService = s;
+    }
+
+    void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
+        if (connectionOutService == s) {
+            connectionOutService = null;
+        }
+    }
+
     /**
      * Function called by the dependency manager when all the required
      * dependencies are satisfied
@@ -164,9 +176,21 @@ public class DataPacketMuxDemux implements IContainerListener,
                     new Object[] { sw, msg, this.pluginOutDataPacketServices });
             return;
         }
+
+        Long ofSwitchID = Long.valueOf(sw.getId());
+        try {
+            Node n = new Node(Node.NodeIDType.OPENFLOW, ofSwitchID);
+            if (!connectionOutService.isLocal(n)) {
+                logger.debug("Connection service refused DataPacketMuxDemux receive {} {}", sw, msg);
+                return;
+            }
+        }
+        catch (Exception e) {
+            return;
+        }
+
         if (msg instanceof OFPacketIn) {
             OFPacketIn ofPacket = (OFPacketIn) msg;
-            Long ofSwitchID = Long.valueOf(sw.getId());
             Short ofPortID = Short.valueOf(ofPacket.getInPort());
 
             try {
@@ -258,6 +282,12 @@ public class DataPacketMuxDemux implements IContainerListener,
             return;
         }
 
+        if (!connectionOutService.isLocal(outPort.getNode())) {
+            logger.debug("data packets will not be sent to {} in a non-master controller", outPort.toString());
+            return;
+        }
+
+
         if (!outPort.getType().equals(
                 NodeConnector.NodeConnectorIDType.OPENFLOW)) {
             // The output Port is not of type OpenFlow
index adb9d20ef7d4338a287a6e721bbb23e9f034fca5..4c65645bfed8f98c53bc019e8351e7c2b09cec01 100644 (file)
@@ -12,6 +12,8 @@ package org.opendaylight.controller.protocol_plugin.openflow.internal;
 import org.opendaylight.controller.protocol_plugin.openflow.IDataPacketMux;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
+import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.packet.IPluginInDataPacketService;
 import org.opendaylight.controller.sal.packet.RawPacket;
 
@@ -19,6 +21,7 @@ public class DataPacketServices implements IPluginInDataPacketService {
     protected static final Logger logger = LoggerFactory
             .getLogger(DataPacketServices.class);
     private IDataPacketMux iDataPacketMux = null;
+    private IPluginOutConnectionService connectionOutService;
 
     void setIDataPacketMux(IDataPacketMux s) {
         this.iDataPacketMux = s;
@@ -30,8 +33,23 @@ public class DataPacketServices implements IPluginInDataPacketService {
         }
     }
 
+    void setIPluginOutConnectionService(IPluginOutConnectionService s) {
+        connectionOutService = s;
+    }
+
+    void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
+        if (connectionOutService == s) {
+            connectionOutService = null;
+        }
+    }
+
     @Override
     public void transmitDataPacket(RawPacket outPkt) {
-        this.iDataPacketMux.transmitDataPacket(outPkt);
+        NodeConnector nc = outPkt.getOutgoingNodeConnector();
+        if (connectionOutService != null && connectionOutService.isLocal(nc.getNode())) {
+            this.iDataPacketMux.transmitDataPacket(outPkt);
+        } else {
+            logger.debug("{} is dropped in the controller "+outPkt);
+        }
     }
 }
index ee712030e95573a807239dc6283f4291bb5f01d0..bb303e3651de7e3a95659641cf001da4b0153bb4 100644 (file)
@@ -39,6 +39,7 @@ import org.osgi.framework.FrameworkUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
 import org.opendaylight.controller.sal.core.Config;
 import org.opendaylight.controller.sal.core.ConstructionException;
 import org.opendaylight.controller.sal.core.Edge;
@@ -60,6 +61,8 @@ import org.opendaylight.controller.sal.utils.HexEncode;
 import org.opendaylight.controller.sal.utils.NetUtils;
 import org.opendaylight.controller.sal.utils.NodeConnectorCreator;
 import org.opendaylight.controller.sal.utils.NodeCreator;
+import org.opendaylight.controller.sal.utils.Status;
+import org.opendaylight.controller.sal.utils.StatusCode;
 
 /**
  * The class describes neighbor discovery service for an OpenFlow network.
@@ -126,6 +129,7 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa
     private volatile Boolean shuttingDown = false;
 
     private LLDPTLV chassisIdTlv, portIdTlv, ttlTlv, customTlv;
+    private IPluginOutConnectionService connectionOutService;
 
     class DiscoveryTransmit implements Runnable {
         private final BlockingQueue<NodeConnector> transmitQ;
@@ -256,6 +260,11 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa
             return;
         }
 
+        if (!connectionOutService.isLocal(nodeConnector.getNode())) {
+            logger.debug("Discoery packets will not be sent to {} in a non-master controller", nodeConnector.toString());
+            return;
+        }
+
         if (outPkt == null) {
             logger.debug("Can not send discovery packet out since outPkt is null");
             return;
@@ -301,12 +310,18 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa
             return PacketResult.IGNORED;
         }
 
-        if (((Short) inPkt.getIncomingNodeConnector().getID()).equals(NodeConnector.SPECIALNODECONNECTORID)) {
+        NodeConnector nodeConnector = inPkt.getIncomingNodeConnector();
+        if (((Short) nodeConnector.getID()).equals(NodeConnector.SPECIALNODECONNECTORID)) {
             logger.trace("Ignoring ethernet packet received on special port: "
                     + inPkt.getIncomingNodeConnector().toString());
             return PacketResult.IGNORED;
         }
 
+        if (!connectionOutService.isLocal(nodeConnector.getNode())) {
+            logger.debug("Discoery packets will not be processed from {} in a non-master controller", nodeConnector.toString());
+            return PacketResult.IGNORED;
+        }
+
         Ethernet ethPkt = new Ethernet();
         try {
             ethPkt.deserialize(data, 0, data.length * NetUtils.NumBitsInAByte);
@@ -659,6 +674,9 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa
                 // Allow one more retry
                 readyListLo.add(nodeConnector);
                 elapsedTime.remove(nodeConnector);
+                if (connectionOutService.isLocal(nodeConnector.getNode())) {
+                    transmitQ.add(nodeConnector);
+                }
             }
         }
     }
@@ -692,10 +710,12 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa
     private void doDiscovery() {
         if (++discoveryTimerTickCount <= discoveryBatchPauseTicks) {
             for (NodeConnector nodeConnector : getWorkingSet()) {
-                transmitQ.add(nodeConnector);
-                // Move to staging area after it's served
-                if (!stagingList.contains(nodeConnector)) {
-                    stagingList.add(nodeConnector);
+                if (connectionOutService.isLocal(nodeConnector.getNode())) {
+                    transmitQ.add(nodeConnector);
+                    // Move to staging area after it's served
+                    if (!stagingList.contains(nodeConnector)) {
+                        stagingList.add(nodeConnector);
+                    }
                 }
             }
         } else if (discoveryTimerTickCount >= discoveryBatchRestartTicks) {
@@ -1402,6 +1422,16 @@ public class DiscoveryService implements IInventoryShimExternalListener, IDataPa
         }
     }
 
+    void setIPluginOutConnectionService(IPluginOutConnectionService s) {
+        connectionOutService = s;
+    }
+
+    void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
+        if (connectionOutService == s) {
+            connectionOutService = null;
+        }
+    }
+
     private void initDiscoveryPacket() {
         // Create LLDP ChassisID TLV
         chassisIdTlv = new LLDPTLV();
index 2a62d6c34a08a19c4cccb898acdf90945ada255b..dfa2026cddc041ea08b040c33b50ca31de0cc8ca 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.protocol_plugin.openflow.internal;
 
 import org.apache.felix.dm.Component;
 import org.opendaylight.controller.protocol_plugin.openflow.IFlowProgrammerNotifier;
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.flowprogrammer.Flow;
 import org.opendaylight.controller.sal.flowprogrammer.IPluginOutFlowProgrammerService;
@@ -24,6 +25,7 @@ public class FlowProgrammerNotifier implements IFlowProgrammerNotifier {
     protected static final Logger logger = LoggerFactory
             .getLogger(FlowProgrammerNotifier.class);
     private IPluginOutFlowProgrammerService salNotifier;
+    private IPluginOutConnectionService connectionOutService;
 
     public FlowProgrammerNotifier() {
         salNotifier = null;
@@ -76,6 +78,11 @@ public class FlowProgrammerNotifier implements IFlowProgrammerNotifier {
 
     @Override
     public void flowRemoved(Node node, Flow flow) {
+        if (!connectionOutService.isLocal(node)) {
+            logger.debug("flow removed will not be notified in a non-master controller for node "+node);
+            return;
+        }
+
         if (salNotifier != null) {
             salNotifier.flowRemoved(node, flow);
         } else {
@@ -85,6 +92,11 @@ public class FlowProgrammerNotifier implements IFlowProgrammerNotifier {
 
     @Override
     public void flowErrorReported(Node node, long rid, Object err) {
+        if (!connectionOutService.isLocal(node)) {
+            logger.debug("flow error will not be notified in a non-master controller for node "+node);
+            return;
+        }
+
         if (salNotifier != null) {
             salNotifier.flowErrorReported(node, rid, err);
         } else {
@@ -92,4 +104,14 @@ public class FlowProgrammerNotifier implements IFlowProgrammerNotifier {
         }
     }
 
+    void setIPluginOutConnectionService(IPluginOutConnectionService s) {
+        connectionOutService = s;
+    }
+
+    void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
+        if (connectionOutService == s) {
+            connectionOutService = null;
+        }
+    }
+
 }
index f58acf62edb0962703b9e4971e582bc412314c47..c55a88cd3f0224cc587bc01e632e742405cc34f1 100644 (file)
@@ -23,6 +23,7 @@ import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimExtern
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
 import org.opendaylight.controller.sal.core.ContainerFlow;
 import org.opendaylight.controller.sal.core.IContainerListener;
 import org.opendaylight.controller.sal.core.Node;
@@ -65,6 +66,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
     private Map<String, Set<NodeConnector>> containerToNc;
     private ConcurrentMap<Long, Map<Integer, Long>> xid2rid;
     private int barrierMessagePriorCount = getBarrierMessagePriorCount();
+    private IPluginOutConnectionService connectionOutService;
 
     public FlowProgrammerService() {
         controller = null;
@@ -83,6 +85,16 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
         }
     }
 
+    void setIPluginOutConnectionService(IPluginOutConnectionService s) {
+        connectionOutService = s;
+    }
+
+    void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
+        if (connectionOutService == s) {
+            connectionOutService = null;
+        }
+    }
+
     public void setFlowProgrammerNotifier(Map<String, ?> props,
             IFlowProgrammerNotifier s) {
         if (props == null || props.get("containerName") == null) {
@@ -146,32 +158,62 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
 
     @Override
     public Status addFlow(Node node, Flow flow) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("Add flow will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
         return addFlowInternal(node, flow, 0);
     }
 
     @Override
     public Status modifyFlow(Node node, Flow oldFlow, Flow newFlow) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("Modify flow will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
         return modifyFlowInternal(node, oldFlow, newFlow, 0);
     }
 
     @Override
     public Status removeFlow(Node node, Flow flow) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("Remove flow will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
         return removeFlowInternal(node, flow, 0);
     }
 
     @Override
     public Status addFlowAsync(Node node, Flow flow, long rid) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("Add flow Async will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
         return addFlowInternal(node, flow, rid);
     }
 
     @Override
     public Status modifyFlowAsync(Node node, Flow oldFlow, Flow newFlow,
             long rid) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("Modify flow async will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
         return modifyFlowInternal(node, oldFlow, newFlow, rid);
     }
 
     @Override
     public Status removeFlowAsync(Node node, Flow flow, long rid) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("Remove flow async will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
         return removeFlowInternal(node, flow, rid);
     }
 
@@ -324,6 +366,11 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
 
     @Override
     public Status removeAllFlows(Node node) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("Remove all flows will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
         return new Status(StatusCode.SUCCESS);
     }
 
@@ -446,6 +493,11 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
 
     @Override
     public Status syncSendBarrierMessage(Node node) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("Sync Send Barrier will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
         if (!node.getType().equals(NodeIDType.OPENFLOW)) {
             return new Status(StatusCode.NOTACCEPTABLE,
                     "The node does not support Barrier message.");
@@ -469,6 +521,11 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
 
     @Override
     public Status asyncSendBarrierMessage(Node node) {
+        if (!connectionOutService.isLocal(node)) {
+            log.debug("ASync Send Barrier will not be processed in a non-master controller for node " + node);
+            return new Status(StatusCode.NOTALLOWED, "This is not the master controller for " + node);
+        }
+
         if (!node.getType().equals(NodeIDType.OPENFLOW)) {
             return new Status(StatusCode.NOTACCEPTABLE,
                     "The node does not support Barrier message.");
index 6e6cb00f899d13a94188f7c76c439a68b331f573..9fded15f42383a1edceb436f5c8b010909a6e03b 100644 (file)
@@ -22,6 +22,7 @@ import org.opendaylight.controller.protocol_plugin.openflow.IInventoryProvider;
 import org.opendaylight.controller.protocol_plugin.openflow.IInventoryShimInternalListener;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.core.Property;
@@ -73,8 +74,10 @@ public class InventoryService implements IInventoryShimInternalListener,
         Dictionary props = c.getServiceProperties();
         if (props != null) {
             containerName = (String) props.get("containerName");
-            isDefaultContainer = containerName.equals(GlobalConstants.DEFAULT
-                    .toString());
+            if (containerName != null) {
+                isDefaultContainer = containerName.equals(GlobalConstants.DEFAULT
+                        .toString());
+            }
         }
 
         nodeProps = new ConcurrentHashMap<Node, Map<String, Property>>();
@@ -201,13 +204,6 @@ public class InventoryService implements IInventoryShimInternalListener,
             return;
         }
 
-        Set<Node> nodeSet = nodeProps.keySet();
-        if (((props == null) || props.isEmpty()) && (nodeSet != null)
-                && nodeSet.contains(node)) {
-            // node already added
-            return;
-        }
-
         logger.trace("addNode: {} added, props: {} for container {}",
                 new Object[] { node, props, containerName });
 
@@ -297,5 +293,4 @@ public class InventoryService implements IInventoryShimInternalListener,
             break;
         }
     }
-
 }
index f0b8735f83f960b0626fff87c20304ce40a03cb9..241fa92deb267414704bb8f2dca5095cf8078d05 100644 (file)
@@ -26,6 +26,7 @@ import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitchStateListener;
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
 import org.opendaylight.controller.sal.core.Actions;
 import org.opendaylight.controller.sal.core.Buffers;
 import org.opendaylight.controller.sal.core.Capabilities;
@@ -63,11 +64,13 @@ public class InventoryServiceShim implements IContainerListener,
             .getLogger(InventoryServiceShim.class);
     private IController controller = null;
     private final ConcurrentMap<String, IInventoryShimInternalListener> inventoryShimInternalListeners = new ConcurrentHashMap<String, IInventoryShimInternalListener>();
+    private final Set<IInventoryShimInternalListener> globalInventoryShimInternalListeners = new HashSet<IInventoryShimInternalListener>();
     private final List<IInventoryShimExternalListener> inventoryShimExternalListeners = new CopyOnWriteArrayList<IInventoryShimExternalListener>();
     private final ConcurrentMap<NodeConnector, Set<String>> nodeConnectorContainerMap = new ConcurrentHashMap<NodeConnector, Set<String>>();
     private final ConcurrentMap<Node, Set<String>> nodeContainerMap = new ConcurrentHashMap<Node, Set<String>>();
     private final ConcurrentMap<NodeConnector, Set<Property>> nodeConnectorProps = new ConcurrentHashMap<NodeConnector, Set<Property>>();
     private final ConcurrentMap<Node, Set<Property>> nodeProps = new ConcurrentHashMap<Node, Set<Property>>();
+    private IPluginOutConnectionService connectionOutService;
 
     void setController(IController s) {
         this.controller = s;
@@ -79,6 +82,20 @@ public class InventoryServiceShim implements IContainerListener,
         }
     }
 
+    void setInventoryShimGlobalInternalListener(Map<?, ?> props,
+            IInventoryShimInternalListener s) {
+        if ((this.globalInventoryShimInternalListeners != null)) {
+            this.globalInventoryShimInternalListeners.add(s);
+        }
+    }
+
+    void unsetInventoryShimGlobalInternalListener(Map<?, ?> props,
+            IInventoryShimInternalListener s) {
+        if ((this.globalInventoryShimInternalListeners != null)) {
+            this.globalInventoryShimInternalListeners.remove(s);
+        }
+    }
+
     void setInventoryShimInternalListener(Map<?, ?> props,
             IInventoryShimInternalListener s) {
         if (props == null) {
@@ -107,7 +124,7 @@ public class InventoryServiceShim implements IContainerListener,
         }
         String containerName = (String) props.get("containerName");
         if (containerName == null) {
-            logger.error("unsetInventoryShimInternalListener containerName not supplied");
+            logger.error("setInventoryShimInternalListener containerName not supplied");
             return;
         }
         if ((this.inventoryShimInternalListeners != null)
@@ -136,6 +153,16 @@ public class InventoryServiceShim implements IContainerListener,
         }
     }
 
+    void setIPluginOutConnectionService(IPluginOutConnectionService s) {
+        connectionOutService = s;
+    }
+
+    void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
+        if (connectionOutService == s) {
+            connectionOutService = null;
+        }
+    }
+
     /**
      * Function called by the dependency manager when all the required
      * dependencies are satisfied
@@ -167,6 +194,7 @@ public class InventoryServiceShim implements IContainerListener,
         this.inventoryShimInternalListeners.clear();
         this.nodeConnectorContainerMap.clear();
         this.nodeContainerMap.clear();
+        this.globalInventoryShimInternalListeners.clear();
         this.controller = null;
     }
 
@@ -353,6 +381,19 @@ public class InventoryServiceShim implements IContainerListener,
      * Notify all internal and external listeners
      */
     private void notifyInventoryShimListener(NodeConnector nodeConnector, UpdateType type, Set<Property> props) {
+        notifyGlobalInventoryShimInternalListener(nodeConnector, type, props);
+        /*
+         * isLocal is intentionally moved after the GlobalInventory listener call.
+         * The above notification to GlobalInventory will make sure that the connectionOutService be ready
+         * to reply to isLocal query.
+         */
+        if (!connectionOutService.isLocal(nodeConnector.getNode())) {
+            logger.debug("Connection service dropped the inventory notification for {} {}", nodeConnector.toString(), type);
+            return;
+        } else {
+            logger.debug("Connection service accepted the inventory notification for {} {}", nodeConnector.toString(), type);
+        }
+
         // notify other containers
         Set<String> containers = (nodeConnectorContainerMap.get(nodeConnector) == null) ? new HashSet<String>()
                 : new HashSet<String>(nodeConnectorContainerMap.get(nodeConnector));
@@ -369,7 +410,19 @@ public class InventoryServiceShim implements IContainerListener,
      * Notify all internal and external listeners
      */
     private void notifyInventoryShimListener(Node node, UpdateType type, Set<Property> props) {
-        // Now notify other containers
+        notifyGlobalInventoryShimInternalListener(node, type, props);
+        /*
+         * isLocal is intentionally moved after the GlobalInventory listener call.
+         * The above notification to GlobalInventory will make sure that the connectionOutService be ready
+         * to reply to isLocal query.
+         */
+        if (!connectionOutService.isLocal(node)) {
+            logger.debug("Connection service dropped the inventory notification for {} {}", node.toString(), type);
+            return;
+        } else {
+            logger.debug("Connection service accepted the inventory notification for {} {}", node.toString(), type);
+        }
+    // Now notify other containers
         Set<String> containers = (nodeContainerMap.get(node) == null) ? new HashSet<String>() : new HashSet<String>(
                 nodeContainerMap.get(node));
         containers.add(GlobalConstants.DEFAULT.toString());
@@ -381,6 +434,24 @@ public class InventoryServiceShim implements IContainerListener,
         notifyInventoryShimExternalListener(node, type, props);
     }
 
+    private void notifyGlobalInventoryShimInternalListener(Node node, UpdateType type, Set<Property> props) {
+        for (IInventoryShimInternalListener globalListener : globalInventoryShimInternalListeners) {
+            globalListener.updateNode(node, type, props);
+            logger.trace(
+                    "notifyGlobalInventoryShimInternalListener {} type {}",
+                    new Object[] { node, type });
+        }
+    }
+
+    private void notifyGlobalInventoryShimInternalListener(NodeConnector nodeConnector, UpdateType type, Set<Property> props) {
+        for (IInventoryShimInternalListener globalListener : globalInventoryShimInternalListeners) {
+            globalListener.updateNodeConnector(nodeConnector, type, props);
+            logger.trace(
+                    "notifyGlobalInventoryShimInternalListener {} type {}",
+                    new Object[] { nodeConnector, type });
+        }
+    }
+
     private void notifyInventoryShimInternalListener(String container,
             Node node, UpdateType type, Set<Property> props) {
         IInventoryShimInternalListener inventoryShimInternalListener = inventoryShimInternalListeners
index bae49d511a2f853f61d428f0751ea7b231539831..e3a1ffec96cc28d26b0dcc247763aeef1c06184d 100644 (file)
@@ -36,6 +36,7 @@ import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6Match;
 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsReply;
 import org.opendaylight.controller.protocol_plugin.openflow.vendorextension.v6extension.V6StatsRequest;
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.core.Property;
@@ -169,6 +170,18 @@ IInventoryShimExternalListener, CommandProvider {
         }
         return statsQueueSize;
     }
+
+    IPluginOutConnectionService connectionPluginOutService;
+    void setIPluginOutConnectionService(IPluginOutConnectionService s) {
+        connectionPluginOutService = s;
+    }
+
+    void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
+        if (connectionPluginOutService == s) {
+            connectionPluginOutService = null;
+        }
+    }
+
     /**
      * Function called by the dependency manager when all the required
      * dependencies are satisfied
index 585e4f32c46d83b1edbe758ad2b635655639dcc8..84f7dd9282ca89cc67eb160a6465712c2b489d8b 100644 (file)
@@ -17,6 +17,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
 import org.apache.felix.dm.Component;
 import org.opendaylight.controller.protocol_plugin.openflow.IReadFilterInternalListener;
 import org.opendaylight.controller.protocol_plugin.openflow.IReadServiceFilter;
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.Node.NodeIDType;
 import org.opendaylight.controller.sal.core.NodeConnector;
@@ -40,6 +41,7 @@ public class ReadService implements IPluginInReadService, IReadFilterInternalLis
     private IReadServiceFilter filter;
     private Set<IPluginOutReadService> pluginOutReadServices;
     private String containerName;
+    private IPluginOutConnectionService connectionOutService;
 
     /**
      * Function called by the dependency manager when all the required
@@ -110,6 +112,10 @@ public class ReadService implements IPluginInReadService, IReadFilterInternalLis
             return null;
         }
 
+        if (!connectionOutService.isLocal(node)) {
+            logger.debug("This Controller is not the master for the node : " + node);
+            return null;
+        }
         return filter.readFlow(containerName, node, flow, cached);
     }
 
@@ -120,6 +126,11 @@ public class ReadService implements IPluginInReadService, IReadFilterInternalLis
             return null;
         }
 
+        if (!connectionOutService.isLocal(node)) {
+            logger.debug("This Controller is not the master for the node : " + node);
+            return null;
+        }
+
         return filter.readAllFlow(containerName, node, cached);
     }
 
@@ -130,6 +141,11 @@ public class ReadService implements IPluginInReadService, IReadFilterInternalLis
             return null;
         }
 
+        if (!connectionOutService.isLocal(node)) {
+            logger.debug("This Controller is not the master for the node : " + node);
+            return null;
+        }
+
         return filter.readDescription(node, cached);
     }
 
@@ -141,6 +157,12 @@ public class ReadService implements IPluginInReadService, IReadFilterInternalLis
             logger.error("Invalid node type");
             return null;
         }
+
+        if (!connectionOutService.isLocal(connector.getNode())) {
+            logger.debug("This Controller is not the master for connector : "+connector);
+            return null;
+        }
+
         return filter.readNodeConnector(containerName, connector, cached);
     }
 
@@ -152,6 +174,11 @@ public class ReadService implements IPluginInReadService, IReadFilterInternalLis
             return null;
         }
 
+        if (!connectionOutService.isLocal(node)) {
+            logger.debug("This Controller is not the master for node : " + node);
+            return null;
+        }
+
         return filter.readAllNodeConnector(containerName, node, cached);
     }
 
@@ -162,6 +189,12 @@ public class ReadService implements IPluginInReadService, IReadFilterInternalLis
             logger.error("Invalid node type");
             return 0;
         }
+
+        if (!connectionOutService.isLocal(connector.getNode())) {
+            logger.debug("This Controller is not the master for connector : "+connector);
+            return 0;
+        }
+
         return filter.getTransmitRate(containerName, connector);
     }
 
@@ -172,6 +205,12 @@ public class ReadService implements IPluginInReadService, IReadFilterInternalLis
             logger.error("Invalid node type");
             return null;
         }
+
+        if (!connectionOutService.isLocal(table.getNode())) {
+            logger.debug("This Controller is not the master for connector : "+table);
+            return null;
+        }
+
         return filter.readNodeTable(containerName, table, cached);
     }
 
@@ -182,11 +221,20 @@ public class ReadService implements IPluginInReadService, IReadFilterInternalLis
             return null;
         }
 
+        if (!connectionOutService.isLocal(node)) {
+            logger.debug("This Controller is not the master for node : " + node);
+            return null;
+        }
+
         return filter.readAllNodeTable(containerName, node, cached);
     }
 
     @Override
     public void nodeFlowStatisticsUpdated(Node node, List<FlowOnNode> flowStatsList) {
+        if (!connectionOutService.isLocal(node)) {
+            logger.debug("This Controller is not the master for node : " + node);
+            return;
+        }
         for (IPluginOutReadService service : pluginOutReadServices) {
             service.nodeFlowStatisticsUpdated(node, flowStatsList);
         }
@@ -194,6 +242,10 @@ public class ReadService implements IPluginInReadService, IReadFilterInternalLis
 
     @Override
     public void nodeConnectorStatisticsUpdated(Node node, List<NodeConnectorStatistics> ncStatsList) {
+        if (!connectionOutService.isLocal(node)) {
+            logger.debug("This Controller is not the master for node : " + node);
+            return;
+        }
         for (IPluginOutReadService service : pluginOutReadServices) {
             service.nodeConnectorStatisticsUpdated(node, ncStatsList);
         }
@@ -201,6 +253,10 @@ public class ReadService implements IPluginInReadService, IReadFilterInternalLis
 
     @Override
     public void nodeTableStatisticsUpdated(Node node, List<NodeTableStatistics> tableStatsList) {
+        if (!connectionOutService.isLocal(node)) {
+            logger.debug("This Controller is not the master for node : " + node);
+            return;
+        }
         for (IPluginOutReadService service : pluginOutReadServices) {
             service.nodeTableStatisticsUpdated(node, tableStatsList);
         }
@@ -208,8 +264,22 @@ public class ReadService implements IPluginInReadService, IReadFilterInternalLis
 
     @Override
     public void nodeDescriptionStatisticsUpdated(Node node, NodeDescription nodeDescription) {
+        if (!connectionOutService.isLocal(node)) {
+            logger.debug("This Controller is not the master for node : " + node);
+            return;
+        }
         for (IPluginOutReadService service : pluginOutReadServices) {
             service.descriptionStatisticsUpdated(node, nodeDescription);
         }
     }
+
+    void setIPluginOutConnectionService(IPluginOutConnectionService s) {
+        connectionOutService = s;
+    }
+
+    void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
+        if (connectionOutService == s) {
+            connectionOutService = null;
+        }
+    }
 }
index 7f9a13e92ac3be846342a7615e37ec658d8b9fa0..1ab89b34cb7c875b65e1dee5be409ebbbc5e1378 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.protocol_plugin.openflow.internal;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -27,6 +26,7 @@ import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
 import org.opendaylight.controller.sal.action.Action;
 import org.opendaylight.controller.sal.action.ActionType;
 import org.opendaylight.controller.sal.action.Output;
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
 import org.opendaylight.controller.sal.core.ContainerFlow;
 import org.opendaylight.controller.sal.core.IContainerListener;
 import org.opendaylight.controller.sal.core.Node;
@@ -55,18 +55,16 @@ import org.slf4j.LoggerFactory;
 /**
  * Read Service shim layer which is in charge of filtering the flow statistics
  * based on container. It is a Global instance.
- *
- *
- *
  */
 public class ReadServiceFilter implements IReadServiceFilter, IContainerListener, IOFStatisticsListener {
     private static final Logger logger = LoggerFactory
             .getLogger(ReadServiceFilter.class);
     private IController controller = null;
     private IOFStatisticsManager statsMgr = null;
-    private Map<String, Set<NodeConnector>> containerToNc;
-    private Map<String, Set<Node>> containerToNode;
-    private Map<String, Set<NodeTable>> containerToNt;
+    private ConcurrentMap<String, Set<NodeConnector>> containerToNc;
+    private ConcurrentMap<String, Set<Node>> containerToNode;
+    private ConcurrentMap<String, Set<NodeTable>> containerToNt;
+    private ConcurrentMap<String, Set<ContainerFlow>> containerFlows;
     private ConcurrentMap<String, IReadFilterInternalListener> readFilterInternalListeners;
 
     public void setController(IController core) {
@@ -118,9 +116,10 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
      *
      */
     void init() {
-        containerToNc = new HashMap<String, Set<NodeConnector>>();
-        containerToNt = new HashMap<String, Set<NodeTable>>();
-        containerToNode = new HashMap<String, Set<Node>>();
+        containerToNc = new ConcurrentHashMap<String, Set<NodeConnector>>();
+        containerToNt = new ConcurrentHashMap<String, Set<NodeTable>>();
+        containerToNode = new ConcurrentHashMap<String, Set<Node>>();
+        containerFlows = new ConcurrentHashMap<String, Set<ContainerFlow>>();
         readFilterInternalListeners = new ConcurrentHashMap<String, IReadFilterInternalListener>();
     }
 
@@ -159,9 +158,19 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
         this.statsMgr = null;
     }
 
+    IPluginOutConnectionService connectionPluginOutService;
+    void setIPluginOutConnectionService(IPluginOutConnectionService s) {
+        connectionPluginOutService = s;
+    }
+
+    void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
+        if (connectionPluginOutService == s) {
+            connectionPluginOutService = null;
+        }
+    }
+
     @Override
-    public FlowOnNode readFlow(String container, Node node, Flow flow,
-            boolean cached) {
+    public FlowOnNode readFlow(String container, Node node, Flow flow, boolean cached) {
 
         if (controller == null) {
             // Avoid to provide cached statistics if controller went down.
@@ -186,17 +195,11 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
             }
         }
 
+        // Convert and filter the statistics per container
+        List<FlowOnNode> flowOnNodeList = new FlowStatisticsConverter(ofList).getFlowOnNodeList(node);
+        List<FlowOnNode> filteredList = filterFlowListPerContainer(container, node, flowOnNodeList);
 
-        /*
-         * Convert and filter the statistics per container
-         */
-        List<FlowOnNode> flowOnNodeList = new FlowStatisticsConverter(ofList)
-                .getFlowOnNodeList(node);
-        List<FlowOnNode> filteredList = filterFlowListPerContainer(container,
-                node, flowOnNodeList);
-
-        return (filteredList == null || filteredList.isEmpty()) ? null
-                : filteredList.get(0);
+        return (filteredList == null || filteredList.isEmpty()) ? null : filteredList.get(0);
     }
 
     @Override
@@ -208,13 +211,9 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
                 .getOFFlowStatistics(sid) : statsMgr.queryStatistics(sid,
                 OFStatisticsType.FLOW, null);
 
-        /*
-         * Convert and filter the statistics per container
-         */
-        List<FlowOnNode> flowOnNodeList = new FlowStatisticsConverter(ofList)
-                .getFlowOnNodeList(node);
-        List<FlowOnNode> filteredList = filterFlowListPerContainer(container,
-                node, flowOnNodeList);
+        // Convert and filter the statistics per container
+        List<FlowOnNode> flowOnNodeList = new FlowStatisticsConverter(ofList).getFlowOnNodeList(node);
+        List<FlowOnNode> filteredList = filterFlowListPerContainer(container, node, flowOnNodeList);
 
         return (filteredList == null) ? null : filteredList;
 
@@ -233,7 +232,7 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
                 .getOFDescStatistics(sid) : statsMgr.queryStatistics(sid,
                         OFStatisticsType.DESC, null);
 
-                return new DescStatisticsConverter(ofList).getHwDescription();
+        return new DescStatisticsConverter(ofList).getHwDescription();
     }
 
     /**
@@ -271,8 +270,7 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
      * @param list
      * @return
      */
-    public List<OFStatistics> filterPortListPerContainer(String container,
-            long switchId, List<OFStatistics> list) {
+    public List<OFStatistics> filterPortListPerContainer(String container, long switchId, List<OFStatistics> list) {
         if (list == null) {
             return null;
         }
@@ -304,8 +302,7 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
 
         for (OFStatistics stat : list) {
             OFTableStatistics target = (OFTableStatistics) stat;
-            NodeTable nt = NodeTableCreator.createOFNodeTable(
-                    target.getTableId(), NodeCreator.createOFNode(switchId));
+            NodeTable nt = NodeTableCreator.createOFNodeTable(target.getTableId(), NodeCreator.createOFNode(switchId));
             if (containerOwnsNodeTable(container, nt)) {
                 newList.add(target);
             }
@@ -328,9 +325,9 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
         if (container.equals(GlobalConstants.DEFAULT.toString())) {
             return true;
         }
-        return (flowPortsBelongToContainer(container, node, flow)
-                && flowVlanBelongsToContainer(container, node, flow) && flowSpecAllowsFlow(
-                        container, flow.getMatch()));
+        return (flowPortsBelongToContainer(container, node, flow) &&
+                flowVlanBelongsToContainer(container, node, flow) &&
+                isFlowAllowedByContainer(container, flow));
     }
 
     /**
@@ -366,14 +363,23 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
     }
 
     /**
-     * Returns whether the container flowspec allows the passed flow
+     * Returns whether the container flows allow the passed flow
      *
      * @param container
      * @param match
      * @return
      */
-    private boolean flowSpecAllowsFlow(String container, Match match) {
-        return true; // Always true for now
+    private boolean isFlowAllowedByContainer(String container, Flow flow) {
+        Set<ContainerFlow> cFlowSet = this.containerFlows.get(container);
+        if (cFlowSet == null || cFlowSet.isEmpty()) {
+            return true;
+        }
+        for (ContainerFlow cFlow : cFlowSet) {
+            if (cFlow.allowsFlow(flow)) {
+                return true;
+            }
+        }
+        return false;
     }
 
     /**
@@ -402,9 +408,7 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
             Flow flow) {
         Match m = flow.getMatch();
         if (m.isPresent(MatchType.IN_PORT)) {
-            NodeConnector inPort = (NodeConnector) m
-                    .getField(MatchType.IN_PORT).getValue();
-
+            NodeConnector inPort = (NodeConnector) m.getField(MatchType.IN_PORT).getValue();
             // If the incoming port is specified, check if it belongs to
             if (!containerOwnsNodeConnector(container, inPort)) {
                 return false;
@@ -414,8 +418,7 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
         // If an outgoing port is specified, it must belong to this container
         for (Action action : flow.getActions()) {
             if (action.getType() == ActionType.OUTPUT) {
-                NodeConnector outPort = ((Output) action)
-                        .getPort();
+                NodeConnector outPort = ((Output) action).getPort();
                 if (!containerOwnsNodeConnector(container, outPort)) {
                     return false;
                 }
@@ -427,12 +430,28 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
     @Override
     public void containerFlowUpdated(String containerName, ContainerFlow previousFlow,
             ContainerFlow currentFlow, UpdateType t) {
-        // TODO
+        Set<ContainerFlow> cFlowSet = containerFlows.get(containerName);
+        switch (t) {
+        case ADDED:
+            if (cFlowSet == null) {
+                cFlowSet = new HashSet<ContainerFlow>();
+                containerFlows.put(containerName, cFlowSet);
+            }
+            cFlowSet.add(currentFlow);
+        case CHANGED:
+            break;
+        case REMOVED:
+            if (cFlowSet != null) {
+                cFlowSet.remove(currentFlow);
+            }
+            break;
+        default:
+            break;
+        }
     }
 
     @Override
-    public void nodeConnectorUpdated(String containerName, NodeConnector p,
-            UpdateType type) {
+    public void nodeConnectorUpdated(String containerName, NodeConnector p, UpdateType type) {
 
         switch (type) {
         case ADDED:
@@ -486,8 +505,7 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
     }
 
     @Override
-    public NodeConnectorStatistics readNodeConnector(
-            String containerName, NodeConnector connector, boolean cached) {
+    public NodeConnectorStatistics readNodeConnector(String containerName, NodeConnector connector, boolean cached) {
         if (!containerOwnsNodeConnector(containerName, connector)) {
             return null;
         }
@@ -498,26 +516,22 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
                 .getOFPortStatistics(sid, portId) : statsMgr.queryStatistics(
                         sid, OFStatisticsType.PORT, portId);
 
-                List<NodeConnectorStatistics> ncStatistics = new PortStatisticsConverter(
-                        sid, ofList).getNodeConnectorStatsList();
-                return (ncStatistics.isEmpty()) ? new NodeConnectorStatistics()
-                : ncStatistics.get(0);
+        List<NodeConnectorStatistics> ncStatistics = new PortStatisticsConverter(sid, ofList)
+                .getNodeConnectorStatsList();
+        return (ncStatistics.isEmpty()) ? new NodeConnectorStatistics() : ncStatistics.get(0);
     }
 
     @Override
-    public List<NodeConnectorStatistics> readAllNodeConnector(
-            String containerName, Node node, boolean cached) {
+    public List<NodeConnectorStatistics> readAllNodeConnector(String containerName, Node node, boolean cached) {
 
         long sid = (Long) node.getID();
         List<OFStatistics> ofList = (cached == true) ? statsMgr
                 .getOFPortStatistics(sid) : statsMgr.queryStatistics(sid,
                         OFStatisticsType.FLOW, null);
 
-                List<OFStatistics> filteredList = filterPortListPerContainer(
-                        containerName, sid, ofList);
+        List<OFStatistics> filteredList = filterPortListPerContainer(containerName, sid, ofList);
 
-                return new PortStatisticsConverter(sid, filteredList)
-                .getNodeConnectorStatsList();
+        return new PortStatisticsConverter(sid, filteredList).getNodeConnectorStatsList();
     }
 
     @Override
@@ -544,8 +558,7 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
         List<OFStatistics> ofList = (cached == true) ? statsMgr.getOFTableStatistics(sid, tableId) :
             statsMgr.queryStatistics(sid, OFStatisticsType.TABLE, tableId);
 
-        List<NodeTableStatistics> ntStatistics =
-                new TableStatisticsConverter(sid, ofList).getNodeTableStatsList();
+        List<NodeTableStatistics> ntStatistics = new TableStatisticsConverter(sid, ofList).getNodeTableStatsList();
 
         return (ntStatistics.isEmpty()) ? new NodeTableStatistics() : ntStatistics.get(0);
     }
@@ -564,13 +577,15 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
     @Override
     public void descriptionStatisticsRefreshed(Long switchId, List<OFStatistics> description) {
         String container;
+        IReadFilterInternalListener listener;
         Node node = NodeCreator.createOFNode(switchId);
         NodeDescription nodeDescription = new DescStatisticsConverter(description).getHwDescription();
         for (Map.Entry<String, IReadFilterInternalListener> l : readFilterInternalListeners.entrySet()) {
             container = l.getKey();
+            listener = l.getValue();
             if (container == GlobalConstants.DEFAULT.toString()
                     || (containerToNode.containsKey(container) && containerToNode.get(container).contains(node))) {
-                l.getValue().nodeDescriptionStatisticsUpdated(node, nodeDescription);
+                listener.nodeDescriptionStatisticsUpdated(node, nodeDescription);
             }
         }
     }
@@ -578,25 +593,29 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
     @Override
     public void flowStatisticsRefreshed(Long switchId, List<OFStatistics> flows) {
         String container;
+        IReadFilterInternalListener listener;
         Node node = NodeCreator.createOFNode(switchId);
         for (Map.Entry<String, IReadFilterInternalListener> l : readFilterInternalListeners.entrySet()) {
             container = l.getKey();
+            listener = l.getValue();
 
             // Convert and filter the statistics per container
             List<FlowOnNode> flowOnNodeList = new FlowStatisticsConverter(flows).getFlowOnNodeList(node);
             flowOnNodeList = filterFlowListPerContainer(container, node, flowOnNodeList);
 
             // notify listeners
-            l.getValue().nodeFlowStatisticsUpdated(node, flowOnNodeList);
+            listener.nodeFlowStatisticsUpdated(node, flowOnNodeList);
         }
     }
 
     @Override
     public void portStatisticsRefreshed(Long switchId, List<OFStatistics> ports) {
         String container;
+        IReadFilterInternalListener listener;
         Node node = NodeCreator.createOFNode(switchId);
         for (Map.Entry<String, IReadFilterInternalListener> l : readFilterInternalListeners.entrySet()) {
             container = l.getKey();
+            listener = l.getValue();
 
             // Convert and filter the statistics per container
             List<OFStatistics> filteredPorts = filterPortListPerContainer(container, switchId, ports);
@@ -604,8 +623,7 @@ public class ReadServiceFilter implements IReadServiceFilter, IContainerListener
                     .getNodeConnectorStatsList();
 
             // notify listeners
-            l.getValue().nodeConnectorStatisticsUpdated(node, ncStatsList);
-
+            listener.nodeConnectorStatisticsUpdated(node, ncStatsList);
         }
     }
 
index 1d7b4263e7b191cc13e83666130464442b66b5f2..a0f48100bdc812e4d1cda33c482bfc51b2f63587 100644 (file)
@@ -35,6 +35,7 @@ import org.osgi.framework.FrameworkUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
 import org.opendaylight.controller.sal.core.Bandwidth;
 import org.opendaylight.controller.sal.core.Config;
 import org.opendaylight.controller.sal.core.ContainerFlow;
@@ -398,6 +399,17 @@ public class TopologyServiceShim implements IDiscoveryListener,
         }
     }
 
+    IPluginOutConnectionService connectionPluginOutService;
+    void setIPluginOutConnectionService(IPluginOutConnectionService s) {
+        connectionPluginOutService = s;
+    }
+
+    void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
+        if (connectionPluginOutService == s) {
+            connectionPluginOutService = null;
+        }
+    }
+
     private void removeNodeConnector(String container,
             NodeConnector nodeConnector) {
         List<TopoEdgeUpdate> teuList = new ArrayList<TopoEdgeUpdate>();
index 68572fb65086ef4c7901dc4179caf03e57c12233..d77e513f3bd52f880b311a513705f88316a61c5a 100644 (file)
@@ -16,6 +16,7 @@ import org.opendaylight.controller.protocol_plugin.openflow.ITopologyServiceShim
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
 import org.opendaylight.controller.sal.core.Edge;
 import org.opendaylight.controller.sal.topology.IPluginInTopologyService;
 import org.opendaylight.controller.sal.topology.IPluginOutTopologyService;
@@ -27,6 +28,7 @@ public class TopologyServices implements ITopologyServiceShimListener,
             .getLogger(TopologyServices.class);
     private IPluginOutTopologyService salTopoService = null;
     private IRefreshInternalProvider topoRefreshService = null;
+    private IPluginOutConnectionService connectionOutService;
     private String containerName;
 
     /**
@@ -150,4 +152,14 @@ public class TopologyServices implements ITopologyServiceShimListener,
             this.salTopoService.edgeUtilBackToNormal(edge);
         }
     }
+
+    void setIPluginOutConnectionService(IPluginOutConnectionService s) {
+        connectionOutService = s;
+    }
+
+    void unsetIPluginOutConnectionService(IPluginOutConnectionService s) {
+        if (connectionOutService == s) {
+            connectionOutService = null;
+        }
+    }
 }
diff --git a/opendaylight/sal/connection/api/pom.xml b/opendaylight/sal/connection/api/pom.xml
new file mode 100644 (file)
index 0000000..45425c9
--- /dev/null
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>\r
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">\r
+  <modelVersion>4.0.0</modelVersion>\r
+  <parent>\r
+    <groupId>org.opendaylight.controller</groupId>\r
+    <artifactId>commons.opendaylight</artifactId>\r
+    <version>1.4.0-SNAPSHOT</version>\r
+    <relativePath>../../../commons/opendaylight</relativePath>\r
+  </parent>\r
+\r
+  <artifactId>sal.connection</artifactId>\r
+  <version>0.1.0-SNAPSHOT</version>\r
+  <packaging>bundle</packaging>\r
+\r
+  <build>\r
+    <plugins>\r
+      <plugin>\r
+        <groupId>org.apache.felix</groupId>\r
+        <artifactId>maven-bundle-plugin</artifactId>\r
+        <version>2.3.6</version>\r
+        <extensions>true</extensions>\r
+        <configuration>\r
+          <instructions>\r
+            <Import-Package>\r
+              org.slf4j,\r
+              org.osgi.framework,\r
+              org.apache.felix.dm,\r
+              org.apache.commons.lang3.tuple,\r
+              javax.xml.bind.annotation,\r
+              javax.xml.bind.annotation.adapters,\r
+              org.opendaylight.controller.sal.core,\r
+              org.opendaylight.controller.sal.utils\r
+            </Import-Package>\r
+            <Export-Package>\r
+              org.opendaylight.controller.sal.connection\r
+             </Export-Package>\r
+          </instructions>\r
+          <manifestLocation>${project.basedir}/META-INF</manifestLocation>\r
+        </configuration>\r
+      </plugin>\r
+    </plugins>\r
+  </build>\r
+  <dependencies>\r
+    <dependency>\r
+      <groupId>org.opendaylight.controller</groupId>\r
+      <artifactId>sal</artifactId>\r
+      <version>0.5.0-SNAPSHOT</version>\r
+    </dependency>\r
+    <dependency>\r
+      <groupId>junit</groupId>\r
+      <artifactId>junit</artifactId>\r
+      <version>4.8.1</version>\r
+      <scope>test</scope>\r
+    </dependency>\r
+  </dependencies>\r
+</project>\r
diff --git a/opendaylight/sal/connection/api/src/main/java/org/opendaylight/controller/sal/connection/ConnectionConstants.java b/opendaylight/sal/connection/api/src/main/java/org/opendaylight/controller/sal/connection/ConnectionConstants.java
new file mode 100644 (file)
index 0000000..aaa883b
--- /dev/null
@@ -0,0 +1,33 @@
+
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connection;
+
+/**
+ * ConnectionConstants
+ * Expand this enum as and when needed to support other connection parameters that
+ * might be needed for certain protocol plugins.
+ */
+public enum ConnectionConstants {
+    ADDRESS("address"),
+    PORT("port"),
+    PROTOCOL("protocol"),
+    USERNAME("username"),
+    PASSWORD("password");
+
+    private ConnectionConstants(String name) {
+        this.name = name;
+    }
+
+    private String name;
+
+    public String toString() {
+        return name;
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/sal/connection/api/src/main/java/org/opendaylight/controller/sal/connection/IConnectionListener.java b/opendaylight/sal/connection/api/src/main/java/org/opendaylight/controller/sal/connection/IConnectionListener.java
new file mode 100644 (file)
index 0000000..7ab3bb2
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connection;
+/**
+ * This interface defines the methods the SAL service which relay the Connection
+ * Notification events to the functional modules
+ */
+public interface IConnectionListener extends IPluginOutConnectionService {
+}
\ No newline at end of file
diff --git a/opendaylight/sal/connection/api/src/main/java/org/opendaylight/controller/sal/connection/IConnectionService.java b/opendaylight/sal/connection/api/src/main/java/org/opendaylight/controller/sal/connection/IConnectionService.java
new file mode 100644 (file)
index 0000000..685738c
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connection;
+
+import java.util.Map;
+
+import org.opendaylight.controller.sal.core.Node;
+import org.opendaylight.controller.sal.utils.Status;
+
+/**
+ * Interface that defines the methods available to the functional modules that operate
+ * above SAL for disconnecting or connecting to a particular node.
+ */
+public interface IConnectionService {
+    /**
+     * Connect to a node with a specified node type.
+     *
+     * @param type Type of the node representing NodeIDType.
+     * @param connectionIdentifier Convenient identifier for the applications to make use of
+     * @param params Connection Params in Map format. This is entirely handled by the south-bound
+     * plugins and is an opaque value for SAL. Typical values keyed inside this params are
+     * Management IP-Address, Username, Password, Security Keys, etc...
+     *
+     *  @return Node
+     */
+    public Node connect (String type, String connectionIdentifier, Map<ConnectionConstants, String> params);
+
+
+    /**
+     * Discover the node type and Connect to the first plugin that is able to connect with the specified parameters.
+     *
+     * @param type Type of the node representing NodeIDType.
+     * @param connectionIdentifier Convenient identifier for the applications to make use of
+     * @param params Connection Params in Map format. This is entirely handled by the south-bound
+     * plugins and is an opaque value for SAL. Typical values keyed inside this params are
+     * Management IP-Address, Username, Password, Security Keys, etc...
+     *
+     *  @return Node
+     */
+    public Node connect (String connectionIdentifier, Map<ConnectionConstants, String> params);
+
+    /**
+     * Disconnect a Node that is connected to this Controller.
+     *
+     * @param node
+     * @param flow
+     */
+    public Status disconnect(Node node);
+
+    /**
+     * View Change notification
+     */
+    public void notifyNodeDisconnectFromMaster(Node node);
+
+    /**
+     * View Change notification
+     */
+    public void notifyClusterViewChanged();
+}
\ No newline at end of file
diff --git a/opendaylight/sal/connection/api/src/main/java/org/opendaylight/controller/sal/connection/IPluginInConnectionService.java b/opendaylight/sal/connection/api/src/main/java/org/opendaylight/controller/sal/connection/IPluginInConnectionService.java
new file mode 100644 (file)
index 0000000..9bae123
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connection;
+
+import java.util.Map;
+
+import org.opendaylight.controller.sal.core.Node;
+import org.opendaylight.controller.sal.utils.Status;
+
+/**
+ * @file IPluginInConnectionService.java
+ *
+ * @brief Connection interface to be implemented by protocol plugins
+ */
+public interface IPluginInConnectionService {
+    /**
+     * Disconnect a Node that is connected to this Controller.
+     *
+     * @param node
+     * @param flow
+     */
+    public Status disconnect(Node node);
+
+    /**
+     * Connect to a node
+     *
+     * @param connectionIdentifier Convenient identifier for the applications to make use of
+     * @param params Connection Params in Map format. This is entirely handled by the south-bound
+     * plugins and is an opaque value for SAL. Typical values keyed inside this params are
+     * Management IP-Address, Username, Password, Security Keys, etc...
+     *
+     * @return Node
+     */
+    public Node connect (String connectionIdentifier, Map<ConnectionConstants, String> params);
+
+    /**
+     * View Change notification
+     */
+    public void notifyClusterViewChanged();
+
+    /**
+     * Node Disconnected from the node's master controller.
+     */
+    public void notifyNodeDisconnectFromMaster(Node node);
+
+}
\ No newline at end of file
diff --git a/opendaylight/sal/connection/api/src/main/java/org/opendaylight/controller/sal/connection/IPluginOutConnectionService.java b/opendaylight/sal/connection/api/src/main/java/org/opendaylight/controller/sal/connection/IPluginOutConnectionService.java
new file mode 100644 (file)
index 0000000..b602d61
--- /dev/null
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connection;
+
+import org.opendaylight.controller.sal.core.Node;
+
+public interface IPluginOutConnectionService {
+    /**
+     * Method to test if a node is local to a controller.
+     *
+     * @return true if node is local to this controller. false otherwise.
+     */
+    public boolean isLocal(Node node);
+}
\ No newline at end of file
diff --git a/opendaylight/sal/connection/implementation/pom.xml b/opendaylight/sal/connection/implementation/pom.xml
new file mode 100644 (file)
index 0000000..2feed0b
--- /dev/null
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>\r
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">\r
+  <modelVersion>4.0.0</modelVersion>\r
+  <parent>\r
+    <groupId>org.opendaylight.controller</groupId>\r
+    <artifactId>commons.opendaylight</artifactId>\r
+    <version>1.4.0-SNAPSHOT</version>\r
+    <relativePath>../../../commons/opendaylight</relativePath>\r
+  </parent>\r
+\r
+  <artifactId>sal.connection.implementation</artifactId>\r
+  <version>0.1.0-SNAPSHOT</version>\r
+  <packaging>bundle</packaging>\r
+\r
+  <build>\r
+    <plugins>\r
+      <plugin>\r
+        <groupId>org.apache.felix</groupId>\r
+        <artifactId>maven-bundle-plugin</artifactId>\r
+        <version>2.3.6</version>\r
+        <extensions>true</extensions>\r
+        <configuration>\r
+          <instructions>\r
+            <Import-Package>\r
+              org.slf4j,\r
+              org.opendaylight.controller.sal.core,\r
+              org.opendaylight.controller.sal.packet,\r
+              org.opendaylight.controller.sal.inventory,\r
+              org.opendaylight.controller.sal.flowprogrammer,\r
+              org.opendaylight.controller.sal.reader,\r
+              org.opendaylight.controller.sal.topology,\r
+              org.opendaylight.controller.sal.action,\r
+              org.opendaylight.controller.sal.match,\r
+              org.opendaylight.controller.sal.utils,\r
+              org.opendaylight.controller.sal.connection,\r
+              org.apache.felix.dm,\r
+              org.eclipse.osgi.framework.console,\r
+              org.osgi.framework\r
+            </Import-Package>\r
+            <Export-Package>\r
+            </Export-Package>\r
+            <Bundle-Activator>\r
+              org.opendaylight.controller.sal.connection.implementation.internal.Activator\r
+            </Bundle-Activator>\r
+          </instructions>\r
+          <manifestLocation>${project.basedir}/META-INF</manifestLocation>\r
+        </configuration>\r
+      </plugin>\r
+    </plugins>\r
+  </build>\r
+  <dependencies>\r
+    <dependency>\r
+      <groupId>org.opendaylight.controller</groupId>\r
+      <artifactId>sal</artifactId>\r
+      <version>0.5.0-SNAPSHOT</version>\r
+    </dependency>\r
+    <dependency>\r
+      <groupId>org.opendaylight.controller</groupId>\r
+      <artifactId>sal.connection</artifactId>\r
+      <version>0.1.0-SNAPSHOT</version>\r
+    </dependency>\r
+  </dependencies>\r
+</project>\r
diff --git a/opendaylight/sal/connection/implementation/src/main/java/org/opendaylight/controller/sal/connection/implementation/internal/Activator.java b/opendaylight/sal/connection/implementation/src/main/java/org/opendaylight/controller/sal/connection/implementation/internal/Activator.java
new file mode 100644 (file)
index 0000000..68cd10d
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connection.implementation.internal;
+
+import org.apache.felix.dm.Component;
+import org.opendaylight.controller.sal.connection.IConnectionListener;
+import org.opendaylight.controller.sal.connection.IConnectionService;
+import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
+import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Activator extends ComponentActivatorAbstractBase {
+    protected static final Logger logger = LoggerFactory
+            .getLogger(Activator.class);
+
+    /**
+     * Function called when the activator starts just after some initializations
+     * are done by the ComponentActivatorAbstractBase.
+     *
+     */
+    @Override
+    public void init() {
+
+    }
+
+    /**
+     * Function called when the activator stops just before the cleanup done by
+     * ComponentActivatorAbstractBase
+     *
+     */
+    @Override
+    public void destroy() {
+
+    }
+
+    /**
+     * Function that is used to communicate to dependency manager the list of
+     * known Global implementations
+     *
+     *
+     * @return An array containing all the CLASS objects that will be
+     *         instantiated in order to get an fully working implementation
+     *         Object
+     */
+    public Object[] getGlobalImplementations() {
+        Object[] res = { ConnectionService.class};
+        return res;
+    }
+
+    /**
+     * Function that is called when configuration of the dependencies is required.
+     *
+     * @param c
+     *            dependency manager Component object, used for configuring the
+     *            dependencies exported and imported
+     * @param imp
+     *            Implementation class that is being configured, needed as long
+     *            as the same routine can configure multiple implementations
+     */
+    public void configureGlobalInstance(Component c, Object imp) {
+        if (imp.equals(ConnectionService.class)) {
+            c.setInterface(
+                    new String[] { IConnectionService.class.getName(),
+                                   IPluginOutConnectionService.class.getName() },
+                                   null);
+
+            c.add(createServiceDependency()
+                    .setService(IPluginInConnectionService.class)
+                    .setCallbacks("setPluginService", "unsetPluginService")
+                    .setRequired(false));
+            c.add(createServiceDependency()
+                    .setService(IConnectionListener.class)
+                    .setCallbacks("setListener", "unsetListener")
+                    .setRequired(false));
+        }
+    }
+}
diff --git a/opendaylight/sal/connection/implementation/src/main/java/org/opendaylight/controller/sal/connection/implementation/internal/ConnectionService.java b/opendaylight/sal/connection/implementation/src/main/java/org/opendaylight/controller/sal/connection/implementation/internal/ConnectionService.java
new file mode 100644 (file)
index 0000000..ad4a5fb
--- /dev/null
@@ -0,0 +1,154 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connection.implementation.internal;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.opendaylight.controller.sal.connection.ConnectionConstants;
+import org.opendaylight.controller.sal.connection.IConnectionListener;
+import org.opendaylight.controller.sal.connection.IConnectionService;
+import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
+import org.opendaylight.controller.sal.connection.IPluginOutConnectionService;
+import org.opendaylight.controller.sal.core.Node;
+import org.opendaylight.controller.sal.utils.GlobalConstants;
+import org.opendaylight.controller.sal.utils.Status;
+import org.opendaylight.controller.sal.utils.StatusCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionService implements IPluginOutConnectionService, IConnectionService {
+    protected static final Logger logger = LoggerFactory
+            .getLogger(ConnectionService.class);
+    private IConnectionListener connectionListener;
+    private ConcurrentMap<String, IPluginInConnectionService> pluginService =
+            new ConcurrentHashMap<String, IPluginInConnectionService>();
+
+    void setPluginService (Map props, IPluginInConnectionService s) {
+        String type = null;
+        Object value = props.get(GlobalConstants.PROTOCOLPLUGINTYPE.toString());
+        if (value instanceof String) {
+            type = (String) value;
+        }
+        if (type == null) {
+            logger.error("Received a PluginInConnectionService without any "
+                    + "protocolPluginType provided");
+        } else {
+            this.pluginService.put(type, s);
+        }
+    }
+
+    void unsetPluginService(Map props, IPluginInConnectionService s) {
+        String type = null;
+
+        Object value = props.get(GlobalConstants.PROTOCOLPLUGINTYPE.toString());
+        if (value instanceof String) {
+            type = (String) value;
+        }
+        if (type == null) {
+            logger.error("Received a PluginInConnectionService without any "
+                    + "protocolPluginType provided");
+        } else if (this.pluginService.get(type).equals(s)) {
+            this.pluginService.remove(type);
+        }
+    }
+
+    void setListener(IConnectionListener s) {
+        this.connectionListener = s;
+    }
+
+    void unsetListener(IConnectionListener s) {
+        if (this.connectionListener == s) {
+            this.connectionListener = null;
+        }
+    }
+
+    /**
+     * Function called by the dependency manager when all the required
+     * dependencies are satisfied
+     *
+     */
+    void init() {
+    }
+
+    /**
+     * Function called by the dependency manager when at least one dependency
+     * become unsatisfied or when the component is shutting down because for
+     * example bundle is being stopped.
+     *
+     */
+    void destroy() {
+        connectionListener = null;
+        if (this.pluginService != null) {
+            this.pluginService.clear();
+        }
+    }
+
+    /**
+     * Method to test if a node is local to a controller.
+     *
+     * @return true if node is local to this controller. false otherwise.
+     */
+    public boolean isLocal(Node node) {
+        if (this.connectionListener == null) return true;
+        return connectionListener.isLocal(node);
+    }
+
+    @Override
+    public Node connect (String type, String connectionIdentifier, Map<ConnectionConstants, String> params) {
+        IPluginInConnectionService s = pluginService.get(type);
+        if (s != null) return s.connect(connectionIdentifier, params);
+        return null;
+    }
+
+    @Override
+    public Node connect (String connectionIdentifier, Map<ConnectionConstants, String> params) {
+        synchronized (this.pluginService) {
+            for (String pluginType : this.pluginService.keySet()) {
+                IPluginInConnectionService s = pluginService.get(pluginType);
+                Node node = s.connect(connectionIdentifier, params);
+                if (node != null) return node;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Status disconnect(Node node) {
+        IPluginInConnectionService s = pluginService.get(node.getType());
+        if (s != null) return s.disconnect(node);
+        return new Status(StatusCode.NOTFOUND);
+    }
+
+    /**
+     * View Change notification
+     */
+    @Override
+    public void notifyClusterViewChanged() {
+        for (String pluginType : this.pluginService.keySet()) {
+            IPluginInConnectionService s = pluginService.get(pluginType);
+            s.notifyClusterViewChanged();
+        }
+    }
+
+    /**
+     * Node Disconnected from the node's master controller.
+     */
+    @Override
+    public void notifyNodeDisconnectFromMaster(Node node) {
+        for (String pluginType : this.pluginService.keySet()) {
+            IPluginInConnectionService s = pluginService.get(pluginType);
+            s.notifyNodeDisconnectFromMaster(node);
+        }
+    }
+}
\ No newline at end of file
index 85d239f4b9e1c52a265e2cb685fcb64925dac3d2..fc9b9e2df48dae7e1dda300c113c5c1470a23ebf 100644 (file)
@@ -8,6 +8,9 @@
 
 package org.opendaylight.controller.sal.implementation.internal;
 
+import java.util.Dictionary;
+import java.util.Hashtable;
+
 import org.apache.felix.dm.Component;
 import org.opendaylight.controller.sal.core.ComponentActivatorAbstractBase;
 import org.opendaylight.controller.sal.flowprogrammer.IFlowProgrammerListener;
@@ -57,6 +60,52 @@ public class Activator extends ComponentActivatorAbstractBase {
 
     }
 
+    /**
+     * Function that is used to communicate to dependency manager the list of
+     * known Global implementations
+     *
+     *
+     * @return An array containing all the CLASS objects that will be
+     *         instantiated in order to get an fully working implementation
+     *         Object
+     */
+    public Object[] getGlobalImplementations() {
+        Object[] res = { Inventory.class };
+        return res;
+    }
+
+    /**
+     * Function that is called when configuration of the dependencies is required.
+     *
+     * @param c
+     *            dependency manager Component object, used for configuring the
+     *            dependencies exported and imported
+     * @param imp
+     *            Implementation class that is being configured, needed as long
+     *            as the same routine can configure multiple implementations
+     */
+    public void configureGlobalInstance(Component c, Object imp) {
+        if (imp.equals(Inventory.class)) {
+            Dictionary<String, Object> props = new Hashtable<String, Object>();
+            props.put("scope", "Global");
+            // export the service
+            c.setInterface(
+                    new String[] { IPluginOutInventoryService.class.getName(),
+                            IInventoryService.class.getName() }, props);
+
+            // Now lets add a service dependency to make sure the
+            // provider of service exists
+            c.add(createServiceDependency()
+                    .setService(IListenInventoryUpdates.class, "(scope=Global)")
+                    .setCallbacks("setUpdateService", "unsetUpdateService")
+                    .setRequired(false));
+            c.add(createServiceDependency()
+                    .setService(IPluginInInventoryService.class, "(scope=Global)")
+                    .setCallbacks("setPluginService", "unsetPluginService")
+                    .setRequired(true));
+        }
+    }
+
     /**
      * Function that is used to communicate to dependency manager the list of
      * known implementations for services inside a container