X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=southbound%2Fsouthbound-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fovsdb%2Fsouthbound%2FOvsdbDataChangeListener.java;h=8b2859317224fd62ce678a1cc3ce305049f4ea8c;hb=582e717190867d42b473896c0544bae580989301;hp=14ecfc775581c6f0aed89354e22e2197be1017df;hpb=c0f755cf5f6c3787a5651863151e2d447bba4af5;p=ovsdb.git diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbDataChangeListener.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbDataChangeListener.java index 14ecfc775..8b2859317 100644 --- a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbDataChangeListener.java +++ b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbDataChangeListener.java @@ -1,11 +1,21 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.ovsdb.southbound; import java.net.UnknownHostException; -import java.util.HashSet; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; @@ -17,10 +27,12 @@ import org.opendaylight.ovsdb.southbound.ovsdb.transact.DataChangesManagedByOvsd import org.opendaylight.ovsdb.southbound.ovsdb.transact.TransactCommandAggregator; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAugmentation; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.ovsdb.node.attributes.ConnectionInfo; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; @@ -31,7 +43,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.Maps; -public class OvsdbDataChangeListener implements DataChangeListener, AutoCloseable { +public class OvsdbDataChangeListener implements ClusteredDataChangeListener, AutoCloseable { private ListenerRegistration registration; private OvsdbConnectionManager cm; @@ -72,15 +84,20 @@ public class OvsdbDataChangeListener implements DataChangeListener, AutoCloseabl // Finally disconnect if we need to disconnect(changes); + // init(changes); + + LOG.trace("onDataChanged: exit"); } private void updateData( AsyncDataChangeEvent, DataObject> changes) { - for (OvsdbConnectionInstance connectionInstance : connectionInstancesFromChanges(changes)) { + for (Entry, OvsdbConnectionInstance> connectionInstanceEntry : + connectionInstancesFromChanges(changes).entrySet()) { + OvsdbConnectionInstance connectionInstance = connectionInstanceEntry.getValue(); connectionInstance.transact(new TransactCommandAggregator( new BridgeOperationalState(db, changes), new DataChangesManagedByOvsdbNodeEvent( - SouthboundMapper.createInstanceIdentifier(connectionInstance.getMDConnectionInfo()), + connectionInstance.getInstanceIdentifier(), changes))); } } @@ -111,7 +128,7 @@ public class OvsdbDataChangeListener implements DataChangeListener, AutoCloseabl if (original.getValue() instanceof OvsdbNodeAugmentation) { try { cm.disconnect((OvsdbNodeAugmentation) original.getValue()); - cm.connect(value); + cm.connect((InstanceIdentifier) original.getKey(),value); } catch (UnknownHostException e) { LOG.warn("Failed to disconnect to ovsdbNode", e); } @@ -127,54 +144,97 @@ public class OvsdbDataChangeListener implements DataChangeListener, AutoCloseabl for (Entry, DataObject> created : changes.getCreatedData().entrySet()) { // TODO validate we have the correct kind of InstanceIdentifier if (created.getValue() instanceof OvsdbNodeAugmentation) { - try { - cm.connect((OvsdbNodeAugmentation) created.getValue()); - } catch (UnknownHostException e) { - LOG.warn("Failed to connect to ovsdbNode", e); + OvsdbNodeAugmentation ovsdbNode = (OvsdbNodeAugmentation)created.getValue(); + ConnectionInfo key = ovsdbNode.getConnectionInfo(); + InstanceIdentifier iid = cm.getInstanceIdentifier(key); + if ( iid != null) { + LOG.warn("Connection to device {} already exists. Plugin does not allow multiple connections " + + "to same device, hence dropping the request {}", key, ovsdbNode); + } else { + try { + cm.connect((InstanceIdentifier) created.getKey(), + (OvsdbNodeAugmentation) created.getValue()); + } catch (UnknownHostException e) { + LOG.warn("Failed to connect to ovsdbNode", e); + } } } } } +/* + private void init( + AsyncDataChangeEvent, DataObject> changes) { + for (Entry, DataObject> created : changes.getCreatedData().entrySet()) { + if (created.getValue() instanceof OvsdbNodeAugmentation) { + OvsdbNodeAugmentation ovsdbNode = (OvsdbNodeAugmentation)created.getValue(); + cm.init(ovsdbNode.getConnectionInfo()); + } + } - public Set connectionInstancesFromChanges( + } +*/ + public Map,OvsdbConnectionInstance> connectionInstancesFromChanges( AsyncDataChangeEvent, DataObject> changes) { - Set result = new HashSet(); - result.addAll(connectionInstancesFromMap(changes.getCreatedData())); - result.addAll(connectionInstancesFromMap(changes.getUpdatedData())); - result.addAll(connectionInstancesFromMap( + Map,OvsdbConnectionInstance> result = + new HashMap<>(); + result.putAll(connectionInstancesFromMap(changes.getCreatedData())); + result.putAll(connectionInstancesFromMap(changes.getUpdatedData())); + result.putAll(connectionInstancesFromMap( Maps.filterKeys(changes.getOriginalData(), Predicates.in(changes.getRemovedPaths())))); return result; } - public Set connectionInstancesFromMap(Map, DataObject> map) { + public Map,OvsdbConnectionInstance> connectionInstancesFromMap(Map, + DataObject> map) { Preconditions.checkNotNull(map); - Set result = new HashSet(); + Map,OvsdbConnectionInstance> result = + new HashMap<>(); for ( Entry, DataObject> created : map.entrySet()) { if (created.getValue() instanceof Node) { + OvsdbConnectionInstance client = null; LOG.debug("Received request for {}",created.getValue()); OvsdbBridgeAugmentation bridge = ((Node)created.getValue()).getAugmentation(OvsdbBridgeAugmentation.class); if (bridge != null) { - OvsdbConnectionInstance client = cm.getConnectionInstance(bridge); - if (client != null) { - LOG.debug("Found client for {}", created.getValue()); - result.add(client); - } else { - LOG.debug("Did not find client for {}",created.getValue()); - } + client = cm.getConnectionInstance(bridge); } else { OvsdbNodeAugmentation ovsNode = ((Node)created.getValue()).getAugmentation(OvsdbNodeAugmentation.class); if (ovsNode != null && ovsNode.getConnectionInfo() != null) { - OvsdbConnectionInstance client = cm.getConnectionInstance(ovsNode.getConnectionInfo()); - if (client != null) { - LOG.debug("Found client for {}", created.getValue()); - result.add(client); - } else { - LOG.debug("Did not find client for {}",created.getValue()); + client = cm.getConnectionInstance(ovsNode.getConnectionInfo()); + } else { + List terminationPoint = ((Node)created.getValue()).getTerminationPoint(); + if (!terminationPoint.isEmpty()) { + InstanceIdentifier nodeIid = SouthboundMapper. + createInstanceIdentifier(((Node)created.getValue()).getNodeId()); + client = cm.getConnectionInstance(nodeIid); } } } + if (client != null) { + LOG.debug("Found client for {}", created.getValue()); + /* + * As of now data change sets are processed by single thread, so we can assume that device will + * be connected and ownership will be decided before sending any instructions down to the device. + * Note:Processing order in onDataChange() method should not change. If processing is changed to + * use multiple thread, we might need to take care of corner cases, where ownership is not decided + * but transaction are ready to go to switch. In that scenario, either we need to queue those task + * till ownership is decided for that specific device. + * Given that each DataChangeNotification is notified through separate thread, so we are already + * multi threaded and i don't see any need to further parallelism per DataChange + * notifications processing. + */ + if ( cm.getHasDeviceOwnership(client.getMDConnectionInfo())) { + LOG.debug("*this* instance of southbound plugin is an " + + "owner of the device {}",created.getValue()); + result.put((InstanceIdentifier) created.getKey(), client); + } else { + LOG.debug("*this* instance of southbound plugin is not an " + + "owner of the device {}",created.getValue()); + } + } else { + LOG.debug("Did not find client for {}",created.getValue()); + } } } return result;