Merge "Fix clustering versions"
[controller.git] / opendaylight / connectionmanager / implementation / src / main / java / org / opendaylight / controller / connectionmanager / internal / ConnectionManager.java
index c4b7d4fe6cf52b9b517d75c53d596778700528d8..ebc56928a23856a158b4247d98c2720af9f98ac6 100644 (file)
@@ -1,4 +1,3 @@
-
 /*
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
  *
@@ -22,6 +21,7 @@ package org.opendaylight.controller.connectionmanager.internal;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -30,9 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.eclipse.osgi.framework.console.CommandInterpreter;
 import org.eclipse.osgi.framework.console.CommandProvider;
 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
@@ -43,29 +40,34 @@ import org.opendaylight.controller.connectionmanager.IConnectionManager;
 import org.opendaylight.controller.connectionmanager.scheme.AbstractScheme;
 import org.opendaylight.controller.connectionmanager.scheme.SchemeFactory;
 import org.opendaylight.controller.sal.connection.ConnectionConstants;
+import org.opendaylight.controller.sal.connection.ConnectionLocality;
 import org.opendaylight.controller.sal.connection.IConnectionListener;
 import org.opendaylight.controller.sal.connection.IConnectionService;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.NodeConnector;
 import org.opendaylight.controller.sal.core.Property;
 import org.opendaylight.controller.sal.core.UpdateType;
+import org.opendaylight.controller.sal.inventory.IInventoryService;
 import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
 import org.opendaylight.controller.sal.utils.Status;
 import org.opendaylight.controller.sal.utils.StatusCode;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.FrameworkUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class ConnectionManager implements IConnectionManager, IConnectionListener,
-                                          ICoordinatorChangeAware, IListenInventoryUpdates,
-                                          ICacheUpdateAware<Node, Set<InetAddress>>,
-                                          CommandProvider {
-    private static final Logger logger = LoggerFactory.getLogger(ConnectionManager.class);
+public class ConnectionManager implements IConnectionManager,
+        IConnectionListener, ICoordinatorChangeAware, IListenInventoryUpdates,
+        ICacheUpdateAware<Node, Set<InetAddress>>, CommandProvider {
+    private static final Logger logger = LoggerFactory
+            .getLogger(ConnectionManager.class);
     private ConnectionMgmtScheme activeScheme = ConnectionMgmtScheme.ANY_CONTROLLER_ONE_MASTER;
     private IClusterGlobalServices clusterServices;
     private ConcurrentMap<ConnectionMgmtScheme, AbstractScheme> schemes;
     private IConnectionService connectionService;
     private Thread connectionEventThread;
     private BlockingQueue<ConnectionMgmtEvent> connectionEvents;
+    private IInventoryService inventoryService;
 
     public void setClusterServices(IClusterGlobalServices i) {
         this.clusterServices = i;
@@ -87,31 +89,80 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene
         }
     }
 
+    public void setInventoryService(IInventoryService service) {
+        logger.trace("Got inventory service set request {}", service);
+        this.inventoryService = service;
+    }
+
+    public void unsetInventoryService(IInventoryService service) {
+        logger.trace("Got a service UNset request");
+        this.inventoryService = null;
+    }
+
+    private void getInventories() {
+        Map<Node, Map<String, Property>> nodeProp = this.inventoryService
+                .getNodeProps();
+        for (Map.Entry<Node, Map<String, Property>> entry : nodeProp.entrySet()) {
+            Node node = entry.getKey();
+            logger.debug("getInventories for node:{}", new Object[] { node });
+            Map<String, Property> propMap = entry.getValue();
+            Set<Property> props = new HashSet<Property>();
+            for (Property property : propMap.values()) {
+                props.add(property);
+            }
+            updateNode(node, UpdateType.ADDED, props);
+        }
+
+        Map<NodeConnector, Map<String, Property>> nodeConnectorProp = this.inventoryService
+                .getNodeConnectorProps();
+        for (Map.Entry<NodeConnector, Map<String, Property>> entry : nodeConnectorProp
+                .entrySet()) {
+            Map<String, Property> propMap = entry.getValue();
+            Set<Property> props = new HashSet<Property>();
+            for (Property property : propMap.values()) {
+                props.add(property);
+            }
+            updateNodeConnector(entry.getKey(), UpdateType.ADDED, props);
+        }
+    }
+
     public void started() {
-        connectionEventThread = new Thread(new EventHandler(), "ConnectionEvent Thread");
+
         connectionEventThread.start();
 
         registerWithOSGIConsole();
         notifyClusterViewChanged();
+        // Should pull the Inventory updates in case we missed it
+        getInventories();
     }
 
     public void init() {
+        connectionEventThread = new Thread(new EventHandler(),
+                "ConnectionEvent Thread");
         this.connectionEvents = new LinkedBlockingQueue<ConnectionMgmtEvent>();
         schemes = new ConcurrentHashMap<ConnectionMgmtScheme, AbstractScheme>();
+
+        String schemeStr = System.getProperty("connection.scheme");
         for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) {
             AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme, clusterServices);
-            if (schemeImpl != null) schemes.put(scheme, schemeImpl);
+            if (schemeImpl != null) {
+                schemes.put(scheme, schemeImpl);
+                if (scheme.name().equalsIgnoreCase(schemeStr)) {
+                    activeScheme = scheme;
+                }
+            }
         }
     }
 
-    public void stop() {
+    public void stopping() {
         connectionEventThread.interrupt();
         Set<Node> localNodes = getLocalNodes();
         if (localNodes != null) {
             AbstractScheme scheme = schemes.get(activeScheme);
             for (Node localNode : localNodes) {
                 connectionService.disconnect(localNode);
-                if (scheme != null) scheme.removeNode(localNode);
+                if (scheme != null)
+                    scheme.removeNode(localNode);
             }
         }
     }
@@ -124,29 +175,41 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene
     @Override
     public Set<Node> getNodes(InetAddress controller) {
         AbstractScheme scheme = schemes.get(activeScheme);
-        if (scheme == null) return null;
+        if (scheme == null)
+            return null;
         return scheme.getNodes(controller);
     }
 
     @Override
     public Set<Node> getLocalNodes() {
         AbstractScheme scheme = schemes.get(activeScheme);
-        if (scheme == null) return null;
+        if (scheme == null)
+            return null;
         return scheme.getNodes();
     }
 
     @Override
     public boolean isLocal(Node node) {
         AbstractScheme scheme = schemes.get(activeScheme);
-        if (scheme == null) return false;
+        if (scheme == null)
+            return false;
         return scheme.isLocal(node);
     }
 
+    @Override
+    public ConnectionLocality getLocalityStatus(Node node) {
+        AbstractScheme scheme = schemes.get(activeScheme);
+        if (scheme == null)
+            return ConnectionLocality.NOT_CONNECTED;
+        return scheme.getLocalityStatus(node);
+    }
+
     @Override
     public void updateNode(Node node, UpdateType type, Set<Property> props) {
         logger.debug("updateNode: {} type {} props {}", node, type, props);
         AbstractScheme scheme = schemes.get(activeScheme);
-        if (scheme == null) return;
+        if (scheme == null)
+            return;
         switch (type) {
         case ADDED:
             scheme.addNode(node);
@@ -155,67 +218,89 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene
             scheme.removeNode(node);
             break;
         default:
-                break;
+            break;
         }
     }
 
     @Override
     public void updateNodeConnector(NodeConnector nodeConnector,
             UpdateType type, Set<Property> props) {
-        logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector, type, props);
+        logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector,
+                type, props);
         AbstractScheme scheme = schemes.get(activeScheme);
-        if (scheme == null) return;
+        if (scheme == null)
+            return;
         switch (type) {
         case ADDED:
             scheme.addNode(nodeConnector.getNode());
             break;
         default:
-                break;
+            break;
         }
     }
 
     @Override
     public void coordinatorChanged() {
-        AbstractScheme scheme = schemes.get(activeScheme);
-        if (scheme == null) return;
-        scheme.handleClusterViewChanged();
         notifyClusterViewChanged();
     }
 
     @Override
-    public Node connect(String connectionIdentifier, Map<ConnectionConstants, String> params) {
-        if (connectionService == null) return null;
-        return connectionService.connect(connectionIdentifier, params);
+    public Node connect(String connectionIdentifier,
+            Map<ConnectionConstants, String> params) {
+        if (connectionService == null)
+            return null;
+        Node node = connectionService.connect(connectionIdentifier, params);
+        AbstractScheme scheme = schemes.get(activeScheme);
+        if (scheme != null && node != null)
+            scheme.addNode(node);
+        return node;
     }
 
     @Override
-    public Node connect(String type, String connectionIdentifier, Map<ConnectionConstants, String> params) {
-        if (connectionService == null) return null;
-        return connectionService.connect(type, connectionIdentifier, params);
+    public Node connect(String type, String connectionIdentifier,
+            Map<ConnectionConstants, String> params) {
+        if (connectionService == null)
+            return null;
+        Node node = connectionService.connect(connectionIdentifier, params);
+        AbstractScheme scheme = schemes.get(activeScheme);
+        if (scheme != null && node != null)
+            scheme.addNode(node);
+        return node;
     }
 
     @Override
-    public Status disconnect (Node node) {
-        if (connectionService == null) return new Status(StatusCode.NOSERVICE);
-        return connectionService.disconnect(node);
+    public Status disconnect(Node node) {
+        if (node == null)
+            return new Status(StatusCode.BADREQUEST);
+        if (connectionService == null)
+            return new Status(StatusCode.NOSERVICE);
+        Status status = connectionService.disconnect(node);
+        if (status.isSuccess()) {
+            AbstractScheme scheme = schemes.get(activeScheme);
+            if (scheme != null)
+                scheme.removeNode(node);
+        }
+        return status;
     }
 
     @Override
     public void entryCreated(Node key, String cacheName, boolean originLocal) {
-        AbstractScheme scheme = schemes.get(activeScheme);
-        logger.debug("Created : {} cache : {} existingValue : {}", key, cacheName, scheme.getNodeConnections().get(key));
+        if (originLocal)
+            return;
     }
 
     /*
-     * Clustering Services' doesnt provide the existing states in the cache update callbacks.
-     * Hence, using a scratch local cache to maintain the existing state.
-     *
+     * Clustering Services doesn't provide the existing states in the cache
+     * update callbacks. Hence, using a scratch local cache to maintain the
+     * existing state.
      */
     private ConcurrentMap<Node, Set<InetAddress>> existingConnections = new ConcurrentHashMap<Node, Set<InetAddress>>();
 
     @Override
