import java.util.Map.Entry;
import java.util.Set;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.ovsdb.southbound.ovsdb.transact.TransactCommandAggregator;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAugmentation;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.ovsdb.node.attributes.ConnectionInfo;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
-public class OvsdbDataChangeListener implements DataChangeListener, AutoCloseable {
+public class OvsdbDataChangeListener implements ClusteredDataChangeListener, AutoCloseable {
private ListenerRegistration<DataChangeListener> registration;
private OvsdbConnectionManager cm;
}
@Override
- public void close() throws Exception {
+ public void close() {
registration.close();
}
// Finally disconnect if we need to
disconnect(changes);
- init(changes);
+ // init(changes);
LOG.trace("onDataChanged: exit");
}
for (Entry<InstanceIdentifier<?>, DataObject> created : changes.getCreatedData().entrySet()) {
// TODO validate we have the correct kind of InstanceIdentifier
if (created.getValue() instanceof OvsdbNodeAugmentation) {
- try {
- cm.connect((InstanceIdentifier<Node>) created.getKey(),
- (OvsdbNodeAugmentation) created.getValue());
- } catch (UnknownHostException e) {
- LOG.warn("Failed to connect to ovsdbNode", e);
+ OvsdbNodeAugmentation ovsdbNode = (OvsdbNodeAugmentation)created.getValue();
+ ConnectionInfo key = ovsdbNode.getConnectionInfo();
+ InstanceIdentifier<Node> iid = cm.getInstanceIdentifier(key);
+ if ( iid != null) {
+ LOG.warn("Connection to device {} already exists. Plugin does not allow multiple connections "
+ + "to same device, hence dropping the request {}", key, ovsdbNode);
+ } else {
+ try {
+ cm.connect((InstanceIdentifier<Node>) created.getKey(),
+ (OvsdbNodeAugmentation) created.getValue());
+ } catch (UnknownHostException e) {
+ LOG.warn("Failed to connect to ovsdbNode", e);
+ }
}
}
}
}
-
+/*
private void init(
AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
for (Entry<InstanceIdentifier<?>, DataObject> created : changes.getCreatedData().entrySet()) {
}
}
-
+*/
public Map<InstanceIdentifier<Node>,OvsdbConnectionInstance> connectionInstancesFromChanges(
AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
Map<InstanceIdentifier<Node>,OvsdbConnectionInstance> result =
- new HashMap<InstanceIdentifier<Node>,OvsdbConnectionInstance>();
+ new HashMap<>();
result.putAll(connectionInstancesFromMap(changes.getCreatedData()));
result.putAll(connectionInstancesFromMap(changes.getUpdatedData()));
result.putAll(connectionInstancesFromMap(
DataObject> map) {
Preconditions.checkNotNull(map);
Map<InstanceIdentifier<Node>,OvsdbConnectionInstance> result =
- new HashMap<InstanceIdentifier<Node>,OvsdbConnectionInstance>();
+ new HashMap<>();
for ( Entry<InstanceIdentifier<?>, DataObject> created : map.entrySet()) {
if (created.getValue() instanceof Node) {
OvsdbConnectionInstance client = null;
}
if (client != null) {
LOG.debug("Found client for {}", created.getValue());
- result.put((InstanceIdentifier<Node>) created.getKey(), client);
+ /*
+ * As of now data change sets are processed by single thread, so we can assume that device will
+ * be connected and ownership will be decided before sending any instructions down to the device.
+ * Note:Processing order in onDataChange() method should not change. If processing is changed to
+ * use multiple thread, we might need to take care of corner cases, where ownership is not decided
+ * but transaction are ready to go to switch. In that scenario, either we need to queue those task
+ * till ownership is decided for that specific device.
+ * Given that each DataChangeNotification is notified through separate thread, so we are already
+ * multi threaded and i don't see any need to further parallelism per DataChange
+ * notifications processing.
+ */
+ if ( cm.getHasDeviceOwnership(client.getMDConnectionInfo())) {
+ LOG.debug("*this* instance of southbound plugin is an "
+ + "owner of the device {}",created.getValue());
+ result.put((InstanceIdentifier<Node>) created.getKey(), client);
+ } else {
+ LOG.debug("*this* instance of southbound plugin is not an "
+ + "owner of the device {}",created.getValue());
+ }
} else {
LOG.debug("Did not find client for {}",created.getValue());
}