Migrate to Objects.requireNonNull()
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / HwvtepConnectionManager.java
1 /*
2  * Copyright (c) 2015, 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.hwvtepsouthbound;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.ovsdb.lib.operations.Operations.op;
12
13 import com.google.common.util.concurrent.FluentFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
17 import java.net.ConnectException;
18 import java.net.InetAddress;
19 import java.net.UnknownHostException;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Optional;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.eclipse.jdt.annotation.NonNull;
30 import org.opendaylight.mdsal.binding.api.DataBroker;
31 import org.opendaylight.mdsal.binding.api.ReadTransaction;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.mdsal.eos.binding.api.Entity;
34 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipCandidateRegistration;
35 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipChange;
36 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListener;
37 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListenerRegistration;
38 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
39 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
40 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
41 import org.opendaylight.ovsdb.hwvtepsouthbound.events.ClientConnected;
42 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationManager;
43 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationTask;
44 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.configuration.HwvtepReconciliationTask;
45 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.connection.ConnectionReconciliationTask;
46 import org.opendaylight.ovsdb.hwvtepsouthbound.transact.DependencyQueue;
47 import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.HwvtepGlobalRemoveCommand;
48 import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionInvoker;
49 import org.opendaylight.ovsdb.lib.OvsdbClient;
50 import org.opendaylight.ovsdb.lib.OvsdbConnection;
51 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
52 import org.opendaylight.ovsdb.lib.operations.Operation;
53 import org.opendaylight.ovsdb.lib.operations.OperationResult;
54 import org.opendaylight.ovsdb.lib.operations.Select;
55 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
56 import org.opendaylight.ovsdb.lib.schema.typed.TypedDatabaseSchema;
57 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
58 import org.opendaylight.ovsdb.schema.hardwarevtep.Global;
59 import org.opendaylight.ovsdb.utils.mdsal.utils.TransactionHistory;
60 import org.opendaylight.ovsdb.utils.mdsal.utils.TransactionType;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepPhysicalSwitchAttributes;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.ConnectionInfo;
65 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
66 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoCloseable {
71     private final Map<ConnectionInfo, HwvtepConnectionInstance> clients = new ConcurrentHashMap<>();
72     private static final Logger LOG = LoggerFactory.getLogger(HwvtepConnectionManager.class);
73     private static final String ENTITY_TYPE = "hwvtep";
74     private static final int DB_FETCH_TIMEOUT = 1000;
75     private static final int TRANSACTION_HISTORY_CAPACITY = 10000;
76     private static final int TRANSACTION_HISTORY_WATERMARK = 7500;
77
78     private final DataBroker db;
79     private final TransactionInvoker txInvoker;
80     private final Map<ConnectionInfo,InstanceIdentifier<Node>> instanceIdentifiers = new ConcurrentHashMap<>();
81     private final Map<Entity, HwvtepConnectionInstance> entityConnectionMap = new ConcurrentHashMap<>();
82     private final EntityOwnershipService entityOwnershipService;
83     private final HwvtepDeviceEntityOwnershipListener hwvtepDeviceEntityOwnershipListener;
84     private final ReconciliationManager reconciliationManager;
85     private final Map<InstanceIdentifier<Node>, HwvtepConnectionInstance> nodeIidVsConnectionInstance =
86             new ConcurrentHashMap<>();
87     private final HwvtepOperGlobalListener hwvtepOperGlobalListener;
88     private final Map<InstanceIdentifier<Node>, TransactionHistory> controllerTxHistory = new ConcurrentHashMap<>();
89     private final Map<InstanceIdentifier<Node>, TransactionHistory> deviceUpdateHistory = new ConcurrentHashMap<>();
90     private final OvsdbConnection ovsdbConnectionService;
91     private final Map<OvsdbClient, OvsdbClient> alreadyProcessedClients = new ConcurrentHashMap<>();
92
93     public HwvtepConnectionManager(final DataBroker db, final TransactionInvoker txInvoker,
94                     final EntityOwnershipService entityOwnershipService, final OvsdbConnection ovsdbConnectionService) {
95         this.db = db;
96         this.txInvoker = txInvoker;
97         this.entityOwnershipService = entityOwnershipService;
98         this.hwvtepDeviceEntityOwnershipListener = new HwvtepDeviceEntityOwnershipListener(this,entityOwnershipService);
99         this.reconciliationManager = new ReconciliationManager(db);
100         this.hwvtepOperGlobalListener = new HwvtepOperGlobalListener(db, this);
101         this.ovsdbConnectionService = ovsdbConnectionService;
102     }
103
104     @Override
105     public void close() throws Exception {
106         if (hwvtepDeviceEntityOwnershipListener != null) {
107             hwvtepDeviceEntityOwnershipListener.close();
108         }
109         if (hwvtepOperGlobalListener != null) {
110             hwvtepOperGlobalListener.close();
111         }
112
113         for (HwvtepConnectionInstance client: clients.values()) {
114             client.disconnect();
115         }
116         DependencyQueue.close();
117     }
118
119     @Override
120     public void connected(final OvsdbClient externalClient) {
121         if (alreadyProcessedClients.containsKey(externalClient)) {
122             LOG.info("Hwvtep Library already connected {} from {}:{} to {}:{} to this, hence skipping the processing",
123                     externalClient.getConnectionInfo().getType(),
124                     externalClient.getConnectionInfo().getRemoteAddress(),
125                     externalClient.getConnectionInfo().getRemotePort(),
126                     externalClient.getConnectionInfo().getLocalAddress(),
127                     externalClient.getConnectionInfo().getLocalPort());
128             return;
129         }
130         alreadyProcessedClients.put(externalClient, externalClient);
131         HwvtepConnectionInstance hwClient = null;
132         try {
133             List<String> databases = externalClient.getDatabases().get(DB_FETCH_TIMEOUT, TimeUnit.MILLISECONDS);
134             if (databases != null && !databases.isEmpty() && databases.contains(HwvtepSchemaConstants.HARDWARE_VTEP)) {
135                 LOG.info("Hwvtep Library connected {} from {}:{} to {}:{}",
136                         externalClient.getConnectionInfo().getType(),
137                         externalClient.getConnectionInfo().getRemoteAddress(),
138                         externalClient.getConnectionInfo().getRemotePort(),
139                         externalClient.getConnectionInfo().getLocalAddress(),
140                         externalClient.getConnectionInfo().getLocalPort());
141                 hwClient = connectedButCallBacksNotRegistered(externalClient);
142                 registerEntityForOwnership(hwClient);
143                 HwvtepOperGlobalListener.runAfterTimeoutIfNodeNotCreated(hwClient.getInstanceIdentifier(), () -> {
144                     externalClient.disconnect();
145                     disconnected(externalClient);
146                 });
147             }
148         } catch (InterruptedException | ExecutionException | TimeoutException e) {
149             LOG.warn("Unable to fetch Database list from device {}. Disconnecting from the device.",
150                     externalClient.getConnectionInfo().getRemoteAddress(), e);
151             externalClient.disconnect();
152         }
153     }
154
155     @Override
156     @SuppressFBWarnings("REC_CATCH_EXCEPTION")
157     @SuppressWarnings("checkstyle:IllegalCatch")
158     public void disconnected(final OvsdbClient client) {
159         alreadyProcessedClients.remove(client);
160         HwvtepConnectionInstance hwvtepConnectionInstance = null;
161         try {
162             LOG.info("Library disconnected {} from {}:{} to {}:{}. Cleaning up the operational data store",
163                     client.getConnectionInfo().getType(),
164                     client.getConnectionInfo().getRemoteAddress(),
165                     client.getConnectionInfo().getRemotePort(),
166                     client.getConnectionInfo().getLocalAddress(),
167                     client.getConnectionInfo().getLocalPort());
168             ConnectionInfo key = HwvtepSouthboundMapper.createConnectionInfo(client);
169             hwvtepConnectionInstance = getConnectionInstance(key);
170             if (hwvtepConnectionInstance != null) {
171                 if (hwvtepConnectionInstance.getInstanceIdentifier() != null) {
172                     int port = hwvtepConnectionInstance.getOvsdbClient().getConnectionInfo().getRemotePort();
173                     deviceUpdateHistory.get(hwvtepConnectionInstance.getInstanceIdentifier()).addToHistory(
174                             TransactionType.DELETE, new ClientConnected(client.getConnectionInfo().getRemotePort()));
175                     LOG.info("CONTROLLER - {} {}", TransactionType.DELETE, new ClientConnected(port));
176                 }
177
178
179                 // Unregister Entity ownership as soon as possible ,so this instance should
180                 // not be used as a candidate in Entity election (given that this instance is
181                 // about to disconnect as well), if current owner get disconnected from
182                 // HWVTEP device.
183                 if (hwvtepConnectionInstance.getHasDeviceOwnership()) {
184                     unregisterEntityForOwnership(hwvtepConnectionInstance);
185                     LOG.info("Client disconnected from the Leader. Delete the Hvtep Node {} ",
186                         hwvtepConnectionInstance.getInstanceIdentifier());
187                     txInvoker.invoke(new HwvtepGlobalRemoveCommand(hwvtepConnectionInstance, null, null));
188                 } else {
189                     unregisterEntityForOwnership(hwvtepConnectionInstance);
190                     LOG.info("Client disconnected from the Follower. Not deleteing the Hvtep Node {} ",
191                         hwvtepConnectionInstance.getInstanceIdentifier());
192
193                     //Do not delete if client disconnected from follower HwvtepGlobalRemoveCommand
194                 }
195
196                 removeConnectionInstance(key);
197
198                 //Controller initiated connection can be terminated from switch side.
199                 //So cleanup the instance identifier cache.
200                 removeInstanceIdentifier(key);
201                 removeConnectionInstance(hwvtepConnectionInstance.getInstanceIdentifier());
202                 retryConnection(hwvtepConnectionInstance.getInstanceIdentifier(),
203                         hwvtepConnectionInstance.getHwvtepGlobalAugmentation(),
204                         ConnectionReconciliationTriggers.ON_DISCONNECT);
205             } else {
206                 LOG.warn("HWVTEP disconnected event did not find connection instance for {}", key);
207             }
208             LOG.trace("HwvtepConnectionManager exit disconnected client: {}", client);
209         } catch (Exception e) {
210             LOG.error("Failed to execute disconnected ",e);
211         }
212     }
213
214     public OvsdbClient connect(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepGlobal)
215             throws UnknownHostException, ConnectException {
216         LOG.info("Connecting to {}", HwvtepSouthboundUtil.connectionInfoToString(hwvtepGlobal.getConnectionInfo()));
217         InetAddress ip = HwvtepSouthboundMapper.createInetAddress(hwvtepGlobal.getConnectionInfo().getRemoteIp());
218         OvsdbClient client = ovsdbConnectionService
219                         .connect(ip, hwvtepGlobal.getConnectionInfo().getRemotePort().getValue().toJava());
220         if (client != null) {
221             putInstanceIdentifier(hwvtepGlobal.getConnectionInfo(), iid.firstIdentifierOf(Node.class));
222             HwvtepConnectionInstance hwvtepConnectionInstance = connectedButCallBacksNotRegistered(client);
223             hwvtepConnectionInstance.setHwvtepGlobalAugmentation(hwvtepGlobal);
224             hwvtepConnectionInstance.setInstanceIdentifier(iid.firstIdentifierOf(Node.class));
225
226             // Register Cluster Ownership for ConnectionInfo
227             registerEntityForOwnership(hwvtepConnectionInstance);
228         } else {
229             LOG.warn("Failed to connect to OVSDB node: {}", hwvtepGlobal.getConnectionInfo());
230         }
231         return client;
232     }
233
234     public void disconnect(final HwvtepGlobalAugmentation ovsdbNode) throws UnknownHostException {
235         LOG.info("Diconnecting from {}", HwvtepSouthboundUtil.connectionInfoToString(ovsdbNode.getConnectionInfo()));
236         HwvtepConnectionInstance client = getConnectionInstance(ovsdbNode.getConnectionInfo());
237         if (client != null) {
238             client.disconnect();
239             // Unregister Cluster Ownership for ConnectionInfo
240             unregisterEntityForOwnership(client);
241             removeInstanceIdentifier(ovsdbNode.getConnectionInfo());
242         }
243     }
244
245     public HwvtepConnectionInstance connectedButCallBacksNotRegistered(final OvsdbClient externalClient) {
246         LOG.info("OVSDB Connection from {}:{}",externalClient.getConnectionInfo().getRemoteAddress(),
247                 externalClient.getConnectionInfo().getRemotePort());
248         ConnectionInfo key = HwvtepSouthboundMapper.createConnectionInfo(externalClient);
249         HwvtepConnectionInstance hwvtepConnectionInstance = getConnectionInstance(key);
250
251         // Check if existing hwvtepConnectionInstance for the OvsdbClient present.
252         // In such cases, we will see if the hwvtepConnectionInstance has same externalClient.
253         if (hwvtepConnectionInstance != null) {
254             if (hwvtepConnectionInstance.hasOvsdbClient(externalClient)) {
255                 LOG.info("HWVTEP Connection Instance {} already exists for client {}", key, externalClient);
256                 return hwvtepConnectionInstance;
257             }
258             LOG.info("HWVTEP Connection Instance {} being replaced with client {}", key, externalClient);
259             hwvtepConnectionInstance.disconnect();
260
261             // Unregister Cluster Ownership for ConnectionInfo
262             // Because the hwvtepConnectionInstance is about to be completely replaced!
263             unregisterEntityForOwnership(hwvtepConnectionInstance);
264
265             removeConnectionInstance(key);
266         }
267
268         hwvtepConnectionInstance = new HwvtepConnectionInstance(this, key,
269                 externalClient, getInstanceIdentifier(key), txInvoker, db);
270         hwvtepConnectionInstance.createTransactInvokers();
271         return hwvtepConnectionInstance;
272     }
273
274     private void putConnectionInstance(final ConnectionInfo key,final HwvtepConnectionInstance instance) {
275         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
276         clients.put(connectionInfo, instance);
277         LOG.info("Clients after put: {}", clients);
278     }
279
280     void putConnectionInstance(final InstanceIdentifier<Node> nodeIid,
281                                        final HwvtepConnectionInstance connectionInstance) {
282         nodeIidVsConnectionInstance.put(nodeIid, connectionInstance);
283     }
284
285     public HwvtepConnectionInstance getConnectionInstance(final ConnectionInfo key) {
286         if (key == null) {
287             return null;
288         }
289         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
290         return clients.get(connectionInfo);
291     }
292
293     public HwvtepConnectionInstance getConnectionInstance(final Node node) {
294         requireNonNull(node);
295         HwvtepGlobalAugmentation hwvtepGlobal = node.augmentation(HwvtepGlobalAugmentation.class);
296         PhysicalSwitchAugmentation switchNode = node.augmentation(PhysicalSwitchAugmentation.class);
297         if (hwvtepGlobal != null) {
298             if (hwvtepGlobal.getConnectionInfo() != null) {
299                 LOG.debug("Get the ConnectionInfo from HwvtepGlobal {}", hwvtepGlobal.getConnectionInfo());
300                 return getConnectionInstance(hwvtepGlobal.getConnectionInfo());
301             } else {
302                 //TODO: Case of user configured connection for now
303                 //TODO: We could get it from Managers also.
304                 return null;
305             }
306         }
307         else if (switchNode != null) {
308             LOG.debug("Get the ConnectionInfo from PhysicalSwitch");
309             return getConnectionInstance(switchNode);
310         } else {
311             LOG.warn("This is not a node that gives any hint how to find its OVSDB Manager: {}",node);
312             return null;
313         }
314     }
315
316     public HwvtepConnectionInstance getConnectionInstance(final HwvtepPhysicalSwitchAttributes node) {
317         Optional<HwvtepGlobalAugmentation> optional = HwvtepSouthboundUtil.getManagingNode(db, node);
318         if (optional.isPresent()) {
319             return getConnectionInstance(optional.get().getConnectionInfo());
320         } else {
321             return null;
322         }
323     }
324
325     public HwvtepConnectionInstance getConnectionInstanceFromNodeIid(final InstanceIdentifier<Node> nodeIid) {
326         HwvtepConnectionInstance hwvtepConnectionInstance = nodeIidVsConnectionInstance.get(nodeIid);
327         if (hwvtepConnectionInstance != null) {
328             return hwvtepConnectionInstance;
329         }
330         InstanceIdentifier<Node> globalNodeIid = HwvtepSouthboundUtil.getGlobalNodeIid(nodeIid);
331         if (globalNodeIid != null) {
332             LOG.debug("Get the ConnectionInfo from HwvtepGlobal : {}", globalNodeIid);
333             return nodeIidVsConnectionInstance.get(globalNodeIid);
334         }
335         return null;
336     }
337
338     public void stopConfigurationReconciliation(final InstanceIdentifier<Node> nodeIid) {
339         final ReconciliationTask task = new HwvtepReconciliationTask(
340                 reconciliationManager, HwvtepConnectionManager.this, nodeIid, null, null, db);
341
342         reconciliationManager.dequeue(task);
343     }
344
345     public void reconcileConfigurations(final HwvtepConnectionInstance client, final Node psNode) {
346         final InstanceIdentifier<Node> nodeIid = client.getInstanceIdentifier();
347         final ReconciliationTask task = new HwvtepReconciliationTask(
348                 reconciliationManager, HwvtepConnectionManager.this, nodeIid, psNode, client, db);
349
350         reconciliationManager.enqueue(task);
351     }
352
353     private void removeConnectionInstance(final ConnectionInfo key) {
354         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
355         clients.remove(connectionInfo);
356         LOG.info("Clients after remove: {}", clients);
357     }
358
359     private void removeConnectionInstance(final InstanceIdentifier<Node> nodeIid) {
360         if (nodeIid != null) {
361             nodeIidVsConnectionInstance.remove(nodeIid);
362         }
363     }
364
365     private void putInstanceIdentifier(final ConnectionInfo key,final InstanceIdentifier<Node> iid) {
366         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
367         instanceIdentifiers.put(connectionInfo, iid);
368     }
369
370     public InstanceIdentifier<Node> getInstanceIdentifier(final ConnectionInfo key) {
371         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
372         return instanceIdentifiers.get(connectionInfo);
373     }
374
375     private void removeInstanceIdentifier(final ConnectionInfo key) {
376         ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
377         instanceIdentifiers.remove(connectionInfo);
378     }
379
380     public OvsdbClient getClient(final ConnectionInfo connectionInfo) {
381         return getConnectionInstance(connectionInfo).getOvsdbClient();
382     }
383
384     @SuppressWarnings("checkstyle:IllegalCatch")
385     private void registerCallbacks(final HwvtepConnectionInstance hwvtepConnectionInstance) {
386         LOG.info("HWVTEP entity {} is owned by this controller registering callbacks",
387                 hwvtepConnectionInstance.getConnectionInfo());
388         try {
389             hwvtepOperGlobalListener.runAfterNodeDeleted(
390                 hwvtepConnectionInstance.getInstanceIdentifier(), () -> {
391                     cleanupOperationalNode(hwvtepConnectionInstance.getInstanceIdentifier());
392                     hwvtepConnectionInstance.registerCallbacks();
393                     return null;
394                 });
395         } catch (Exception e) {
396             LOG.error("Failed to register callbacks for HWVTEP entity {} ",
397                     hwvtepConnectionInstance.getConnectionInfo(), e);
398         }
399     }
400
401
402     private void registerEntityForOwnership(final HwvtepConnectionInstance hwvtepConnectionInstance) {
403
404         Entity candidateEntity = getEntityFromConnectionInstance(hwvtepConnectionInstance);
405         if (entityConnectionMap.get(candidateEntity) != null) {
406             InstanceIdentifier<Node> iid = hwvtepConnectionInstance.getInstanceIdentifier();
407             LOG.info("Calling disconnect before processing new connection for {}", candidateEntity);
408             disconnected(entityConnectionMap.get(candidateEntity).getOvsdbClient());
409             hwvtepConnectionInstance.setInstanceIdentifier(iid);
410             putConnectionInstance(hwvtepConnectionInstance.getInstanceIdentifier(), hwvtepConnectionInstance);
411         }
412         entityConnectionMap.put(candidateEntity, hwvtepConnectionInstance);
413         hwvtepConnectionInstance.setConnectedEntity(candidateEntity);
414
415         try {
416             EntityOwnershipCandidateRegistration registration =
417                     entityOwnershipService.registerCandidate(candidateEntity);
418             hwvtepConnectionInstance.setDeviceOwnershipCandidateRegistration(registration);
419             LOG.info("HWVTEP entity {} is registered for ownership.", candidateEntity);
420         } catch (CandidateAlreadyRegisteredException e) {
421             LOG.warn("OVSDB entity {} was already registered for ownership", candidateEntity, e);
422         }
423         handleOwnershipState(candidateEntity, hwvtepConnectionInstance);
424     }
425
426     private void handleOwnershipState(final Entity candidateEntity,
427             final HwvtepConnectionInstance hwvtepConnectionInstance) {
428         //If entity already has owner, it won't get notification from EntityOwnershipService
429         //so cache the connection instances.
430         java.util.Optional<EntityOwnershipState> ownershipStateOpt =
431                 entityOwnershipService.getOwnershipState(candidateEntity);
432         if (ownershipStateOpt.isPresent()) {
433             EntityOwnershipState ownershipState = ownershipStateOpt.get();
434             putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance);
435             if (ownershipState != EntityOwnershipState.NO_OWNER) {
436                 hwvtepConnectionInstance.setHasDeviceOwnership(ownershipState == EntityOwnershipState.IS_OWNER);
437                 if (ownershipState != EntityOwnershipState.IS_OWNER) {
438                     LOG.info("HWVTEP entity {} is already owned by other southbound plugin "
439                                     + "instance, so *this* instance is NOT an OWNER of the device",
440                             hwvtepConnectionInstance.getConnectionInfo());
441                 } else {
442                     registerCallbacks(hwvtepConnectionInstance);
443                 }
444             }
445         }
446     }
447
448     private static Global getHwvtepGlobalTableEntry(final HwvtepConnectionInstance connectionInstance) {
449         final TypedDatabaseSchema dbSchema;
450         try {
451             dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get();
452         } catch (InterruptedException | ExecutionException e) {
453             LOG.warn("Not able to fetch schema for database {} from device {}",
454                     HwvtepSchemaConstants.HARDWARE_VTEP, connectionInstance.getConnectionInfo(), e);
455             return null;
456         }
457
458         GenericTableSchema hwvtepSchema = dbSchema.getTableSchema(Global.class);
459         Select<GenericTableSchema> selectOperation = op.select(hwvtepSchema);
460         selectOperation.setColumns(hwvtepSchema.getColumnList());
461
462         ArrayList<Operation> operations = new ArrayList<>(2);
463         operations.add(selectOperation);
464         operations.add(op.comment("Fetching hardware_vtep table rows"));
465
466         final List<OperationResult> results;
467         try {
468             results = connectionInstance.transact(dbSchema, operations).get();
469         } catch (InterruptedException | ExecutionException e) {
470             LOG.warn("Not able to fetch hardware_vtep table row from device {}", connectionInstance.getConnectionInfo(),
471                 e);
472             return null;
473         }
474
475         final Global globalRow;
476         if (results != null) {
477             OperationResult selectResult = results.get(0);
478             globalRow = TyperUtils.getTypedRowWrapper(dbSchema,Global.class,selectResult.getRows().get(0));
479         } else {
480             globalRow = null;
481         }
482         LOG.info("Fetched global {} from hardware_vtep schema", globalRow);
483         return globalRow;
484     }
485
486     private Entity getEntityFromConnectionInstance(@NonNull final HwvtepConnectionInstance hwvtepConnectionInstance) {
487         InstanceIdentifier<Node> iid = hwvtepConnectionInstance.getInstanceIdentifier();
488         if (iid == null) {
489             //TODO: Is Global the right one?
490             Global hwvtepGlobalRow = getHwvtepGlobalTableEntry(hwvtepConnectionInstance);
491             iid = HwvtepSouthboundMapper.getInstanceIdentifier(hwvtepGlobalRow);
492             /* Let's set the iid now */
493             hwvtepConnectionInstance.setInstanceIdentifier(iid);
494             LOG.trace("InstanceIdentifier {} generated for device "
495                     + "connection {}",iid, hwvtepConnectionInstance.getConnectionInfo());
496             controllerTxHistory.putIfAbsent(iid,
497                     new TransactionHistory(TRANSACTION_HISTORY_CAPACITY, TRANSACTION_HISTORY_WATERMARK));
498             deviceUpdateHistory.putIfAbsent(iid,
499                     new TransactionHistory(TRANSACTION_HISTORY_CAPACITY, TRANSACTION_HISTORY_WATERMARK));
500             TransactionHistory controllerLog = controllerTxHistory.get(iid);
501             TransactionHistory deviceLog = deviceUpdateHistory.get(iid);
502             int port = hwvtepConnectionInstance.getOvsdbClient().getConnectionInfo().getRemotePort();
503             deviceLog.addToHistory(TransactionType.ADD, new ClientConnected(port));
504             LOG.info("CONTROLLER - {} {}", TransactionType.ADD, new ClientConnected(port));
505             hwvtepConnectionInstance.setControllerTxHistory(controllerLog);
506             hwvtepConnectionInstance.setDeviceUpdateHistory(deviceLog);
507         }
508         Entity deviceEntity = new Entity(ENTITY_TYPE, iid);
509         LOG.debug("Entity {} created for device connection {}",
510                 deviceEntity, hwvtepConnectionInstance.getConnectionInfo());
511         return deviceEntity;
512     }
513
514     private void unregisterEntityForOwnership(final HwvtepConnectionInstance hwvtepConnectionInstance) {
515         hwvtepConnectionInstance.closeDeviceOwnershipCandidateRegistration();
516         entityConnectionMap.remove(hwvtepConnectionInstance.getConnectedEntity());
517     }
518
519     public void reconcileConnection(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepNode) {
520         this.retryConnection(iid, hwvtepNode,
521                 ConnectionReconciliationTriggers.ON_CONTROLLER_INITIATED_CONNECTION_FAILURE);
522     }
523
524     public void stopConnectionReconciliationIfActive(final InstanceIdentifier<?> iid,
525             final HwvtepGlobalAugmentation hwvtepNode) {
526         final ReconciliationTask task = new ConnectionReconciliationTask(
527                 reconciliationManager,
528                 this,
529                 iid,
530                 hwvtepNode);
531         reconciliationManager.dequeue(task);
532     }
533
534     private void retryConnection(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepNode,
535                                  final ConnectionReconciliationTriggers trigger) {
536         if (hwvtepNode == null) {
537             //switch initiated connection
538             return;
539         }
540         final ReconciliationTask task = new ConnectionReconciliationTask(
541                 reconciliationManager,
542                 this,
543                 iid,
544                 hwvtepNode);
545
546         if (reconciliationManager.isEnqueued(task)) {
547             return;
548         }
549
550         switch (trigger) {
551             case ON_CONTROLLER_INITIATED_CONNECTION_FAILURE:
552                 reconciliationManager.enqueueForRetry(task);
553                 break;
554             case ON_DISCONNECT: {
555                 ReadTransaction tx = db.newReadOnlyTransaction();
556                 FluentFuture<Optional<Node>> readNodeFuture =
557                         tx.read(LogicalDatastoreType.CONFIGURATION, iid);
558
559                 readNodeFuture.addCallback(new FutureCallback<Optional<Node>>() {
560                     @Override
561                     public void onSuccess(final Optional<Node> node) {
562                         if (node.isPresent()) {
563                             HwvtepGlobalAugmentation augmentation = node.get()
564                                     .augmentation(HwvtepGlobalAugmentation.class);
565                             if (augmentation == null || augmentation.getConnectionInfo() == null) {
566                                 return;
567                             }
568                             LOG.info(
569                                 "Disconnected/Failed connection {} was controller initiated, attempting reconnection",
570                                 hwvtepNode.getConnectionInfo());
571                             reconciliationManager.enqueue(task);
572
573                         } else {
574                             LOG.debug("Connection {} was switch initiated, no reconciliation is required",
575                                 iid.firstKeyOf(Node.class).getNodeId());
576                         }
577                     }
578
579                     @Override
580                     public void onFailure(final Throwable ex) {
581                         LOG.warn("Read Config/DS for Node failed! {}", iid, ex);
582                     }
583                 }, MoreExecutors.directExecutor());
584                 break;
585             }
586             default:
587                 break;
588         }
589     }
590
591     public void handleOwnershipChanged(final EntityOwnershipChange ownershipChange) {
592         HwvtepConnectionInstance hwvtepConnectionInstance =
593                 getConnectionInstanceFromEntity(ownershipChange.getEntity());
594         LOG.info("handleOwnershipChanged: {} event received for device {}",
595                 ownershipChange, hwvtepConnectionInstance != null ? hwvtepConnectionInstance.getConnectionInfo()
596                         : "THAT'S NOT REGISTERED BY THIS SOUTHBOUND PLUGIN INSTANCE");
597
598         if (hwvtepConnectionInstance == null) {
599             if (ownershipChange.getState().isOwner()) {
600                 LOG.warn("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity());
601             } else {
602                 // EntityOwnershipService sends notification to all the nodes, irrespective of whether
603                 // that instance registered for the device ownership or not. It is to make sure that
604                 // If all the controller instance that was connected to the device are down, so the
605                 // running instance can clear up the operational data store even though it was not
606                 // connected to the device.
607                 LOG.debug("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity());
608             }
609
610             // If entity has no owner, clean up the operational data store (it's possible because owner controller
611             // might went down abruptly and didn't get a chance to clean up the operational data store.
612             if (!ownershipChange.getState().hasOwner()) {
613                 LOG.info("{} has no owner, cleaning up the operational data store", ownershipChange.getEntity());
614                 // Below code might look weird but it's required. We want to give first opportunity to the
615                 // previous owner of the device to clean up the operational data store if there is no owner now.
616                 // That way we will avoid lot of nasty md-sal exceptions because of concurrent delete.
617                 InstanceIdentifier<Node> nodeIid =
618                         (InstanceIdentifier<Node>) ownershipChange.getEntity().getIdentifier();
619                 hwvtepOperGlobalListener.scheduleOldConnectionNodeDelete(nodeIid);
620                 /*
621                 Assuming node1 was the owner earlier.
622                 If the owner relinquished he would have cleaned it already in which case the above would be a no op
623                 If the owner crashed then the above would clean the node after the scheduled delay
624                 The live nodes (two and three) will try to cleanup but that is ok one of them ends up cleaning.
625                 But if the southbound connects again that connection can itself trigger the pending cleanup and
626                 the above op would become noop again.
627
628                 In summary
629                 In The following cases it would be a noop
630                 1) The southbound connects again within the scheduled cleanup delay.
631                 2) The owner node1 which is not crashed cleaned the node properly.
632
633                 In the following case both node2 and node3 will try to clean it (one of them will succeed ).
634                  1) node1 which was the owner crashed
635                  */
636             }
637             return;
638         }
639         //Connection detail need to be cached, irrespective of ownership result.
640         putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance);
641
642         if (ownershipChange.getState().isOwner() == hwvtepConnectionInstance.getHasDeviceOwnership()) {
643             LOG.debug("handleOwnershipChanged: no change in ownership for {}. Ownership status is : {}",
644                     hwvtepConnectionInstance.getConnectionInfo(), hwvtepConnectionInstance.getHasDeviceOwnership());
645             return;
646         }
647
648         hwvtepConnectionInstance.setHasDeviceOwnership(ownershipChange.getState().isOwner());
649         // You were not an owner, but now you are
650         if (ownershipChange.getState().isOwner()) {
651             LOG.info("handleOwnershipChanged: *this* southbound plugin instance is owner of device {}",
652                     hwvtepConnectionInstance.getConnectionInfo());
653
654             //*this* instance of southbound plugin is owner of the device,
655             //so register for monitor callbacks
656             registerCallbacks(hwvtepConnectionInstance);
657
658         } else {
659             //You were owner of the device, but now you are not. With the current ownership
660             //grant mechanism, this scenario should not occur. Because this scenario will occur
661             //when this controller went down or switch flap the connection, but in both the case
662             //it will go through the re-registration process. We need to implement this condition
663             //when clustering service implement a ownership grant strategy which can revoke the
664             //device ownership for load balancing the devices across the instances.
665             //Once this condition occur, we should unregister the callback.
666             LOG.error("handleOwnershipChanged: *this* southbound plugin instance is no longer the owner of device {}",
667                     hwvtepConnectionInstance.getNodeId().getValue());
668         }
669     }
670
671     private HwvtepConnectionInstance getConnectionInstanceFromEntity(final Entity entity) {
672         return entityConnectionMap.get(entity);
673     }
674
675     public Map<InstanceIdentifier<Node>, TransactionHistory> getControllerTxHistory() {
676         return controllerTxHistory;
677     }
678
679     public Map<InstanceIdentifier<Node>, TransactionHistory> getDeviceUpdateHistory() {
680         return deviceUpdateHistory;
681     }
682
683     private static class HwvtepDeviceEntityOwnershipListener implements EntityOwnershipListener {
684         private final HwvtepConnectionManager hcm;
685         private final EntityOwnershipListenerRegistration listenerRegistration;
686
687         HwvtepDeviceEntityOwnershipListener(final HwvtepConnectionManager hcm,
688                 final EntityOwnershipService entityOwnershipService) {
689             this.hcm = hcm;
690             listenerRegistration = entityOwnershipService.registerListener(ENTITY_TYPE, this);
691         }
692
693         public void close() {
694             listenerRegistration.close();
695         }
696
697         @Override
698         public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
699             hcm.handleOwnershipChanged(ownershipChange);
700         }
701     }
702
703     private enum ConnectionReconciliationTriggers {
704         /*
705         Reconciliation trigger for scenario where controller's attempt
706         to connect to switch fails on config data store notification
707         */
708         ON_CONTROLLER_INITIATED_CONNECTION_FAILURE,
709
710         /*
711         Reconciliation trigger for the scenario where controller
712         initiated connection disconnects.
713         */
714         ON_DISCONNECT
715     }
716
717     public Map<InstanceIdentifier<Node>, HwvtepConnectionInstance> getAllConnectedInstances() {
718         return Collections.unmodifiableMap(nodeIidVsConnectionInstance);
719     }
720
721     public void cleanupOperationalNode(final InstanceIdentifier<Node> nodeIid) {
722         txInvoker.invoke(new HwvtepGlobalRemoveCommand(nodeIid));
723     }
724 }