X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fconnectionmanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fconnectionmanager%2Finternal%2FConnectionManager.java;h=ebc56928a23856a158b4247d98c2720af9f98ac6;hb=d9de6c2ddfb30eb2eee782c229f6e03cef352bbd;hp=7097958126aaec02cf680265bad8322b6526ae5e;hpb=626522b7589a90fd9c47c84414707cbd10afae38;p=controller.git diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java index 7097958126..ebc56928a2 100644 --- a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java +++ b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java @@ -1,4 +1,3 @@ - /* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * @@ -22,6 +21,7 @@ package org.opendaylight.controller.connectionmanager.internal; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -30,8 +30,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.eclipse.osgi.framework.console.CommandInterpreter; import org.eclipse.osgi.framework.console.CommandProvider; import org.opendaylight.controller.clustering.services.ICacheUpdateAware; @@ -42,6 +40,7 @@ import org.opendaylight.controller.connectionmanager.IConnectionManager; import org.opendaylight.controller.connectionmanager.scheme.AbstractScheme; import org.opendaylight.controller.connectionmanager.scheme.SchemeFactory; import org.opendaylight.controller.sal.connection.ConnectionConstants; +import org.opendaylight.controller.sal.connection.ConnectionLocality; import org.opendaylight.controller.sal.connection.IConnectionListener; import org.opendaylight.controller.sal.connection.IConnectionService; import org.opendaylight.controller.sal.core.Node; @@ -54,12 +53,14 @@ import org.opendaylight.controller.sal.utils.Status; import org.opendaylight.controller.sal.utils.StatusCode; import org.osgi.framework.BundleContext; import org.osgi.framework.FrameworkUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class ConnectionManager implements IConnectionManager, IConnectionListener, - ICoordinatorChangeAware, IListenInventoryUpdates, - ICacheUpdateAware>, - CommandProvider { - private static final Logger logger = LoggerFactory.getLogger(ConnectionManager.class); +public class ConnectionManager implements IConnectionManager, + IConnectionListener, ICoordinatorChangeAware, IListenInventoryUpdates, + ICacheUpdateAware>, CommandProvider { + private static final Logger logger = LoggerFactory + .getLogger(ConnectionManager.class); private ConnectionMgmtScheme activeScheme = ConnectionMgmtScheme.ANY_CONTROLLER_ONE_MASTER; private IClusterGlobalServices clusterServices; private ConcurrentMap schemes; @@ -99,7 +100,8 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene } private void getInventories() { - Map> nodeProp = this.inventoryService.getNodeProps(); + Map> nodeProp = this.inventoryService + .getNodeProps(); for (Map.Entry> entry : nodeProp.entrySet()) { Node node = entry.getKey(); logger.debug("getInventories for node:{}", new Object[] { node }); @@ -111,8 +113,10 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene updateNode(node, UpdateType.ADDED, props); } - Map> nodeConnectorProp = this.inventoryService.getNodeConnectorProps(); - for (Map.Entry> entry : nodeConnectorProp.entrySet()) { + Map> nodeConnectorProp = this.inventoryService + .getNodeConnectorProps(); + for (Map.Entry> entry : nodeConnectorProp + .entrySet()) { Map propMap = entry.getValue(); Set props = new HashSet(); for (Property property : propMap.values()) { @@ -123,7 +127,7 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene } public void started() { - connectionEventThread = new Thread(new EventHandler(), "ConnectionEvent Thread"); + connectionEventThread.start(); registerWithOSGIConsole(); @@ -133,22 +137,32 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene } public void init() { + connectionEventThread = new Thread(new EventHandler(), + "ConnectionEvent Thread"); this.connectionEvents = new LinkedBlockingQueue(); schemes = new ConcurrentHashMap(); + + String schemeStr = System.getProperty("connection.scheme"); for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) { AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme, clusterServices); - if (schemeImpl != null) schemes.put(scheme, schemeImpl); + if (schemeImpl != null) { + schemes.put(scheme, schemeImpl); + if (scheme.name().equalsIgnoreCase(schemeStr)) { + activeScheme = scheme; + } + } } } - public void stop() { + public void stopping() { connectionEventThread.interrupt(); Set localNodes = getLocalNodes(); if (localNodes != null) { AbstractScheme scheme = schemes.get(activeScheme); for (Node localNode : localNodes) { connectionService.disconnect(localNode); - if (scheme != null) scheme.removeNode(localNode); + if (scheme != null) + scheme.removeNode(localNode); } } } @@ -161,29 +175,41 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene @Override public Set getNodes(InetAddress controller) { AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return null; + if (scheme == null) + return null; return scheme.getNodes(controller); } @Override public Set getLocalNodes() { AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return null; + if (scheme == null) + return null; return scheme.getNodes(); } @Override public boolean isLocal(Node node) { AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return false; + if (scheme == null) + return false; return scheme.isLocal(node); } + @Override + public ConnectionLocality getLocalityStatus(Node node) { + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme == null) + return ConnectionLocality.NOT_CONNECTED; + return scheme.getLocalityStatus(node); + } + @Override public void updateNode(Node node, UpdateType type, Set props) { logger.debug("updateNode: {} type {} props {}", node, type, props); AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return; + if (scheme == null) + return; switch (type) { case ADDED: scheme.addNode(node); @@ -192,22 +218,24 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene scheme.removeNode(node); break; default: - break; + break; } } @Override public void updateNodeConnector(NodeConnector nodeConnector, UpdateType type, Set props) { - logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector, type, props); + logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector, + type, props); AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return; + if (scheme == null) + return; switch (type) { case ADDED: scheme.addNode(nodeConnector.getNode()); break; default: - break; + break; } } @@ -217,38 +245,62 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene } @Override - public Node connect(String connectionIdentifier, Map params) { - if (connectionService == null) return null; - return connectionService.connect(connectionIdentifier, params); + public Node connect(String connectionIdentifier, + Map params) { + if (connectionService == null) + return null; + Node node = connectionService.connect(connectionIdentifier, params); + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme != null && node != null) + scheme.addNode(node); + return node; } @Override - public Node connect(String type, String connectionIdentifier, Map params) { - if (connectionService == null) return null; - return connectionService.connect(type, connectionIdentifier, params); + public Node connect(String type, String connectionIdentifier, + Map params) { + if (connectionService == null) + return null; + Node node = connectionService.connect(connectionIdentifier, params); + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme != null && node != null) + scheme.addNode(node); + return node; } @Override - public Status disconnect (Node node) { - if (connectionService == null) return new Status(StatusCode.NOSERVICE); - return connectionService.disconnect(node); + public Status disconnect(Node node) { + if (node == null) + return new Status(StatusCode.BADREQUEST); + if (connectionService == null) + return new Status(StatusCode.NOSERVICE); + Status status = connectionService.disconnect(node); + if (status.isSuccess()) { + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme != null) + scheme.removeNode(node); + } + return status; } @Override public void entryCreated(Node key, String cacheName, boolean originLocal) { - if (originLocal) return; + if (originLocal) + return; } /* - * Clustering Services' doesnt provide the existing states in the cache update callbacks. - * Hence, using a scratch local cache to maintain the existing state. - * + * Clustering Services doesn't provide the existing states in the cache + * update callbacks. Hence, using a scratch local cache to maintain the + * existing state. */ private ConcurrentMap> existingConnections = new ConcurrentHashMap>(); @Override - public void entryUpdated(Node node, Set newControllers, String cacheName, boolean originLocal) { - if (originLocal) return; + public void entryUpdated(Node node, Set newControllers, + String cacheName, boolean originLocal) { + if (originLocal) + return; Set existingControllers = existingConnections.get(node); if (existingControllers != null) { logger.debug("Processing Update for : {} NewControllers : {} existingControllers : {}", node, @@ -268,8 +320,9 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene @Override public void entryDeleted(Node key, String cacheName, boolean originLocal) { - if (originLocal) return; - logger.debug("Deleted : {} cache : {}", key, cacheName); + if (originLocal) + return; + logger.debug("Deleted entry {} from cache : {}", key, cacheName); notifyNodeDisconnectedEvent(key); } @@ -279,22 +332,27 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene this.connectionEvents.put(event); } } catch (InterruptedException e) { - logger.debug("enqueueConnectionEvent caught Interrupt Exception for event {}", event); + logger.debug( + "enqueueConnectionEvent caught Interrupt Exception for event {}", + event); } } private void notifyClusterViewChanged() { - ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null); + ConnectionMgmtEvent event = new ConnectionMgmtEvent( + ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null); enqueueConnectionEvent(event); } private void notifyNodeDisconnectedEvent(Node node) { - ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node); + ConnectionMgmtEvent event = new ConnectionMgmtEvent( + ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node); enqueueConnectionEvent(event); } /* - * this thread monitors the connectionEvent queue for new incoming events from + * this thread monitors the connectionEvent queue for new incoming events + * from */ private class EventHandler implements Runnable { @Override @@ -306,17 +364,19 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene ConnectionMgmtEventType eType = ev.getEvent(); switch (eType) { case NODE_DISCONNECTED_FROM_MASTER: - Node node = (Node)ev.getData(); + Node node = (Node) ev.getData(); connectionService.notifyNodeDisconnectFromMaster(node); break; case CLUSTER_VIEW_CHANGED: AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return; + if (scheme == null) + return; scheme.handleClusterViewChanged(); connectionService.notifyClusterViewChanged(); break; default: - logger.error("Unknown Connection event {}", eType.ordinal()); + logger.error("Unknown Connection event {}", + eType.ordinal()); } } catch (InterruptedException e) { connectionEvents.clear(); @@ -333,7 +393,7 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene null); } - public void _scheme (CommandInterpreter ci) { + public void _scheme(CommandInterpreter ci) { String schemeStr = ci.nextArgument(); if (schemeStr == null) { ci.println("Please enter valid Scheme name"); @@ -348,7 +408,7 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene activeScheme = scheme; } - public void _printNodes (CommandInterpreter ci) { + public void _printNodes(CommandInterpreter ci) { String controller = ci.nextArgument(); if (controller == null) { ci.println("Nodes connected to this controller : "); @@ -361,14 +421,14 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene } try { InetAddress address = InetAddress.getByName(controller); - ci.println("Nodes connected to controller "+controller); + ci.println("Nodes connected to controller " + controller); if (this.getNodes(address) == null) { ci.println("None"); } else { ci.println(this.getNodes(address).toString()); } } catch (UnknownHostException e) { - logger.error("An error occured",e); + logger.error("An error occured", e); } } @@ -380,4 +440,12 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene help.append("\t printNodes [] - Print connected nodes\n"); return help.toString(); } + + @Override + public Set getControllers(Node node) { + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme == null) + return Collections.emptySet(); + return scheme.getControllers(node); + } }