Merge changes from topic 'move-karaf-parent'
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / OvsdbDataTreeChangeListener.java
1 /*
2  * Copyright © 2016 Red Hat, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.southbound;
10
11 import java.net.ConnectException;
12 import java.net.UnknownHostException;
13 import java.util.Collection;
14 import java.util.HashMap;
15 import java.util.Map;
16 import java.util.Map.Entry;
17 import javax.annotation.Nonnull;
18 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
19 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
20 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
21 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
22 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
23 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
24 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
25 import org.opendaylight.ovsdb.lib.OvsdbClient;
26 import org.opendaylight.ovsdb.southbound.ovsdb.transact.BridgeOperationalState;
27 import org.opendaylight.ovsdb.southbound.ovsdb.transact.TransactCommandAggregator;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAugmentation;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.ovsdb.node.attributes.ConnectionInfo;
31 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
32 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
33 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
34 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
35 import org.opendaylight.yangtools.concepts.ListenerRegistration;
36 import org.opendaylight.yangtools.yang.binding.Augmentation;
37 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * Data-tree change listener for OVSDB.
43  */
44 public class OvsdbDataTreeChangeListener implements ClusteredDataTreeChangeListener<Node>, AutoCloseable {
45
46     /** Our registration. */
47     private final ListenerRegistration<DataTreeChangeListener<Node>> registration;
48
49     /** The connection manager. */
50     private final OvsdbConnectionManager cm;
51
52     /** The data broker. */
53     private final DataBroker db;
54
55     /** Logger. */
56     private static final Logger LOG = LoggerFactory.getLogger(OvsdbDataTreeChangeListener.class);
57
58     /**
59      * Create an instance and register the listener.
60      *
61      * @param db The data broker.
62      * @param cm The connection manager.
63      */
64     OvsdbDataTreeChangeListener(DataBroker db, OvsdbConnectionManager cm) {
65         this.cm = cm;
66         this.db = db;
67         InstanceIdentifier<Node> path = InstanceIdentifier
68                 .create(NetworkTopology.class)
69                 .child(Topology.class, new TopologyKey(SouthboundConstants.OVSDB_TOPOLOGY_ID))
70                 .child(Node.class);
71         DataTreeIdentifier<Node> dataTreeIdentifier =
72                 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, path);
73         registration = db.registerDataTreeChangeListener(dataTreeIdentifier, this);
74         LOG.info("OVSDB topology listener has been registered.");
75     }
76
77     @Override
78     public void close() {
79         registration.close();
80         LOG.info("OVSDB topology listener has been closed.");
81     }
82
83     @Override
84     public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Node>> changes) {
85         LOG.trace("onDataTreeChanged: {}", changes);
86
87         // Connect first if necessary
88         connect(changes);
89
90         // Update connections if necessary
91         updateConnections(changes);
92
93         // Update the actual data
94         updateData(changes);
95
96         // Disconnect if necessary
97         disconnect(changes);
98
99         LOG.trace("onDataTreeChanged: exit");
100     }
101
102     private void connect(@Nonnull Collection<DataTreeModification<Node>> changes) {
103         for (DataTreeModification<Node> change : changes) {
104             if (change.getRootNode().getModificationType() == DataObjectModification.ModificationType.WRITE || change
105                     .getRootNode().getModificationType() == DataObjectModification.ModificationType.SUBTREE_MODIFIED) {
106                 DataObjectModification<OvsdbNodeAugmentation> ovsdbNodeModification =
107                         change.getRootNode().getModifiedAugmentation(OvsdbNodeAugmentation.class);
108                 if (ovsdbNodeModification != null && ovsdbNodeModification.getDataBefore() == null
109                         && ovsdbNodeModification.getDataAfter() != null
110                         && ovsdbNodeModification.getDataAfter().getConnectionInfo() != null) {
111                     OvsdbNodeAugmentation ovsdbNode = ovsdbNodeModification.getDataAfter();
112                     ConnectionInfo key = ovsdbNode.getConnectionInfo();
113                     InstanceIdentifier<Node> iid = cm.getInstanceIdentifier(key);
114                     if ( iid != null) {
115                         LOG.warn("Connection to device {} already exists. Plugin does not allow multiple connections "
116                                 + "to same device, hence dropping the request {}", key, ovsdbNode);
117                     } else {
118                         try {
119                             InstanceIdentifier<Node> instanceIdentifier = change.getRootPath().getRootIdentifier();
120                             cm.connect(instanceIdentifier, ovsdbNode);
121                             LOG.info("OVSDB node has been connected: {}",ovsdbNode);
122                         } catch (UnknownHostException | ConnectException e) {
123                             LOG.warn("Failed to connect to ovsdbNode", e);
124                         }
125                     }
126                 }
127             }
128         }
129     }
130
131     private void disconnect(@Nonnull Collection<DataTreeModification<Node>> changes) {
132         for (DataTreeModification<Node> change : changes) {
133             if (change.getRootNode().getModificationType() == DataObjectModification.ModificationType.DELETE) {
134                 DataObjectModification<OvsdbNodeAugmentation> ovsdbNodeModification =
135                         change.getRootNode().getModifiedAugmentation(OvsdbNodeAugmentation.class);
136                 if (ovsdbNodeModification != null && ovsdbNodeModification.getDataBefore() != null) {
137                     OvsdbNodeAugmentation ovsdbNode = ovsdbNodeModification.getDataBefore();
138                     ConnectionInfo key = ovsdbNode.getConnectionInfo();
139                     InstanceIdentifier<Node> iid = cm.getInstanceIdentifier(key);
140                     try {
141                         cm.disconnect(ovsdbNode);
142                         LOG.info("OVSDB node has been disconnected:{}", ovsdbNode);
143                         cm.stopConnectionReconciliationIfActive(iid.firstIdentifierOf(Node.class), ovsdbNode);
144                     } catch (UnknownHostException e) {
145                         LOG.warn("Failed to disconnect ovsdbNode", e);
146                     }
147                 }
148             }
149         }
150     }
151
152     private void updateConnections(@Nonnull Collection<DataTreeModification<Node>> changes) {
153         for (DataTreeModification<Node> change : changes) {
154             if (change.getRootNode().getModificationType() == DataObjectModification.ModificationType.WRITE || change
155                     .getRootNode().getModificationType() == DataObjectModification.ModificationType.SUBTREE_MODIFIED) {
156                 DataObjectModification<OvsdbNodeAugmentation> ovsdbNodeModification =
157                         change.getRootNode().getModifiedAugmentation(OvsdbNodeAugmentation.class);
158                 if (ovsdbNodeModification != null && ovsdbNodeModification.getDataBefore() != null
159                         && ovsdbNodeModification.getDataAfter() != null
160                         && ovsdbNodeModification.getDataAfter().getConnectionInfo() != null) {
161                     OvsdbClient client = cm.getClient(ovsdbNodeModification.getDataAfter().getConnectionInfo());
162                     if (client == null) {
163                         if (ovsdbNodeModification.getDataBefore() != null) {
164                             try {
165                                 cm.disconnect(ovsdbNodeModification.getDataBefore());
166                                 cm.connect(change.getRootPath().getRootIdentifier(), ovsdbNodeModification
167                                         .getDataAfter());
168                             } catch (UnknownHostException | ConnectException e) {
169                                 LOG.warn("Error disconnecting from or connecting to ovsdbNode", e);
170                             }
171                         }
172                     }
173                 }
174             }
175         }
176     }
177
178     private void updateData(@Nonnull Collection<DataTreeModification<Node>> changes) {
179         for (Entry<InstanceIdentifier<Node>, OvsdbConnectionInstance> connectionInstanceEntry :
180                 connectionInstancesFromChanges(changes).entrySet()) {
181             OvsdbConnectionInstance connectionInstance = connectionInstanceEntry.getValue();
182             connectionInstance.transact(new TransactCommandAggregator(),
183                     new BridgeOperationalState(db, changes), changes);
184         }
185     }
186
187     private Map<InstanceIdentifier<Node>, OvsdbConnectionInstance> connectionInstancesFromChanges(
188             @Nonnull Collection<DataTreeModification<Node>> changes) {
189         Map<InstanceIdentifier<Node>,OvsdbConnectionInstance> result =
190                 new HashMap<>();
191         for (DataTreeModification<Node> change : changes) {
192             OvsdbConnectionInstance client = null;
193             Node node = change.getRootNode().getDataAfter() != null
194                     ? change.getRootNode().getDataAfter() : change.getRootNode().getDataBefore();
195             if (node != null) {
196                 InstanceIdentifier<Node> nodeIid;
197                 Augmentation nodeAug = node.getAugmentation(OvsdbNodeAugmentation.class) != null
198                         ? node.getAugmentation(OvsdbNodeAugmentation.class)
199                         : node.getAugmentation(OvsdbBridgeAugmentation.class);
200
201                 if (nodeAug instanceof OvsdbNodeAugmentation) {
202                     OvsdbNodeAugmentation ovsdbNode = (OvsdbNodeAugmentation) nodeAug;
203                     if (ovsdbNode.getConnectionInfo() != null) {
204                         client = cm.getConnectionInstance(ovsdbNode.getConnectionInfo());
205                     } else {
206                         client = cm.getConnectionInstance(SouthboundMapper.createInstanceIdentifier(node.getNodeId()));
207                     }
208                 }
209                 if (nodeAug instanceof OvsdbBridgeAugmentation) {
210                     OvsdbBridgeAugmentation bridgeAugmentation = (OvsdbBridgeAugmentation)nodeAug;
211                     if (bridgeAugmentation.getManagedBy() != null) {
212                         nodeIid = (InstanceIdentifier<Node>) bridgeAugmentation.getManagedBy().getValue();
213                         client = cm.getConnectionInstance(nodeIid);
214                     }
215                 }
216                 if (client == null) {
217                     //Try getting from change root identifier
218                     client = cm.getConnectionInstance(change.getRootPath().getRootIdentifier());
219                 }
220             } else {
221                 LOG.warn("Following change don't have after/before data {}", change);
222             }
223             if (client != null) {
224                 LOG.debug("Found client for {}", node);
225                     /*
226                      * As of now data change sets are processed by single thread, so we can assume that device will
227                      * be connected and ownership will be decided before sending any instructions down to the device.
228                      * Note:Processing order in onDataChange() method should not change. If processing is changed to
229                      * use multiple thread, we might need to take care of corner cases, where ownership is not decided
230                      * but transaction are ready to go to switch. In that scenario, either we need to queue those task
231                      * till ownership is decided for that specific device.
232                      * Given that each DataChangeNotification is notified through separate thread, so we are already
233                      * multi threaded and i don't see any need to further parallelism per DataChange
234                      * notifications processing.
235                      */
236                 if ( cm.getHasDeviceOwnership(client.getMDConnectionInfo())) {
237                     LOG.debug("*This* instance of southbound plugin is an owner of the device {}", node);
238                     result.put(change.getRootPath().getRootIdentifier(), client);
239                 } else {
240                     LOG.debug("*This* instance of southbound plugin is *not* an owner of the device {}", node);
241                 }
242             } else {
243                 LOG.debug("Did not find client for {}", node);
244             }
245         }
246         return result;
247     }
248 }