X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fconnectionmanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fconnectionmanager%2Finternal%2FConnectionManager.java;fp=opendaylight%2Fconnectionmanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fconnectionmanager%2Finternal%2FConnectionManager.java;h=c4b7d4fe6cf52b9b517d75c53d596778700528d8;hp=0000000000000000000000000000000000000000;hb=59fdd1aa84189a4672326f225235f747f6eb9a66;hpb=2f14a41bbd09010114116aef1499641f7251bf83 diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java new file mode 100644 index 0000000000..c4b7d4fe6c --- /dev/null +++ b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java @@ -0,0 +1,342 @@ + +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +/** + * Connection Manager provides south-bound connectivity services. + * The APIs are currently focused towards Active-Active Clustering support + * wherein the node can connect to any of the Active Controller in the Cluster. + * This component can also host the necessary logic for south-bound connectivity + * when partial cluster is identified during Partition scenarios. + * + * But this (and its corresponding implementation) component can also be used for + * basic connectivity mechansims for various south-bound plugins. + */ + +package org.opendaylight.controller.connectionmanager.internal; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.eclipse.osgi.framework.console.CommandInterpreter; +import org.eclipse.osgi.framework.console.CommandProvider; +import org.opendaylight.controller.clustering.services.ICacheUpdateAware; +import org.opendaylight.controller.clustering.services.IClusterGlobalServices; +import org.opendaylight.controller.clustering.services.ICoordinatorChangeAware; +import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme; +import org.opendaylight.controller.connectionmanager.IConnectionManager; +import org.opendaylight.controller.connectionmanager.scheme.AbstractScheme; +import org.opendaylight.controller.connectionmanager.scheme.SchemeFactory; +import org.opendaylight.controller.sal.connection.ConnectionConstants; +import org.opendaylight.controller.sal.connection.IConnectionListener; +import org.opendaylight.controller.sal.connection.IConnectionService; +import org.opendaylight.controller.sal.core.Node; +import org.opendaylight.controller.sal.core.NodeConnector; +import org.opendaylight.controller.sal.core.Property; +import org.opendaylight.controller.sal.core.UpdateType; +import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates; +import org.opendaylight.controller.sal.utils.Status; +import org.opendaylight.controller.sal.utils.StatusCode; +import org.osgi.framework.BundleContext; +import org.osgi.framework.FrameworkUtil; + +public class ConnectionManager implements IConnectionManager, IConnectionListener, + ICoordinatorChangeAware, IListenInventoryUpdates, + ICacheUpdateAware>, + CommandProvider { + private static final Logger logger = LoggerFactory.getLogger(ConnectionManager.class); + private ConnectionMgmtScheme activeScheme = ConnectionMgmtScheme.ANY_CONTROLLER_ONE_MASTER; + private IClusterGlobalServices clusterServices; + private ConcurrentMap schemes; + private IConnectionService connectionService; + private Thread connectionEventThread; + private BlockingQueue connectionEvents; + + public void setClusterServices(IClusterGlobalServices i) { + this.clusterServices = i; + } + + public void unsetClusterServices(IClusterGlobalServices i) { + if (this.clusterServices == i) { + this.clusterServices = null; + } + } + + public void setConnectionService(IConnectionService i) { + this.connectionService = i; + } + + public void unsetConnectionService(IConnectionService i) { + if (this.connectionService == i) { + this.connectionService = null; + } + } + + public void started() { + connectionEventThread = new Thread(new EventHandler(), "ConnectionEvent Thread"); + connectionEventThread.start(); + + registerWithOSGIConsole(); + notifyClusterViewChanged(); + } + + public void init() { + this.connectionEvents = new LinkedBlockingQueue(); + schemes = new ConcurrentHashMap(); + for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) { + AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme, clusterServices); + if (schemeImpl != null) schemes.put(scheme, schemeImpl); + } + } + + public void stop() { + connectionEventThread.interrupt(); + Set localNodes = getLocalNodes(); + if (localNodes != null) { + AbstractScheme scheme = schemes.get(activeScheme); + for (Node localNode : localNodes) { + connectionService.disconnect(localNode); + if (scheme != null) scheme.removeNode(localNode); + } + } + } + + @Override + public ConnectionMgmtScheme getActiveScheme() { + return activeScheme; + } + + @Override + public Set getNodes(InetAddress controller) { + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme == null) return null; + return scheme.getNodes(controller); + } + + @Override + public Set getLocalNodes() { + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme == null) return null; + return scheme.getNodes(); + } + + @Override + public boolean isLocal(Node node) { + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme == null) return false; + return scheme.isLocal(node); + } + + @Override + public void updateNode(Node node, UpdateType type, Set props) { + logger.debug("updateNode: {} type {} props {}", node, type, props); + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme == null) return; + switch (type) { + case ADDED: + scheme.addNode(node); + break; + case REMOVED: + scheme.removeNode(node); + break; + default: + break; + } + } + + @Override + public void updateNodeConnector(NodeConnector nodeConnector, + UpdateType type, Set props) { + logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector, type, props); + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme == null) return; + switch (type) { + case ADDED: + scheme.addNode(nodeConnector.getNode()); + break; + default: + break; + } + } + + @Override + public void coordinatorChanged() { + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme == null) return; + scheme.handleClusterViewChanged(); + notifyClusterViewChanged(); + } + + @Override + public Node connect(String connectionIdentifier, Map params) { + if (connectionService == null) return null; + return connectionService.connect(connectionIdentifier, params); + } + + @Override + public Node connect(String type, String connectionIdentifier, Map params) { + if (connectionService == null) return null; + return connectionService.connect(type, connectionIdentifier, params); + } + + @Override + public Status disconnect (Node node) { + if (connectionService == null) return new Status(StatusCode.NOSERVICE); + return connectionService.disconnect(node); + } + + @Override + public void entryCreated(Node key, String cacheName, boolean originLocal) { + AbstractScheme scheme = schemes.get(activeScheme); + logger.debug("Created : {} cache : {} existingValue : {}", key, cacheName, scheme.getNodeConnections().get(key)); + } + + /* + * Clustering Services' doesnt provide the existing states in the cache update callbacks. + * Hence, using a scratch local cache to maintain the existing state. + * + */ + private ConcurrentMap> existingConnections = new ConcurrentHashMap>(); + + @Override + public void entryUpdated(Node node, Set newControllers, String cacheName, boolean originLocal) { + if (originLocal) return; + Set existingControllers = existingConnections.get(node); + if (existingControllers != null) { + logger.debug("Processing Update for : {} NewControllers : {} existingControllers : {}", node, + newControllers.toString(), existingControllers.toString()); + if (newControllers.size() < existingControllers.size()) { + Set removed = new HashSet(existingControllers); + if (removed.removeAll(newControllers)) { + logger.debug("notifyNodeDisconnectFromMaster({})", node); + notifyNodeDisconnectedEvent(node); + } + } + } else { + logger.debug("Ignoring the Update for : {} NewControllers : {}", node, newControllers.toString()); + } + existingConnections.put(node, newControllers); + } + + @Override + public void entryDeleted(Node key, String cacheName, boolean originLocal) { + if (originLocal) return; + logger.debug("Deleted : {} cache : {}", key, cacheName); + notifyNodeDisconnectedEvent(key); + } + + private void enqueueConnectionEvent(ConnectionMgmtEvent event) { + try { + if (!connectionEvents.contains(event)) { + this.connectionEvents.put(event); + } + } catch (InterruptedException e) { + logger.debug("enqueueConnectionEvent caught Interrupt Exception for event {}", event); + } + } + + private void notifyClusterViewChanged() { + ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null); + enqueueConnectionEvent(event); + } + + private void notifyNodeDisconnectedEvent(Node node) { + ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node); + enqueueConnectionEvent(event); + } + + /* + * this thread monitors the connectionEvent queue for new incoming events from + */ + private class EventHandler implements Runnable { + @Override + public void run() { + + while (true) { + try { + ConnectionMgmtEvent ev = connectionEvents.take(); + ConnectionMgmtEventType eType = ev.getEvent(); + switch (eType) { + case NODE_DISCONNECTED_FROM_MASTER: + Node node = (Node)ev.getData(); + connectionService.notifyNodeDisconnectFromMaster(node); + break; + case CLUSTER_VIEW_CHANGED: + connectionService.notifyClusterViewChanged(); + break; + default: + logger.error("Unknown Connection event {}", eType.ordinal()); + } + } catch (InterruptedException e) { + connectionEvents.clear(); + return; + } + } + } + } + + private void registerWithOSGIConsole() { + BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()) + .getBundleContext(); + bundleContext.registerService(CommandProvider.class.getName(), this, + null); + } + + public void _scheme (CommandInterpreter ci) { + String schemeStr = ci.nextArgument(); + if (schemeStr == null) { + ci.println("Please enter valid Scheme name"); + ci.println("Current Scheme : " + activeScheme.name()); + return; + } + ConnectionMgmtScheme scheme = ConnectionMgmtScheme.valueOf(schemeStr); + if (scheme == null) { + ci.println("Please enter a valid Scheme name"); + return; + } + activeScheme = scheme; + } + + public void _printNodes (CommandInterpreter ci) { + String controller = ci.nextArgument(); + if (controller == null) { + ci.println("Nodes connected to this controller : "); + if (this.getLocalNodes() == null) ci.println("None"); + else ci.println(this.getLocalNodes().toString()); + return; + } + try { + InetAddress address = InetAddress.getByName(controller); + ci.println("Nodes connected to controller "+controller); + if (this.getNodes(address) == null) ci.println("None"); + else ci.println(this.getNodes(address).toString()); + return; + } catch (UnknownHostException e) { + e.printStackTrace(); + } + } + + @Override + public String getHelp() { + StringBuffer help = new StringBuffer(); + help.append("---Connection Manager---\n"); + help.append("\t scheme [] - Print / Set scheme\n"); + help.append("\t printNodes [] - Print connected nodes\n"); + return help.toString(); + } +}