-
/*
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
*
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConnectionManager implements IConnectionManager, IConnectionListener,
- ICoordinatorChangeAware, IListenInventoryUpdates,
- ICacheUpdateAware<Node, Set<InetAddress>>,
- CommandProvider {
- private static final Logger logger = LoggerFactory.getLogger(ConnectionManager.class);
+public class ConnectionManager implements IConnectionManager,
+ IConnectionListener, ICoordinatorChangeAware, IListenInventoryUpdates,
+ ICacheUpdateAware<Node, Set<InetAddress>>, CommandProvider {
+ private static final Logger logger = LoggerFactory
+ .getLogger(ConnectionManager.class);
private ConnectionMgmtScheme activeScheme = ConnectionMgmtScheme.ANY_CONTROLLER_ONE_MASTER;
private IClusterGlobalServices clusterServices;
private ConcurrentMap<ConnectionMgmtScheme, AbstractScheme> schemes;
}
private void getInventories() {
- Map<Node, Map<String, Property>> nodeProp = this.inventoryService.getNodeProps();
+ Map<Node, Map<String, Property>> nodeProp = this.inventoryService
+ .getNodeProps();
for (Map.Entry<Node, Map<String, Property>> entry : nodeProp.entrySet()) {
Node node = entry.getKey();
logger.debug("getInventories for node:{}", new Object[] { node });
updateNode(node, UpdateType.ADDED, props);
}
- Map<NodeConnector, Map<String, Property>> nodeConnectorProp = this.inventoryService.getNodeConnectorProps();
- for (Map.Entry<NodeConnector, Map<String, Property>> entry : nodeConnectorProp.entrySet()) {
+ Map<NodeConnector, Map<String, Property>> nodeConnectorProp = this.inventoryService
+ .getNodeConnectorProps();
+ for (Map.Entry<NodeConnector, Map<String, Property>> entry : nodeConnectorProp
+ .entrySet()) {
Map<String, Property> propMap = entry.getValue();
Set<Property> props = new HashSet<Property>();
for (Property property : propMap.values()) {
}
}
-
- public void started() {
- String schemeStr = System.getProperty("connection.scheme");
- for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) {
- AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme, clusterServices);
- if (schemeImpl != null) {
- schemes.put(scheme, schemeImpl);
- if (scheme.name().equalsIgnoreCase(schemeStr)) {
- activeScheme = scheme;
- }
- }
- }
+ public void started() {
+ String schemeStr = System.getProperty("connection.scheme");
+ for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) {
+ AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme,
+ clusterServices);
+ if (schemeImpl != null) {
+ schemes.put(scheme, schemeImpl);
+ if (scheme.name().equalsIgnoreCase(schemeStr)) {
+ activeScheme = scheme;
+ }
+ }
+ }
connectionEventThread.start();
}
public void init() {
- connectionEventThread = new Thread(new EventHandler(), "ConnectionEvent Thread");
+ connectionEventThread = new Thread(new EventHandler(),
+ "ConnectionEvent Thread");
this.connectionEvents = new LinkedBlockingQueue<ConnectionMgmtEvent>();
schemes = new ConcurrentHashMap<ConnectionMgmtScheme, AbstractScheme>();
}
AbstractScheme scheme = schemes.get(activeScheme);
for (Node localNode : localNodes) {
connectionService.disconnect(localNode);
- if (scheme != null) scheme.removeNode(localNode);
+ if (scheme != null)
+ scheme.removeNode(localNode);
}
}
}
@Override
public Set<Node> getNodes(InetAddress controller) {
AbstractScheme scheme = schemes.get(activeScheme);
- if (scheme == null) return null;
+ if (scheme == null)
+ return null;
return scheme.getNodes(controller);
}
@Override
public Set<Node> getLocalNodes() {
AbstractScheme scheme = schemes.get(activeScheme);
- if (scheme == null) return null;
+ if (scheme == null)
+ return null;
return scheme.getNodes();
}
@Override
public boolean isLocal(Node node) {
AbstractScheme scheme = schemes.get(activeScheme);
- if (scheme == null) return false;
+ if (scheme == null)
+ return false;
return scheme.isLocal(node);
}
@Override
public ConnectionLocality getLocalityStatus(Node node) {
AbstractScheme scheme = schemes.get(activeScheme);
- if (scheme == null) return ConnectionLocality.NOT_CONNECTED;
+ if (scheme == null)
+ return ConnectionLocality.NOT_CONNECTED;
return scheme.getLocalityStatus(node);
}
public void updateNode(Node node, UpdateType type, Set<Property> props) {
logger.debug("updateNode: {} type {} props {}", node, type, props);
AbstractScheme scheme = schemes.get(activeScheme);
- if (scheme == null) return;
+ if (scheme == null)
+ return;
switch (type) {
case ADDED:
scheme.addNode(node);
scheme.removeNode(node);
break;
default:
- break;
+ break;
}
}
@Override
public void updateNodeConnector(NodeConnector nodeConnector,
UpdateType type, Set<Property> props) {
- logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector, type, props);
+ logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector,
+ type, props);
AbstractScheme scheme = schemes.get(activeScheme);
- if (scheme == null) return;
+ if (scheme == null)
+ return;
switch (type) {
case ADDED:
scheme.addNode(nodeConnector.getNode());
break;
default:
- break;
+ break;
}
}
}
@Override
- public Node connect(String connectionIdentifier, Map<ConnectionConstants, String> params) {
- if (connectionService == null) return null;
+ public Node connect(String connectionIdentifier,
+ Map<ConnectionConstants, String> params) {
+ if (connectionService == null)
+ return null;
Node node = connectionService.connect(connectionIdentifier, params);
AbstractScheme scheme = schemes.get(activeScheme);
- if (scheme != null && node != null) scheme.addNode(node);
+ if (scheme != null && node != null)
+ scheme.addNode(node);
return node;
}
@Override
- public Node connect(String type, String connectionIdentifier, Map<ConnectionConstants, String> params) {
- if (connectionService == null) return null;
+ public Node connect(String type, String connectionIdentifier,
+ Map<ConnectionConstants, String> params) {
+ if (connectionService == null)
+ return null;
Node node = connectionService.connect(connectionIdentifier, params);
AbstractScheme scheme = schemes.get(activeScheme);
- if (scheme != null && node != null) scheme.addNode(node);
+ if (scheme != null && node != null)
+ scheme.addNode(node);
return node;
}
@Override
- public Status disconnect (Node node) {
- if (node == null) return new Status(StatusCode.BADREQUEST);
- if (connectionService == null) return new Status(StatusCode.NOSERVICE);
+ public Status disconnect(Node node) {
+ if (node == null)
+ return new Status(StatusCode.BADREQUEST);
+ if (connectionService == null)
+ return new Status(StatusCode.NOSERVICE);
Status status = connectionService.disconnect(node);
if (status.isSuccess()) {
AbstractScheme scheme = schemes.get(activeScheme);
- if (scheme != null) scheme.removeNode(node);
+ if (scheme != null)
+ scheme.removeNode(node);
}
return status;
}
@Override
public void entryCreated(Node key, String cacheName, boolean originLocal) {
- if (originLocal) return;
+ if (originLocal)
+ return;
}
/*
- * Clustering Services' doesnt provide the existing states in the cache update callbacks.
- * Hence, using a scratch local cache to maintain the existing state.
- *
+ * Clustering Services' doesnt provide the existing states in the cache
+ * update callbacks. Hence, using a scratch local cache to maintain the
+ * existing state.
*/
private ConcurrentMap<Node, Set<InetAddress>> existingConnections = new ConcurrentHashMap<Node, Set<InetAddress>>();
@Override
- public void entryUpdated(Node node, Set<InetAddress> newControllers, String cacheName, boolean originLocal) {
- if (originLocal) return;
+ public void entryUpdated(Node node, Set<InetAddress> newControllers,
+ String cacheName, boolean originLocal) {
+ if (originLocal)
+ return;
Set<InetAddress> existingControllers = existingConnections.get(node);
if (existingControllers != null) {
- logger.debug("Processing Update for : {} NewControllers : {} existingControllers : {}", node,
- newControllers.toString(), existingControllers.toString());
+ logger.debug(
+ "Processing Update for : {} NewControllers : {} existingControllers : {}",
+ node, newControllers.toString(),
+ existingControllers.toString());
if (newControllers.size() < existingControllers.size()) {
- Set<InetAddress> removed = new HashSet<InetAddress>(existingControllers);
+ Set<InetAddress> removed = new HashSet<InetAddress>(
+ existingControllers);
if (removed.removeAll(newControllers)) {
logger.debug("notifyNodeDisconnectFromMaster({})", node);
notifyNodeDisconnectedEvent(node);
}
}
} else {
- logger.debug("Ignoring the Update for : {} NewControllers : {}", node, newControllers.toString());
+ logger.debug("Ignoring the Update for : {} NewControllers : {}",
+ node, newControllers.toString());
}
existingConnections.put(node, newControllers);
}
@Override
public void entryDeleted(Node key, String cacheName, boolean originLocal) {
- if (originLocal) return;
+ if (originLocal)
+ return;
logger.debug("Deleted : {} cache : {}", key, cacheName);
notifyNodeDisconnectedEvent(key);
}
this.connectionEvents.put(event);
}
} catch (InterruptedException e) {
- logger.debug("enqueueConnectionEvent caught Interrupt Exception for event {}", event);
+ logger.debug(
+ "enqueueConnectionEvent caught Interrupt Exception for event {}",
+ event);
}
}
private void notifyClusterViewChanged() {
- ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null);
+ ConnectionMgmtEvent event = new ConnectionMgmtEvent(
+ ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null);
enqueueConnectionEvent(event);
}
private void notifyNodeDisconnectedEvent(Node node) {
- ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node);
+ ConnectionMgmtEvent event = new ConnectionMgmtEvent(
+ ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node);
enqueueConnectionEvent(event);
}
/*
- * this thread monitors the connectionEvent queue for new incoming events from
+ * this thread monitors the connectionEvent queue for new incoming events
+ * from
*/
private class EventHandler implements Runnable {
@Override
ConnectionMgmtEventType eType = ev.getEvent();
switch (eType) {
case NODE_DISCONNECTED_FROM_MASTER:
- Node node = (Node)ev.getData();
+ Node node = (Node) ev.getData();
connectionService.notifyNodeDisconnectFromMaster(node);
break;
case CLUSTER_VIEW_CHANGED:
AbstractScheme scheme = schemes.get(activeScheme);
- if (scheme == null) return;
+ if (scheme == null)
+ return;
scheme.handleClusterViewChanged();
connectionService.notifyClusterViewChanged();
break;
default:
- logger.error("Unknown Connection event {}", eType.ordinal());
+ logger.error("Unknown Connection event {}",
+ eType.ordinal());
}
} catch (InterruptedException e) {
connectionEvents.clear();
null);
}
- public void _scheme (CommandInterpreter ci) {
+ public void _scheme(CommandInterpreter ci) {
String schemeStr = ci.nextArgument();
if (schemeStr == null) {
ci.println("Please enter valid Scheme name");
activeScheme = scheme;
}
- public void _printNodes (CommandInterpreter ci) {
+ public void _printNodes(CommandInterpreter ci) {
String controller = ci.nextArgument();
if (controller == null) {
ci.println("Nodes connected to this controller : ");
}
try {
InetAddress address = InetAddress.getByName(controller);
- ci.println("Nodes connected to controller "+controller);
+ ci.println("Nodes connected to controller " + controller);
if (this.getNodes(address) == null) {
ci.println("None");
} else {
ci.println(this.getNodes(address).toString());
}
} catch (UnknownHostException e) {
- logger.error("An error occured",e);
+ logger.error("An error occured", e);
}
}
help.append("\t printNodes [<controller>] - Print connected nodes\n");
return help.toString();
}
+
+ @Override
+ public Set<InetAddress> getControllers(Node node) {
+ AbstractScheme scheme = schemes.get(activeScheme);
+ if (scheme == null)
+ return Collections.emptySet();
+ return scheme.getControllers(node);
+ }
}