-    public void entryUpdated(Node node, Set<InetAddress> newControllers, String cacheName, boolean originLocal) {
-        if (originLocal) return;
+    public void entryUpdated(Node node, Set<InetAddress> newControllers,
+            String cacheName, boolean originLocal) {
+        if (originLocal)
+            return;
         Set<InetAddress> existingControllers = existingConnections.get(node);
         if (existingControllers != null) {
             logger.debug("Processing Update for : {} NewControllers : {} existingControllers : {}", node,
@@ -235,8 +320,9 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene
 
     @Override
     public void entryDeleted(Node key, String cacheName, boolean originLocal) {
-        if (originLocal) return;
-        logger.debug("Deleted : {} cache : {}", key, cacheName);
+        if (originLocal)
+            return;
+        logger.debug("Deleted entry {} from cache : {}", key, cacheName);
         notifyNodeDisconnectedEvent(key);
     }
 
@@ -246,22 +332,27 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene
                 this.connectionEvents.put(event);
             }
         } catch (InterruptedException e) {
-            logger.debug("enqueueConnectionEvent caught Interrupt Exception for event {}", event);
+            logger.debug(
+                    "enqueueConnectionEvent caught Interrupt Exception for event {}",
+                    event);
         }
     }
 
     private void notifyClusterViewChanged() {
-        ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null);
+        ConnectionMgmtEvent event = new ConnectionMgmtEvent(
+                ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null);
         enqueueConnectionEvent(event);
     }
 
     private void notifyNodeDisconnectedEvent(Node node) {
-        ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node);
+        ConnectionMgmtEvent event = new ConnectionMgmtEvent(
+                ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node);
         enqueueConnectionEvent(event);
     }
 
     /*
-     * this thread monitors the connectionEvent queue for new incoming events from
+     * this thread monitors the connectionEvent queue for new incoming events
+     * from
      */
     private class EventHandler implements Runnable {
         @Override
@@ -273,14 +364,19 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene
                     ConnectionMgmtEventType eType = ev.getEvent();
                     switch (eType) {
                     case NODE_DISCONNECTED_FROM_MASTER:
-                        Node node = (Node)ev.getData();
+                        Node node = (Node) ev.getData();
                         connectionService.notifyNodeDisconnectFromMaster(node);
                         break;
                     case CLUSTER_VIEW_CHANGED:
+                        AbstractScheme scheme = schemes.get(activeScheme);
+                        if (scheme == null)
+                            return;
+                        scheme.handleClusterViewChanged();
                         connectionService.notifyClusterViewChanged();
                         break;
                     default:
-                        logger.error("Unknown Connection event {}", eType.ordinal());
+                        logger.error("Unknown Connection event {}",
+                                eType.ordinal());
                     }
                 } catch (InterruptedException e) {
                     connectionEvents.clear();
@@ -297,7 +393,7 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene
                 null);
     }
 
