54ff1e5a21b811fb66a797c1a00f21dc90100fd5
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / OvsdbConnectionManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.southbound;
9
10 import static org.opendaylight.ovsdb.lib.operations.Operations.op;
11
12 import java.net.InetAddress;
13 import java.net.UnknownHostException;
14 import java.util.ArrayList;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.ExecutionException;
19
20 import javax.annotation.Nonnull;
21
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
24 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
26 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
27 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
28 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
29 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
30 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
31 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
34 import org.opendaylight.ovsdb.lib.OvsdbClient;
35 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
36 import org.opendaylight.ovsdb.lib.impl.OvsdbConnectionService;
37 import org.opendaylight.ovsdb.lib.operations.Operation;
38 import org.opendaylight.ovsdb.lib.operations.OperationResult;
39 import org.opendaylight.ovsdb.lib.operations.Select;
40 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
41 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
42 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
43 import org.opendaylight.ovsdb.schema.openvswitch.OpenVSwitch;
44 import org.opendaylight.ovsdb.southbound.transactions.md.OvsdbNodeRemoveCommand;
45 import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvoker;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAttributes;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAugmentation;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.ovsdb.node.attributes.ConnectionInfo;
50 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
51 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import com.google.common.base.Optional;
57 import com.google.common.base.Preconditions;
58 import com.google.common.util.concurrent.CheckedFuture;
59
60 public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoCloseable {
61     private Map<ConnectionInfo, OvsdbConnectionInstance> clients =
62             new ConcurrentHashMap<ConnectionInfo,OvsdbConnectionInstance>();
63     private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionManager.class);
64     private static final String ENTITY_TYPE = "ovsdb";
65
66     private DataBroker db;
67     private TransactionInvoker txInvoker;
68     private Map<ConnectionInfo,InstanceIdentifier<Node>> instanceIdentifiers =
69             new ConcurrentHashMap<ConnectionInfo,InstanceIdentifier<Node>>();
70     private Map<Entity, OvsdbConnectionInstance> entityConnectionMap =
71             new ConcurrentHashMap<>();
72     private EntityOwnershipService entityOwnershipService;
73     private OvsdbDeviceEntityOwnershipListener ovsdbDeviceEntityOwnershipListener;
74
75     public OvsdbConnectionManager(DataBroker db,TransactionInvoker txInvoker,
76                                   EntityOwnershipService entityOwnershipService) {
77         this.db = db;
78         this.txInvoker = txInvoker;
79         this.entityOwnershipService = entityOwnershipService;
80         this.ovsdbDeviceEntityOwnershipListener = new OvsdbDeviceEntityOwnershipListener(this, entityOwnershipService);
81     }
82
83     @Override
84     public void connected(@Nonnull final OvsdbClient externalClient) {
85
86         OvsdbConnectionInstance client = connectedButCallBacksNotRegistered(externalClient);
87
88         // Register Cluster Ownership for ConnectionInfo
89         registerEntityForOwnership(client);
90     }
91
92     public OvsdbConnectionInstance connectedButCallBacksNotRegistered(final OvsdbClient externalClient) {
93         LOG.info("OVSDB Connection from {}:{}",externalClient.getConnectionInfo().getRemoteAddress(),
94                 externalClient.getConnectionInfo().getRemotePort());
95         ConnectionInfo key = SouthboundMapper.createConnectionInfo(externalClient);
96         OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstance(key);
97
98         // Check if existing ovsdbConnectionInstance for the OvsdbClient present.
99         // In such cases, we will see if the ovsdbConnectionInstance has same externalClient.
100         if (ovsdbConnectionInstance != null) {
101             if (ovsdbConnectionInstance.hasOvsdbClient(externalClient)) {
102                 LOG.warn("OVSDB Connection Instance {} already exists for client {}", key, externalClient);
103                 return ovsdbConnectionInstance;
104             }
105             LOG.warn("OVSDB Connection Instance {} being replaced with client {}", key, externalClient);
106             ovsdbConnectionInstance.disconnect();
107
108             // Unregister Cluster Ownership for ConnectionInfo
109             // Because the ovsdbConnectionInstance is about to be completely replaced!
110             unregisterEntityForOwnership(ovsdbConnectionInstance);
111
112             removeConnectionInstance(key);
113         }
114
115         ovsdbConnectionInstance = new OvsdbConnectionInstance(key, externalClient, txInvoker,
116                 getInstanceIdentifier(key));
117         ovsdbConnectionInstance.createTransactInvokers();
118         return ovsdbConnectionInstance;
119     }
120
121     @Override
122     public void disconnected(OvsdbClient client) {
123         LOG.info("OVSDB Disconnected from {}:{}. Cleaning up the operational data store"
124                 ,client.getConnectionInfo().getRemoteAddress(),
125                 client.getConnectionInfo().getRemotePort());
126         ConnectionInfo key = SouthboundMapper.createConnectionInfo(client);
127         OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstance(key);
128         if (ovsdbConnectionInstance != null) {
129             txInvoker.invoke(new OvsdbNodeRemoveCommand(ovsdbConnectionInstance, null, null));
130             removeConnectionInstance(key);
131
132             // Unregister Cluster Onwership for ConnectionInfo
133             unregisterEntityForOwnership(ovsdbConnectionInstance);
134         } else {
135             LOG.warn("OVSDB disconnected event did not find connection instance for {}", key);
136         }
137         LOG.trace("OvsdbConnectionManager: disconnected exit");
138     }
139
140     public OvsdbClient connect(InstanceIdentifier<Node> iid,
141             OvsdbNodeAugmentation ovsdbNode) throws UnknownHostException {
142         // TODO handle case where we already have a connection
143         // TODO use transaction chains to handle ordering issues between disconnected
144         // TODO and connected when writing to the operational store
145         InetAddress ip = SouthboundMapper.createInetAddress(ovsdbNode.getConnectionInfo().getRemoteIp());
146         OvsdbClient client = OvsdbConnectionService.getService().connect(ip,
147                 ovsdbNode.getConnectionInfo().getRemotePort().getValue());
148         // For connections from the controller to the ovs instance, the library doesn't call
149         // this method for us
150         if (client != null) {
151             putInstanceIdentifier(ovsdbNode.getConnectionInfo(), iid.firstIdentifierOf(Node.class));
152             OvsdbConnectionInstance ovsdbConnectionInstance = connectedButCallBacksNotRegistered(client);
153
154             // Register Cluster Ownership for ConnectionInfo
155             registerEntityForOwnership(ovsdbConnectionInstance);
156         } else {
157             LOG.warn("Failed to connect to Ovsdb Node {}", ovsdbNode.getConnectionInfo());
158         }
159         return client;
160     }
161
162     public void disconnect(OvsdbNodeAugmentation ovsdbNode) throws UnknownHostException {
163         OvsdbConnectionInstance client = getConnectionInstance(ovsdbNode.getConnectionInfo());
164         if (client != null) {
165             client.disconnect();
166
167             // Unregister Cluster Onwership for ConnectionInfo
168             unregisterEntityForOwnership(client);
169
170             removeInstanceIdentifier(ovsdbNode.getConnectionInfo());
171         }
172     }
173
174 /*    public void init(ConnectionInfo key) {
175         OvsdbConnectionInstance client = getConnectionInstance(key);
176
177         // TODO (FF): make sure that this cluster instance is the 'entity owner' fo the given OvsdbConnectionInstance ?
178
179         if (client != null) {
180
181              *  Note: registerCallbacks() is idemPotent... so if you call it repeatedly all is safe,
182              *  it only registersCallbacks on the *first* call.
183
184             client.registerCallbacks();
185         }
186     }
187 */
188     @Override
189     public void close() throws Exception {
190         if (ovsdbDeviceEntityOwnershipListener != null) {
191             ovsdbDeviceEntityOwnershipListener.close();
192         }
193
194         for (OvsdbClient client: clients.values()) {
195             client.disconnect();
196         }
197     }
198
199     private void putConnectionInstance(ConnectionInfo key,OvsdbConnectionInstance instance) {
200         ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
201         clients.put(connectionInfo, instance);
202     }
203
204     private void removeConnectionInstance(ConnectionInfo key) {
205         ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
206         clients.remove(connectionInfo);
207     }
208
209     private void putInstanceIdentifier(ConnectionInfo key,InstanceIdentifier<Node> iid) {
210         ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
211         instanceIdentifiers.put(connectionInfo, iid);
212     }
213
214     private void removeInstanceIdentifier(ConnectionInfo key) {
215         ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
216         instanceIdentifiers.remove(connectionInfo);
217     }
218
219     public OvsdbConnectionInstance getConnectionInstance(ConnectionInfo key) {
220         ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
221         return clients.get(connectionInfo);
222     }
223
224     public InstanceIdentifier<Node> getInstanceIdentifier(ConnectionInfo key) {
225         ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
226         InstanceIdentifier<Node> iid = instanceIdentifiers.get(connectionInfo);
227         return iid;
228     }
229
230     public OvsdbConnectionInstance getConnectionInstance(OvsdbBridgeAttributes mn) {
231         Optional<OvsdbNodeAugmentation> optional = SouthboundUtil.getManagingNode(db, mn);
232         if (optional.isPresent()) {
233             return getConnectionInstance(optional.get().getConnectionInfo());
234         } else {
235             return null;
236         }
237     }
238
239     public OvsdbConnectionInstance getConnectionInstance(Node node) {
240         Preconditions.checkNotNull(node);
241         OvsdbNodeAugmentation ovsdbNode = node.getAugmentation(OvsdbNodeAugmentation.class);
242         OvsdbBridgeAugmentation ovsdbManagedNode = node.getAugmentation(OvsdbBridgeAugmentation.class);
243         if (ovsdbNode != null) {
244             return getConnectionInstance(ovsdbNode.getConnectionInfo());
245         } else if (ovsdbManagedNode != null) {
246             return getConnectionInstance(ovsdbManagedNode);
247         } else {
248             LOG.warn("This is not a node that gives any hint how to find its OVSDB Manager: {}",node);
249             return null;
250         }
251     }
252
253     public OvsdbConnectionInstance getConnectionInstance(InstanceIdentifier<Node> nodePath) {
254         try {
255             ReadOnlyTransaction transaction = db.newReadOnlyTransaction();
256             CheckedFuture<Optional<Node>, ReadFailedException> nodeFuture = transaction.read(
257                     LogicalDatastoreType.OPERATIONAL, nodePath);
258             transaction.close();
259             Optional<Node> optional = nodeFuture.get();
260             if (optional != null && optional.isPresent() && optional.get() instanceof Node) {
261                 return this.getConnectionInstance(optional.get());
262             } else {
263                 LOG.warn("Found non-topological node {} on path {}",optional);
264                 return null;
265             }
266         } catch (Exception e) {
267             LOG.warn("Failed to get Ovsdb Node {}",nodePath, e);
268             return null;
269         }
270     }
271
272     public OvsdbClient getClient(ConnectionInfo connectionInfo) {
273         return getConnectionInstance(connectionInfo);
274     }
275
276     public OvsdbClient getClient(OvsdbBridgeAttributes mn) {
277         return getConnectionInstance(mn);
278     }
279
280     public OvsdbClient getClient(Node node) {
281         return getConnectionInstance(node);
282     }
283
284     public Boolean getHasDeviceOwnership(ConnectionInfo connectionInfo) {
285         OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstance(connectionInfo);
286         if (ovsdbConnectionInstance == null) {
287             return Boolean.FALSE;
288         }
289         return ovsdbConnectionInstance.getHasDeviceOwnership();
290     }
291
292     public void setHasDeviceOwnership(ConnectionInfo connectionInfo, Boolean hasDeviceOwnership) {
293         OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstance(connectionInfo);
294         if (ovsdbConnectionInstance != null) {
295             ovsdbConnectionInstance.setHasDeviceOwnership(hasDeviceOwnership);
296         }
297     }
298
299     private void handleOwnershipChanged(EntityOwnershipChange ownershipChange) {
300         OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstanceFromEntity(ownershipChange.getEntity());
301         LOG.info("handleOwnershipChanged: {} event received for device {}",
302                 ownershipChange, ovsdbConnectionInstance != null ? ovsdbConnectionInstance.getConnectionInfo()
303                         : "THAT'S NOT REGISTERED BY THIS SOUTHBOUND PLUGIN INSTANCE");
304
305         if (ovsdbConnectionInstance == null) {
306             if (ownershipChange.isOwner()) {
307                 LOG.warn("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity());
308             } else {
309                 // EntityOwnershipService sends notification to all the nodes, irrespective of whether
310                 // that instance registered for the device ownership or not. It is to make sure that
311                 // If all the controller instance that was connected to the device are down, so the
312                 // running instance can clear up the operational data store even though it was not
313                 // connected to the device.
314                 LOG.debug("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity());
315             }
316
317             // If entity has no owner, clean up the operational data store (it's possible because owner controller
318             // might went down abruptly and didn't get a chance to clean up the operational data store.
319             if (!ownershipChange.hasOwner()) {
320                 LOG.debug("{} has no onwer, cleaning up the operational data store", ownershipChange.getEntity());
321                 // Below code might look weird but it's required. We want to give first opportunity to the
322                 // previous owner of the device to clean up the operational data store if there is no owner now.
323                 // That way we will avoid lot of nasty md-sal exceptions because of concurrent delete.
324                 if (ownershipChange.wasOwner()) {
325                     cleanEntityOperationalData(ownershipChange.getEntity());
326                 }
327                 // If first cleanEntityOperationalData() was called, this call will be no-op.
328                 cleanEntityOperationalData(ownershipChange.getEntity());
329             }
330             return;
331         }
332         //Connection detail need to be cached, irrespective of ownership result.
333         putConnectionInstance(ovsdbConnectionInstance.getMDConnectionInfo(),ovsdbConnectionInstance);
334
335         if (ownershipChange.isOwner() == ovsdbConnectionInstance.getHasDeviceOwnership()) {
336             LOG.debug("handleOwnershipChanged: no change in ownership for {}. Ownership status is : {}",
337                     ovsdbConnectionInstance.getConnectionInfo(), ovsdbConnectionInstance.getHasDeviceOwnership());
338             return;
339         }
340
341         ovsdbConnectionInstance.setHasDeviceOwnership(ownershipChange.isOwner());
342         // You were not an owner, but now you are
343         if (ownershipChange.isOwner()) {
344             LOG.info("handleOwnershipChanged: *this* southbound plugin instance is owner of device {}",
345                     ovsdbConnectionInstance.getConnectionInfo());
346
347             //*this* instance of southbound plugin is owner of the device,
348             //so register for monitor callbacks
349             ovsdbConnectionInstance.registerCallbacks();
350
351         } else {
352             //You were owner of the device, but now you are not. With the current ownership
353             //grant mechanism, this scenario should not occur. Because this scenario will occur
354             //when this controller went down or switch flap the connection, but in both the case
355             //it will go through the re-registration process. We need to implement this condition
356             //when clustering service implement a ownership grant strategy which can revoke the
357             //device ownership for load balancing the devices across the instances.
358             //Once this condition occur, we should unregister the callback.
359             LOG.error("handleOwnershipChanged: *this* southbound plugin instance is no longer the owner of device {}",
360                     ovsdbConnectionInstance.getNodeId().getValue());
361         }
362     }
363
364     private void cleanEntityOperationalData(Entity entity) {
365
366         InstanceIdentifier<Node> nodeIid = (InstanceIdentifier<Node>) SouthboundUtil
367                 .getInstanceIdentifierCodec().bindingDeserializer(entity.getId());
368
369         final ReadWriteTransaction transaction = db.newReadWriteTransaction();
370         Optional<Node> node = SouthboundUtil.readNode(transaction, nodeIid);
371         if (node.isPresent()) {
372             SouthboundUtil.deleteNode(transaction, nodeIid);
373         }
374     }
375
376     private OpenVSwitch getOpenVswitchTableEntry(OvsdbConnectionInstance connectionInstance) {
377         DatabaseSchema dbSchema = null;
378         OpenVSwitch openVSwitchRow = null;
379         try {
380             dbSchema = connectionInstance.getSchema(OvsdbSchemaContants.databaseName).get();
381         } catch (InterruptedException | ExecutionException e) {
382             LOG.warn("Not able to fetch schema for database {} from device {}",
383                     OvsdbSchemaContants.databaseName,connectionInstance.getConnectionInfo(),e);
384         }
385         if (dbSchema != null) {
386             GenericTableSchema openVSwitchSchema = TyperUtils.getTableSchema(dbSchema, OpenVSwitch.class);
387
388             List<String> openVSwitchTableColumn = new ArrayList<String>();
389             openVSwitchTableColumn.addAll(openVSwitchSchema.getColumns());
390             Select<GenericTableSchema> selectOperation = op.select(openVSwitchSchema);
391             selectOperation.setColumns(openVSwitchTableColumn);;
392
393             ArrayList<Operation> operations = new ArrayList<Operation>();
394             operations.add(selectOperation);
395             operations.add(op.comment("Fetching Open_VSwitch table rows"));
396             List<OperationResult> results = null;
397             try {
398                 results = connectionInstance.transact(dbSchema, operations).get();
399                 if (results != null ) {
400                     OperationResult selectResult = results.get(0);
401                     openVSwitchRow = TyperUtils.getTypedRowWrapper(
402                             dbSchema,OpenVSwitch.class,selectResult.getRows().get(0));
403
404                 }
405             } catch (InterruptedException | ExecutionException e) {
406                 LOG.warn("Not able to fetch OpenVswitch table row from device {}",
407                         connectionInstance.getConnectionInfo(),e);
408             }
409         }
410         return openVSwitchRow;
411     }
412     private Entity getEntityFromConnectionInstance(@Nonnull OvsdbConnectionInstance ovsdbConnectionInstance) {
413         YangInstanceIdentifier entityId = null;
414         InstanceIdentifier<Node> iid = ovsdbConnectionInstance.getInstanceIdentifier();;
415         if ( iid == null ) {
416             /* Switch initiated connection won't have iid, till it gets OpenVSwitch
417              * table update but update callback is always registered after ownership
418              * is granted. So we are explicitly fetch the row here to get the iid.
419              */
420             OpenVSwitch openvswitchRow = getOpenVswitchTableEntry(ovsdbConnectionInstance);
421             iid = SouthboundMapper.getInstanceIdentifier(openvswitchRow);
422             LOG.info("InstanceIdentifier {} generated for device "
423                     + "connection {}",iid,ovsdbConnectionInstance.getConnectionInfo());
424
425         }
426         entityId = SouthboundUtil.getInstanceIdentifierCodec().getYangInstanceIdentifier(iid);
427         Entity deviceEntity = new Entity(ENTITY_TYPE, entityId);
428         LOG.debug("Entity {} created for device connection {}",
429                 deviceEntity, ovsdbConnectionInstance.getConnectionInfo());
430         return deviceEntity;
431     }
432
433     private OvsdbConnectionInstance getConnectionInstanceFromEntity(Entity entity) {
434         return entityConnectionMap.get(entity);
435     }
436
437     private void registerEntityForOwnership(OvsdbConnectionInstance ovsdbConnectionInstance) {
438
439         Entity candidateEntity = getEntityFromConnectionInstance(ovsdbConnectionInstance);
440         entityConnectionMap.put(candidateEntity, ovsdbConnectionInstance);
441         ovsdbConnectionInstance.setConnectedEntity(candidateEntity);
442         try {
443             EntityOwnershipCandidateRegistration registration =
444                     entityOwnershipService.registerCandidate(candidateEntity);
445             ovsdbConnectionInstance.setDeviceOwnershipCandidateRegistration(registration);
446             LOG.info("OVSDB entity {} is registred for ownership.", candidateEntity);
447         } catch (CandidateAlreadyRegisteredException e) {
448             LOG.warn("OVSDB entity {} was already registered for {} ownership", candidateEntity, e);
449         }
450
451     }
452
453     private void unregisterEntityForOwnership(OvsdbConnectionInstance ovsdbConnectionInstance) {
454         ovsdbConnectionInstance.closeDeviceOwnershipCandidateRegistration();
455         entityConnectionMap.remove(ovsdbConnectionInstance.getConnectedEntity());
456     }
457
458     private class OvsdbDeviceEntityOwnershipListener implements EntityOwnershipListener {
459         private OvsdbConnectionManager cm;
460         private EntityOwnershipListenerRegistration listenerRegistration;
461
462         OvsdbDeviceEntityOwnershipListener(OvsdbConnectionManager cm, EntityOwnershipService entityOwnershipService) {
463             this.cm = cm;
464             listenerRegistration = entityOwnershipService.registerListener(ENTITY_TYPE, this);
465         }
466         public void close() {
467             listenerRegistration.close();
468         }
469         @Override
470         public void ownershipChanged(EntityOwnershipChange ownershipChange) {
471             cm.handleOwnershipChanged(ownershipChange);
472         }
473     }
474 }