Handle EOS Timeout for the Hwvtep Node addition.
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / HwvtepConnectionManager.java
1 /*
2  * Copyright (c) 2015, 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.hwvtepsouthbound;
10
11 import static org.opendaylight.ovsdb.lib.operations.Operations.op;
12
13 import com.google.common.base.Optional;
14 import com.google.common.base.Preconditions;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.net.ConnectException;
20 import java.net.InetAddress;
21 import java.net.UnknownHostException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import org.eclipse.jdt.annotation.NonNull;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
35 import org.opendaylight.mdsal.eos.binding.api.Entity;
36 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipCandidateRegistration;
37 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipChange;
38 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListener;
39 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListenerRegistration;
40 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
41 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
42 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
43 import org.opendaylight.ovsdb.hwvtepsouthbound.events.ClientConnected;
44 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationManager;
45 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationTask;
46 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.configuration.HwvtepReconciliationTask;
47 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.connection.ConnectionReconciliationTask;
48 import org.opendaylight.ovsdb.hwvtepsouthbound.transact.DependencyQueue;
49 import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.HwvtepGlobalRemoveCommand;
50 import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionInvoker;
51 import org.opendaylight.ovsdb.lib.OvsdbClient;
52 import org.opendaylight.ovsdb.lib.OvsdbConnection;
53 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
54 import org.opendaylight.ovsdb.lib.operations.Operation;
55 import org.opendaylight.ovsdb.lib.operations.OperationResult;
56 import org.opendaylight.ovsdb.lib.operations.Select;
57 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
58 import org.opendaylight.ovsdb.lib.schema.typed.TypedDatabaseSchema;
59 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
60 import org.opendaylight.ovsdb.schema.hardwarevtep.Global;
61 import org.opendaylight.ovsdb.utils.mdsal.utils.TransactionHistory;
62 import org.opendaylight.ovsdb.utils.mdsal.utils.TransactionType;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepPhysicalSwitchAttributes;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.ConnectionInfo;
67 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
68 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoCloseable {
73     private final Map<ConnectionInfo, HwvtepConnectionInstance> clients = new ConcurrentHashMap<>();
74     private static final Logger LOG = LoggerFactory.getLogger(HwvtepConnectionManager.class);
75     private static final String ENTITY_TYPE = "hwvtep";
76     private static final int DB_FETCH_TIMEOUT = 1000;
77     private static final int TRANSACTION_HISTORY_CAPACITY = 10000;
78     private static final int TRANSACTION_HISTORY_WATERMARK = 7500;
79
80     private final DataBroker db;
81     private final TransactionInvoker txInvoker;
82     private final Map<ConnectionInfo,InstanceIdentifier<Node>> instanceIdentifiers = new ConcurrentHashMap<>();
83     private final Map<Entity, HwvtepConnectionInstance> entityConnectionMap = new ConcurrentHashMap<>();
84     private final EntityOwnershipService entityOwnershipService;
85     private final HwvtepDeviceEntityOwnershipListener hwvtepDeviceEntityOwnershipListener;
86     private final ReconciliationManager reconciliationManager;
87     private final Map<InstanceIdentifier<Node>, HwvtepConnectionInstance> nodeIidVsConnectionInstance =
88             new ConcurrentHashMap<>();
89     private final HwvtepOperGlobalListener hwvtepOperGlobalListener;
90     private final Map<InstanceIdentifier<Node>, TransactionHistory> controllerTxHistory = new ConcurrentHashMap<>();
91     private final Map<InstanceIdentifier<Node>, TransactionHistory> deviceUpdateHistory = new ConcurrentHashMap<>();
92     private final OvsdbConnection ovsdbConnectionService;
93
94     public HwvtepConnectionManager(final DataBroker db, final TransactionInvoker txInvoker,
95                     final EntityOwnershipService entityOwnershipService, final OvsdbConnection ovsdbConnectionService) {
96         this.db = db;
97         this.txInvoker = txInvoker;
98         this.entityOwnershipService = entityOwnershipService;
99         this.hwvtepDeviceEntityOwnershipListener = new HwvtepDeviceEntityOwnershipListener(this,entityOwnershipService);
100         this.reconciliationManager = new ReconciliationManager(db);
101         this.hwvtepOperGlobalListener = new HwvtepOperGlobalListener(db, this);
102         this.ovsdbConnectionService = ovsdbConnectionService;
103     }
104
105     @Override
106     public void close() throws Exception {
107         if (hwvtepDeviceEntityOwnershipListener != null) {
108             hwvtepDeviceEntityOwnershipListener.close();
109         }
110
111         for (HwvtepConnectionInstance client: clients.values()) {
112             client.disconnect();
113         }
114         DependencyQueue.close();
115     }
116
117     @Override
118     public void connected(final OvsdbClient externalClient) {
119         HwvtepConnectionInstance hwClient = null;
120         try {
121             List<String> databases = externalClient.getDatabases().get(DB_FETCH_TIMEOUT, TimeUnit.MILLISECONDS);
122             if (databases != null && !databases.isEmpty() && databases.contains(HwvtepSchemaConstants.HARDWARE_VTEP)) {
123                 LOG.info("Hwvtep Library connected {} from {}:{} to {}:{}",
124                         externalClient.getConnectionInfo().getType(),
125                         externalClient.getConnectionInfo().getRemoteAddress(),
126                         externalClient.getConnectionInfo().getRemotePort(),
127                         externalClient.getConnectionInfo().getLocalAddress(),
128                         externalClient.getConnectionInfo().getLocalPort());
129                 hwClient = connectedButCallBacksNotRegistered(externalClient);
130                 registerEntityForOwnership(hwClient);
131                 HwvtepOperGlobalListener.runAfterTimeoutIfNodeNotCreated(hwClient.getInstanceIdentifier(), () -> {
132                     externalClient.disconnect();
133                     disconnected(externalClient);
134                 });
135             }
136         } catch (InterruptedException | ExecutionException | TimeoutException e) {
137             LOG.warn("Unable to fetch Database list from device {}. Disconnecting from the device.",
138                     externalClient.getConnectionInfo().getRemoteAddress(), e);
139             externalClient.disconnect();
140         }
141     }
142
143     @Override
144     @SuppressWarnings("checkstyle:IllegalCatch")
145     public void disconnected(final OvsdbClient client) {
146         HwvtepConnectionInstance hwvtepConnectionInstance = null;
147         try {
148             LOG.info("Library disconnected {} from {}:{} to {}:{}. Cleaning up the operational data store",
149                     client.getConnectionInfo().getType(),
150                     client.getConnectionInfo().getRemoteAddress(),
151                     client.getConnectionInfo().getRemotePort(),
152                     client.getConnectionInfo().getLocalAddress(),
153                     client.getConnectionInfo().getLocalPort());
154             ConnectionInfo key = HwvtepSouthboundMapper.createConnectionInfo(client);
155             hwvtepConnectionInstance = getConnectionInstance(key);
156             if (hwvtepConnectionInstance != null) {
157                 if (hwvtepConnectionInstance.getInstanceIdentifier() != null) {
158                     deviceUpdateHistory.get(hwvtepConnectionInstance.getInstanceIdentifier()).addToHistory(
159                             TransactionType.DELETE, new ClientConnected(client.getConnectionInfo().getRemotePort()));
160                 }
161
162
163                 // Unregister Entity ownership as soon as possible ,so this instance should
164                 // not be used as a candidate in Entity election (given that this instance is
165                 // about to disconnect as well), if current owner get disconnected from
166                 // HWVTEP device.
167                 if (hwvtepConnectionInstance.getHasDeviceOwnership()) {
168                     unregisterEntityForOwnership(hwvtepConnectionInstance);
169                     txInvoker.invoke(new HwvtepGlobalRemoveCommand(hwvtepConnectionInstance, null, null));
170                 } else {
171                     unregisterEntityForOwnership(hwvtepConnectionInstance);
172                     //Do not delete if client disconnected from follower HwvtepGlobalRemoveCommand
173                 }
174
175                 removeConnectionInstance(key);
176
177                 //Controller initiated connection can be terminated from switch side.
178                 //So cleanup the instance identifier cache.
179                 removeInstanceIdentifier(key);
180                 removeConnectionInstance(hwvtepConnectionInstance.getInstanceIdentifier());
181                 retryConnection(hwvtepConnectionInstance.getInstanceIdentifier(),
182                         hwvtepConnectionInstance.getHwvtepGlobalAugmentation(),
183                         ConnectionReconciliationTriggers.ON_DISCONNECT);
184             } else {
185                 LOG.warn("HWVTEP disconnected event did not find connection instance for {}", key);
186             }
187             LOG.trace("HwvtepConnectionManager exit disconnected client: {}", client);
188         } catch (Exception e) {
189             LOG.error("Failed to execute disconnected ",e);
190         }
191     }
192
193     public OvsdbClient connect(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepGlobal)
194             throws UnknownHostException, ConnectException {
195         LOG.info("Connecting to {}", HwvtepSouthboundUtil.connectionInfoToString(hwvtepGlobal.getConnectionInfo()));
196         InetAddress ip = HwvtepSouthboundMapper.createInetAddress(hwvtepGlobal.getConnectionInfo().getRemoteIp());
197         OvsdbClient client = ovsdbConnectionService
198                         .connect(ip, hwvtepGlobal.getConnectionInfo().getRemotePort().getValue().toJava());
199         if (client != null) {
200             putInstanceIdentifier(hwvtepGlobal.getConnectionInfo(), iid.firstIdentifierOf(Node.class));
201             HwvtepConnectionInstance hwvtepConnectionInstance = connectedButCallBacksNotRegistered(client);
202             hwvtepConnectionInstance.setHwvtepGlobalAugmentation(hwvtepGlobal);
203             hwvtepConnectionInstance.setInstanceIdentifier(iid.firstIdentifierOf(Node.class));
204
205             // Register Cluster Ownership for ConnectionInfo
206             registerEntityForOwnership(hwvtepConnectionInstance);
207         } else {
208             LOG.warn("Failed to connect to OVSDB node: {}", hwvtepGlobal.getConnectionInfo());
209         }
210         return client;
211     }
212
213     public void disconnect(final HwvtepGlobalAugmentation ovsdbNode) throws UnknownHostException {
214         LOG.info("Diconnecting from {}", HwvtepSouthboundUtil.connectionInfoToString(ovsdbNode.getConnectionInfo()));
215         HwvtepConnectionInstance client = getConnectionInstance(ovsdbNode.getConnectionInfo());
216         if (client != null) {
217             client.disconnect();
218             // Unregister Cluster Ownership for ConnectionInfo
219             unregisterEntityForOwnership(client);
220             removeInstanceIdentifier(ovsdbNode.getConnectionInfo());
221         }
222     }
223
224     public HwvtepConnectionInstance connectedButCallBacksNotRegistered(final OvsdbClient externalClient) {
225         LOG.info("OVSDB Connection from {}:{}",externalClient.getConnectionInfo().getRemoteAddress(),
226                 externalClient.getConnectionInfo().getRemotePort());
227         ConnectionInfo key = HwvtepSouthboundMapper.createConnectionInfo(externalClient);
228         HwvtepConnectionInstance hwvtepConnectionInstance = getConnectionInstance(key);
229
230         // Check if existing hwvtepConnectionInstance for the OvsdbClient present.
231         // In such cases, we will see if the hwvtepConnectionInstance has same externalClient.
232         if (hwvtepConnectionInstance != null) {
233             if (hwvtepConnectionInstance.hasOvsdbClient(externalClient)) {
234                 LOG.warn("HWVTEP Connection Instance {} already exists for client {}", key, externalClient);
235                 return hwvtepConnectionInstance;
236             }
237             LOG.warn("HWVTEP Connection Instance {} being replaced with client {}", key, externalClient);
238             hwvtepConnectionInstance.disconnect();
239
240             // Unregister Cluster Ownership for ConnectionInfo
241             // Because the hwvtepConnectionInstance is about to be completely replaced!
242             unregisterEntityForOwnership(hwvtepConnectionInstance);
243
244             removeConnectionInstance(key);
245         }
246
247         hwvtepConnectionInstance = new HwvtepConnectionInstance(this, key,
248                 externalClient, getInstanceIdentifier(key), txInvoker, db);
249         hwvtepConnectionInstance.createTransactInvokers();
250         return hwvtepConnectionInstance;
251     }
252
253     private void putConnectionInstance(final ConnectionInfo key,final HwvtepConnectionInstance instance) {
254         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
255         clients.put(connectionInfo, instance);
256         LOG.info("Clients after put: {}", clients);
257     }
258
259     void putConnectionInstance(final InstanceIdentifier<Node> nodeIid,
260                                        final HwvtepConnectionInstance connectionInstance) {
261         nodeIidVsConnectionInstance.put(nodeIid, connectionInstance);
262     }
263
264     public HwvtepConnectionInstance getConnectionInstance(final ConnectionInfo key) {
265         if (key == null) {
266             return null;
267         }
268         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
269         return clients.get(connectionInfo);
270     }
271
272     public HwvtepConnectionInstance getConnectionInstance(final Node node) {
273         Preconditions.checkNotNull(node);
274         HwvtepGlobalAugmentation hwvtepGlobal = node.augmentation(HwvtepGlobalAugmentation.class);
275         PhysicalSwitchAugmentation switchNode = node.augmentation(PhysicalSwitchAugmentation.class);
276         if (hwvtepGlobal != null) {
277             if (hwvtepGlobal.getConnectionInfo() != null) {
278                 return getConnectionInstance(hwvtepGlobal.getConnectionInfo());
279             } else {
280                 //TODO: Case of user configured connection for now
281                 //TODO: We could get it from Managers also.
282                 return null;
283             }
284         }
285         else if (switchNode != null) {
286             return getConnectionInstance(switchNode);
287         } else {
288             LOG.warn("This is not a node that gives any hint how to find its OVSDB Manager: {}",node);
289             return null;
290         }
291     }
292
293     public HwvtepConnectionInstance getConnectionInstance(final HwvtepPhysicalSwitchAttributes node) {
294         Optional<HwvtepGlobalAugmentation> optional = HwvtepSouthboundUtil.getManagingNode(db, node);
295         if (optional.isPresent()) {
296             return getConnectionInstance(optional.get().getConnectionInfo());
297         } else {
298             return null;
299         }
300     }
301
302     public HwvtepConnectionInstance getConnectionInstanceFromNodeIid(final InstanceIdentifier<Node> nodeIid) {
303         HwvtepConnectionInstance hwvtepConnectionInstance = nodeIidVsConnectionInstance.get(nodeIid);
304         if (hwvtepConnectionInstance != null) {
305             return hwvtepConnectionInstance;
306         }
307         InstanceIdentifier<Node> globalNodeIid = HwvtepSouthboundUtil.getGlobalNodeIid(nodeIid);
308         if (globalNodeIid != null) {
309             return nodeIidVsConnectionInstance.get(globalNodeIid);
310         }
311         return null;
312     }
313
314     public void stopConfigurationReconciliation(final InstanceIdentifier<Node> nodeIid) {
315         final ReconciliationTask task = new HwvtepReconciliationTask(
316                 reconciliationManager, HwvtepConnectionManager.this, nodeIid, null, null, db);
317
318         reconciliationManager.dequeue(task);
319     }
320
321     public void reconcileConfigurations(final HwvtepConnectionInstance client, final Node psNode) {
322         final InstanceIdentifier<Node> nodeIid = client.getInstanceIdentifier();
323         final ReconciliationTask task = new HwvtepReconciliationTask(
324                 reconciliationManager, HwvtepConnectionManager.this, nodeIid, psNode, client, db);
325
326         reconciliationManager.enqueue(task);
327     }
328
329     private void removeConnectionInstance(final ConnectionInfo key) {
330         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
331         clients.remove(connectionInfo);
332         LOG.info("Clients after remove: {}", clients);
333     }
334
335     private void removeConnectionInstance(final InstanceIdentifier<Node> nodeIid) {
336         if (nodeIid != null) {
337             nodeIidVsConnectionInstance.remove(nodeIid);
338         }
339     }
340
341     private void putInstanceIdentifier(final ConnectionInfo key,final InstanceIdentifier<Node> iid) {
342         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
343         instanceIdentifiers.put(connectionInfo, iid);
344     }
345
346     public InstanceIdentifier<Node> getInstanceIdentifier(final ConnectionInfo key) {
347         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
348         return instanceIdentifiers.get(connectionInfo);
349     }
350
351     private void removeInstanceIdentifier(final ConnectionInfo key) {
352         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
353         instanceIdentifiers.remove(connectionInfo);
354     }
355
356     public OvsdbClient getClient(final ConnectionInfo connectionInfo) {
357         return getConnectionInstance(connectionInfo).getOvsdbClient();
358     }
359
360     @SuppressWarnings("checkstyle:IllegalCatch")
361     private void registerCallbacks(HwvtepConnectionInstance hwvtepConnectionInstance) {
362         LOG.info("HWVTEP entity {} is owned by this controller registering callbacks",
363                 hwvtepConnectionInstance.getConnectionInfo());
364         try {
365             hwvtepOperGlobalListener.runAfterNodeDeleted(
366                 hwvtepConnectionInstance.getInstanceIdentifier(), () -> {
367                     cleanupOperationalNode(hwvtepConnectionInstance.getInstanceIdentifier());
368                     hwvtepConnectionInstance.registerCallbacks();
369                     return null;
370                 });
371         } catch (Exception e) {
372             LOG.error("Failed to register callbacks for HWVTEP entity {} ",
373                     hwvtepConnectionInstance.getConnectionInfo(), e);
374         }
375     }
376
377
378     private void registerEntityForOwnership(HwvtepConnectionInstance hwvtepConnectionInstance) {
379
380         Entity candidateEntity = getEntityFromConnectionInstance(hwvtepConnectionInstance);
381         if (entityConnectionMap.get(candidateEntity) != null) {
382             InstanceIdentifier<Node> iid = hwvtepConnectionInstance.getInstanceIdentifier();
383             disconnected(entityConnectionMap.get(candidateEntity).getOvsdbClient());
384             hwvtepConnectionInstance.setInstanceIdentifier(iid);
385             putConnectionInstance(hwvtepConnectionInstance.getInstanceIdentifier(), hwvtepConnectionInstance);
386         }
387         entityConnectionMap.put(candidateEntity, hwvtepConnectionInstance);
388         hwvtepConnectionInstance.setConnectedEntity(candidateEntity);
389
390         try {
391             EntityOwnershipCandidateRegistration registration =
392                     entityOwnershipService.registerCandidate(candidateEntity);
393             hwvtepConnectionInstance.setDeviceOwnershipCandidateRegistration(registration);
394             LOG.info("HWVTEP entity {} is registered for ownership.", candidateEntity);
395         } catch (CandidateAlreadyRegisteredException e) {
396             LOG.warn("OVSDB entity {} was already registered for ownership", candidateEntity, e);
397         }
398         handleOwnershipState(candidateEntity, hwvtepConnectionInstance);
399     }
400
401     private void handleOwnershipState(final Entity candidateEntity,
402             final HwvtepConnectionInstance hwvtepConnectionInstance) {
403         //If entity already has owner, it won't get notification from EntityOwnershipService
404         //so cache the connection instances.
405         java.util.Optional<EntityOwnershipState> ownershipStateOpt =
406                 entityOwnershipService.getOwnershipState(candidateEntity);
407         if (ownershipStateOpt.isPresent()) {
408             EntityOwnershipState ownershipState = ownershipStateOpt.get();
409             putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance);
410             if (ownershipState != EntityOwnershipState.NO_OWNER) {
411                 hwvtepConnectionInstance.setHasDeviceOwnership(ownershipState == EntityOwnershipState.IS_OWNER);
412                 if (ownershipState != EntityOwnershipState.IS_OWNER) {
413                     LOG.info("HWVTEP entity {} is already owned by other southbound plugin "
414                                     + "instance, so *this* instance is NOT an OWNER of the device",
415                             hwvtepConnectionInstance.getConnectionInfo());
416                 } else {
417                     registerCallbacks(hwvtepConnectionInstance);
418                 }
419             }
420         }
421     }
422
423     private static Global getHwvtepGlobalTableEntry(final HwvtepConnectionInstance connectionInstance) {
424         final TypedDatabaseSchema dbSchema;
425         try {
426             dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get();
427         } catch (InterruptedException | ExecutionException e) {
428             LOG.warn("Not able to fetch schema for database {} from device {}",
429                     HwvtepSchemaConstants.HARDWARE_VTEP, connectionInstance.getConnectionInfo(), e);
430             return null;
431         }
432
433         GenericTableSchema hwvtepSchema = dbSchema.getTableSchema(Global.class);
434         Select<GenericTableSchema> selectOperation = op.select(hwvtepSchema);
435         selectOperation.setColumns(hwvtepSchema.getColumnList());
436
437         ArrayList<Operation> operations = new ArrayList<>(2);
438         operations.add(selectOperation);
439         operations.add(op.comment("Fetching hardware_vtep table rows"));
440
441         final List<OperationResult> results;
442         try {
443             results = connectionInstance.transact(dbSchema, operations).get();
444         } catch (InterruptedException | ExecutionException e) {
445             LOG.warn("Not able to fetch hardware_vtep table row from device {}", connectionInstance.getConnectionInfo(),
446                 e);
447             return null;
448         }
449
450         final Global globalRow;
451         if (results != null) {
452             OperationResult selectResult = results.get(0);
453             globalRow = TyperUtils.getTypedRowWrapper(dbSchema,Global.class,selectResult.getRows().get(0));
454         } else {
455             globalRow = null;
456         }
457         LOG.trace("Fetched global {} from hardware_vtep schema", globalRow);
458         return globalRow;
459     }
460
461     private Entity getEntityFromConnectionInstance(@NonNull final HwvtepConnectionInstance hwvtepConnectionInstance) {
462         InstanceIdentifier<Node> iid = hwvtepConnectionInstance.getInstanceIdentifier();
463         if (iid == null) {
464             //TODO: Is Global the right one?
465             Global hwvtepGlobalRow = getHwvtepGlobalTableEntry(hwvtepConnectionInstance);
466             iid = HwvtepSouthboundMapper.getInstanceIdentifier(hwvtepGlobalRow);
467             /* Let's set the iid now */
468             hwvtepConnectionInstance.setInstanceIdentifier(iid);
469             LOG.info("InstanceIdentifier {} generated for device "
470                     + "connection {}",iid, hwvtepConnectionInstance.getConnectionInfo());
471             controllerTxHistory.putIfAbsent(iid,
472                     new TransactionHistory(TRANSACTION_HISTORY_CAPACITY, TRANSACTION_HISTORY_WATERMARK));
473             deviceUpdateHistory.putIfAbsent(iid,
474                     new TransactionHistory(TRANSACTION_HISTORY_CAPACITY, TRANSACTION_HISTORY_WATERMARK));
475             TransactionHistory controllerLog = controllerTxHistory.get(iid);
476             TransactionHistory deviceLog = deviceUpdateHistory.get(iid);
477             int port = hwvtepConnectionInstance.getOvsdbClient().getConnectionInfo().getRemotePort();
478             deviceLog.addToHistory(TransactionType.ADD, new ClientConnected(port));
479             hwvtepConnectionInstance.setControllerTxHistory(controllerLog);
480             hwvtepConnectionInstance.setDeviceUpdateHistory(deviceLog);
481         }
482         Entity deviceEntity = new Entity(ENTITY_TYPE, iid);
483         LOG.debug("Entity {} created for device connection {}",
484                 deviceEntity, hwvtepConnectionInstance.getConnectionInfo());
485         return deviceEntity;
486     }
487
488     private void unregisterEntityForOwnership(final HwvtepConnectionInstance hwvtepConnectionInstance) {
489         hwvtepConnectionInstance.closeDeviceOwnershipCandidateRegistration();
490         entityConnectionMap.remove(hwvtepConnectionInstance.getConnectedEntity());
491     }
492
493     public void reconcileConnection(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepNode) {
494         this.retryConnection(iid, hwvtepNode,
495                 ConnectionReconciliationTriggers.ON_CONTROLLER_INITIATED_CONNECTION_FAILURE);
496     }
497
498     public void stopConnectionReconciliationIfActive(final InstanceIdentifier<?> iid,
499             final HwvtepGlobalAugmentation hwvtepNode) {
500         final ReconciliationTask task = new ConnectionReconciliationTask(
501                 reconciliationManager,
502                 this,
503                 iid,
504                 hwvtepNode);
505         reconciliationManager.dequeue(task);
506     }
507
508     private void retryConnection(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepNode,
509                                  ConnectionReconciliationTriggers trigger) {
510         if (hwvtepNode == null) {
511             //switch initiated connection
512             return;
513         }
514         final ReconciliationTask task = new ConnectionReconciliationTask(
515                 reconciliationManager,
516                 this,
517                 iid,
518                 hwvtepNode);
519
520         if (reconciliationManager.isEnqueued(task)) {
521             return;
522         }
523
524         switch (trigger) {
525             case ON_CONTROLLER_INITIATED_CONNECTION_FAILURE:
526                 reconciliationManager.enqueueForRetry(task);
527                 break;
528             case ON_DISCONNECT: {
529                 ReadOnlyTransaction tx = db.newReadOnlyTransaction();
530                 CheckedFuture<Optional<Node>, ReadFailedException> readNodeFuture =
531                         tx.read(LogicalDatastoreType.CONFIGURATION, iid);
532
533                 Futures.addCallback(readNodeFuture, new FutureCallback<Optional<Node>>() {
534                     @Override
535                     public void onSuccess(final Optional<Node> node) {
536                         if (node.isPresent()) {
537                             HwvtepGlobalAugmentation augmentation = node.get()
538                                     .augmentation(HwvtepGlobalAugmentation.class);
539                             if (augmentation == null || augmentation.getConnectionInfo() == null) {
540                                 return;
541                             }
542                             LOG.info(
543                                 "Disconnected/Failed connection {} was controller initiated, attempting reconnection",
544                                 hwvtepNode.getConnectionInfo());
545                             reconciliationManager.enqueue(task);
546
547                         } else {
548                             LOG.debug("Connection {} was switch initiated, no reconciliation is required",
549                                 iid.firstKeyOf(Node.class).getNodeId());
550                         }
551                     }
552
553                     @Override
554                     public void onFailure(final Throwable ex) {
555                         LOG.warn("Read Config/DS for Node failed! {}", iid, ex);
556                     }
557                 }, MoreExecutors.directExecutor());
558                 break;
559             }
560             default:
561                 break;
562         }
563     }
564
565     public void handleOwnershipChanged(final EntityOwnershipChange ownershipChange) {
566         HwvtepConnectionInstance hwvtepConnectionInstance =
567                 getConnectionInstanceFromEntity(ownershipChange.getEntity());
568         LOG.info("handleOwnershipChanged: {} event received for device {}",
569                 ownershipChange, hwvtepConnectionInstance != null ? hwvtepConnectionInstance.getConnectionInfo()
570                         : "THAT'S NOT REGISTERED BY THIS SOUTHBOUND PLUGIN INSTANCE");
571
572         if (hwvtepConnectionInstance == null) {
573             if (ownershipChange.getState().isOwner()) {
574                 LOG.warn("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity());
575             } else {
576                 // EntityOwnershipService sends notification to all the nodes, irrespective of whether
577                 // that instance registered for the device ownership or not. It is to make sure that
578                 // If all the controller instance that was connected to the device are down, so the
579                 // running instance can clear up the operational data store even though it was not
580                 // connected to the device.
581                 LOG.debug("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity());
582             }
583
584             // If entity has no owner, clean up the operational data store (it's possible because owner controller
585             // might went down abruptly and didn't get a chance to clean up the operational data store.
586             if (!ownershipChange.getState().hasOwner()) {
587                 LOG.debug("{} has no owner, cleaning up the operational data store", ownershipChange.getEntity());
588                 // Below code might look weird but it's required. We want to give first opportunity to the
589                 // previous owner of the device to clean up the operational data store if there is no owner now.
590                 // That way we will avoid lot of nasty md-sal exceptions because of concurrent delete.
591                 InstanceIdentifier<Node> nodeIid =
592                         (InstanceIdentifier<Node>) ownershipChange.getEntity().getIdentifier();
593                 hwvtepOperGlobalListener.scheduleOldConnectionNodeDelete(nodeIid);
594                 /*
595                 Assuming node1 was the owner earlier.
596                 If the owner relinquished he would have cleaned it already in which case the above would be a no op
597                 If the owner crashed then the above would clean the node after the scheduled delay
598                 The live nodes (two and three) will try to cleanup but that is ok one of them ends up cleaning.
599                 But if the southbound connects again that connection can itself trigger the pending cleanup and
600                 the above op would become noop again.
601
602                 In summary
603                 In The following cases it would be a noop
604                 1) The southbound connects again within the scheduled cleanup delay.
605                 2) The owner node1 which is not crashed cleaned the node properly.
606
607                 In the following case both node2 and node3 will try to clean it (one of them will succeed ).
608                  1) node1 which was the owner crashed
609                  */
610             }
611             return;
612         }
613         //Connection detail need to be cached, irrespective of ownership result.
614         putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance);
615
616         if (ownershipChange.getState().isOwner() == hwvtepConnectionInstance.getHasDeviceOwnership()) {
617             LOG.debug("handleOwnershipChanged: no change in ownership for {}. Ownership status is : {}",
618                     hwvtepConnectionInstance.getConnectionInfo(), hwvtepConnectionInstance.getHasDeviceOwnership());
619             return;
620         }
621
622         hwvtepConnectionInstance.setHasDeviceOwnership(ownershipChange.getState().isOwner());
623         // You were not an owner, but now you are
624         if (ownershipChange.getState().isOwner()) {
625             LOG.info("handleOwnershipChanged: *this* southbound plugin instance is owner of device {}",
626                     hwvtepConnectionInstance.getConnectionInfo());
627
628             //*this* instance of southbound plugin is owner of the device,
629             //so register for monitor callbacks
630             registerCallbacks(hwvtepConnectionInstance);
631
632         } else {
633             //You were owner of the device, but now you are not. With the current ownership
634             //grant mechanism, this scenario should not occur. Because this scenario will occur
635             //when this controller went down or switch flap the connection, but in both the case
636             //it will go through the re-registration process. We need to implement this condition
637             //when clustering service implement a ownership grant strategy which can revoke the
638             //device ownership for load balancing the devices across the instances.
639             //Once this condition occur, we should unregister the callback.
640             LOG.error("handleOwnershipChanged: *this* southbound plugin instance is no longer the owner of device {}",
641                     hwvtepConnectionInstance.getNodeId().getValue());
642         }
643     }
644
645     private HwvtepConnectionInstance getConnectionInstanceFromEntity(final Entity entity) {
646         return entityConnectionMap.get(entity);
647     }
648
649     public Map<InstanceIdentifier<Node>, TransactionHistory> getControllerTxHistory() {
650         return controllerTxHistory;
651     }
652
653     public Map<InstanceIdentifier<Node>, TransactionHistory> getDeviceUpdateHistory() {
654         return deviceUpdateHistory;
655     }
656
657     private static class HwvtepDeviceEntityOwnershipListener implements EntityOwnershipListener {
658         private final HwvtepConnectionManager hcm;
659         private final EntityOwnershipListenerRegistration listenerRegistration;
660
661         HwvtepDeviceEntityOwnershipListener(final HwvtepConnectionManager hcm,
662                 final EntityOwnershipService entityOwnershipService) {
663             this.hcm = hcm;
664             listenerRegistration = entityOwnershipService.registerListener(ENTITY_TYPE, this);
665         }
666
667         public void close() {
668             listenerRegistration.close();
669         }
670
671         @Override
672         public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
673             hcm.handleOwnershipChanged(ownershipChange);
674         }
675     }
676
677     private enum ConnectionReconciliationTriggers {
678         /*
679         Reconciliation trigger for scenario where controller's attempt
680         to connect to switch fails on config data store notification
681         */
682         ON_CONTROLLER_INITIATED_CONNECTION_FAILURE,
683
684         /*
685         Reconciliation trigger for the scenario where controller
686         initiated connection disconnects.
687         */
688         ON_DISCONNECT
689     }
690
691     public Map<InstanceIdentifier<Node>, HwvtepConnectionInstance> getAllConnectedInstances() {
692         return Collections.unmodifiableMap(nodeIidVsConnectionInstance);
693     }
694
695     public void cleanupOperationalNode(InstanceIdentifier<Node> nodeIid) {
696         txInvoker.invoke(new HwvtepGlobalRemoveCommand(nodeIid));
697     }
698 }