21ead14ef898f912e74d62abc9c916bd2a77c906
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / HwvtepConnectionManager.java
1 /*
2  * Copyright (c) 2015, 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.hwvtepsouthbound;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.ovsdb.lib.operations.Operations.op;
12
13 import com.google.common.collect.Maps;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import java.net.ConnectException;
19 import java.net.InetAddress;
20 import java.net.UnknownHostException;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Optional;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import org.eclipse.jdt.annotation.NonNull;
31 import org.opendaylight.mdsal.binding.api.DataBroker;
32 import org.opendaylight.mdsal.binding.api.ReadTransaction;
33 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
34 import org.opendaylight.mdsal.eos.binding.api.Entity;
35 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipCandidateRegistration;
36 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipChange;
37 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListener;
38 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListenerRegistration;
39 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
40 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
41 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
42 import org.opendaylight.ovsdb.hwvtepsouthbound.events.ClientConnected;
43 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationManager;
44 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationTask;
45 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.configuration.HwvtepReconciliationTask;
46 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.connection.ConnectionReconciliationTask;
47 import org.opendaylight.ovsdb.hwvtepsouthbound.transact.DependencyQueue;
48 import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.HwvtepGlobalRemoveCommand;
49 import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionInvoker;
50 import org.opendaylight.ovsdb.lib.OvsdbClient;
51 import org.opendaylight.ovsdb.lib.OvsdbConnection;
52 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
53 import org.opendaylight.ovsdb.lib.operations.Operation;
54 import org.opendaylight.ovsdb.lib.operations.OperationResult;
55 import org.opendaylight.ovsdb.lib.operations.Select;
56 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
57 import org.opendaylight.ovsdb.lib.schema.typed.TypedDatabaseSchema;
58 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
59 import org.opendaylight.ovsdb.schema.hardwarevtep.Global;
60 import org.opendaylight.ovsdb.utils.mdsal.utils.TransactionHistory;
61 import org.opendaylight.ovsdb.utils.mdsal.utils.TransactionType;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepPhysicalSwitchAttributes;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.ConnectionInfo;
66 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70
71 public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoCloseable {
72     private final Map<ConnectionInfo, HwvtepConnectionInstance> clients = new ConcurrentHashMap<>();
73     private static final Logger LOG = LoggerFactory.getLogger(HwvtepConnectionManager.class);
74     private static final String ENTITY_TYPE = "hwvtep";
75     private static final int DB_FETCH_TIMEOUT = 1000;
76     private static final int TRANSACTION_HISTORY_CAPACITY = 10000;
77     private static final int TRANSACTION_HISTORY_WATERMARK = 7500;
78
79     private final DataBroker db;
80     private final TransactionInvoker txInvoker;
81     private final Map<ConnectionInfo,InstanceIdentifier<Node>> instanceIdentifiers = new ConcurrentHashMap<>();
82     private final Map<Entity, HwvtepConnectionInstance> entityConnectionMap = new ConcurrentHashMap<>();
83     private final EntityOwnershipService entityOwnershipService;
84     private final HwvtepDeviceEntityOwnershipListener hwvtepDeviceEntityOwnershipListener;
85     private final ReconciliationManager reconciliationManager;
86     private final Map<InstanceIdentifier<Node>, HwvtepConnectionInstance> nodeIidVsConnectionInstance =
87             new ConcurrentHashMap<>();
88     private final HwvtepOperGlobalListener hwvtepOperGlobalListener;
89     private final Map<InstanceIdentifier<Node>, TransactionHistory> controllerTxHistory = new ConcurrentHashMap<>();
90     private final Map<InstanceIdentifier<Node>, TransactionHistory> deviceUpdateHistory = new ConcurrentHashMap<>();
91     private final OvsdbConnection ovsdbConnectionService;
92     private final Map<OvsdbClient, OvsdbClient> alreadyProcessedClients = new ConcurrentHashMap<>();
93
94     public HwvtepConnectionManager(final DataBroker db, final TransactionInvoker txInvoker,
95                     final EntityOwnershipService entityOwnershipService, final OvsdbConnection ovsdbConnectionService) {
96         this.db = db;
97         this.txInvoker = txInvoker;
98         this.entityOwnershipService = entityOwnershipService;
99         this.hwvtepDeviceEntityOwnershipListener = new HwvtepDeviceEntityOwnershipListener(this,entityOwnershipService);
100         this.reconciliationManager = new ReconciliationManager(db);
101         this.hwvtepOperGlobalListener = new HwvtepOperGlobalListener(db, this);
102         this.ovsdbConnectionService = ovsdbConnectionService;
103     }
104
105     @Override
106     public void close() throws Exception {
107         if (hwvtepDeviceEntityOwnershipListener != null) {
108             hwvtepDeviceEntityOwnershipListener.close();
109         }
110         if (hwvtepOperGlobalListener != null) {
111             hwvtepOperGlobalListener.close();
112         }
113
114         for (HwvtepConnectionInstance client: clients.values()) {
115             client.disconnect();
116         }
117         DependencyQueue.close();
118     }
119
120     @Override
121     public void connected(final OvsdbClient externalClient) {
122         if (alreadyProcessedClients.containsKey(externalClient)) {
123             LOG.info("Hwvtep Library already connected {} from {}:{} to {}:{} to this, hence skipping the processing",
124                     externalClient.getConnectionInfo().getType(),
125                     externalClient.getConnectionInfo().getRemoteAddress(),
126                     externalClient.getConnectionInfo().getRemotePort(),
127                     externalClient.getConnectionInfo().getLocalAddress(),
128                     externalClient.getConnectionInfo().getLocalPort());
129             return;
130         }
131         alreadyProcessedClients.put(externalClient, externalClient);
132         HwvtepConnectionInstance hwClient = null;
133         try {
134             List<String> databases = externalClient.getDatabases().get(DB_FETCH_TIMEOUT, TimeUnit.MILLISECONDS);
135             if (databases != null && !databases.isEmpty() && databases.contains(HwvtepSchemaConstants.HARDWARE_VTEP)) {
136                 LOG.info("Hwvtep Library connected {} from {}:{} to {}:{}",
137                         externalClient.getConnectionInfo().getType(),
138                         externalClient.getConnectionInfo().getRemoteAddress(),
139                         externalClient.getConnectionInfo().getRemotePort(),
140                         externalClient.getConnectionInfo().getLocalAddress(),
141                         externalClient.getConnectionInfo().getLocalPort());
142                 hwClient = connectedButCallBacksNotRegistered(externalClient);
143                 registerEntityForOwnership(hwClient);
144                 HwvtepOperGlobalListener.runAfterTimeoutIfNodeNotCreated(hwClient.getInstanceIdentifier(), () -> {
145                     externalClient.disconnect();
146                     disconnected(externalClient);
147                 });
148             }
149         } catch (InterruptedException | ExecutionException | TimeoutException e) {
150             LOG.warn("Unable to fetch Database list from device {}. Disconnecting from the device.",
151                     externalClient.getConnectionInfo().getRemoteAddress(), e);
152             externalClient.disconnect();
153         }
154     }
155
156     @Override
157     @SuppressFBWarnings("REC_CATCH_EXCEPTION")
158     @SuppressWarnings("checkstyle:IllegalCatch")
159     public void disconnected(final OvsdbClient client) {
160         alreadyProcessedClients.remove(client);
161         HwvtepConnectionInstance hwvtepConnectionInstance = null;
162         try {
163             LOG.info("Library disconnected {} from {}:{} to {}:{}. Cleaning up the operational data store",
164                     client.getConnectionInfo().getType(),
165                     client.getConnectionInfo().getRemoteAddress(),
166                     client.getConnectionInfo().getRemotePort(),
167                     client.getConnectionInfo().getLocalAddress(),
168                     client.getConnectionInfo().getLocalPort());
169             ConnectionInfo key = HwvtepSouthboundMapper.createConnectionInfo(client);
170             hwvtepConnectionInstance = getConnectionInstance(key);
171             if (hwvtepConnectionInstance != null) {
172                 if (hwvtepConnectionInstance.getInstanceIdentifier() != null) {
173                     int port = hwvtepConnectionInstance.getOvsdbClient().getConnectionInfo().getRemotePort();
174                     deviceUpdateHistory.get(hwvtepConnectionInstance.getInstanceIdentifier()).addToHistory(
175                             TransactionType.DELETE, new ClientConnected(client.getConnectionInfo().getRemotePort()));
176                     LOG.info("CONTROLLER - {} {}", TransactionType.DELETE, new ClientConnected(port));
177                 }
178
179
180                 // Unregister Entity ownership as soon as possible ,so this instance should
181                 // not be used as a candidate in Entity election (given that this instance is
182                 // about to disconnect as well), if current owner get disconnected from
183                 // HWVTEP device.
184                 if (hwvtepConnectionInstance.getHasDeviceOwnership()) {
185                     unregisterEntityForOwnership(hwvtepConnectionInstance);
186                     LOG.info("Client disconnected from the Leader. Delete the Hvtep Node {} ",
187                         hwvtepConnectionInstance.getInstanceIdentifier());
188                     txInvoker.invoke(new HwvtepGlobalRemoveCommand(hwvtepConnectionInstance, null, null));
189                 } else {
190                     unregisterEntityForOwnership(hwvtepConnectionInstance);
191                     LOG.info("Client disconnected from the Follower. Not deleteing the Hvtep Node {} ",
192                         hwvtepConnectionInstance.getInstanceIdentifier());
193
194                     //Do not delete if client disconnected from follower HwvtepGlobalRemoveCommand
195                 }
196
197                 removeConnectionInstance(key);
198
199                 //Controller initiated connection can be terminated from switch side.
200                 //So cleanup the instance identifier cache.
201                 removeInstanceIdentifier(key);
202                 removeConnectionInstance(hwvtepConnectionInstance.getInstanceIdentifier());
203                 retryConnection(hwvtepConnectionInstance.getInstanceIdentifier(),
204                         hwvtepConnectionInstance.getHwvtepGlobalAugmentation(),
205                         ConnectionReconciliationTriggers.ON_DISCONNECT);
206             } else {
207                 LOG.warn("HWVTEP disconnected event did not find connection instance for {}", key);
208             }
209             LOG.trace("HwvtepConnectionManager exit disconnected client: {}", client);
210         } catch (Exception e) {
211             LOG.error("Failed to execute disconnected ",e);
212         }
213     }
214
215     public OvsdbClient connect(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepGlobal)
216             throws UnknownHostException, ConnectException {
217         LOG.info("Connecting to {}", HwvtepSouthboundUtil.connectionInfoToString(hwvtepGlobal.getConnectionInfo()));
218         InetAddress ip = HwvtepSouthboundMapper.createInetAddress(hwvtepGlobal.getConnectionInfo().getRemoteIp());
219         OvsdbClient client = ovsdbConnectionService
220                         .connect(ip, hwvtepGlobal.getConnectionInfo().getRemotePort().getValue().toJava());
221         if (client != null) {
222             putInstanceIdentifier(hwvtepGlobal.getConnectionInfo(), iid.firstIdentifierOf(Node.class));
223             HwvtepConnectionInstance hwvtepConnectionInstance = connectedButCallBacksNotRegistered(client);
224             hwvtepConnectionInstance.setHwvtepGlobalAugmentation(hwvtepGlobal);
225             hwvtepConnectionInstance.setInstanceIdentifier(iid.firstIdentifierOf(Node.class));
226
227             // Register Cluster Ownership for ConnectionInfo
228             registerEntityForOwnership(hwvtepConnectionInstance);
229         } else {
230             LOG.warn("Failed to connect to OVSDB node: {}", hwvtepGlobal.getConnectionInfo());
231         }
232         return client;
233     }
234
235     public void disconnect(final HwvtepGlobalAugmentation ovsdbNode) throws UnknownHostException {
236         LOG.info("Diconnecting from {}", HwvtepSouthboundUtil.connectionInfoToString(ovsdbNode.getConnectionInfo()));
237         HwvtepConnectionInstance client = getConnectionInstance(ovsdbNode.getConnectionInfo());
238         if (client != null) {
239             client.disconnect();
240             // Unregister Cluster Ownership for ConnectionInfo
241             unregisterEntityForOwnership(client);
242             removeInstanceIdentifier(ovsdbNode.getConnectionInfo());
243         }
244     }
245
246     public HwvtepConnectionInstance connectedButCallBacksNotRegistered(final OvsdbClient externalClient) {
247         LOG.info("OVSDB Connection from {}:{}",externalClient.getConnectionInfo().getRemoteAddress(),
248                 externalClient.getConnectionInfo().getRemotePort());
249         ConnectionInfo key = HwvtepSouthboundMapper.createConnectionInfo(externalClient);
250         HwvtepConnectionInstance hwvtepConnectionInstance = getConnectionInstance(key);
251
252         // Check if existing hwvtepConnectionInstance for the OvsdbClient present.
253         // In such cases, we will see if the hwvtepConnectionInstance has same externalClient.
254         if (hwvtepConnectionInstance != null) {
255             if (hwvtepConnectionInstance.hasOvsdbClient(externalClient)) {
256                 LOG.info("HWVTEP Connection Instance {} already exists for client {}", key, externalClient);
257                 return hwvtepConnectionInstance;
258             }
259             LOG.info("HWVTEP Connection Instance {} being replaced with client {}", key, externalClient);
260             hwvtepConnectionInstance.disconnect();
261
262             // Unregister Cluster Ownership for ConnectionInfo
263             // Because the hwvtepConnectionInstance is about to be completely replaced!
264             unregisterEntityForOwnership(hwvtepConnectionInstance);
265
266             removeConnectionInstance(key);
267         }
268
269         hwvtepConnectionInstance = new HwvtepConnectionInstance(this, key,
270                 externalClient, getInstanceIdentifier(key), txInvoker, db);
271         hwvtepConnectionInstance.createTransactInvokers();
272         return hwvtepConnectionInstance;
273     }
274
275     private void putConnectionInstance(final ConnectionInfo key,final HwvtepConnectionInstance instance) {
276         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
277         clients.put(connectionInfo, instance);
278         LOG.info("Clients after put: {}", clients);
279     }
280
281     void putConnectionInstance(final InstanceIdentifier<Node> nodeIid,
282                                        final HwvtepConnectionInstance connectionInstance) {
283         nodeIidVsConnectionInstance.put(nodeIid, connectionInstance);
284     }
285
286     public HwvtepConnectionInstance getConnectionInstance(final ConnectionInfo key) {
287         if (key == null) {
288             return null;
289         }
290         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
291         return clients.get(connectionInfo);
292     }
293
294     public HwvtepConnectionInstance getConnectionInstance(final Node node) {
295         requireNonNull(node);
296         HwvtepGlobalAugmentation hwvtepGlobal = node.augmentation(HwvtepGlobalAugmentation.class);
297         PhysicalSwitchAugmentation switchNode = node.augmentation(PhysicalSwitchAugmentation.class);
298         if (hwvtepGlobal != null) {
299             if (hwvtepGlobal.getConnectionInfo() != null) {
300                 LOG.debug("Get the ConnectionInfo from HwvtepGlobal {}", hwvtepGlobal.getConnectionInfo());
301                 return getConnectionInstance(hwvtepGlobal.getConnectionInfo());
302             } else {
303                 //TODO: Case of user configured connection for now
304                 //TODO: We could get it from Managers also.
305                 return null;
306             }
307         }
308         else if (switchNode != null) {
309             LOG.debug("Get the ConnectionInfo from PhysicalSwitch");
310             return getConnectionInstance(switchNode);
311         } else {
312             LOG.warn("This is not a node that gives any hint how to find its OVSDB Manager: {}",node);
313             return null;
314         }
315     }
316
317     public HwvtepConnectionInstance getConnectionInstance(final HwvtepPhysicalSwitchAttributes node) {
318         Optional<HwvtepGlobalAugmentation> optional = HwvtepSouthboundUtil.getManagingNode(db, node);
319         if (optional.isPresent()) {
320             return getConnectionInstance(optional.get().getConnectionInfo());
321         } else {
322             return null;
323         }
324     }
325
326     public HwvtepConnectionInstance getConnectionInstanceFromNodeIid(final InstanceIdentifier<Node> nodeIid) {
327         HwvtepConnectionInstance hwvtepConnectionInstance = nodeIidVsConnectionInstance.get(nodeIid);
328         if (hwvtepConnectionInstance != null) {
329             return hwvtepConnectionInstance;
330         }
331         InstanceIdentifier<Node> globalNodeIid = HwvtepSouthboundUtil.getGlobalNodeIid(nodeIid);
332         if (globalNodeIid != null) {
333             LOG.debug("Get the ConnectionInfo from HwvtepGlobal : {}", globalNodeIid);
334             return nodeIidVsConnectionInstance.get(globalNodeIid);
335         }
336         return null;
337     }
338
339     public void stopConfigurationReconciliation(final InstanceIdentifier<Node> nodeIid) {
340         final ReconciliationTask task = new HwvtepReconciliationTask(
341                 reconciliationManager, HwvtepConnectionManager.this, nodeIid, null, null, db);
342
343         reconciliationManager.dequeue(task);
344     }
345
346     public void reconcileConfigurations(final HwvtepConnectionInstance client, final Node psNode) {
347         final InstanceIdentifier<Node> nodeIid = client.getInstanceIdentifier();
348         final ReconciliationTask task = new HwvtepReconciliationTask(
349                 reconciliationManager, HwvtepConnectionManager.this, nodeIid, psNode, client, db);
350
351         reconciliationManager.enqueue(task);
352     }
353
354     private void removeConnectionInstance(final ConnectionInfo key) {
355         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
356         clients.remove(connectionInfo);
357         LOG.info("Clients after remove: {}", clients);
358     }
359
360     private void removeConnectionInstance(final InstanceIdentifier<Node> nodeIid) {
361         if (nodeIid != null) {
362             nodeIidVsConnectionInstance.remove(nodeIid);
363         }
364     }
365
366     private void putInstanceIdentifier(final ConnectionInfo key,final InstanceIdentifier<Node> iid) {
367         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
368         instanceIdentifiers.put(connectionInfo, iid);
369     }
370
371     public InstanceIdentifier<Node> getInstanceIdentifier(final ConnectionInfo key) {
372         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
373         return instanceIdentifiers.get(connectionInfo);
374     }
375
376     private void removeInstanceIdentifier(final ConnectionInfo key) {
377         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
378         instanceIdentifiers.remove(connectionInfo);
379     }
380
381     public OvsdbClient getClient(final ConnectionInfo connectionInfo) {
382         return getConnectionInstance(connectionInfo).getOvsdbClient();
383     }
384
385     @SuppressWarnings("checkstyle:IllegalCatch")
386     private void registerCallbacks(final HwvtepConnectionInstance hwvtepConnectionInstance) {
387         LOG.info("HWVTEP entity {} is owned by this controller registering callbacks",
388                 hwvtepConnectionInstance.getConnectionInfo());
389         try {
390             hwvtepOperGlobalListener.runAfterNodeDeleted(
391                 hwvtepConnectionInstance.getInstanceIdentifier(), () -> {
392                     cleanupOperationalNode(hwvtepConnectionInstance.getInstanceIdentifier());
393                     hwvtepConnectionInstance.registerCallbacks();
394                     return null;
395                 });
396         } catch (Exception e) {
397             LOG.error("Failed to register callbacks for HWVTEP entity {} ",
398                     hwvtepConnectionInstance.getConnectionInfo(), e);
399         }
400     }
401
402
403     private void registerEntityForOwnership(final HwvtepConnectionInstance hwvtepConnectionInstance) {
404
405         Entity candidateEntity = getEntityFromConnectionInstance(hwvtepConnectionInstance);
406         if (entityConnectionMap.get(candidateEntity) != null) {
407             InstanceIdentifier<Node> iid = hwvtepConnectionInstance.getInstanceIdentifier();
408             LOG.info("Calling disconnect before processing new connection for {}", candidateEntity);
409             disconnected(entityConnectionMap.get(candidateEntity).getOvsdbClient());
410             hwvtepConnectionInstance.setInstanceIdentifier(iid);
411             putConnectionInstance(hwvtepConnectionInstance.getInstanceIdentifier(), hwvtepConnectionInstance);
412         }
413         entityConnectionMap.put(candidateEntity, hwvtepConnectionInstance);
414         hwvtepConnectionInstance.setConnectedEntity(candidateEntity);
415
416         try {
417             EntityOwnershipCandidateRegistration registration =
418                     entityOwnershipService.registerCandidate(candidateEntity);
419             hwvtepConnectionInstance.setDeviceOwnershipCandidateRegistration(registration);
420             LOG.info("HWVTEP entity {} is registered for ownership.", candidateEntity);
421         } catch (CandidateAlreadyRegisteredException e) {
422             LOG.warn("OVSDB entity {} was already registered for ownership", candidateEntity, e);
423         }
424         handleOwnershipState(candidateEntity, hwvtepConnectionInstance);
425     }
426
427     private void handleOwnershipState(final Entity candidateEntity,
428             final HwvtepConnectionInstance hwvtepConnectionInstance) {
429         //If entity already has owner, it won't get notification from EntityOwnershipService
430         //so cache the connection instances.
431         java.util.Optional<EntityOwnershipState> ownershipStateOpt =
432                 entityOwnershipService.getOwnershipState(candidateEntity);
433         if (ownershipStateOpt.isPresent()) {
434             EntityOwnershipState ownershipState = ownershipStateOpt.get();
435             putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance);
436             if (ownershipState != EntityOwnershipState.NO_OWNER) {
437                 hwvtepConnectionInstance.setHasDeviceOwnership(ownershipState == EntityOwnershipState.IS_OWNER);
438                 if (ownershipState != EntityOwnershipState.IS_OWNER) {
439                     LOG.info("HWVTEP entity {} is already owned by other southbound plugin "
440                                     + "instance, so *this* instance is NOT an OWNER of the device",
441                             hwvtepConnectionInstance.getConnectionInfo());
442                 } else {
443                     registerCallbacks(hwvtepConnectionInstance);
444                 }
445             }
446         }
447     }
448
449     private static Global getHwvtepGlobalTableEntry(final HwvtepConnectionInstance connectionInstance) {
450         final TypedDatabaseSchema dbSchema;
451         try {
452             dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get();
453         } catch (InterruptedException | ExecutionException e) {
454             LOG.warn("Not able to fetch schema for database {} from device {}",
455                     HwvtepSchemaConstants.HARDWARE_VTEP, connectionInstance.getConnectionInfo(), e);
456             return null;
457         }
458
459         GenericTableSchema hwvtepSchema = dbSchema.getTableSchema(Global.class);
460         Select<GenericTableSchema> selectOperation = op.select(hwvtepSchema);
461         selectOperation.setColumns(hwvtepSchema.getColumnList());
462
463         ArrayList<Operation> operations = new ArrayList<>(2);
464         operations.add(selectOperation);
465         operations.add(op.comment("Fetching hardware_vtep table rows"));
466
467         final List<OperationResult> results;
468         try {
469             results = connectionInstance.transact(dbSchema, operations).get();
470         } catch (InterruptedException | ExecutionException e) {
471             LOG.warn("Not able to fetch hardware_vtep table row from device {}", connectionInstance.getConnectionInfo(),
472                 e);
473             return null;
474         }
475
476         final Global globalRow;
477         if (results != null) {
478             OperationResult selectResult = results.get(0);
479             globalRow = TyperUtils.getTypedRowWrapper(dbSchema,Global.class,selectResult.getRows().get(0));
480         } else {
481             globalRow = null;
482         }
483         LOG.info("Fetched global {} from hardware_vtep schema", globalRow);
484         return globalRow;
485     }
486
487     private Entity getEntityFromConnectionInstance(@NonNull final HwvtepConnectionInstance hwvtepConnectionInstance) {
488         InstanceIdentifier<Node> iid = hwvtepConnectionInstance.getInstanceIdentifier();
489         if (iid == null) {
490             //TODO: Is Global the right one?
491             Global hwvtepGlobalRow = getHwvtepGlobalTableEntry(hwvtepConnectionInstance);
492             iid = HwvtepSouthboundMapper.getInstanceIdentifier(hwvtepGlobalRow);
493             /* Let's set the iid now */
494             hwvtepConnectionInstance.setInstanceIdentifier(iid);
495             LOG.trace("InstanceIdentifier {} generated for device "
496                     + "connection {}",iid, hwvtepConnectionInstance.getConnectionInfo());
497             controllerTxHistory.putIfAbsent(iid,
498                     new TransactionHistory(TRANSACTION_HISTORY_CAPACITY, TRANSACTION_HISTORY_WATERMARK));
499             deviceUpdateHistory.putIfAbsent(iid,
500                     new TransactionHistory(TRANSACTION_HISTORY_CAPACITY, TRANSACTION_HISTORY_WATERMARK));
501             TransactionHistory controllerLog = controllerTxHistory.get(iid);
502             TransactionHistory deviceLog = deviceUpdateHistory.get(iid);
503             int port = hwvtepConnectionInstance.getOvsdbClient().getConnectionInfo().getRemotePort();
504             deviceLog.addToHistory(TransactionType.ADD, new ClientConnected(port));
505             LOG.info("CONTROLLER - {} {}", TransactionType.ADD, new ClientConnected(port));
506             hwvtepConnectionInstance.setControllerTxHistory(controllerLog);
507             hwvtepConnectionInstance.setDeviceUpdateHistory(deviceLog);
508         }
509         Entity deviceEntity = new Entity(ENTITY_TYPE, iid);
510         LOG.debug("Entity {} created for device connection {}",
511                 deviceEntity, hwvtepConnectionInstance.getConnectionInfo());
512         return deviceEntity;
513     }
514
515     private void unregisterEntityForOwnership(final HwvtepConnectionInstance hwvtepConnectionInstance) {
516         hwvtepConnectionInstance.closeDeviceOwnershipCandidateRegistration();
517         entityConnectionMap.remove(hwvtepConnectionInstance.getConnectedEntity());
518     }
519
520     public void reconcileConnection(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepNode) {
521         this.retryConnection(iid, hwvtepNode,
522                 ConnectionReconciliationTriggers.ON_CONTROLLER_INITIATED_CONNECTION_FAILURE);
523     }
524
525     public void stopConnectionReconciliationIfActive(final InstanceIdentifier<?> iid,
526             final HwvtepGlobalAugmentation hwvtepNode) {
527         final ReconciliationTask task = new ConnectionReconciliationTask(
528                 reconciliationManager,
529                 this,
530                 iid,
531                 hwvtepNode);
532         reconciliationManager.dequeue(task);
533     }
534
535     private void retryConnection(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepNode,
536                                  final ConnectionReconciliationTriggers trigger) {
537         if (hwvtepNode == null) {
538             //switch initiated connection
539             return;
540         }
541         final ReconciliationTask task = new ConnectionReconciliationTask(
542                 reconciliationManager,
543                 this,
544                 iid,
545                 hwvtepNode);
546
547         if (reconciliationManager.isEnqueued(task)) {
548             return;
549         }
550
551         switch (trigger) {
552             case ON_CONTROLLER_INITIATED_CONNECTION_FAILURE:
553                 reconciliationManager.enqueueForRetry(task);
554                 break;
555             case ON_DISCONNECT: {
556                 ReadTransaction tx = db.newReadOnlyTransaction();
557                 FluentFuture<Optional<Node>> readNodeFuture =
558                         tx.read(LogicalDatastoreType.CONFIGURATION, iid);
559
560                 readNodeFuture.addCallback(new FutureCallback<Optional<Node>>() {
561                     @Override
562                     public void onSuccess(final Optional<Node> node) {
563                         if (node.isPresent()) {
564                             HwvtepGlobalAugmentation augmentation = node.get()
565                                     .augmentation(HwvtepGlobalAugmentation.class);
566                             if (augmentation == null || augmentation.getConnectionInfo() == null) {
567                                 return;
568                             }
569                             LOG.info(
570                                 "Disconnected/Failed connection {} was controller initiated, attempting reconnection",
571                                 hwvtepNode.getConnectionInfo());
572                             reconciliationManager.enqueue(task);
573
574                         } else {
575                             LOG.debug("Connection {} was switch initiated, no reconciliation is required",
576                                 iid.firstKeyOf(Node.class).getNodeId());
577                         }
578                     }
579
580                     @Override
581                     public void onFailure(final Throwable ex) {
582                         LOG.warn("Read Config/DS for Node failed! {}", iid, ex);
583                     }
584                 }, MoreExecutors.directExecutor());
585                 break;
586             }
587             default:
588                 break;
589         }
590     }
591
592     public void handleOwnershipChanged(final EntityOwnershipChange ownershipChange) {
593         HwvtepConnectionInstance hwvtepConnectionInstance =
594                 getConnectionInstanceFromEntity(ownershipChange.getEntity());
595         LOG.info("handleOwnershipChanged: {} event received for device {}",
596                 ownershipChange, hwvtepConnectionInstance != null ? hwvtepConnectionInstance.getConnectionInfo()
597                         : "THAT'S NOT REGISTERED BY THIS SOUTHBOUND PLUGIN INSTANCE");
598
599         if (hwvtepConnectionInstance == null) {
600             if (ownershipChange.getState().isOwner()) {
601                 LOG.warn("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity());
602             } else {
603                 // EntityOwnershipService sends notification to all the nodes, irrespective of whether
604                 // that instance registered for the device ownership or not. It is to make sure that
605                 // If all the controller instance that was connected to the device are down, so the
606                 // running instance can clear up the operational data store even though it was not
607                 // connected to the device.
608                 LOG.debug("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity());
609             }
610
611             // If entity has no owner, clean up the operational data store (it's possible because owner controller
612             // might went down abruptly and didn't get a chance to clean up the operational data store.
613             if (!ownershipChange.getState().hasOwner()) {
614                 LOG.info("{} has no owner, cleaning up the operational data store", ownershipChange.getEntity());
615                 // Below code might look weird but it's required. We want to give first opportunity to the
616                 // previous owner of the device to clean up the operational data store if there is no owner now.
617                 // That way we will avoid lot of nasty md-sal exceptions because of concurrent delete.
618                 InstanceIdentifier<Node> nodeIid =
619                         (InstanceIdentifier<Node>) ownershipChange.getEntity().getIdentifier();
620                 hwvtepOperGlobalListener.scheduleOldConnectionNodeDelete(nodeIid);
621                 /*
622                 Assuming node1 was the owner earlier.
623                 If the owner relinquished he would have cleaned it already in which case the above would be a no op
624                 If the owner crashed then the above would clean the node after the scheduled delay
625                 The live nodes (two and three) will try to cleanup but that is ok one of them ends up cleaning.
626                 But if the southbound connects again that connection can itself trigger the pending cleanup and
627                 the above op would become noop again.
628
629                 In summary
630                 In The following cases it would be a noop
631                 1) The southbound connects again within the scheduled cleanup delay.
632                 2) The owner node1 which is not crashed cleaned the node properly.
633
634                 In the following case both node2 and node3 will try to clean it (one of them will succeed ).
635                  1) node1 which was the owner crashed
636                  */
637             }
638             return;
639         }
640         //Connection detail need to be cached, irrespective of ownership result.
641         putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance);
642
643         if (ownershipChange.getState().isOwner() == hwvtepConnectionInstance.getHasDeviceOwnership()) {
644             LOG.debug("handleOwnershipChanged: no change in ownership for {}. Ownership status is : {}",
645                     hwvtepConnectionInstance.getConnectionInfo(), hwvtepConnectionInstance.getHasDeviceOwnership());
646             return;
647         }
648
649         hwvtepConnectionInstance.setHasDeviceOwnership(ownershipChange.getState().isOwner());
650         // You were not an owner, but now you are
651         if (ownershipChange.getState().isOwner()) {
652             LOG.info("handleOwnershipChanged: *this* southbound plugin instance is owner of device {}",
653                     hwvtepConnectionInstance.getConnectionInfo());
654
655             //*this* instance of southbound plugin is owner of the device,
656             //so register for monitor callbacks
657             registerCallbacks(hwvtepConnectionInstance);
658
659         } else {
660             //You were owner of the device, but now you are not. With the current ownership
661             //grant mechanism, this scenario should not occur. Because this scenario will occur
662             //when this controller went down or switch flap the connection, but in both the case
663             //it will go through the re-registration process. We need to implement this condition
664             //when clustering service implement a ownership grant strategy which can revoke the
665             //device ownership for load balancing the devices across the instances.
666             //Once this condition occur, we should unregister the callback.
667             LOG.error("handleOwnershipChanged: *this* southbound plugin instance is no longer the owner of device {}",
668                     hwvtepConnectionInstance.getNodeId().getValue());
669         }
670     }
671
672     private HwvtepConnectionInstance getConnectionInstanceFromEntity(final Entity entity) {
673         return entityConnectionMap.get(entity);
674     }
675
676     private static final class HwvtepDeviceEntityOwnershipListener implements EntityOwnershipListener {
677         private final HwvtepConnectionManager hcm;
678         private final EntityOwnershipListenerRegistration listenerRegistration;
679
680         HwvtepDeviceEntityOwnershipListener(final HwvtepConnectionManager hcm,
681                 final EntityOwnershipService entityOwnershipService) {
682             this.hcm = hcm;
683             listenerRegistration = entityOwnershipService.registerListener(ENTITY_TYPE, this);
684         }
685
686         public void close() {
687             listenerRegistration.close();
688         }
689
690         @Override
691         public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
692             hcm.handleOwnershipChanged(ownershipChange);
693         }
694     }
695
696     final Map<InstanceIdentifier<Node>, HwvtepDeviceInfo> allConnectedInstances() {
697         return Maps.transformValues(Collections.unmodifiableMap(nodeIidVsConnectionInstance),
698             HwvtepConnectionInstance::getDeviceInfo);
699     }
700
701     final Map<InstanceIdentifier<Node>, TransactionHistory> controllerTxHistory() {
702         return Collections.unmodifiableMap(controllerTxHistory);
703     }
704
705     final Map<InstanceIdentifier<Node>, TransactionHistory> deviceUpdateHistory() {
706         return Collections.unmodifiableMap(deviceUpdateHistory);
707     }
708
709     public void cleanupOperationalNode(final InstanceIdentifier<Node> nodeIid) {
710         txInvoker.invoke(new HwvtepGlobalRemoveCommand(nodeIid));
711     }
712
713     private enum ConnectionReconciliationTriggers {
714         /*
715         Reconciliation trigger for scenario where controller's attempt
716         to connect to switch fails on config data store notification
717         */
718         ON_CONTROLLER_INITIATED_CONNECTION_FAILURE,
719
720         /*
721         Reconciliation trigger for the scenario where controller
722         initiated connection disconnects.
723         */
724         ON_DISCONNECT
725     }
726 }