import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.eclipse.osgi.framework.console.CommandInterpreter;
import org.eclipse.osgi.framework.console.CommandProvider;
import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
import org.opendaylight.controller.connectionmanager.scheme.AbstractScheme;
import org.opendaylight.controller.connectionmanager.scheme.SchemeFactory;
import org.opendaylight.controller.sal.connection.ConnectionConstants;
+import org.opendaylight.controller.sal.connection.ConnectionLocality;
import org.opendaylight.controller.sal.connection.IConnectionListener;
import org.opendaylight.controller.sal.connection.IConnectionService;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.core.NodeConnector;
import org.opendaylight.controller.sal.core.Property;
import org.opendaylight.controller.sal.core.UpdateType;
+import org.opendaylight.controller.sal.inventory.IInventoryService;
import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.sal.utils.StatusCode;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ConnectionManager implements IConnectionManager, IConnectionListener,
ICoordinatorChangeAware, IListenInventoryUpdates,
private IConnectionService connectionService;
private Thread connectionEventThread;
private BlockingQueue<ConnectionMgmtEvent> connectionEvents;
+ private IInventoryService inventoryService;
public void setClusterServices(IClusterGlobalServices i) {
this.clusterServices = i;
}
}
- public void started() {
- connectionEventThread = new Thread(new EventHandler(), "ConnectionEvent Thread");
+ public void setInventoryService(IInventoryService service) {
+ logger.trace("Got inventory service set request {}", service);
+ this.inventoryService = service;
+ }
+
+ public void unsetInventoryService(IInventoryService service) {
+ logger.trace("Got a service UNset request");
+ this.inventoryService = null;
+ }
+
+ private void getInventories() {
+ Map<Node, Map<String, Property>> nodeProp = this.inventoryService.getNodeProps();
+ for (Map.Entry<Node, Map<String, Property>> entry : nodeProp.entrySet()) {
+ Node node = entry.getKey();
+ logger.debug("getInventories for node:{}", new Object[] { node });
+ Map<String, Property> propMap = entry.getValue();
+ Set<Property> props = new HashSet<Property>();
+ for (Property property : propMap.values()) {
+ props.add(property);
+ }
+ updateNode(node, UpdateType.ADDED, props);
+ }
+
+ Map<NodeConnector, Map<String, Property>> nodeConnectorProp = this.inventoryService.getNodeConnectorProps();
+ for (Map.Entry<NodeConnector, Map<String, Property>> entry : nodeConnectorProp.entrySet()) {
+ Map<String, Property> propMap = entry.getValue();
+ Set<Property> props = new HashSet<Property>();
+ for (Property property : propMap.values()) {
+ props.add(property);
+ }
+ updateNodeConnector(entry.getKey(), UpdateType.ADDED, props);
+ }
+ }
+
+
+ public void started() {
+ String schemeStr = System.getProperty("connection.scheme");
+ for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) {
+ AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme, clusterServices);
+ if (schemeImpl != null) {
+ schemes.put(scheme, schemeImpl);
+ if (scheme.name().equalsIgnoreCase(schemeStr)) {
+ activeScheme = scheme;
+ }
+ }
+ }
+
connectionEventThread.start();
registerWithOSGIConsole();
notifyClusterViewChanged();
+ // Should pull the Inventory updates in case we missed it
+ getInventories();
}
public void init() {
+ connectionEventThread = new Thread(new EventHandler(), "ConnectionEvent Thread");
this.connectionEvents = new LinkedBlockingQueue<ConnectionMgmtEvent>();
schemes = new ConcurrentHashMap<ConnectionMgmtScheme, AbstractScheme>();
- for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) {
- AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme, clusterServices);
- if (schemeImpl != null) schemes.put(scheme, schemeImpl);
- }
}
- public void stop() {
+ public void stopping() {
connectionEventThread.interrupt();
Set<Node> localNodes = getLocalNodes();
if (localNodes != null) {
return scheme.isLocal(node);
}
+ @Override
+ public ConnectionLocality getLocalityStatus(Node node) {
+ AbstractScheme scheme = schemes.get(activeScheme);
+ if (scheme == null) return ConnectionLocality.NOT_CONNECTED;
+ return scheme.getLocalityStatus(node);
+ }
+
@Override
public void updateNode(Node node, UpdateType type, Set<Property> props) {
logger.debug("updateNode: {} type {} props {}", node, type, props);
String controller = ci.nextArgument();
if (controller == null) {
ci.println("Nodes connected to this controller : ");
- if (this.getLocalNodes() == null) ci.println("None");
- else ci.println(this.getLocalNodes().toString());
+ if (this.getLocalNodes() == null) {
+ ci.println("None");
+ } else {
+ ci.println(this.getLocalNodes().toString());
+ }
return;
}
try {
InetAddress address = InetAddress.getByName(controller);
ci.println("Nodes connected to controller "+controller);
- if (this.getNodes(address) == null) ci.println("None");
- else ci.println(this.getNodes(address).toString());
- return;
+ if (this.getNodes(address) == null) {
+ ci.println("None");
+ } else {
+ ci.println(this.getNodes(address).toString());
+ }
} catch (UnknownHostException e) {
- e.printStackTrace();
+ logger.error("An error occured",e);
}
}