Merge "Controller initiated connections always connect to one manager listening in...
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / OvsdbDataChangeListener.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.southbound;
10
11 import java.net.UnknownHostException;
12 import java.util.HashMap;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Map.Entry;
16 import java.util.Set;
17
18 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
19 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
21 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
22 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
23 import org.opendaylight.ovsdb.lib.OvsdbClient;
24 import org.opendaylight.ovsdb.southbound.ovsdb.transact.BridgeOperationalState;
25 import org.opendaylight.ovsdb.southbound.ovsdb.transact.DataChangesManagedByOvsdbNodeEvent;
26 import org.opendaylight.ovsdb.southbound.ovsdb.transact.TransactCommandAggregator;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAugmentation;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.ovsdb.node.attributes.ConnectionInfo;
30 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
31 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
32 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
33 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
34 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
35 import org.opendaylight.yangtools.concepts.ListenerRegistration;
36 import org.opendaylight.yangtools.yang.binding.DataObject;
37 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 import com.google.common.base.Preconditions;
42 import com.google.common.base.Predicates;
43 import com.google.common.collect.Maps;
44
45 public class OvsdbDataChangeListener implements DataChangeListener, AutoCloseable {
46
47     private ListenerRegistration<DataChangeListener> registration;
48     private OvsdbConnectionManager cm;
49     private DataBroker db;
50     private static final Logger LOG = LoggerFactory.getLogger(OvsdbDataChangeListener.class);
51
52     OvsdbDataChangeListener(DataBroker db, OvsdbConnectionManager cm) {
53         LOG.info("Registering OvsdbNodeDataChangeListener");
54         this.cm = cm;
55         this.db = db;
56         InstanceIdentifier<Node> path = InstanceIdentifier
57                 .create(NetworkTopology.class)
58                 .child(Topology.class, new TopologyKey(SouthboundConstants.OVSDB_TOPOLOGY_ID))
59                 .child(Node.class);
60         registration =
61                 db.registerDataChangeListener(LogicalDatastoreType.CONFIGURATION, path, this, DataChangeScope.SUBTREE);
62
63     }
64
65     @Override
66     public void close() throws Exception {
67         registration.close();
68     }
69
70     @Override
71     public void onDataChanged(
72             AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
73         LOG.trace("onDataChanged: {}", changes);
74         // Connect first if we have to:
75         connect(changes);
76
77         // Second update connections if we have to
78         updateConnections(changes);
79
80         // Then handle updates to the actual data
81         updateData(changes);
82
83         // Finally disconnect if we need to
84         disconnect(changes);
85
86         init(changes);
87
88         LOG.trace("onDataChanged: exit");
89     }
90
91     private void updateData(
92             AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
93         for (Entry<InstanceIdentifier<Node>, OvsdbConnectionInstance> connectionInstanceEntry :
94                 connectionInstancesFromChanges(changes).entrySet()) {
95             OvsdbConnectionInstance connectionInstance = connectionInstanceEntry.getValue();
96             connectionInstance.transact(new TransactCommandAggregator(
97                     new BridgeOperationalState(db, changes),
98                     new DataChangesManagedByOvsdbNodeEvent(
99                             connectionInstance.getInstanceIdentifier(),
100                             changes)));
101         }
102     }
103
104     private void disconnect(
105             AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
106         Map<InstanceIdentifier<?>, DataObject> originalDataObject = changes.getOriginalData();
107         Set<InstanceIdentifier<?>> iiD = changes.getRemovedPaths();
108         for (InstanceIdentifier instanceIdentifier : iiD) {
109             if (originalDataObject.get(instanceIdentifier) instanceof OvsdbNodeAugmentation) {
110                 try {
111                     cm.disconnect((OvsdbNodeAugmentation) originalDataObject.get(instanceIdentifier));
112                 } catch (UnknownHostException e) {
113                     LOG.warn("Failed to disconnect ovsdbNode", e);
114                 }
115             }
116         }
117     }
118
119     private void updateConnections(
120             AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
121         for (Entry<InstanceIdentifier<?>, DataObject> updated : changes.getUpdatedData().entrySet()) {
122             if (updated.getValue() instanceof OvsdbNodeAugmentation) {
123                 OvsdbNodeAugmentation value = (OvsdbNodeAugmentation) updated.getValue();
124                 OvsdbClient client = cm.getClient(value.getConnectionInfo());
125                 if (client == null) {
126                     for (Entry<InstanceIdentifier<?>, DataObject> original : changes.getOriginalData().entrySet()) {
127                         if (original.getValue() instanceof OvsdbNodeAugmentation) {
128                             try {
129                                 cm.disconnect((OvsdbNodeAugmentation) original.getValue());
130                                 cm.connect((InstanceIdentifier<Node>) original.getKey(),value);
131                             } catch (UnknownHostException e) {
132                                 LOG.warn("Failed to disconnect to ovsdbNode", e);
133                             }
134                         }
135                     }
136                 }
137             }
138         }
139     }
140
141     private void connect(
142             AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
143         for (Entry<InstanceIdentifier<?>, DataObject> created : changes.getCreatedData().entrySet()) {
144             // TODO validate we have the correct kind of InstanceIdentifier
145             if (created.getValue() instanceof OvsdbNodeAugmentation) {
146                 OvsdbNodeAugmentation ovsdbNode = (OvsdbNodeAugmentation)created.getValue();
147                 ConnectionInfo key = ovsdbNode.getConnectionInfo();
148                 InstanceIdentifier<Node> iid = cm.getInstanceIdentifier(key);
149                 if ( iid != null) {
150                     LOG.warn("Connection to device {} already exists. Plugin does not allow multiple connections "
151                               + "to same device, hence dropping the request {}", key, ovsdbNode);
152                 } else {
153                     try {
154                         cm.connect((InstanceIdentifier<Node>) created.getKey(),
155                                 (OvsdbNodeAugmentation) created.getValue());
156                     } catch (UnknownHostException e) {
157                         LOG.warn("Failed to connect to ovsdbNode", e);
158                     }
159                 }
160             }
161         }
162     }
163
164     private void init(
165             AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
166         for (Entry<InstanceIdentifier<?>, DataObject> created : changes.getCreatedData().entrySet()) {
167             if (created.getValue() instanceof OvsdbNodeAugmentation) {
168                 OvsdbNodeAugmentation ovsdbNode = (OvsdbNodeAugmentation)created.getValue();
169                 cm.init(ovsdbNode.getConnectionInfo());
170             }
171         }
172
173     }
174
175     public Map<InstanceIdentifier<Node>,OvsdbConnectionInstance> connectionInstancesFromChanges(
176             AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes) {
177         Map<InstanceIdentifier<Node>,OvsdbConnectionInstance> result =
178                 new HashMap<InstanceIdentifier<Node>,OvsdbConnectionInstance>();
179         result.putAll(connectionInstancesFromMap(changes.getCreatedData()));
180         result.putAll(connectionInstancesFromMap(changes.getUpdatedData()));
181         result.putAll(connectionInstancesFromMap(
182                 Maps.filterKeys(changes.getOriginalData(), Predicates.in(changes.getRemovedPaths()))));
183         return result;
184     }
185
186     public Map<InstanceIdentifier<Node>,OvsdbConnectionInstance> connectionInstancesFromMap(Map<InstanceIdentifier<?>,
187             DataObject> map) {
188         Preconditions.checkNotNull(map);
189         Map<InstanceIdentifier<Node>,OvsdbConnectionInstance> result =
190                 new HashMap<InstanceIdentifier<Node>,OvsdbConnectionInstance>();
191         for ( Entry<InstanceIdentifier<?>, DataObject> created : map.entrySet()) {
192             if (created.getValue() instanceof Node) {
193                 OvsdbConnectionInstance client = null;
194                 LOG.debug("Received request for {}",created.getValue());
195                 OvsdbBridgeAugmentation bridge =
196                         ((Node)created.getValue()).getAugmentation(OvsdbBridgeAugmentation.class);
197                 if (bridge != null) {
198                     client = cm.getConnectionInstance(bridge);
199                 } else {
200                     OvsdbNodeAugmentation ovsNode =
201                             ((Node)created.getValue()).getAugmentation(OvsdbNodeAugmentation.class);
202                     if (ovsNode != null && ovsNode.getConnectionInfo() != null) {
203                         client = cm.getConnectionInstance(ovsNode.getConnectionInfo());
204                     } else {
205                         List<TerminationPoint> terminationPoint = ((Node)created.getValue()).getTerminationPoint();
206                         if (!terminationPoint.isEmpty()) {
207                             InstanceIdentifier<Node> nodeIid = SouthboundMapper.
208                                     createInstanceIdentifier(((Node)created.getValue()).getNodeId());
209                             client = cm.getConnectionInstance(nodeIid);
210                         }
211                     }
212                 }
213                 if (client != null) {
214                     LOG.debug("Found client for {}", created.getValue());
215                     result.put((InstanceIdentifier<Node>) created.getKey(), client);
216                 } else {
217                     LOG.debug("Did not find client for {}",created.getValue());
218                 }
219             }
220         }
221         return result;
222     }
223
224 }