Migrate TyperUtils.getTableSchema() users
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / OvsdbConnectionManager.java
1 /*
2  * Copyright © 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.southbound;
9
10 import static org.opendaylight.ovsdb.lib.operations.Operations.op;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Optional;
14 import com.google.common.base.Preconditions;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20 import java.net.ConnectException;
21 import java.net.InetAddress;
22 import java.net.UnknownHostException;
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import org.eclipse.jdt.annotation.NonNull;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
35 import org.opendaylight.mdsal.eos.binding.api.Entity;
36 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipCandidateRegistration;
37 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipChange;
38 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListener;
39 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListenerRegistration;
40 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
41 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
42 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
43 import org.opendaylight.ovsdb.lib.OvsdbClient;
44 import org.opendaylight.ovsdb.lib.OvsdbConnection;
45 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
46 import org.opendaylight.ovsdb.lib.operations.Operation;
47 import org.opendaylight.ovsdb.lib.operations.OperationResult;
48 import org.opendaylight.ovsdb.lib.operations.Select;
49 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
50 import org.opendaylight.ovsdb.lib.schema.typed.TypedDatabaseSchema;
51 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
52 import org.opendaylight.ovsdb.schema.openvswitch.OpenVSwitch;
53 import org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationManager;
54 import org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationTask;
55 import org.opendaylight.ovsdb.southbound.reconciliation.configuration.BridgeConfigReconciliationTask;
56 import org.opendaylight.ovsdb.southbound.reconciliation.connection.ConnectionReconciliationTask;
57 import org.opendaylight.ovsdb.southbound.transactions.md.OvsdbNodeRemoveCommand;
58 import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvoker;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAttributes;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAugmentation;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.ovsdb.node.attributes.ConnectionInfo;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.ovsdb.node.attributes.ManagedNodeEntry;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
65 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68
69 public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoCloseable {
70     private final Map<ConnectionInfo, OvsdbConnectionInstance> clients =
71             new ConcurrentHashMap<>();
72     private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionManager.class);
73     private static final String ENTITY_TYPE = "ovsdb";
74     private static final int DB_FETCH_TIMEOUT = 1000;
75
76     private final DataBroker db;
77     private final TransactionInvoker txInvoker;
78     private final Map<ConnectionInfo,InstanceIdentifier<Node>> instanceIdentifiers =
79             new ConcurrentHashMap<>();
80     private final Map<InstanceIdentifier<Node>, OvsdbConnectionInstance> nodeIdVsConnectionInstance =
81             new ConcurrentHashMap<>();
82     private final Map<Entity, OvsdbConnectionInstance> entityConnectionMap =
83             new ConcurrentHashMap<>();
84     private final EntityOwnershipService entityOwnershipService;
85     private final OvsdbDeviceEntityOwnershipListener ovsdbDeviceEntityOwnershipListener;
86     private final OvsdbConnection ovsdbConnection;
87     private final ReconciliationManager reconciliationManager;
88     private final InstanceIdentifierCodec instanceIdentifierCodec;
89
90     public OvsdbConnectionManager(final DataBroker db,final TransactionInvoker txInvoker,
91                                   final EntityOwnershipService entityOwnershipService,
92                                   final OvsdbConnection ovsdbConnection,
93                                   final InstanceIdentifierCodec instanceIdentifierCodec) {
94         this.db = db;
95         this.txInvoker = txInvoker;
96         this.entityOwnershipService = entityOwnershipService;
97         this.ovsdbDeviceEntityOwnershipListener = new OvsdbDeviceEntityOwnershipListener(this, entityOwnershipService);
98         this.ovsdbConnection = ovsdbConnection;
99         this.reconciliationManager = new ReconciliationManager(db, instanceIdentifierCodec);
100         this.instanceIdentifierCodec = instanceIdentifierCodec;
101     }
102
103     @Override
104     public void connected(final OvsdbClient externalClient) {
105         LOG.info("Library connected {} from {}:{} to {}:{}",
106                 externalClient.getConnectionInfo().getType(),
107                 externalClient.getConnectionInfo().getRemoteAddress(),
108                 externalClient.getConnectionInfo().getRemotePort(),
109                 externalClient.getConnectionInfo().getLocalAddress(),
110                 externalClient.getConnectionInfo().getLocalPort());
111         try {
112             List<String> databases = externalClient.getDatabases().get(DB_FETCH_TIMEOUT, TimeUnit.MILLISECONDS);
113             if (databases.contains(SouthboundConstants.OPEN_V_SWITCH)) {
114                 OvsdbConnectionInstance client = connectedButCallBacksNotRegistered(externalClient);
115                 // Register Cluster Ownership for ConnectionInfo
116                 registerEntityForOwnership(client);
117             }
118         } catch (InterruptedException | ExecutionException | TimeoutException e) {
119             LOG.warn("Unable to fetch Database list from device {}. Disconnecting from the device.",
120                     externalClient.getConnectionInfo().getRemoteAddress(), e);
121             externalClient.disconnect();
122         }
123
124     }
125
126     public OvsdbConnectionInstance connectedButCallBacksNotRegistered(final OvsdbClient externalClient) {
127         LOG.info("OVSDB Connection from {}:{}",externalClient.getConnectionInfo().getRemoteAddress(),
128                 externalClient.getConnectionInfo().getRemotePort());
129         ConnectionInfo key = SouthboundMapper.createConnectionInfo(externalClient);
130         OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstance(key);
131
132         // Check if existing ovsdbConnectionInstance for the OvsdbClient present.
133         // In such cases, we will see if the ovsdbConnectionInstance has same externalClient.
134         if (ovsdbConnectionInstance != null) {
135             if (ovsdbConnectionInstance.hasOvsdbClient(externalClient)) {
136                 LOG.warn("OVSDB Connection Instance {} already exists for client {}", key, externalClient);
137                 return ovsdbConnectionInstance;
138             }
139             LOG.warn("OVSDB Connection Instance {} being replaced with client {}", key, externalClient);
140
141             // Unregister Cluster Ownership for ConnectionInfo
142             // Because the ovsdbConnectionInstance is about to be completely replaced!
143             unregisterEntityForOwnership(ovsdbConnectionInstance);
144
145             ovsdbConnectionInstance.disconnect();
146
147             removeConnectionInstance(key);
148
149             stopBridgeConfigReconciliationIfActive(ovsdbConnectionInstance.getInstanceIdentifier());
150         }
151
152         ovsdbConnectionInstance = new OvsdbConnectionInstance(key, externalClient, txInvoker,
153                 getInstanceIdentifier(key));
154         ovsdbConnectionInstance.createTransactInvokers();
155         return ovsdbConnectionInstance;
156     }
157
158     @Override
159     public void disconnected(final OvsdbClient client) {
160         LOG.info("Library disconnected {} from {}:{} to {}:{}. Cleaning up the operational data store",
161                 client.getConnectionInfo().getType(),
162                 client.getConnectionInfo().getRemoteAddress(),
163                 client.getConnectionInfo().getRemotePort(),
164                 client.getConnectionInfo().getLocalAddress(),
165                 client.getConnectionInfo().getLocalPort());
166         ConnectionInfo key = SouthboundMapper.createConnectionInfo(client);
167         OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstance(key);
168         if (ovsdbConnectionInstance != null) {
169             // Unregister Entity ownership as soon as possible ,so this instance should
170             // not be used as a candidate in Entity election (given that this instance is
171             // about to disconnect as well), if current owner get disconnected from
172             // OVSDB device.
173             if (ovsdbConnectionInstance.getHasDeviceOwnership()) {
174                 LOG.info("Library disconnected {} this controller instance has ownership", key);
175                 deleteOperNodeAndReleaseOwnership(ovsdbConnectionInstance);
176             } else {
177                 LOG.info("Library disconnected {} this controller does not have ownership", key);
178                 unregisterEntityForOwnership(ovsdbConnectionInstance);
179             }
180             removeConnectionInstance(key);
181
182             //Controller initiated connection can be terminated from switch side.
183             //So cleanup the instance identifier cache.
184             removeInstanceIdentifier(key);
185             nodeIdVsConnectionInstance.remove(ovsdbConnectionInstance.getInstanceIdentifier(),
186                     ovsdbConnectionInstance);
187             stopBridgeConfigReconciliationIfActive(ovsdbConnectionInstance.getInstanceIdentifier());
188             retryConnection(ovsdbConnectionInstance.getInstanceIdentifier(),
189                     ovsdbConnectionInstance.getOvsdbNodeAugmentation(),
190                     ConnectionReconciliationTriggers.ON_DISCONNECT);
191         } else {
192             LOG.warn("disconnected : Connection instance not found for OVSDB Node {} ", key);
193         }
194         LOG.trace("OvsdbConnectionManager: exit disconnected client: {}", client);
195     }
196
197     private void deleteOperNodeAndReleaseOwnership(final OvsdbConnectionInstance ovsdbConnectionInstance) {
198         ovsdbConnectionInstance.setHasDeviceOwnership(false);
199         final InstanceIdentifier nodeIid = ovsdbConnectionInstance.getInstanceIdentifier();
200         //remove the node from oper only if it has ownership
201         txInvoker.invoke(new OvsdbNodeRemoveCommand(ovsdbConnectionInstance, null, null) {
202
203             @Override
204             public void onSuccess() {
205                 super.onSuccess();
206                 LOG.debug("Successfully removed node {} from oper", nodeIid);
207                 //Giveup the ownership only after cleanup is done
208                 unregisterEntityForOwnership(ovsdbConnectionInstance);
209             }
210
211             @Override
212             public void onFailure(final Throwable throwable) {
213                 LOG.debug("Failed to remove node {} from oper", nodeIid);
214                 super.onFailure(throwable);
215                 unregisterEntityForOwnership(ovsdbConnectionInstance);
216             }
217         });
218     }
219
220     public OvsdbClient connect(final InstanceIdentifier<Node> iid,
221             final OvsdbNodeAugmentation ovsdbNode) throws UnknownHostException, ConnectException {
222         LOG.info("Connecting to {}", SouthboundUtil.connectionInfoToString(ovsdbNode.getConnectionInfo()));
223
224         // TODO handle case where we already have a connection
225         // TODO use transaction chains to handle ordering issues between disconnected
226         // TODO and connected when writing to the operational store
227         InetAddress ip = SouthboundMapper.createInetAddress(ovsdbNode.getConnectionInfo().getRemoteIp());
228         OvsdbClient client = ovsdbConnection.connect(ip,
229                 ovsdbNode.getConnectionInfo().getRemotePort().getValue().toJava());
230         // For connections from the controller to the ovs instance, the library doesn't call
231         // this method for us
232         if (client != null) {
233             putInstanceIdentifier(ovsdbNode.getConnectionInfo(), iid.firstIdentifierOf(Node.class));
234             OvsdbConnectionInstance ovsdbConnectionInstance = connectedButCallBacksNotRegistered(client);
235             ovsdbConnectionInstance.setOvsdbNodeAugmentation(ovsdbNode);
236
237             // Register Cluster Ownership for ConnectionInfo
238             registerEntityForOwnership(ovsdbConnectionInstance);
239         } else {
240             LOG.warn("Failed to connect to OVSDB Node {}", ovsdbNode.getConnectionInfo());
241         }
242         return client;
243     }
244
245     public void disconnect(final OvsdbNodeAugmentation ovsdbNode) throws UnknownHostException {
246         LOG.info("Disconnecting from {}", SouthboundUtil.connectionInfoToString(ovsdbNode.getConnectionInfo()));
247         OvsdbConnectionInstance client = getConnectionInstance(ovsdbNode.getConnectionInfo());
248         if (client != null) {
249             // Unregister Cluster Onwership for ConnectionInfo
250             deleteOperNodeAndReleaseOwnership(client);
251
252             client.disconnect();
253
254             removeInstanceIdentifier(ovsdbNode.getConnectionInfo());
255
256             stopBridgeConfigReconciliationIfActive(client.getInstanceIdentifier());
257         } else {
258             LOG.debug("disconnect : connection instance not found for {}",ovsdbNode.getConnectionInfo());
259         }
260     }
261
262 /*    public void init(ConnectionInfo key) {
263         OvsdbConnectionInstance client = getConnectionInstance(key);
264
265         // TODO (FF): make sure that this cluster instance is the 'entity owner' fo the given OvsdbConnectionInstance ?
266
267         if (client != null) {
268
269              *  Note: registerCallbacks() is idemPotent... so if you call it repeatedly all is safe,
270              *  it only registersCallbacks on the *first* call.
271
272             client.registerCallbacks();
273         }
274     }
275 */
276     @Override
277     public void close() {
278         if (ovsdbDeviceEntityOwnershipListener != null) {
279             ovsdbDeviceEntityOwnershipListener.close();
280         }
281
282         for (OvsdbConnectionInstance client: clients.values()) {
283             client.disconnect();
284         }
285     }
286
287     @VisibleForTesting
288     void putConnectionInstance(final ConnectionInfo key,final OvsdbConnectionInstance instance) {
289         ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
290         clients.put(connectionInfo, instance);
291     }
292
293     private void removeConnectionInstance(final ConnectionInfo key) {
294         ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
295         clients.remove(connectionInfo);
296     }
297
298     @VisibleForTesting
299     void putInstanceIdentifier(final ConnectionInfo key, final InstanceIdentifier<Node> iid) {
300         ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
301         instanceIdentifiers.put(connectionInfo, iid);
302     }
303
304     private void removeInstanceIdentifier(final ConnectionInfo key) {
305         ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
306         instanceIdentifiers.remove(connectionInfo);
307     }
308
309     public InstanceIdentifier<Node> getInstanceIdentifier(final ConnectionInfo key) {
310         ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
311         return instanceIdentifiers.get(connectionInfo);
312     }
313
314     public OvsdbConnectionInstance getConnectionInstance(final ConnectionInfo key) {
315         ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
316         return clients.get(connectionInfo);
317     }
318
319     public OvsdbConnectionInstance getConnectionInstance(final OvsdbBridgeAttributes mn) {
320         Optional<OvsdbNodeAugmentation> optional = SouthboundUtil.getManagingNode(db, mn);
321         if (optional.isPresent()) {
322             return getConnectionInstance(optional.get().getConnectionInfo());
323         } else {
324             return null;
325         }
326     }
327
328     public OvsdbConnectionInstance getConnectionInstance(final Node node) {
329         Preconditions.checkNotNull(node);
330         OvsdbNodeAugmentation ovsdbNode = node.augmentation(OvsdbNodeAugmentation.class);
331         OvsdbBridgeAugmentation ovsdbManagedNode = node.augmentation(OvsdbBridgeAugmentation.class);
332         if (ovsdbNode != null) {
333             return getConnectionInstance(ovsdbNode.getConnectionInfo());
334         } else if (ovsdbManagedNode != null) {
335             return getConnectionInstance(ovsdbManagedNode);
336         } else {
337             LOG.warn("This is not a node that gives any hint how to find its OVSDB Manager: {}",node);
338             return null;
339         }
340     }
341
342     public OvsdbConnectionInstance getConnectionInstance(final InstanceIdentifier<Node> nodePath) {
343         if (nodeIdVsConnectionInstance.get(nodePath) != null) {
344             return nodeIdVsConnectionInstance.get(nodePath);
345         }
346         try {
347             ReadOnlyTransaction transaction = db.newReadOnlyTransaction();
348             CheckedFuture<Optional<Node>, ReadFailedException> nodeFuture = transaction.read(
349                     LogicalDatastoreType.OPERATIONAL, nodePath);
350             transaction.close();
351             Optional<Node> optional = nodeFuture.get();
352             if (optional.isPresent()) {
353                 return this.getConnectionInstance(optional.get());
354             } else {
355                 LOG.debug("Node was not found on the path in the operational DS: {}", nodePath);
356                 return null;
357             }
358         } catch (InterruptedException | ExecutionException e) {
359             LOG.warn("Failed to get Ovsdb Node {}",nodePath, e);
360             return null;
361         }
362     }
363
364     public OvsdbClient getClient(final ConnectionInfo connectionInfo) {
365         OvsdbConnectionInstance connectionInstance = getConnectionInstance(connectionInfo);
366         if (connectionInstance != null) {
367             return connectionInstance.getOvsdbClient();
368         }
369         return null;
370     }
371
372     public OvsdbClient getClient(final OvsdbBridgeAttributes mn) {
373         return getConnectionInstance(mn).getOvsdbClient();
374     }
375
376     public OvsdbClient getClient(final Node node) {
377         return getConnectionInstance(node).getOvsdbClient();
378     }
379
380     public Boolean getHasDeviceOwnership(final ConnectionInfo connectionInfo) {
381         OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstance(connectionInfo);
382         if (ovsdbConnectionInstance == null) {
383             return Boolean.FALSE;
384         }
385         return ovsdbConnectionInstance.getHasDeviceOwnership();
386     }
387
388     public void reconcileConnection(final InstanceIdentifier<Node> iid, final OvsdbNodeAugmentation ovsdbNode) {
389         this.retryConnection(iid, ovsdbNode,
390                 ConnectionReconciliationTriggers.ON_CONTROLLER_INITIATED_CONNECTION_FAILURE);
391
392     }
393
394     public void stopConnectionReconciliationIfActive(final InstanceIdentifier<?> iid,
395             final OvsdbNodeAugmentation ovsdbNode) {
396         final ReconciliationTask task = new ConnectionReconciliationTask(
397                 reconciliationManager,
398                 this,
399                 iid,
400                 ovsdbNode);
401         reconciliationManager.dequeue(task);
402     }
403
404     public void stopBridgeConfigReconciliationIfActive(final InstanceIdentifier<?> iid) {
405         final ReconciliationTask task =
406                 new BridgeConfigReconciliationTask(reconciliationManager, this, iid, null, instanceIdentifierCodec);
407         reconciliationManager.dequeue(task);
408         reconciliationManager.cancelTerminationPointReconciliation();
409     }
410
411     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
412             justification = "https://github.com/spotbugs/spotbugs/issues/811")
413     private void handleOwnershipChanged(final EntityOwnershipChange ownershipChange) {
414         OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstanceFromEntity(ownershipChange.getEntity());
415         LOG.debug("handleOwnershipChanged: {} event received for device {}",
416                 ownershipChange, ovsdbConnectionInstance != null ? ovsdbConnectionInstance.getConnectionInfo()
417                         : "that's currently NOT registered by *this* southbound plugin instance");
418
419         if (ovsdbConnectionInstance == null) {
420             if (ownershipChange.getState().isOwner()) {
421                 LOG.warn("handleOwnershipChanged: *this* instance is elected as an owner of the device {} but it "
422                         + "is NOT registered for ownership", ownershipChange.getEntity());
423             } else {
424                 // EntityOwnershipService sends notification to all the nodes, irrespective of whether
425                 // that instance registered for the device ownership or not. It is to make sure that
426                 // If all the controller instance that was connected to the device are down, so the
427                 // running instance can clear up the operational data store even though it was not
428                 // connected to the device.
429                 LOG.debug("handleOwnershipChanged: No connection instance found for {}", ownershipChange.getEntity());
430             }
431
432             // If entity has no owner, clean up the operational data store (it's possible because owner controller
433             // might went down abruptly and didn't get a chance to clean up the operational data store.
434             if (!ownershipChange.getState().hasOwner()) {
435                 LOG.info("{} has no owner, cleaning up the operational data store", ownershipChange.getEntity());
436                 cleanEntityOperationalData(ownershipChange.getEntity());
437             }
438             return;
439         }
440         //Connection detail need to be cached, irrespective of ownership result.
441         putConnectionInstance(ovsdbConnectionInstance.getMDConnectionInfo(),ovsdbConnectionInstance);
442
443         if (ownershipChange.getState().isOwner() == ovsdbConnectionInstance.getHasDeviceOwnership()) {
444             LOG.info("handleOwnershipChanged: no change in ownership for {}. Ownership status is : {}",
445                     ovsdbConnectionInstance.getConnectionInfo(), ovsdbConnectionInstance.getHasDeviceOwnership()
446                             ? SouthboundConstants.OwnershipStates.OWNER.getState()
447                             : SouthboundConstants.OwnershipStates.NONOWNER.getState());
448             return;
449         }
450
451         ovsdbConnectionInstance.setHasDeviceOwnership(ownershipChange.getState().isOwner());
452         // You were not an owner, but now you are
453         if (ownershipChange.getState().isOwner()) {
454             LOG.info("handleOwnershipChanged: *this* southbound plugin instance is an OWNER of the device {}",
455                     ovsdbConnectionInstance.getConnectionInfo());
456
457             //*this* instance of southbound plugin is owner of the device,
458             //so register for monitor callbacks
459             ovsdbConnectionInstance.registerCallbacks(instanceIdentifierCodec);
460
461             reconcileBridgeConfigurations(ovsdbConnectionInstance);
462         } else {
463             //You were owner of the device, but now you are not. With the current ownership
464             //grant mechanism, this scenario should not occur. Because this scenario will occur
465             //when this controller went down or switch flap the connection, but in both the case
466             //it will go through the re-registration process. We need to implement this condition
467             //when clustering service implement a ownership grant strategy which can revoke the
468             //device ownership for load balancing the devices across the instances.
469             //Once this condition occur, we should unregister the callback.
470             LOG.error("handleOwnershipChanged: *this* southbound plugin instance is no longer the owner of device {}."
471                     + "This should NOT happen.",
472                     ovsdbConnectionInstance.getNodeId().getValue());
473         }
474     }
475
476     private void cleanEntityOperationalData(final Entity entity) {
477
478         //Do explicit cleanup rather than using OvsdbNodeRemoveCommand, because there
479         // are chances that other controller instance went down abruptly and it does
480         // not clear manager entry, which OvsdbNodeRemoveCommand look for before cleanup.
481
482         @SuppressWarnings("unchecked")
483         final InstanceIdentifier<Node> nodeIid = (InstanceIdentifier<Node>) entity.getIdentifier();
484
485         txInvoker.invoke(transaction -> {
486             Optional<Node> ovsdbNodeOpt = SouthboundUtil.readNode(transaction, nodeIid);
487             if (ovsdbNodeOpt.isPresent()) {
488                 Node ovsdbNode = ovsdbNodeOpt.get();
489                 OvsdbNodeAugmentation nodeAugmentation = ovsdbNode.augmentation(OvsdbNodeAugmentation.class);
490                 if (nodeAugmentation != null) {
491                     if (nodeAugmentation.getManagedNodeEntry() != null) {
492                         for (ManagedNodeEntry managedNode : nodeAugmentation.getManagedNodeEntry()) {
493                             transaction.delete(
494                                     LogicalDatastoreType.OPERATIONAL, managedNode.getBridgeRef().getValue());
495                         }
496                     } else {
497                         LOG.debug("{} had no managed nodes", ovsdbNode.getNodeId().getValue());
498                     }
499                 }
500                 transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeIid);
501             }
502         });
503
504     }
505
506     private static OpenVSwitch getOpenVswitchTableEntry(final OvsdbConnectionInstance connectionInstance) {
507         final TypedDatabaseSchema dbSchema;
508         try {
509             dbSchema = connectionInstance.getSchema(OvsdbSchemaContants.DATABASE_NAME).get();
510         } catch (InterruptedException | ExecutionException e) {
511             LOG.warn("Not able to fetch schema for database {} from device {}",
512                     OvsdbSchemaContants.DATABASE_NAME,connectionInstance.getConnectionInfo(),e);
513             return null;
514         }
515
516         final GenericTableSchema openVSwitchSchema = dbSchema.getTableSchema(OpenVSwitch.class);
517         final Select<GenericTableSchema> selectOperation = op.select(openVSwitchSchema);
518         selectOperation.setColumns(new ArrayList<>(openVSwitchSchema.getColumns()));
519
520         List<Operation> operations = new ArrayList<>();
521         operations.add(selectOperation);
522         operations.add(op.comment("Fetching Open_VSwitch table rows"));
523         final List<OperationResult> results;
524         try {
525             results = connectionInstance.transact(dbSchema, operations).get();
526         } catch (InterruptedException | ExecutionException e) {
527             LOG.warn("Not able to fetch OpenVswitch table row from device {}", connectionInstance.getConnectionInfo(),
528                 e);
529             return null;
530         }
531
532         return results == null || results.isEmpty() ? null
533                 : TyperUtils.getTypedRowWrapper(dbSchema, OpenVSwitch.class, results.get(0).getRows().get(0));
534     }
535
536     private Entity getEntityFromConnectionInstance(@NonNull final OvsdbConnectionInstance ovsdbConnectionInstance) {
537         InstanceIdentifier<Node> iid = ovsdbConnectionInstance.getInstanceIdentifier();
538         if (iid == null) {
539             /* Switch initiated connection won't have iid, till it gets OpenVSwitch
540              * table update but update callback is always registered after ownership
541              * is granted. So we are explicitly fetch the row here to get the iid.
542              */
543             OpenVSwitch openvswitchRow = getOpenVswitchTableEntry(ovsdbConnectionInstance);
544             iid = SouthboundMapper.getInstanceIdentifier(instanceIdentifierCodec, openvswitchRow);
545             LOG.info("InstanceIdentifier {} generated for device "
546                     + "connection {}",iid,ovsdbConnectionInstance.getConnectionInfo());
547             ovsdbConnectionInstance.setInstanceIdentifier(iid);
548         }
549         Entity deviceEntity = new Entity(ENTITY_TYPE, iid);
550         LOG.debug("Entity {} created for device connection {}",
551                 deviceEntity, ovsdbConnectionInstance.getConnectionInfo());
552         return deviceEntity;
553     }
554
555     private OvsdbConnectionInstance getConnectionInstanceFromEntity(final Entity entity) {
556         return entityConnectionMap.get(entity);
557     }
558
559     private void registerEntityForOwnership(final OvsdbConnectionInstance ovsdbConnectionInstance) {
560         putConnectionInstance(ovsdbConnectionInstance.getMDConnectionInfo(), ovsdbConnectionInstance);
561
562         Entity candidateEntity = getEntityFromConnectionInstance(ovsdbConnectionInstance);
563         if (entityConnectionMap.containsKey(candidateEntity)) {
564             LOG.error("Old connection still hanging for {}", candidateEntity);
565             disconnected(ovsdbConnectionInstance.getOvsdbClient());
566             //TODO do cleanup for old connection or stale check
567         }
568         nodeIdVsConnectionInstance.put((InstanceIdentifier<Node>) candidateEntity.getIdentifier(),
569                 ovsdbConnectionInstance);
570         entityConnectionMap.put(candidateEntity, ovsdbConnectionInstance);
571         ovsdbConnectionInstance.setConnectedEntity(candidateEntity);
572         try {
573             EntityOwnershipCandidateRegistration registration =
574                     entityOwnershipService.registerCandidate(candidateEntity);
575             ovsdbConnectionInstance.setDeviceOwnershipCandidateRegistration(registration);
576             LOG.info("OVSDB entity {} is registered for ownership.", candidateEntity);
577
578         } catch (CandidateAlreadyRegisteredException e) {
579             LOG.warn("OVSDB entity {} was already registered for ownership", candidateEntity, e);
580         }
581         //If entity already has owner, it won't get notification from EntityOwnershipService
582         java.util.Optional<EntityOwnershipState> ownershipStateOpt =
583                 entityOwnershipService.getOwnershipState(candidateEntity);
584         if (ownershipStateOpt.isPresent()) {
585             EntityOwnershipState ownershipState = ownershipStateOpt.get();
586             if (ownershipState == EntityOwnershipState.OWNED_BY_OTHER) {
587                 ovsdbConnectionInstance.setHasDeviceOwnership(false);
588             } else if (ownershipState == EntityOwnershipState.IS_OWNER) {
589                 ovsdbConnectionInstance.setHasDeviceOwnership(true);
590                 ovsdbConnectionInstance.registerCallbacks(instanceIdentifierCodec);
591             }
592         }
593     }
594
595     private void unregisterEntityForOwnership(final OvsdbConnectionInstance ovsdbConnectionInstance) {
596         ovsdbConnectionInstance.closeDeviceOwnershipCandidateRegistration();
597         entityConnectionMap.remove(ovsdbConnectionInstance.getConnectedEntity(), ovsdbConnectionInstance);
598     }
599
600     private void retryConnection(final InstanceIdentifier<Node> iid, final OvsdbNodeAugmentation ovsdbNode,
601                                  final ConnectionReconciliationTriggers trigger) {
602         final ReconciliationTask task = new ConnectionReconciliationTask(
603                 reconciliationManager,
604                 this,
605                 iid,
606                 ovsdbNode);
607
608         if (reconciliationManager.isEnqueued(task)) {
609             return;
610         }
611         switch (trigger) {
612             case ON_CONTROLLER_INITIATED_CONNECTION_FAILURE:
613                 reconciliationManager.enqueueForRetry(task);
614                 break;
615             case ON_DISCONNECT: {
616                 CheckedFuture<Optional<Node>, ReadFailedException> readNodeFuture;
617                 try (ReadOnlyTransaction tx = db.newReadOnlyTransaction()) {
618                     readNodeFuture = tx.read(LogicalDatastoreType.CONFIGURATION, iid);
619                 }
620                 Futures.addCallback(readNodeFuture, new FutureCallback<Optional<Node>>() {
621                     @Override
622                     public void onSuccess(final Optional<Node> node) {
623                         if (node.isPresent()) {
624                             LOG.info("Disconnected/Failed connection {} was controller initiated, attempting "
625                                     + "reconnection", ovsdbNode.getConnectionInfo());
626                             reconciliationManager.enqueue(task);
627
628                         } else {
629                             LOG.debug("Connection {} was switch initiated, no reconciliation is required",
630                                     iid.firstKeyOf(Node.class).getNodeId());
631                         }
632                     }
633
634                     @Override
635                     public void onFailure(final Throwable throwable) {
636                         LOG.warn("Read Config/DS for Node failed! {}", iid, throwable);
637                     }
638                 }, MoreExecutors.directExecutor());
639                 break;
640             }
641             default:
642                 break;
643         }
644     }
645
646     private void reconcileBridgeConfigurations(final OvsdbConnectionInstance client) {
647         final InstanceIdentifier<Node> nodeIid = client.getInstanceIdentifier();
648         final ReconciliationTask task = new BridgeConfigReconciliationTask(
649                 reconciliationManager, OvsdbConnectionManager.this, nodeIid, client, instanceIdentifierCodec);
650
651         reconciliationManager.enqueue(task);
652     }
653
654     private static class OvsdbDeviceEntityOwnershipListener implements EntityOwnershipListener {
655         private final OvsdbConnectionManager cm;
656         private final EntityOwnershipListenerRegistration listenerRegistration;
657
658         OvsdbDeviceEntityOwnershipListener(final OvsdbConnectionManager cm,
659                 final EntityOwnershipService entityOwnershipService) {
660             this.cm = cm;
661             listenerRegistration = entityOwnershipService.registerListener(ENTITY_TYPE, this);
662         }
663
664         public void close() {
665             listenerRegistration.close();
666         }
667
668         @Override
669         public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
670             cm.handleOwnershipChanged(ownershipChange);
671         }
672     }
673
674     private enum ConnectionReconciliationTriggers {
675         /*
676         Reconciliation trigger for scenario where controller's attempt
677         to connect to switch fails on config data store notification
678         */
679         ON_CONTROLLER_INITIATED_CONNECTION_FAILURE,
680
681         /*
682         Reconciliation trigger for the scenario where controller
683         initiated connection disconnects.
684         */
685         ON_DISCONNECT
686     }
687 }