2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.ovsdb.southbound;
11 import java.net.UnknownHostException;
12 import java.util.HashMap;
13 import java.util.List;
15 import java.util.Map.Entry;
18 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener;
19 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
20 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
21 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
22 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.ovsdb.lib.OvsdbClient;
25 import org.opendaylight.ovsdb.southbound.ovsdb.transact.BridgeOperationalState;
26 import org.opendaylight.ovsdb.southbound.ovsdb.transact.DataChangesManagedByOvsdbNodeEvent;
27 import org.opendaylight.ovsdb.southbound.ovsdb.transact.TransactCommandAggregator;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAugmentation;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.ovsdb.node.attributes.ConnectionInfo;
31 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
32 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
33 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
34 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
35 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
36 import org.opendaylight.yangtools.concepts.ListenerRegistration;
37 import org.opendaylight.yangtools.yang.binding.DataObject;
38 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 import com.google.common.base.Preconditions;
43 import com.google.common.base.Predicates;
44 import com.google.common.collect.Maps;
46 public class OvsdbDataChangeListener implements ClusteredDataChangeListener, AutoCloseable {
48 private ListenerRegistration<DataChangeListener> registration;
49 private OvsdbConnectionManager cm;
50 private DataBroker db;
51 private static final Logger LOG = LoggerFactory.getLogger(OvsdbDataChangeListener.class);
53 OvsdbDataChangeListener(DataBroker db, OvsdbConnectionManager cm) {
54 LOG.info("Registering OvsdbNodeDataChangeListener");
57 InstanceIdentifier<Node> path = InstanceIdentifier
58 .create(NetworkTopology.class)
59 .child(Topology.class, new TopologyKey(SouthboundConstants.OVSDB_TOPOLOGY_ID))
62 db.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, this, DataChangeScope.SUBTREE);
67 public void close() throws Exception {
72 public void onDataChanged(
73 AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
74 LOG.trace("onDataChanged: {}", changes);
75 // Connect first if we have to:
78 // Second update connections if we have to
79 updateConnections(changes);
81 // Then handle updates to the actual data
84 // Finally disconnect if we need to
89 LOG.trace("onDataChanged: exit");
92 private void updateData(
93 AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
94 for (Entry<InstanceIdentifier<Node>, OvsdbConnectionInstance> connectionInstanceEntry :
95 connectionInstancesFromChanges(changes).entrySet()) {
96 OvsdbConnectionInstance connectionInstance = connectionInstanceEntry.getValue();
97 connectionInstance.transact(new TransactCommandAggregator(
98 new BridgeOperationalState(db, changes),
99 new DataChangesManagedByOvsdbNodeEvent(
100 connectionInstance.getInstanceIdentifier(),
105 private void disconnect(
106 AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
107 Map<InstanceIdentifier<?>, DataObject> originalDataObject = changes.getOriginalData();
108 Set<InstanceIdentifier<?>> iiD = changes.getRemovedPaths();
109 for (InstanceIdentifier instanceIdentifier : iiD) {
110 if (originalDataObject.get(instanceIdentifier) instanceof OvsdbNodeAugmentation) {
112 cm.disconnect((OvsdbNodeAugmentation) originalDataObject.get(instanceIdentifier));
113 } catch (UnknownHostException e) {
114 LOG.warn("Failed to disconnect ovsdbNode", e);
120 private void updateConnections(
121 AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
122 for (Entry<InstanceIdentifier<?>, DataObject> updated : changes.getUpdatedData().entrySet()) {
123 if (updated.getValue() instanceof OvsdbNodeAugmentation) {
124 OvsdbNodeAugmentation value = (OvsdbNodeAugmentation) updated.getValue();
125 OvsdbClient client = cm.getClient(value.getConnectionInfo());
126 if (client == null) {
127 for (Entry<InstanceIdentifier<?>, DataObject> original : changes.getOriginalData().entrySet()) {
128 if (original.getValue() instanceof OvsdbNodeAugmentation) {
130 cm.disconnect((OvsdbNodeAugmentation) original.getValue());
131 cm.connect((InstanceIdentifier<Node>) original.getKey(),value);
132 } catch (UnknownHostException e) {
133 LOG.warn("Failed to disconnect to ovsdbNode", e);
142 private void connect(
143 AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
144 for (Entry<InstanceIdentifier<?>, DataObject> created : changes.getCreatedData().entrySet()) {
145 // TODO validate we have the correct kind of InstanceIdentifier
146 if (created.getValue() instanceof OvsdbNodeAugmentation) {
147 OvsdbNodeAugmentation ovsdbNode = (OvsdbNodeAugmentation)created.getValue();
148 ConnectionInfo key = ovsdbNode.getConnectionInfo();
149 InstanceIdentifier<Node> iid = cm.getInstanceIdentifier(key);
151 LOG.warn("Connection to device {} already exists. Plugin does not allow multiple connections "
152 + "to same device, hence dropping the request {}", key, ovsdbNode);
155 cm.connect((InstanceIdentifier<Node>) created.getKey(),
156 (OvsdbNodeAugmentation) created.getValue());
157 } catch (UnknownHostException e) {
158 LOG.warn("Failed to connect to ovsdbNode", e);
166 AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
167 for (Entry<InstanceIdentifier<?>, DataObject> created : changes.getCreatedData().entrySet()) {
168 if (created.getValue() instanceof OvsdbNodeAugmentation) {
169 OvsdbNodeAugmentation ovsdbNode = (OvsdbNodeAugmentation)created.getValue();
170 cm.init(ovsdbNode.getConnectionInfo());
176 public Map<InstanceIdentifier<Node>,OvsdbConnectionInstance> connectionInstancesFromChanges(
177 AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
178 Map<InstanceIdentifier<Node>,OvsdbConnectionInstance> result =
180 result.putAll(connectionInstancesFromMap(changes.getCreatedData()));
181 result.putAll(connectionInstancesFromMap(changes.getUpdatedData()));
182 result.putAll(connectionInstancesFromMap(
183 Maps.filterKeys(changes.getOriginalData(), Predicates.in(changes.getRemovedPaths()))));
187 public Map<InstanceIdentifier<Node>,OvsdbConnectionInstance> connectionInstancesFromMap(Map<InstanceIdentifier<?>,
189 Preconditions.checkNotNull(map);
190 Map<InstanceIdentifier<Node>,OvsdbConnectionInstance> result =
192 for ( Entry<InstanceIdentifier<?>, DataObject> created : map.entrySet()) {
193 if (created.getValue() instanceof Node) {
194 OvsdbConnectionInstance client = null;
195 LOG.debug("Received request for {}",created.getValue());
196 OvsdbBridgeAugmentation bridge =
197 ((Node)created.getValue()).getAugmentation(OvsdbBridgeAugmentation.class);
198 if (bridge != null) {
199 client = cm.getConnectionInstance(bridge);
201 OvsdbNodeAugmentation ovsNode =
202 ((Node)created.getValue()).getAugmentation(OvsdbNodeAugmentation.class);
203 if (ovsNode != null && ovsNode.getConnectionInfo() != null) {
204 client = cm.getConnectionInstance(ovsNode.getConnectionInfo());
206 List<TerminationPoint> terminationPoint = ((Node)created.getValue()).getTerminationPoint();
207 if (!terminationPoint.isEmpty()) {
208 InstanceIdentifier<Node> nodeIid = SouthboundMapper.
209 createInstanceIdentifier(((Node)created.getValue()).getNodeId());
210 client = cm.getConnectionInstance(nodeIid);
214 if (client != null) {
215 LOG.debug("Found client for {}", created.getValue());
217 * As of now data change sets are processed by single thread, so we can assume that device will
218 * be connected and ownership will be decided before sending any instructions down to the device.
219 * Note:Processing order in onDataChange() method should not change. If processing is changed to
220 * use multiple thread, we might need to take care of corner cases, where ownership is not decided
221 * but transaction are ready to go to switch. In that scenario, either we need to queue those task
222 * till ownership is decided for that specific device.
223 * Given that each DataChangeNotification is notified through separate thread, so we are already
224 * multi threaded and i don't see any need to further parallelism per DataChange
225 * notifications processing.
227 if ( cm.getHasDeviceOwnership(client.getMDConnectionInfo())) {
228 LOG.debug("*this* instance of southbound plugin is an "
229 + "owner of the device {}",created.getValue());
230 result.put((InstanceIdentifier<Node>) created.getKey(), client);
232 LOG.debug("*this* instance of southbound plugin is not an "
233 + "owner of the device {}",created.getValue());
236 LOG.debug("Did not find client for {}",created.getValue());