X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fconnectionmanager%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fconnectionmanager%2Finternal%2FConnectionManager.java;h=773c6cac508b9b8974846d7d1ad38ad80fadc1fb;hb=eb9aaa98f3e850d633bd81598c467f6b5961e13e;hp=2d5f80fb79f99365544680e77df45cb779f897d9;hpb=6b64494fd8e4654a298312afb4b8e6e827b75d5d;p=controller.git diff --git a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java index 2d5f80fb79..773c6cac50 100644 --- a/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java +++ b/opendaylight/connectionmanager/implementation/src/main/java/org/opendaylight/controller/connectionmanager/internal/ConnectionManager.java @@ -1,4 +1,3 @@ - /* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * @@ -22,6 +21,7 @@ package org.opendaylight.controller.connectionmanager.internal; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -35,12 +35,12 @@ import org.eclipse.osgi.framework.console.CommandProvider; import org.opendaylight.controller.clustering.services.ICacheUpdateAware; import org.opendaylight.controller.clustering.services.IClusterGlobalServices; import org.opendaylight.controller.clustering.services.ICoordinatorChangeAware; -import org.opendaylight.controller.connectionmanager.ConnectionLocality; import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme; import org.opendaylight.controller.connectionmanager.IConnectionManager; import org.opendaylight.controller.connectionmanager.scheme.AbstractScheme; import org.opendaylight.controller.connectionmanager.scheme.SchemeFactory; import org.opendaylight.controller.sal.connection.ConnectionConstants; +import org.opendaylight.controller.sal.connection.ConnectionLocality; import org.opendaylight.controller.sal.connection.IConnectionListener; import org.opendaylight.controller.sal.connection.IConnectionService; import org.opendaylight.controller.sal.core.Node; @@ -56,11 +56,11 @@ import org.osgi.framework.FrameworkUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ConnectionManager implements IConnectionManager, IConnectionListener, - ICoordinatorChangeAware, IListenInventoryUpdates, - ICacheUpdateAware>, - CommandProvider { - private static final Logger logger = LoggerFactory.getLogger(ConnectionManager.class); +public class ConnectionManager implements IConnectionManager, + IConnectionListener, ICoordinatorChangeAware, IListenInventoryUpdates, + ICacheUpdateAware>, CommandProvider { + private static final Logger logger = LoggerFactory + .getLogger(ConnectionManager.class); private ConnectionMgmtScheme activeScheme = ConnectionMgmtScheme.ANY_CONTROLLER_ONE_MASTER; private IClusterGlobalServices clusterServices; private ConcurrentMap schemes; @@ -100,7 +100,8 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene } private void getInventories() { - Map> nodeProp = this.inventoryService.getNodeProps(); + Map> nodeProp = this.inventoryService + .getNodeProps(); for (Map.Entry> entry : nodeProp.entrySet()) { Node node = entry.getKey(); logger.debug("getInventories for node:{}", new Object[] { node }); @@ -112,8 +113,10 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene updateNode(node, UpdateType.ADDED, props); } - Map> nodeConnectorProp = this.inventoryService.getNodeConnectorProps(); - for (Map.Entry> entry : nodeConnectorProp.entrySet()) { + Map> nodeConnectorProp = this.inventoryService + .getNodeConnectorProps(); + for (Map.Entry> entry : nodeConnectorProp + .entrySet()) { Map propMap = entry.getValue(); Set props = new HashSet(); for (Property property : propMap.values()) { @@ -123,18 +126,18 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene } } - - public void started() { - String schemeStr = System.getProperty("connection.scheme"); - for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) { - AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme, clusterServices); - if (schemeImpl != null) { - schemes.put(scheme, schemeImpl); - if (scheme.name().equalsIgnoreCase(schemeStr)) { - activeScheme = scheme; - } - } - } + public void started() { + String schemeStr = System.getProperty("connection.scheme"); + for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) { + AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme, + clusterServices); + if (schemeImpl != null) { + schemes.put(scheme, schemeImpl); + if (scheme.name().equalsIgnoreCase(schemeStr)) { + activeScheme = scheme; + } + } + } connectionEventThread.start(); @@ -145,7 +148,8 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene } public void init() { - connectionEventThread = new Thread(new EventHandler(), "ConnectionEvent Thread"); + connectionEventThread = new Thread(new EventHandler(), + "ConnectionEvent Thread"); this.connectionEvents = new LinkedBlockingQueue(); schemes = new ConcurrentHashMap(); } @@ -157,7 +161,8 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene AbstractScheme scheme = schemes.get(activeScheme); for (Node localNode : localNodes) { connectionService.disconnect(localNode); - if (scheme != null) scheme.removeNode(localNode); + if (scheme != null) + scheme.removeNode(localNode); } } } @@ -170,28 +175,32 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene @Override public Set getNodes(InetAddress controller) { AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return null; + if (scheme == null) + return null; return scheme.getNodes(controller); } @Override public Set getLocalNodes() { AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return null; + if (scheme == null) + return null; return scheme.getNodes(); } @Override public boolean isLocal(Node node) { AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return false; + if (scheme == null) + return false; return scheme.isLocal(node); } @Override public ConnectionLocality getLocalityStatus(Node node) { AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return ConnectionLocality.NOT_CONNECTED; + if (scheme == null) + return ConnectionLocality.NOT_CONNECTED; return scheme.getLocalityStatus(node); } @@ -199,7 +208,8 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene public void updateNode(Node node, UpdateType type, Set props) { logger.debug("updateNode: {} type {} props {}", node, type, props); AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return; + if (scheme == null) + return; switch (type) { case ADDED: scheme.addNode(node); @@ -208,22 +218,24 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene scheme.removeNode(node); break; default: - break; + break; } } @Override public void updateNodeConnector(NodeConnector nodeConnector, UpdateType type, Set props) { - logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector, type, props); + logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector, + type, props); AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return; + if (scheme == null) + return; switch (type) { case ADDED: scheme.addNode(nodeConnector.getNode()); break; default: - break; + break; } } @@ -233,58 +245,87 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene } @Override - public Node connect(String connectionIdentifier, Map params) { - if (connectionService == null) return null; - return connectionService.connect(connectionIdentifier, params); + public Node connect(String connectionIdentifier, + Map params) { + if (connectionService == null) + return null; + Node node = connectionService.connect(connectionIdentifier, params); + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme != null && node != null) + scheme.addNode(node); + return node; } @Override - public Node connect(String type, String connectionIdentifier, Map params) { - if (connectionService == null) return null; - return connectionService.connect(type, connectionIdentifier, params); + public Node connect(String type, String connectionIdentifier, + Map params) { + if (connectionService == null) + return null; + Node node = connectionService.connect(connectionIdentifier, params); + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme != null && node != null) + scheme.addNode(node); + return node; } @Override - public Status disconnect (Node node) { - if (connectionService == null) return new Status(StatusCode.NOSERVICE); - return connectionService.disconnect(node); + public Status disconnect(Node node) { + if (node == null) + return new Status(StatusCode.BADREQUEST); + if (connectionService == null) + return new Status(StatusCode.NOSERVICE); + Status status = connectionService.disconnect(node); + if (status.isSuccess()) { + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme != null) + scheme.removeNode(node); + } + return status; } @Override public void entryCreated(Node key, String cacheName, boolean originLocal) { - if (originLocal) return; + if (originLocal) + return; } /* - * Clustering Services' doesnt provide the existing states in the cache update callbacks. - * Hence, using a scratch local cache to maintain the existing state. - * + * Clustering Services' doesnt provide the existing states in the cache + * update callbacks. Hence, using a scratch local cache to maintain the + * existing state. */ private ConcurrentMap> existingConnections = new ConcurrentHashMap>(); @Override - public void entryUpdated(Node node, Set newControllers, String cacheName, boolean originLocal) { - if (originLocal) return; + public void entryUpdated(Node node, Set newControllers, + String cacheName, boolean originLocal) { + if (originLocal) + return; Set existingControllers = existingConnections.get(node); if (existingControllers != null) { - logger.debug("Processing Update for : {} NewControllers : {} existingControllers : {}", node, - newControllers.toString(), existingControllers.toString()); + logger.debug( + "Processing Update for : {} NewControllers : {} existingControllers : {}", + node, newControllers.toString(), + existingControllers.toString()); if (newControllers.size() < existingControllers.size()) { - Set removed = new HashSet(existingControllers); + Set removed = new HashSet( + existingControllers); if (removed.removeAll(newControllers)) { logger.debug("notifyNodeDisconnectFromMaster({})", node); notifyNodeDisconnectedEvent(node); } } } else { - logger.debug("Ignoring the Update for : {} NewControllers : {}", node, newControllers.toString()); + logger.debug("Ignoring the Update for : {} NewControllers : {}", + node, newControllers.toString()); } existingConnections.put(node, newControllers); } @Override public void entryDeleted(Node key, String cacheName, boolean originLocal) { - if (originLocal) return; + if (originLocal) + return; logger.debug("Deleted : {} cache : {}", key, cacheName); notifyNodeDisconnectedEvent(key); } @@ -295,22 +336,27 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene this.connectionEvents.put(event); } } catch (InterruptedException e) { - logger.debug("enqueueConnectionEvent caught Interrupt Exception for event {}", event); + logger.debug( + "enqueueConnectionEvent caught Interrupt Exception for event {}", + event); } } private void notifyClusterViewChanged() { - ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null); + ConnectionMgmtEvent event = new ConnectionMgmtEvent( + ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null); enqueueConnectionEvent(event); } private void notifyNodeDisconnectedEvent(Node node) { - ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node); + ConnectionMgmtEvent event = new ConnectionMgmtEvent( + ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node); enqueueConnectionEvent(event); } /* - * this thread monitors the connectionEvent queue for new incoming events from + * this thread monitors the connectionEvent queue for new incoming events + * from */ private class EventHandler implements Runnable { @Override @@ -322,17 +368,19 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene ConnectionMgmtEventType eType = ev.getEvent(); switch (eType) { case NODE_DISCONNECTED_FROM_MASTER: - Node node = (Node)ev.getData(); + Node node = (Node) ev.getData(); connectionService.notifyNodeDisconnectFromMaster(node); break; case CLUSTER_VIEW_CHANGED: AbstractScheme scheme = schemes.get(activeScheme); - if (scheme == null) return; + if (scheme == null) + return; scheme.handleClusterViewChanged(); connectionService.notifyClusterViewChanged(); break; default: - logger.error("Unknown Connection event {}", eType.ordinal()); + logger.error("Unknown Connection event {}", + eType.ordinal()); } } catch (InterruptedException e) { connectionEvents.clear(); @@ -349,7 +397,7 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene null); } - public void _scheme (CommandInterpreter ci) { + public void _scheme(CommandInterpreter ci) { String schemeStr = ci.nextArgument(); if (schemeStr == null) { ci.println("Please enter valid Scheme name"); @@ -364,7 +412,7 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene activeScheme = scheme; } - public void _printNodes (CommandInterpreter ci) { + public void _printNodes(CommandInterpreter ci) { String controller = ci.nextArgument(); if (controller == null) { ci.println("Nodes connected to this controller : "); @@ -377,14 +425,14 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene } try { InetAddress address = InetAddress.getByName(controller); - ci.println("Nodes connected to controller "+controller); + ci.println("Nodes connected to controller " + controller); if (this.getNodes(address) == null) { ci.println("None"); } else { ci.println(this.getNodes(address).toString()); } } catch (UnknownHostException e) { - logger.error("An error occured",e); + logger.error("An error occured", e); } } @@ -396,4 +444,12 @@ public class ConnectionManager implements IConnectionManager, IConnectionListene help.append("\t printNodes [] - Print connected nodes\n"); return help.toString(); } + + @Override + public Set getControllers(Node node) { + AbstractScheme scheme = schemes.get(activeScheme); + if (scheme == null) + return Collections.emptySet(); + return scheme.getControllers(node); + } }