Merge "Bug 5174: Support for AutoAttach Table in OVSDB"
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / OvsdbDataChangeListener.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.southbound;
10
11 import java.net.UnknownHostException;
12 import java.util.HashMap;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Map.Entry;
16 import java.util.Set;
17
18 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataChangeListener;
19 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
20 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
21 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
22 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.ovsdb.lib.OvsdbClient;
25 import org.opendaylight.ovsdb.southbound.ovsdb.transact.BridgeOperationalState;
26 import org.opendaylight.ovsdb.southbound.ovsdb.transact.DataChangesManagedByOvsdbNodeEvent;
27 import org.opendaylight.ovsdb.southbound.ovsdb.transact.TransactCommandAggregator;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAugmentation;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.ovsdb.node.attributes.ConnectionInfo;
31 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
32 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
33 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
34 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
35 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
36 import org.opendaylight.yangtools.concepts.ListenerRegistration;
37 import org.opendaylight.yangtools.yang.binding.DataObject;
38 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import com.google.common.base.Preconditions;
43 import com.google.common.base.Predicates;
44 import com.google.common.collect.Maps;
45
46 public class OvsdbDataChangeListener implements ClusteredDataChangeListener, AutoCloseable {
47
48     private ListenerRegistration<DataChangeListener> registration;
49     private OvsdbConnectionManager cm;
50     private DataBroker db;
51     private static final Logger LOG = LoggerFactory.getLogger(OvsdbDataChangeListener.class);
52
53     OvsdbDataChangeListener(DataBroker db, OvsdbConnectionManager cm) {
54         LOG.info("Registering OvsdbNodeDataChangeListener");
55         this.cm = cm;
56         this.db = db;
57         InstanceIdentifier<Node> path = InstanceIdentifier
58                 .create(NetworkTopology.class)
59                 .child(Topology.class, new TopologyKey(SouthboundConstants.OVSDB_TOPOLOGY_ID))
60                 .child(Node.class);
61         registration =
62                 db.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, this, DataChangeScope.SUBTREE);
63
64     }
65
66     @Override
67     public void close() {
68         registration.close();
69     }
70
71     @Override
72     public void onDataChanged(
73             AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
74         LOG.trace("onDataChanged: {}", changes);
75         // Connect first if we have to:
76         connect(changes);
77
78         // Second update connections if we have to
79         updateConnections(changes);
80
81         // Then handle updates to the actual data
82         updateData(changes);
83
84         // Finally disconnect if we need to
85         disconnect(changes);
86
87  //       init(changes);
88
89         LOG.trace("onDataChanged: exit");
90     }
91
92     private void updateData(
93             AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
94         for (Entry<InstanceIdentifier<Node>, OvsdbConnectionInstance> connectionInstanceEntry :
95                 connectionInstancesFromChanges(changes).entrySet()) {
96             OvsdbConnectionInstance connectionInstance = connectionInstanceEntry.getValue();
97             connectionInstance.transact(new TransactCommandAggregator(
98                     new BridgeOperationalState(db, changes),
99                     new DataChangesManagedByOvsdbNodeEvent(
100                             connectionInstance.getInstanceIdentifier(),
101                             changes)));
102         }
103     }
104
105     private void disconnect(
106             AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
107         Map<InstanceIdentifier<?>, DataObject> originalDataObject = changes.getOriginalData();
108         Set<InstanceIdentifier<?>> iiD = changes.getRemovedPaths();
109         for (InstanceIdentifier instanceIdentifier : iiD) {
110             if (originalDataObject.get(instanceIdentifier) instanceof OvsdbNodeAugmentation) {
111                 try {
112                     cm.disconnect((OvsdbNodeAugmentation) originalDataObject.get(instanceIdentifier));
113                 } catch (UnknownHostException e) {
114                     LOG.warn("Failed to disconnect ovsdbNode", e);
115                 }
116             }
117         }
118     }
119
120     private void updateConnections(
121             AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
122         for (Entry<InstanceIdentifier<?>, DataObject> updated : changes.getUpdatedData().entrySet()) {
123             if (updated.getValue() instanceof OvsdbNodeAugmentation) {
124                 OvsdbNodeAugmentation value = (OvsdbNodeAugmentation) updated.getValue();
125                 OvsdbClient client = cm.getClient(value.getConnectionInfo());
126                 if (client == null) {
127                     for (Entry<InstanceIdentifier<?>, DataObject> original : changes.getOriginalData().entrySet()) {
128                         if (original.getValue() instanceof OvsdbNodeAugmentation) {
129                             try {
130                                 cm.disconnect((OvsdbNodeAugmentation) original.getValue());
131                                 cm.connect((InstanceIdentifier<Node>) original.getKey(),value);
132                             } catch (UnknownHostException e) {
133                                 LOG.warn("Failed to disconnect to ovsdbNode", e);
134                             }
135                         }
136                     }
137                 }
138             }
139         }
140     }
141
142     private void connect(
143             AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
144         for (Entry<InstanceIdentifier<?>, DataObject> created : changes.getCreatedData().entrySet()) {
145             // TODO validate we have the correct kind of InstanceIdentifier
146             if (created.getValue() instanceof OvsdbNodeAugmentation) {
147                 OvsdbNodeAugmentation ovsdbNode = (OvsdbNodeAugmentation)created.getValue();
148                 ConnectionInfo key = ovsdbNode.getConnectionInfo();
149                 InstanceIdentifier<Node> iid = cm.getInstanceIdentifier(key);
150                 if ( iid != null) {
151                     LOG.warn("Connection to device {} already exists. Plugin does not allow multiple connections "
152                               + "to same device, hence dropping the request {}", key, ovsdbNode);
153                 } else {
154                     try {
155                         cm.connect((InstanceIdentifier<Node>) created.getKey(),
156                                 (OvsdbNodeAugmentation) created.getValue());
157                     } catch (UnknownHostException e) {
158                         LOG.warn("Failed to connect to ovsdbNode", e);
159                     }
160                 }
161             }
162         }
163     }
164 /*
165     private void init(
166             AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
167         for (Entry<InstanceIdentifier<?>, DataObject> created : changes.getCreatedData().entrySet()) {
168             if (created.getValue() instanceof OvsdbNodeAugmentation) {
169                 OvsdbNodeAugmentation ovsdbNode = (OvsdbNodeAugmentation)created.getValue();
170                 cm.init(ovsdbNode.getConnectionInfo());
171             }
172         }
173
174     }
175 */
176     public Map<InstanceIdentifier<Node>,OvsdbConnectionInstance> connectionInstancesFromChanges(
177             AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
178         Map<InstanceIdentifier<Node>,OvsdbConnectionInstance> result =
179                 new HashMap<>();
180         result.putAll(connectionInstancesFromMap(changes.getCreatedData()));
181         result.putAll(connectionInstancesFromMap(changes.getUpdatedData()));
182         result.putAll(connectionInstancesFromMap(
183                 Maps.filterKeys(changes.getOriginalData(), Predicates.in(changes.getRemovedPaths()))));
184         return result;
185     }
186
187     public Map<InstanceIdentifier<Node>,OvsdbConnectionInstance> connectionInstancesFromMap(Map<InstanceIdentifier<?>,
188             DataObject> map) {
189         Preconditions.checkNotNull(map);
190         Map<InstanceIdentifier<Node>,OvsdbConnectionInstance> result =
191                 new HashMap<>();
192         for ( Entry<InstanceIdentifier<?>, DataObject> created : map.entrySet()) {
193             if (created.getValue() instanceof Node) {
194                 OvsdbConnectionInstance client = null;
195                 LOG.debug("Received request for {}",created.getValue());
196                 OvsdbBridgeAugmentation bridge =
197                         ((Node)created.getValue()).getAugmentation(OvsdbBridgeAugmentation.class);
198                 if (bridge != null) {
199                     client = cm.getConnectionInstance(bridge);
200                 } else {
201                     OvsdbNodeAugmentation ovsNode =
202                             ((Node)created.getValue()).getAugmentation(OvsdbNodeAugmentation.class);
203                     if (ovsNode != null && ovsNode.getConnectionInfo() != null) {
204                         client = cm.getConnectionInstance(ovsNode.getConnectionInfo());
205                     } else {
206                         List<TerminationPoint> terminationPoint = ((Node)created.getValue()).getTerminationPoint();
207                         if (!terminationPoint.isEmpty()) {
208                             InstanceIdentifier<Node> nodeIid = SouthboundMapper.
209                                     createInstanceIdentifier(((Node)created.getValue()).getNodeId());
210                             client = cm.getConnectionInstance(nodeIid);
211                         }
212                     }
213                 }
214                 if (client != null) {
215                     LOG.debug("Found client for {}", created.getValue());
216                     /*
217                      * As of now data change sets are processed by single thread, so we can assume that device will
218                      * be connected and ownership will be decided before sending any instructions down to the device.
219                      * Note:Processing order in onDataChange() method should not change. If processing is changed to
220                      * use multiple thread, we might need to take care of corner cases, where ownership is not decided
221                      * but transaction are ready to go to switch. In that scenario, either we need to queue those task
222                      * till ownership is decided for that specific device.
223                      * Given that each DataChangeNotification is notified through separate thread, so we are already
224                      * multi threaded and i don't see any need to further parallelism per DataChange
225                      * notifications processing.
226                      */
227                     if ( cm.getHasDeviceOwnership(client.getMDConnectionInfo())) {
228                         LOG.debug("*this* instance of southbound plugin is an "
229                                 + "owner of the device {}",created.getValue());
230                         result.put((InstanceIdentifier<Node>) created.getKey(), client);
231                     } else {
232                         LOG.debug("*this* instance of southbound plugin is not an "
233                                 + "owner of the device {}",created.getValue());
234                     }
235                 } else {
236                     LOG.debug("Did not find client for {}",created.getValue());
237                 }
238             }
239         }
240         return result;
241     }
242
243 }