4a07e20f7df7890ef255c838340864277baf7028
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / HwvtepConnectionManager.java
1 /*
2  * Copyright (c) 2015, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.hwvtepsouthbound;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
17 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
18 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
19 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
20 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
21 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
22 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
23 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
24 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
25 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationManager;
29 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationTask;
30 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.configuration.HwvtepReconciliationTask;
31 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.connection.ConnectionReconciliationTask;
32 import org.opendaylight.ovsdb.hwvtepsouthbound.transact.DependencyQueue;
33 import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.HwvtepGlobalRemoveCommand;
34 import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionInvoker;
35 import org.opendaylight.ovsdb.lib.OvsdbClient;
36 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
37 import org.opendaylight.ovsdb.lib.impl.OvsdbConnectionService;
38 import org.opendaylight.ovsdb.lib.operations.Operation;
39 import org.opendaylight.ovsdb.lib.operations.OperationResult;
40 import org.opendaylight.ovsdb.lib.operations.Select;
41 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
42 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
43 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
44 import org.opendaylight.ovsdb.schema.hardwarevtep.Global;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepPhysicalSwitchAttributes;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.ConnectionInfo;
49 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
50 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 import javax.annotation.Nonnull;
56 import javax.annotation.Nullable;
57 import java.net.ConnectException;
58 import java.net.InetAddress;
59 import java.net.UnknownHostException;
60 import java.util.ArrayList;
61 import java.util.List;
62 import java.util.Map;
63 import java.util.Objects;
64 import java.util.concurrent.ConcurrentHashMap;
65 import java.util.concurrent.ExecutionException;
66 import java.util.concurrent.TimeUnit;
67 import java.util.concurrent.TimeoutException;
68
69 import static org.opendaylight.ovsdb.lib.operations.Operations.op;
70
71 public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoCloseable{
72     private Map<ConnectionInfo, HwvtepConnectionInstance> clients = new ConcurrentHashMap<>();
73     private static final Logger LOG = LoggerFactory.getLogger(HwvtepConnectionManager.class);
74     private static final String ENTITY_TYPE = "hwvtep";
75     private static final int DB_FETCH_TIMEOUT = 1000;
76
77     private DataBroker db;
78     private TransactionInvoker txInvoker;
79     private Map<ConnectionInfo,InstanceIdentifier<Node>> instanceIdentifiers = new ConcurrentHashMap<>();
80     private Map<Entity, HwvtepConnectionInstance> entityConnectionMap = new ConcurrentHashMap<>();
81     private EntityOwnershipService entityOwnershipService;
82     private HwvtepDeviceEntityOwnershipListener hwvtepDeviceEntityOwnershipListener;
83     private final ReconciliationManager reconciliationManager;
84     private final Map<InstanceIdentifier<Node>, HwvtepConnectionInstance> nodeIidVsConnectionInstance =
85             new ConcurrentHashMap<>();
86     private HwvtepOperGlobalListener hwvtepOperGlobalListener;
87
88     public HwvtepConnectionManager(DataBroker db, TransactionInvoker txInvoker,
89                     EntityOwnershipService entityOwnershipService) {
90         this.db = db;
91         this.txInvoker = txInvoker;
92         this.entityOwnershipService = entityOwnershipService;
93         this.hwvtepDeviceEntityOwnershipListener = new HwvtepDeviceEntityOwnershipListener(this,entityOwnershipService);
94         this.reconciliationManager = new ReconciliationManager(db);
95         this.hwvtepOperGlobalListener = new HwvtepOperGlobalListener(db, this);
96     }
97
98     @Override
99     public void close() throws Exception {
100         if (hwvtepDeviceEntityOwnershipListener != null) {
101             hwvtepDeviceEntityOwnershipListener.close();
102         }
103
104         for (HwvtepConnectionInstance client: clients.values()) {
105             client.disconnect();
106         }
107         DependencyQueue.close();
108     }
109
110     @Override
111     public void connected(@Nonnull final OvsdbClient externalClient) {
112         LOG.info("Library connected {} from {}:{} to {}:{}",
113                 externalClient.getConnectionInfo().getType(),
114                 externalClient.getConnectionInfo().getRemoteAddress(),
115                 externalClient.getConnectionInfo().getRemotePort(),
116                 externalClient.getConnectionInfo().getLocalAddress(),
117                 externalClient.getConnectionInfo().getLocalPort());
118         List<String> databases = new ArrayList<>();
119         try {
120             databases = externalClient.getDatabases().get(DB_FETCH_TIMEOUT, TimeUnit.MILLISECONDS);
121             if(databases.contains(HwvtepSchemaConstants.HARDWARE_VTEP)) {
122                 HwvtepConnectionInstance hwClient = connectedButCallBacksNotRegistered(externalClient);
123                 registerEntityForOwnership(hwClient);
124             }
125         } catch (InterruptedException | ExecutionException | TimeoutException e) {
126             LOG.warn("Unable to fetch Database list from device {}. Disconnecting from the device.",
127                     externalClient.getConnectionInfo().getRemoteAddress(), e);
128             externalClient.disconnect();
129         }
130     }
131
132     @Override
133     public void disconnected(OvsdbClient client) {
134         LOG.info("Library disconnected {} from {}:{} to {}:{}. Cleaning up the operational data store",
135                 client.getConnectionInfo().getType(),
136                 client.getConnectionInfo().getRemoteAddress(),
137                 client.getConnectionInfo().getRemotePort(),
138                 client.getConnectionInfo().getLocalAddress(),
139                 client.getConnectionInfo().getLocalPort());
140         ConnectionInfo key = HwvtepSouthboundMapper.createConnectionInfo(client);
141         HwvtepConnectionInstance hwvtepConnectionInstance = getConnectionInstance(key);
142         if (hwvtepConnectionInstance != null) {
143             // Unregister Entity ownership as soon as possible ,so this instance should
144             // not be used as a candidate in Entity election (given that this instance is
145             // about to disconnect as well), if current owner get disconnected from
146             // HWVTEP device.
147             if (hwvtepConnectionInstance.getHasDeviceOwnership()) {
148                 unregisterEntityForOwnership(hwvtepConnectionInstance);
149                 txInvoker.invoke(new HwvtepGlobalRemoveCommand(hwvtepConnectionInstance, null, null));
150             } else {
151                 unregisterEntityForOwnership(hwvtepConnectionInstance);
152                 //Do not delete if client disconnected from follower HwvtepGlobalRemoveCommand
153             }
154
155             removeConnectionInstance(key);
156
157             //Controller initiated connection can be terminated from switch side.
158             //So cleanup the instance identifier cache.
159             removeInstanceIdentifier(key);
160             removeConnectionInstance(hwvtepConnectionInstance.getInstanceIdentifier());
161             retryConnection(hwvtepConnectionInstance.getInstanceIdentifier(),
162                     hwvtepConnectionInstance.getHwvtepGlobalAugmentation(),
163                     ConnectionReconciliationTriggers.ON_DISCONNECT);
164         } else {
165             LOG.warn("HWVTEP disconnected event did not find connection instance for {}", key);
166         }
167         LOG.trace("HwvtepConnectionManager exit disconnected client: {}", client);
168     }
169
170     public OvsdbClient connect(InstanceIdentifier<Node> iid,
171                                HwvtepGlobalAugmentation hwvtepGlobal) throws UnknownHostException, ConnectException {
172         LOG.info("Connecting to {}", HwvtepSouthboundUtil.connectionInfoToString(hwvtepGlobal.getConnectionInfo()));
173         InetAddress ip = HwvtepSouthboundMapper.createInetAddress(hwvtepGlobal.getConnectionInfo().getRemoteIp());
174         OvsdbClient client = OvsdbConnectionService.getService()
175                         .connect(ip, hwvtepGlobal.getConnectionInfo().getRemotePort().getValue());
176         if(client != null) {
177             putInstanceIdentifier(hwvtepGlobal.getConnectionInfo(), iid.firstIdentifierOf(Node.class));
178             HwvtepConnectionInstance hwvtepConnectionInstance = connectedButCallBacksNotRegistered(client);
179             hwvtepConnectionInstance.setHwvtepGlobalAugmentation(hwvtepGlobal);
180             hwvtepConnectionInstance.setInstanceIdentifier(iid.firstIdentifierOf(Node.class));
181
182             // Register Cluster Ownership for ConnectionInfo
183             registerEntityForOwnership(hwvtepConnectionInstance);
184         } else {
185             LOG.warn("Failed to connect to OVSDB node: {}", hwvtepGlobal.getConnectionInfo());
186         }
187         return client;
188     }
189     public void disconnect(HwvtepGlobalAugmentation ovsdbNode) throws UnknownHostException {
190         LOG.info("Diconnecting from {}", HwvtepSouthboundUtil.connectionInfoToString(ovsdbNode.getConnectionInfo()));
191         HwvtepConnectionInstance client = getConnectionInstance(ovsdbNode.getConnectionInfo());
192         if (client != null) {
193             client.disconnect();
194             // Unregister Cluster Ownership for ConnectionInfo
195             unregisterEntityForOwnership(client);
196             removeInstanceIdentifier(ovsdbNode.getConnectionInfo());
197         }
198     }
199
200     public HwvtepConnectionInstance connectedButCallBacksNotRegistered(final OvsdbClient externalClient) {
201         LOG.info("OVSDB Connection from {}:{}",externalClient.getConnectionInfo().getRemoteAddress(),
202                 externalClient.getConnectionInfo().getRemotePort());
203         ConnectionInfo key = HwvtepSouthboundMapper.createConnectionInfo(externalClient);
204         HwvtepConnectionInstance hwvtepConnectionInstance = getConnectionInstance(key);
205
206         // Check if existing hwvtepConnectionInstance for the OvsdbClient present.
207         // In such cases, we will see if the hwvtepConnectionInstance has same externalClient.
208         if (hwvtepConnectionInstance != null) {
209             if (hwvtepConnectionInstance.hasOvsdbClient(externalClient)) {
210                 LOG.warn("HWVTEP Connection Instance {} already exists for client {}", key, externalClient);
211                 return hwvtepConnectionInstance;
212             }
213             LOG.warn("HWVTEP Connection Instance {} being replaced with client {}", key, externalClient);
214             hwvtepConnectionInstance.disconnect();
215
216             // Unregister Cluster Ownership for ConnectionInfo
217             // Because the hwvtepConnectionInstance is about to be completely replaced!
218             unregisterEntityForOwnership(hwvtepConnectionInstance);
219
220             removeConnectionInstance(key);
221         }
222
223         hwvtepConnectionInstance = new HwvtepConnectionInstance(this, key, externalClient, getInstanceIdentifier(key),
224                 txInvoker, db);
225         hwvtepConnectionInstance.createTransactInvokers();
226         return hwvtepConnectionInstance;
227     }
228
229     private void putConnectionInstance(ConnectionInfo key,HwvtepConnectionInstance instance) {
230         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
231         clients.put(connectionInfo, instance);
232         LOG.info("Clients after put: {}", clients);
233     }
234
235     public HwvtepConnectionInstance getConnectionInstance(final ConnectionInfo key) {
236         if (key == null) {
237             return null;
238         }
239         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
240         return clients.get(connectionInfo);
241     }
242
243     public HwvtepConnectionInstance getConnectionInstanceFromNodeIid(final InstanceIdentifier<Node> nodeIid) {
244         HwvtepConnectionInstance hwvtepConnectionInstance = nodeIidVsConnectionInstance.get(nodeIid);
245         if (hwvtepConnectionInstance != null) {
246             return hwvtepConnectionInstance;
247         }
248         InstanceIdentifier<Node> globalNodeIid = HwvtepSouthboundUtil.getGlobalNodeIid(nodeIid);
249         if (globalNodeIid != null) {
250             return nodeIidVsConnectionInstance.get(globalNodeIid);
251         }
252         return null;
253     }
254
255     public HwvtepConnectionInstance getConnectionInstance(Node node) {
256         Preconditions.checkNotNull(node);
257         HwvtepGlobalAugmentation hwvtepGlobal = node.getAugmentation(HwvtepGlobalAugmentation.class);
258         PhysicalSwitchAugmentation pSwitchNode = node.getAugmentation(PhysicalSwitchAugmentation.class);
259         if (hwvtepGlobal != null) {
260             if(hwvtepGlobal.getConnectionInfo() != null) {
261                 return getConnectionInstance(hwvtepGlobal.getConnectionInfo());
262             } else {
263                 //TODO: Case of user configured connection for now
264                 //TODO: We could get it from Managers also.
265                 return null;
266             }
267         }
268         else if(pSwitchNode != null){
269             return getConnectionInstance(pSwitchNode);
270         } else {
271             LOG.warn("This is not a node that gives any hint how to find its OVSDB Manager: {}",node);
272             return null;
273         }
274     }
275
276     public HwvtepConnectionInstance getConnectionInstance(HwvtepPhysicalSwitchAttributes pNode) {
277         Optional<HwvtepGlobalAugmentation> optional = HwvtepSouthboundUtil.getManagingNode(db, pNode);
278         if(optional.isPresent()) {
279             return getConnectionInstance(optional.get().getConnectionInfo());
280         } else {
281             return null;
282         }
283     }
284
285     public void stopConfigurationReconciliation(final InstanceIdentifier<Node> nodeIid) {
286         final ReconciliationTask task = new HwvtepReconciliationTask(
287                 reconciliationManager, HwvtepConnectionManager.this, nodeIid, null, null, db);
288
289         reconciliationManager.dequeue(task);
290     }
291
292     public void reconcileConfigurations(final HwvtepConnectionInstance client, Node psNode) {
293         final InstanceIdentifier<Node> nodeIid = client.getInstanceIdentifier();
294         final ReconciliationTask task = new HwvtepReconciliationTask(
295                 reconciliationManager, HwvtepConnectionManager.this, nodeIid, psNode, client, db);
296
297         reconciliationManager.enqueue(task);
298     }
299
300     private void removeConnectionInstance(ConnectionInfo key) {
301         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
302         clients.remove(connectionInfo);
303         LOG.info("Clients after remove: {}", clients);
304     }
305
306     private void putInstanceIdentifier(ConnectionInfo key,InstanceIdentifier<Node> iid) {
307         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
308         instanceIdentifiers.put(connectionInfo, iid);
309     }
310
311     public InstanceIdentifier<Node> getInstanceIdentifier(ConnectionInfo key) {
312         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
313         return instanceIdentifiers.get(connectionInfo);
314     }
315
316     private void removeInstanceIdentifier(ConnectionInfo key) {
317         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
318         instanceIdentifiers.remove(connectionInfo);
319     }
320
321     public OvsdbClient getClient(ConnectionInfo connectionInfo) {
322         return getConnectionInstance(connectionInfo).getOvsdbClient();
323     }
324
325     private void registerEntityForOwnership(HwvtepConnectionInstance hwvtepConnectionInstance) {
326
327         Entity candidateEntity = getEntityFromConnectionInstance(hwvtepConnectionInstance);
328         if (entityConnectionMap.get(candidateEntity) != null) {
329             disconnected(entityConnectionMap.get(candidateEntity).getOvsdbClient());
330             putConnectionInstance(hwvtepConnectionInstance.getInstanceIdentifier(), hwvtepConnectionInstance);
331         }
332         entityConnectionMap.put(candidateEntity, hwvtepConnectionInstance);
333         hwvtepConnectionInstance.setConnectedEntity(candidateEntity);
334
335         try {
336             EntityOwnershipCandidateRegistration registration =
337                     entityOwnershipService.registerCandidate(candidateEntity);
338             hwvtepConnectionInstance.setDeviceOwnershipCandidateRegistration(registration);
339             LOG.info("HWVTEP entity {} is registered for ownership.", candidateEntity);
340
341             //If entity already has owner, it won't get notification from EntityOwnershipService
342             //so cache the connection instances.
343             handleOwnershipState(candidateEntity, hwvtepConnectionInstance);
344         } catch (CandidateAlreadyRegisteredException e) {
345             LOG.warn("OVSDB entity {} was already registered for ownership", candidateEntity, e);
346         }
347
348     }
349
350     private void handleOwnershipState(Entity candidateEntity, HwvtepConnectionInstance hwvtepConnectionInstance) {
351         //If entity already has owner, it won't get notification from EntityOwnershipService
352         //so cache the connection instances.
353         Optional<EntityOwnershipState> ownershipStateOpt =
354                 entityOwnershipService.getOwnershipState(candidateEntity);
355         if (ownershipStateOpt.isPresent()) {
356             EntityOwnershipState ownershipState = ownershipStateOpt.get();
357             putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance);
358             if (ownershipState.hasOwner()) {
359                 hwvtepConnectionInstance.setHasDeviceOwnership(ownershipState.isOwner());
360                 if (!ownershipState.isOwner()) {
361                     LOG.info("HWVTEP entity {} is already owned by other southbound plugin "
362                                     + "instance, so *this* instance is NOT an OWNER of the device",
363                             hwvtepConnectionInstance.getConnectionInfo());
364                 } else {
365                     afterTakingOwnership(hwvtepConnectionInstance);
366                 }
367             }
368         }
369     }
370
371     private void afterTakingOwnership(HwvtepConnectionInstance hwvtepConnectionInstance) {
372         txInvoker.invoke(new HwvtepGlobalRemoveCommand(hwvtepConnectionInstance, null, null));
373         putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance);
374         hwvtepConnectionInstance.setHasDeviceOwnership(true);
375         hwvtepConnectionInstance.registerCallbacks();
376     }
377
378     private Global getHwvtepGlobalTableEntry(HwvtepConnectionInstance connectionInstance) {
379         DatabaseSchema dbSchema = null;
380         Global globalRow = null;
381
382         try {
383             dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get();
384         } catch (InterruptedException | ExecutionException e) {
385             LOG.warn("Not able to fetch schema for database {} from device {}",
386                     HwvtepSchemaConstants.HARDWARE_VTEP,connectionInstance.getConnectionInfo(),e);
387         }
388
389         if (dbSchema != null) {
390             GenericTableSchema hwvtepSchema = TyperUtils.getTableSchema(dbSchema, Global.class);
391
392             List<String> hwvtepTableColumn = new ArrayList<>();
393             hwvtepTableColumn.addAll(hwvtepSchema.getColumns());
394             Select<GenericTableSchema> selectOperation = op.select(hwvtepSchema);
395             selectOperation.setColumns(hwvtepTableColumn);
396
397             ArrayList<Operation> operations = new ArrayList<>();
398             operations.add(selectOperation);
399             operations.add(op.comment("Fetching hardware_vtep table rows"));
400
401             try {
402                 List<OperationResult> results = connectionInstance.transact(dbSchema, operations).get();
403                 if (results != null ) {
404                     OperationResult selectResult = results.get(0);
405                     globalRow = TyperUtils.getTypedRowWrapper(
406                             dbSchema,Global.class,selectResult.getRows().get(0));
407                 }
408             } catch (InterruptedException | ExecutionException e) {
409                 LOG.warn("Not able to fetch hardware_vtep table row from device {}",
410                         connectionInstance.getConnectionInfo(),e);
411             }
412         }
413         LOG.trace("Fetched global {} from hardware_vtep schema",globalRow);
414         return globalRow;
415     }
416
417     private Entity getEntityFromConnectionInstance(@Nonnull HwvtepConnectionInstance hwvtepConnectionInstance) {
418         InstanceIdentifier<Node> iid = hwvtepConnectionInstance.getInstanceIdentifier();
419         if ( iid == null ) {
420             //TODO: Is Global the right one?
421             Global hwvtepGlobalRow = getHwvtepGlobalTableEntry(hwvtepConnectionInstance);
422             iid = HwvtepSouthboundMapper.getInstanceIdentifier(hwvtepGlobalRow);
423             /* Let's set the iid now */
424             hwvtepConnectionInstance.setInstanceIdentifier(iid);
425             LOG.info("InstanceIdentifier {} generated for device "
426                     + "connection {}",iid, hwvtepConnectionInstance.getConnectionInfo());
427
428         }
429         YangInstanceIdentifier entityId =
430                 HwvtepSouthboundUtil.getInstanceIdentifierCodec().getYangInstanceIdentifier(iid);
431         Entity deviceEntity = new Entity(ENTITY_TYPE, entityId);
432         LOG.debug("Entity {} created for device connection {}",
433                 deviceEntity, hwvtepConnectionInstance.getConnectionInfo());
434         return deviceEntity;
435     }
436     private void unregisterEntityForOwnership(HwvtepConnectionInstance hwvtepConnectionInstance) {
437         hwvtepConnectionInstance.closeDeviceOwnershipCandidateRegistration();
438         entityConnectionMap.remove(hwvtepConnectionInstance.getConnectedEntity());
439     }
440
441     public void reconcileConnection(InstanceIdentifier<Node> iid, HwvtepGlobalAugmentation hwvtepNode) {
442         this.retryConnection(iid, hwvtepNode,
443                 ConnectionReconciliationTriggers.ON_CONTROLLER_INITIATED_CONNECTION_FAILURE);
444         }
445
446     public void stopConnectionReconciliationIfActive(InstanceIdentifier<?> iid, HwvtepGlobalAugmentation hwvtepNode) {
447         final ReconciliationTask task = new ConnectionReconciliationTask(
448                 reconciliationManager,
449                 this,
450                 iid,
451                 hwvtepNode);
452         reconciliationManager.dequeue(task);
453     }
454
455     private void retryConnection(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepNode,
456                                  ConnectionReconciliationTriggers trigger) {
457         final ReconciliationTask task = new ConnectionReconciliationTask(
458                 reconciliationManager,
459                 this,
460                 iid,
461                 hwvtepNode);
462
463         if(reconciliationManager.isEnqueued(task)){
464             return;
465         }
466         switch(trigger){
467             case ON_CONTROLLER_INITIATED_CONNECTION_FAILURE:
468                 reconciliationManager.enqueueForRetry(task);
469                 break;
470             case ON_DISCONNECT:
471             {
472                 ReadOnlyTransaction tx = db.newReadOnlyTransaction();
473                 CheckedFuture<Optional<Node>, ReadFailedException> readNodeFuture =
474                         tx.read(LogicalDatastoreType.CONFIGURATION, iid);
475
476                 final HwvtepConnectionManager connectionManager = this;
477                 Futures.addCallback(readNodeFuture, new FutureCallback<Optional<Node>>() {
478                     @Override
479                     public void onSuccess(@Nullable Optional<Node> node) {
480                         if (node.isPresent()) {
481                             HwvtepGlobalAugmentation augmentation = node.get()
482                                     .getAugmentation(HwvtepGlobalAugmentation.class);
483                             if (augmentation == null || augmentation.getConnectionInfo() == null) {
484                                 return;
485                             }
486                             LOG.info("Disconnected/Failed connection {} was controller initiated, attempting " +
487                                     "reconnection", hwvtepNode.getConnectionInfo());
488                             reconciliationManager.enqueue(task);
489
490                         } else {
491                             LOG.debug("Connection {} was switch initiated, no reconciliation is required"
492                                     , iid.firstKeyOf(Node.class).getNodeId());
493                         }
494                     }
495
496                     @Override
497                     public void onFailure(Throwable t) {
498                         LOG.warn("Read Config/DS for Node failed! {}", iid, t);
499                     }
500                 });
501                 break;
502             }
503             default:
504                 break;
505         }
506     }
507
508     public void handleOwnershipChanged(EntityOwnershipChange ownershipChange) {
509         HwvtepConnectionInstance hwvtepConnectionInstance = getConnectionInstanceFromEntity(ownershipChange.getEntity());
510         LOG.info("handleOwnershipChanged: {} event received for device {}",
511                 ownershipChange, hwvtepConnectionInstance != null ? hwvtepConnectionInstance.getConnectionInfo()
512                         : "THAT'S NOT REGISTERED BY THIS SOUTHBOUND PLUGIN INSTANCE");
513
514         if (hwvtepConnectionInstance == null) {
515             if (ownershipChange.isOwner()) {
516                 LOG.warn("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity());
517             } else {
518                 // EntityOwnershipService sends notification to all the nodes, irrespective of whether
519                 // that instance registered for the device ownership or not. It is to make sure that
520                 // If all the controller instance that was connected to the device are down, so the
521                 // running instance can clear up the operational data store even though it was not
522                 // connected to the device.
523                 LOG.debug("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity());
524             }
525
526             // If entity has no owner, clean up the operational data store (it's possible because owner controller
527             // might went down abruptly and didn't get a chance to clean up the operational data store.
528             if (!ownershipChange.hasOwner()) {
529                 LOG.debug("{} has no owner, cleaning up the operational data store", ownershipChange.getEntity());
530                 // If first cleanEntityOperationalData() was called, this call will be no-op.
531                 cleanEntityOperationalData(ownershipChange.getEntity());
532             }
533             return;
534         }
535         //Connection detail need to be cached, irrespective of ownership result.
536         putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance);
537
538         if (ownershipChange.isOwner() == hwvtepConnectionInstance.getHasDeviceOwnership()) {
539             LOG.debug("handleOwnershipChanged: no change in ownership for {}. Ownership status is : {}",
540                     hwvtepConnectionInstance.getConnectionInfo(), hwvtepConnectionInstance.getHasDeviceOwnership());
541             return;
542         }
543
544         hwvtepConnectionInstance.setHasDeviceOwnership(ownershipChange.isOwner());
545         // You were not an owner, but now you are
546         if (ownershipChange.isOwner()) {
547             LOG.info("handleOwnershipChanged: *this* southbound plugin instance is owner of device {}",
548                     hwvtepConnectionInstance.getConnectionInfo());
549
550             //*this* instance of southbound plugin is owner of the device,
551             //so register for monitor callbacks
552             afterTakingOwnership(hwvtepConnectionInstance);
553
554         } else {
555             //You were owner of the device, but now you are not. With the current ownership
556             //grant mechanism, this scenario should not occur. Because this scenario will occur
557             //when this controller went down or switch flap the connection, but in both the case
558             //it will go through the re-registration process. We need to implement this condition
559             //when clustering service implement a ownership grant strategy which can revoke the
560             //device ownership for load balancing the devices across the instances.
561             //Once this condition occur, we should unregister the callback.
562             LOG.error("handleOwnershipChanged: *this* southbound plugin instance is no longer the owner of device {}",
563                     hwvtepConnectionInstance.getNodeId().getValue());
564         }
565     }
566
567     private void cleanEntityOperationalData(Entity entity) {
568         @SuppressWarnings("unchecked") final InstanceIdentifier<Node> nodeIid =
569                 (InstanceIdentifier<Node>) HwvtepSouthboundUtil
570                         .getInstanceIdentifierCodec().bindingDeserializer(entity.getId());
571         txInvoker.invoke(new HwvtepGlobalRemoveCommand(nodeIid));
572     }
573
574     private HwvtepConnectionInstance getConnectionInstanceFromEntity(Entity entity) {
575         return entityConnectionMap.get(entity);
576     }
577
578     void putConnectionInstance(final InstanceIdentifier<Node> nodeIid,
579                                final HwvtepConnectionInstance connectionInstance) {
580         nodeIidVsConnectionInstance.put(nodeIid, connectionInstance);
581     }
582
583     private void removeConnectionInstance(final InstanceIdentifier<Node> nodeIid) {
584          if (nodeIid != null) {
585              nodeIidVsConnectionInstance.remove(nodeIid);
586          }
587     }
588
589     private class HwvtepDeviceEntityOwnershipListener implements EntityOwnershipListener {
590         private HwvtepConnectionManager hcm;
591         private EntityOwnershipListenerRegistration listenerRegistration;
592
593         HwvtepDeviceEntityOwnershipListener(HwvtepConnectionManager hcm, EntityOwnershipService entityOwnershipService) {
594             this.hcm = hcm;
595             listenerRegistration = entityOwnershipService.registerListener(ENTITY_TYPE, this);
596         }
597         public void close() {
598             listenerRegistration.close();
599         }
600         @Override
601         public void ownershipChanged(EntityOwnershipChange ownershipChange) {
602             hcm.handleOwnershipChanged(ownershipChange);
603         }
604     }
605
606     private enum ConnectionReconciliationTriggers {
607         /*
608         Reconciliation trigger for scenario where controller's attempt
609         to connect to switch fails on config data store notification
610         */
611         ON_CONTROLLER_INITIATED_CONNECTION_FAILURE,
612
613         /*
614         Reconciliation trigger for the scenario where controller
615         initiated connection disconnects.
616         */
617         ON_DISCONNECT
618     }
619 }