X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fconnectionmanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fconnectionmanager%2Finternal%2FConnectionManager.java;h=ebc56928a23856a158b4247d98c2720af9f98ac6;hp=df5175083b1870ef99b29443d42003eb2bb179c5;hb=f9aae7377704eed8a43c9a984f585165042ce5f7;hpb=3d81902c3f5176bae52be789f57ee461b9d9e158 diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java index df5175083b..ebc56928a2 100644 --- a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java +++ b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java @@ -1,4 +1,3 @@ - /* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * @@ -22,6 +21,7 @@ package org.opendaylight.controller.connectionmanager.internal; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -30,19 +30,17 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.eclipse.osgi.framework.console.CommandInterpreter; import org.eclipse.osgi.framework.console.CommandProvider; import org.opendaylight.controller.clustering.services.ICacheUpdateAware; import org.opendaylight.controller.clustering.services.IClusterGlobalServices; import org.opendaylight.controller.clustering.services.ICoordinatorChangeAware; -import org.opendaylight.controller.connectionmanager.ConnectionLocality; import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme; import org.opendaylight.controller.connectionmanager.IConnectionManager; import org.opendaylight.controller.connectionmanager.scheme.AbstractScheme; import org.opendaylight.controller.connectionmanager.scheme.SchemeFactory; import org.opendaylight.controller.sal.connection.ConnectionConstants; +import org.opendaylight.controller.sal.connection.ConnectionLocality; import org.opendaylight.controller.sal.connection.IConnectionListener; import org.opendaylight.controller.sal.connection.IConnectionService; import org.opendaylight.controller.sal.core.Node; @@ -55,12 +53,14 @@ import org.opendaylight.controller.sal.utils.Status; import org.opendaylight.controller.sal.utils.StatusCode; import org.osgi.framework.BundleContext; import org.osgi.framework.FrameworkUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class ConnectionManager implements IConnectionManager, IConnectionListener, - ICoordinatorChangeAware, IListenInventoryUpdates, - ICacheUpdateAware>, - CommandProvider { - private static final Logger logger = LoggerFactory.getLogger(ConnectionManager.class); +public class ConnectionManager implements IConnectionManager, + IConnectionListener, ICoordinatorChangeAware, IListenInventoryUpdates, + ICacheUpdateAware>, CommandProvider { + private static final Logger logger = LoggerFactory + .getLogger(ConnectionManager.class); private ConnectionMgmtScheme activeScheme = ConnectionMgmtScheme.ANY_CONTROLLER_ONE_MASTER; private IClusterGlobalServices clusterServices; private ConcurrentMap schemes; @@ -100,7 +100,8 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene } private void getInventories() { - Map> nodeProp = this.inventoryService.getNodeProps(); + Map> nodeProp = this.inventoryService + .getNodeProps(); for (Map.Entry> entry : nodeProp.entrySet()) { Node node = entry.getKey(); logger.debug("getInventories for node:{}", new Object[] { node }); @@ -112,8 +113,10 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene updateNode(node, UpdateType.ADDED, props); } - Map> nodeConnectorProp = this.inventoryService.getNodeConnectorProps(); - for (Map.Entry> entry : nodeConnectorProp.entrySet()) { + Map> nodeConnectorProp = this.inventoryService + .getNodeConnectorProps(); + for (Map.Entry> entry : nodeConnectorProp + .entrySet()) { Map propMap = entry.getValue(); Set props = new HashSet(); for (Property property : propMap.values()) { @@ -124,7 +127,7 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene } public void started() { - connectionEventThread = new Thread(new EventHandler(), "ConnectionEvent Thread"); + connectionEventThread.start(); registerWithOSGIConsole(); @@ -134,9 +137,12 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene } public void init() { - String schemeStr = System.getProperty("connection.scheme"); + connectionEventThread = new Thread(new EventHandler(), + "ConnectionEvent Thread"); this.connectionEvents = new LinkedBlockingQueue(); schemes = new ConcurrentHashMap(); + + String schemeStr = System.getProperty("connection.scheme"); for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) { AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme, clusterServices); if (schemeImpl != null) { @@ -148,14 +154,15 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene } } - public void stop() { + public void stopping() { connectionEventThread.interrupt(); Set localNodes = getLocalNodes(); if (localNodes != null) { AbstractScheme scheme = schemes.get(activeScheme); for (Node localNode : localNodes) { connectionService.disconnect(localNode); - if (scheme != null) scheme.removeNode(localNode); + if (scheme != null) + scheme.removeNode(localNode); } } } @@ -168,28 +175,32 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene @Override public Set getNodes(InetAddress controller) { AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return null; + if (scheme == null) + return null; return scheme.getNodes(controller); } @Override public Set getLocalNodes() { AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return null; + if (scheme == null) + return null; return scheme.getNodes(); } @Override public boolean isLocal(Node node) { AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return false; + if (scheme == null) + return false; return scheme.isLocal(node); } @Override public ConnectionLocality getLocalityStatus(Node node) { AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return ConnectionLocality.NOT_CONNECTED; + if (scheme == null) + return ConnectionLocality.NOT_CONNECTED; return scheme.getLocalityStatus(node); } @@ -197,7 +208,8 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene public void updateNode(Node node, UpdateType type, Set props) { logger.debug("updateNode: {} type {} props {}", node, type, props); AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return; + if (scheme == null) + return; switch (type) { case ADDED: scheme.addNode(node); @@ -206,22 +218,24 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene scheme.removeNode(node); break; default: - break; + break; } } @Override public void updateNodeConnector(NodeConnector nodeConnector, UpdateType type, Set props) { - logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector, type, props); + logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector, + type, props); AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return; + if (scheme == null) + return; switch (type) { case ADDED: scheme.addNode(nodeConnector.getNode()); break; default: - break; + break; } } @@ -231,38 +245,62 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene } @Override - public Node connect(String connectionIdentifier, Map params) { - if (connectionService == null) return null; - return connectionService.connect(connectionIdentifier, params); + public Node connect(String connectionIdentifier, + Map params) { + if (connectionService == null) + return null; + Node node = connectionService.connect(connectionIdentifier, params); + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme != null && node != null) + scheme.addNode(node); + return node; } @Override - public Node connect(String type, String connectionIdentifier, Map params) { - if (connectionService == null) return null; - return connectionService.connect(type, connectionIdentifier, params); + public Node connect(String type, String connectionIdentifier, + Map params) { + if (connectionService == null) + return null; + Node node = connectionService.connect(connectionIdentifier, params); + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme != null && node != null) + scheme.addNode(node); + return node; } @Override - public Status disconnect (Node node) { - if (connectionService == null) return new Status(StatusCode.NOSERVICE); - return connectionService.disconnect(node); + public Status disconnect(Node node) { + if (node == null) + return new Status(StatusCode.BADREQUEST); + if (connectionService == null) + return new Status(StatusCode.NOSERVICE); + Status status = connectionService.disconnect(node); + if (status.isSuccess()) { + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme != null) + scheme.removeNode(node); + } + return status; } @Override public void entryCreated(Node key, String cacheName, boolean originLocal) { - if (originLocal) return; + if (originLocal) + return; } /* - * Clustering Services' doesnt provide the existing states in the cache update callbacks. - * Hence, using a scratch local cache to maintain the existing state. - * + * Clustering Services doesn't provide the existing states in the cache + * update callbacks. Hence, using a scratch local cache to maintain the + * existing state. */ private ConcurrentMap> existingConnections = new ConcurrentHashMap>(); @Override - public void entryUpdated(Node node, Set newControllers, String cacheName, boolean originLocal) { - if (originLocal) return; + public void entryUpdated(Node node, Set newControllers, + String cacheName, boolean originLocal) { + if (originLocal) + return; Set existingControllers = existingConnections.get(node); if (existingControllers != null) { logger.debug("Processing Update for : {} NewControllers : {} existingControllers : {}", node, @@ -282,8 +320,9 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene @Override public void entryDeleted(Node key, String cacheName, boolean originLocal) { - if (originLocal) return; - logger.debug("Deleted : {} cache : {}", key, cacheName); + if (originLocal) + return; + logger.debug("Deleted entry {} from cache : {}", key, cacheName); notifyNodeDisconnectedEvent(key); } @@ -293,22 +332,27 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene this.connectionEvents.put(event); } } catch (InterruptedException e) { - logger.debug("enqueueConnectionEvent caught Interrupt Exception for event {}", event); + logger.debug( + "enqueueConnectionEvent caught Interrupt Exception for event {}", + event); } } private void notifyClusterViewChanged() { - ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null); + ConnectionMgmtEvent event = new ConnectionMgmtEvent( + ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null); enqueueConnectionEvent(event); } private void notifyNodeDisconnectedEvent(Node node) { - ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node); + ConnectionMgmtEvent event = new ConnectionMgmtEvent( + ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node); enqueueConnectionEvent(event); } /* - * this thread monitors the connectionEvent queue for new incoming events from + * this thread monitors the connectionEvent queue for new incoming events + * from */ private class EventHandler implements Runnable { @Override @@ -320,17 +364,19 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene ConnectionMgmtEventType eType = ev.getEvent(); switch (eType) { case NODE_DISCONNECTED_FROM_MASTER: - Node node = (Node)ev.getData(); + Node node = (Node) ev.getData(); connectionService.notifyNodeDisconnectFromMaster(node); break; case CLUSTER_VIEW_CHANGED: AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return; + if (scheme == null) + return; scheme.handleClusterViewChanged(); connectionService.notifyClusterViewChanged(); break; default: - logger.error("Unknown Connection event {}", eType.ordinal()); + logger.error("Unknown Connection event {}", + eType.ordinal()); } } catch (InterruptedException e) { connectionEvents.clear(); @@ -347,7 +393,7 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene null); } - public void _scheme (CommandInterpreter ci) { + public void _scheme(CommandInterpreter ci) { String schemeStr = ci.nextArgument(); if (schemeStr == null) { ci.println("Please enter valid Scheme name"); @@ -362,7 +408,7 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene activeScheme = scheme; } - public void _printNodes (CommandInterpreter ci) { + public void _printNodes(CommandInterpreter ci) { String controller = ci.nextArgument(); if (controller == null) { ci.println("Nodes connected to this controller : "); @@ -375,14 +421,14 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene } try { InetAddress address = InetAddress.getByName(controller); - ci.println("Nodes connected to controller "+controller); + ci.println("Nodes connected to controller " + controller); if (this.getNodes(address) == null) { ci.println("None"); } else { ci.println(this.getNodes(address).toString()); } } catch (UnknownHostException e) { - logger.error("An error occured",e); + logger.error("An error occured", e); } } @@ -394,4 +440,12 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene help.append("\t printNodes [] - Print connected nodes\n"); return help.toString(); } + + @Override + public Set getControllers(Node node) { + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme == null) + return Collections.emptySet(); + return scheme.getControllers(node); + } }