-    public void _scheme (CommandInterpreter ci) {
+    public void _scheme(CommandInterpreter ci) {
         String schemeStr = ci.nextArgument();
         if (schemeStr == null) {
             ci.println("Please enter valid Scheme name");
@@ -312,22 +408,27 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene
         activeScheme = scheme;
     }
 
-    public void _printNodes (CommandInterpreter ci) {
+    public void _printNodes(CommandInterpreter ci) {
         String controller = ci.nextArgument();
         if (controller == null) {
             ci.println("Nodes connected to this controller : ");
-            if (this.getLocalNodes() == null) ci.println("None");
-            else ci.println(this.getLocalNodes().toString());
+            if (this.getLocalNodes() == null) {
+                ci.println("None");
+            } else {
+                ci.println(this.getLocalNodes().toString());
+            }
             return;
         }
         try {
             InetAddress address = InetAddress.getByName(controller);
-            ci.println("Nodes connected to controller "+controller);
-            if (this.getNodes(address) == null) ci.println("None");
-            else ci.println(this.getNodes(address).toString());
-            return;
+            ci.println("Nodes connected to controller " + controller);
+            if (this.getNodes(address) == null) {
+                ci.println("None");
+            } else {
+                ci.println(this.getNodes(address).toString());
+            }
         } catch (UnknownHostException e) {
-            e.printStackTrace();
+            logger.error("An error occured", e);
         }
     }
 
@@ -339,4 +440,12 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene
         help.append("\t printNodes [<controller>]            - Print connected nodes\n");
         return help.toString();
     }
+
+    @Override
+    public Set<InetAddress> getControllers(Node node) {
+        AbstractScheme scheme = schemes.get(activeScheme);
+        if (scheme == null)
+            return Collections.emptySet();
+        return scheme.getControllers(node);
+    }
 }