2 * Copyright (c) 2015, 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.ovsdb.hwvtepsouthbound;
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.ovsdb.lib.operations.Operations.op;
13 import com.google.common.util.concurrent.FluentFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
17 import java.net.ConnectException;
18 import java.net.InetAddress;
19 import java.net.UnknownHostException;
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
24 import java.util.Optional;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.eclipse.jdt.annotation.NonNull;
30 import org.opendaylight.mdsal.binding.api.DataBroker;
31 import org.opendaylight.mdsal.binding.api.ReadTransaction;
32 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
33 import org.opendaylight.mdsal.eos.binding.api.Entity;
34 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipCandidateRegistration;
35 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipChange;
36 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListener;
37 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListenerRegistration;
38 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
39 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
40 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
41 import org.opendaylight.ovsdb.hwvtepsouthbound.events.ClientConnected;
42 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationManager;
43 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationTask;
44 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.configuration.HwvtepReconciliationTask;
45 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.connection.ConnectionReconciliationTask;
46 import org.opendaylight.ovsdb.hwvtepsouthbound.transact.DependencyQueue;
47 import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.HwvtepGlobalRemoveCommand;
48 import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionInvoker;
49 import org.opendaylight.ovsdb.lib.OvsdbClient;
50 import org.opendaylight.ovsdb.lib.OvsdbConnection;
51 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
52 import org.opendaylight.ovsdb.lib.operations.Operation;
53 import org.opendaylight.ovsdb.lib.operations.OperationResult;
54 import org.opendaylight.ovsdb.lib.operations.Select;
55 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
56 import org.opendaylight.ovsdb.lib.schema.typed.TypedDatabaseSchema;
57 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
58 import org.opendaylight.ovsdb.schema.hardwarevtep.Global;
59 import org.opendaylight.ovsdb.utils.mdsal.utils.TransactionHistory;
60 import org.opendaylight.ovsdb.utils.mdsal.utils.TransactionType;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepPhysicalSwitchAttributes;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.ConnectionInfo;
65 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
66 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
70 public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoCloseable {
71 private final Map<ConnectionInfo, HwvtepConnectionInstance> clients = new ConcurrentHashMap<>();
72 private static final Logger LOG = LoggerFactory.getLogger(HwvtepConnectionManager.class);
73 private static final String ENTITY_TYPE = "hwvtep";
74 private static final int DB_FETCH_TIMEOUT = 1000;
75 private static final int TRANSACTION_HISTORY_CAPACITY = 10000;
76 private static final int TRANSACTION_HISTORY_WATERMARK = 7500;
78 private final DataBroker db;
79 private final TransactionInvoker txInvoker;
80 private final Map<ConnectionInfo,InstanceIdentifier<Node>> instanceIdentifiers = new ConcurrentHashMap<>();
81 private final Map<Entity, HwvtepConnectionInstance> entityConnectionMap = new ConcurrentHashMap<>();
82 private final EntityOwnershipService entityOwnershipService;
83 private final HwvtepDeviceEntityOwnershipListener hwvtepDeviceEntityOwnershipListener;
84 private final ReconciliationManager reconciliationManager;
85 private final Map<InstanceIdentifier<Node>, HwvtepConnectionInstance> nodeIidVsConnectionInstance =
86 new ConcurrentHashMap<>();
87 private final HwvtepOperGlobalListener hwvtepOperGlobalListener;
88 private final Map<InstanceIdentifier<Node>, TransactionHistory> controllerTxHistory = new ConcurrentHashMap<>();
89 private final Map<InstanceIdentifier<Node>, TransactionHistory> deviceUpdateHistory = new ConcurrentHashMap<>();
90 private final OvsdbConnection ovsdbConnectionService;
91 private final Map<OvsdbClient, OvsdbClient> alreadyProcessedClients = new ConcurrentHashMap<>();
93 public HwvtepConnectionManager(final DataBroker db, final TransactionInvoker txInvoker,
94 final EntityOwnershipService entityOwnershipService, final OvsdbConnection ovsdbConnectionService) {
96 this.txInvoker = txInvoker;
97 this.entityOwnershipService = entityOwnershipService;
98 this.hwvtepDeviceEntityOwnershipListener = new HwvtepDeviceEntityOwnershipListener(this,entityOwnershipService);
99 this.reconciliationManager = new ReconciliationManager(db);
100 this.hwvtepOperGlobalListener = new HwvtepOperGlobalListener(db, this);
101 this.ovsdbConnectionService = ovsdbConnectionService;
105 public void close() throws Exception {
106 if (hwvtepDeviceEntityOwnershipListener != null) {
107 hwvtepDeviceEntityOwnershipListener.close();
109 if (hwvtepOperGlobalListener != null) {
110 hwvtepOperGlobalListener.close();
113 for (HwvtepConnectionInstance client: clients.values()) {
116 DependencyQueue.close();
120 public void connected(final OvsdbClient externalClient) {
121 if (alreadyProcessedClients.containsKey(externalClient)) {
122 LOG.info("Hwvtep Library already connected {} from {}:{} to {}:{} to this, hence skipping the processing",
123 externalClient.getConnectionInfo().getType(),
124 externalClient.getConnectionInfo().getRemoteAddress(),
125 externalClient.getConnectionInfo().getRemotePort(),
126 externalClient.getConnectionInfo().getLocalAddress(),
127 externalClient.getConnectionInfo().getLocalPort());
130 alreadyProcessedClients.put(externalClient, externalClient);
131 HwvtepConnectionInstance hwClient = null;
133 List<String> databases = externalClient.getDatabases().get(DB_FETCH_TIMEOUT, TimeUnit.MILLISECONDS);
134 if (databases != null && !databases.isEmpty() && databases.contains(HwvtepSchemaConstants.HARDWARE_VTEP)) {
135 LOG.info("Hwvtep Library connected {} from {}:{} to {}:{}",
136 externalClient.getConnectionInfo().getType(),
137 externalClient.getConnectionInfo().getRemoteAddress(),
138 externalClient.getConnectionInfo().getRemotePort(),
139 externalClient.getConnectionInfo().getLocalAddress(),
140 externalClient.getConnectionInfo().getLocalPort());
141 hwClient = connectedButCallBacksNotRegistered(externalClient);
142 registerEntityForOwnership(hwClient);
143 HwvtepOperGlobalListener.runAfterTimeoutIfNodeNotCreated(hwClient.getInstanceIdentifier(), () -> {
144 externalClient.disconnect();
145 disconnected(externalClient);
148 } catch (InterruptedException | ExecutionException | TimeoutException e) {
149 LOG.warn("Unable to fetch Database list from device {}. Disconnecting from the device.",
150 externalClient.getConnectionInfo().getRemoteAddress(), e);
151 externalClient.disconnect();
156 @SuppressFBWarnings("REC_CATCH_EXCEPTION")
157 @SuppressWarnings("checkstyle:IllegalCatch")
158 public void disconnected(final OvsdbClient client) {
159 alreadyProcessedClients.remove(client);
160 HwvtepConnectionInstance hwvtepConnectionInstance = null;
162 LOG.info("Library disconnected {} from {}:{} to {}:{}. Cleaning up the operational data store",
163 client.getConnectionInfo().getType(),
164 client.getConnectionInfo().getRemoteAddress(),
165 client.getConnectionInfo().getRemotePort(),
166 client.getConnectionInfo().getLocalAddress(),
167 client.getConnectionInfo().getLocalPort());
168 ConnectionInfo key = HwvtepSouthboundMapper.createConnectionInfo(client);
169 hwvtepConnectionInstance = getConnectionInstance(key);
170 if (hwvtepConnectionInstance != null) {
171 if (hwvtepConnectionInstance.getInstanceIdentifier() != null) {
172 int port = hwvtepConnectionInstance.getOvsdbClient().getConnectionInfo().getRemotePort();
173 deviceUpdateHistory.get(hwvtepConnectionInstance.getInstanceIdentifier()).addToHistory(
174 TransactionType.DELETE, new ClientConnected(client.getConnectionInfo().getRemotePort()));
175 LOG.info("CONTROLLER - {} {}", TransactionType.DELETE, new ClientConnected(port));
179 // Unregister Entity ownership as soon as possible ,so this instance should
180 // not be used as a candidate in Entity election (given that this instance is
181 // about to disconnect as well), if current owner get disconnected from
183 if (hwvtepConnectionInstance.getHasDeviceOwnership()) {
184 unregisterEntityForOwnership(hwvtepConnectionInstance);
185 LOG.info("Client disconnected from the Leader. Delete the Hvtep Node {} ",
186 hwvtepConnectionInstance.getInstanceIdentifier());
187 txInvoker.invoke(new HwvtepGlobalRemoveCommand(hwvtepConnectionInstance, null, null));
189 unregisterEntityForOwnership(hwvtepConnectionInstance);
190 LOG.info("Client disconnected from the Follower. Not deleteing the Hvtep Node {} ",
191 hwvtepConnectionInstance.getInstanceIdentifier());
193 //Do not delete if client disconnected from follower HwvtepGlobalRemoveCommand
196 removeConnectionInstance(key);
198 //Controller initiated connection can be terminated from switch side.
199 //So cleanup the instance identifier cache.
200 removeInstanceIdentifier(key);
201 removeConnectionInstance(hwvtepConnectionInstance.getInstanceIdentifier());
202 retryConnection(hwvtepConnectionInstance.getInstanceIdentifier(),
203 hwvtepConnectionInstance.getHwvtepGlobalAugmentation(),
204 ConnectionReconciliationTriggers.ON_DISCONNECT);
206 LOG.warn("HWVTEP disconnected event did not find connection instance for {}", key);
208 LOG.trace("HwvtepConnectionManager exit disconnected client: {}", client);
209 } catch (Exception e) {
210 LOG.error("Failed to execute disconnected ",e);
214 public OvsdbClient connect(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepGlobal)
215 throws UnknownHostException, ConnectException {
216 LOG.info("Connecting to {}", HwvtepSouthboundUtil.connectionInfoToString(hwvtepGlobal.getConnectionInfo()));
217 InetAddress ip = HwvtepSouthboundMapper.createInetAddress(hwvtepGlobal.getConnectionInfo().getRemoteIp());
218 OvsdbClient client = ovsdbConnectionService
219 .connect(ip, hwvtepGlobal.getConnectionInfo().getRemotePort().getValue().toJava());
220 if (client != null) {
221 putInstanceIdentifier(hwvtepGlobal.getConnectionInfo(), iid.firstIdentifierOf(Node.class));
222 HwvtepConnectionInstance hwvtepConnectionInstance = connectedButCallBacksNotRegistered(client);
223 hwvtepConnectionInstance.setHwvtepGlobalAugmentation(hwvtepGlobal);
224 hwvtepConnectionInstance.setInstanceIdentifier(iid.firstIdentifierOf(Node.class));
226 // Register Cluster Ownership for ConnectionInfo
227 registerEntityForOwnership(hwvtepConnectionInstance);
229 LOG.warn("Failed to connect to OVSDB node: {}", hwvtepGlobal.getConnectionInfo());
234 public void disconnect(final HwvtepGlobalAugmentation ovsdbNode) throws UnknownHostException {
235 LOG.info("Diconnecting from {}", HwvtepSouthboundUtil.connectionInfoToString(ovsdbNode.getConnectionInfo()));
236 HwvtepConnectionInstance client = getConnectionInstance(ovsdbNode.getConnectionInfo());
237 if (client != null) {
239 // Unregister Cluster Ownership for ConnectionInfo
240 unregisterEntityForOwnership(client);
241 removeInstanceIdentifier(ovsdbNode.getConnectionInfo());
245 public HwvtepConnectionInstance connectedButCallBacksNotRegistered(final OvsdbClient externalClient) {
246 LOG.info("OVSDB Connection from {}:{}",externalClient.getConnectionInfo().getRemoteAddress(),
247 externalClient.getConnectionInfo().getRemotePort());
248 ConnectionInfo key = HwvtepSouthboundMapper.createConnectionInfo(externalClient);
249 HwvtepConnectionInstance hwvtepConnectionInstance = getConnectionInstance(key);
251 // Check if existing hwvtepConnectionInstance for the OvsdbClient present.
252 // In such cases, we will see if the hwvtepConnectionInstance has same externalClient.
253 if (hwvtepConnectionInstance != null) {
254 if (hwvtepConnectionInstance.hasOvsdbClient(externalClient)) {
255 LOG.info("HWVTEP Connection Instance {} already exists for client {}", key, externalClient);
256 return hwvtepConnectionInstance;
258 LOG.info("HWVTEP Connection Instance {} being replaced with client {}", key, externalClient);
259 hwvtepConnectionInstance.disconnect();
261 // Unregister Cluster Ownership for ConnectionInfo
262 // Because the hwvtepConnectionInstance is about to be completely replaced!
263 unregisterEntityForOwnership(hwvtepConnectionInstance);
265 removeConnectionInstance(key);
268 hwvtepConnectionInstance = new HwvtepConnectionInstance(this, key,
269 externalClient, getInstanceIdentifier(key), txInvoker, db);
270 hwvtepConnectionInstance.createTransactInvokers();
271 return hwvtepConnectionInstance;
274 private void putConnectionInstance(final ConnectionInfo key,final HwvtepConnectionInstance instance) {
275 ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
276 clients.put(connectionInfo, instance);
277 LOG.info("Clients after put: {}", clients);
280 void putConnectionInstance(final InstanceIdentifier<Node> nodeIid,
281 final HwvtepConnectionInstance connectionInstance) {
282 nodeIidVsConnectionInstance.put(nodeIid, connectionInstance);
285 public HwvtepConnectionInstance getConnectionInstance(final ConnectionInfo key) {
289 ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
290 return clients.get(connectionInfo);
293 public HwvtepConnectionInstance getConnectionInstance(final Node node) {
294 requireNonNull(node);
295 HwvtepGlobalAugmentation hwvtepGlobal = node.augmentation(HwvtepGlobalAugmentation.class);
296 PhysicalSwitchAugmentation switchNode = node.augmentation(PhysicalSwitchAugmentation.class);
297 if (hwvtepGlobal != null) {
298 if (hwvtepGlobal.getConnectionInfo() != null) {
299 LOG.debug("Get the ConnectionInfo from HwvtepGlobal {}", hwvtepGlobal.getConnectionInfo());
300 return getConnectionInstance(hwvtepGlobal.getConnectionInfo());
302 //TODO: Case of user configured connection for now
303 //TODO: We could get it from Managers also.
307 else if (switchNode != null) {
308 LOG.debug("Get the ConnectionInfo from PhysicalSwitch");
309 return getConnectionInstance(switchNode);
311 LOG.warn("This is not a node that gives any hint how to find its OVSDB Manager: {}",node);
316 public HwvtepConnectionInstance getConnectionInstance(final HwvtepPhysicalSwitchAttributes node) {
317 Optional<HwvtepGlobalAugmentation> optional = HwvtepSouthboundUtil.getManagingNode(db, node);
318 if (optional.isPresent()) {
319 return getConnectionInstance(optional.get().getConnectionInfo());
325 public HwvtepConnectionInstance getConnectionInstanceFromNodeIid(final InstanceIdentifier<Node> nodeIid) {
326 HwvtepConnectionInstance hwvtepConnectionInstance = nodeIidVsConnectionInstance.get(nodeIid);
327 if (hwvtepConnectionInstance != null) {
328 return hwvtepConnectionInstance;
330 InstanceIdentifier<Node> globalNodeIid = HwvtepSouthboundUtil.getGlobalNodeIid(nodeIid);
331 if (globalNodeIid != null) {
332 LOG.debug("Get the ConnectionInfo from HwvtepGlobal : {}", globalNodeIid);
333 return nodeIidVsConnectionInstance.get(globalNodeIid);
338 public void stopConfigurationReconciliation(final InstanceIdentifier<Node> nodeIid) {
339 final ReconciliationTask task = new HwvtepReconciliationTask(
340 reconciliationManager, HwvtepConnectionManager.this, nodeIid, null, null, db);
342 reconciliationManager.dequeue(task);
345 public void reconcileConfigurations(final HwvtepConnectionInstance client, final Node psNode) {
346 final InstanceIdentifier<Node> nodeIid = client.getInstanceIdentifier();
347 final ReconciliationTask task = new HwvtepReconciliationTask(
348 reconciliationManager, HwvtepConnectionManager.this, nodeIid, psNode, client, db);
350 reconciliationManager.enqueue(task);
353 private void removeConnectionInstance(final ConnectionInfo key) {
354 ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
355 clients.remove(connectionInfo);
356 LOG.info("Clients after remove: {}", clients);
359 private void removeConnectionInstance(final InstanceIdentifier<Node> nodeIid) {
360 if (nodeIid != null) {
361 nodeIidVsConnectionInstance.remove(nodeIid);
365 private void putInstanceIdentifier(final ConnectionInfo key,final InstanceIdentifier<Node> iid) {
366 ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
367 instanceIdentifiers.put(connectionInfo, iid);
370 public InstanceIdentifier<Node> getInstanceIdentifier(final ConnectionInfo key) {
371 ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
372 return instanceIdentifiers.get(connectionInfo);
375 private void removeInstanceIdentifier(final ConnectionInfo key) {
376 ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
377 instanceIdentifiers.remove(connectionInfo);
380 public OvsdbClient getClient(final ConnectionInfo connectionInfo) {
381 return getConnectionInstance(connectionInfo).getOvsdbClient();
384 @SuppressWarnings("checkstyle:IllegalCatch")
385 private void registerCallbacks(final HwvtepConnectionInstance hwvtepConnectionInstance) {
386 LOG.info("HWVTEP entity {} is owned by this controller registering callbacks",
387 hwvtepConnectionInstance.getConnectionInfo());
389 hwvtepOperGlobalListener.runAfterNodeDeleted(
390 hwvtepConnectionInstance.getInstanceIdentifier(), () -> {
391 cleanupOperationalNode(hwvtepConnectionInstance.getInstanceIdentifier());
392 hwvtepConnectionInstance.registerCallbacks();
395 } catch (Exception e) {
396 LOG.error("Failed to register callbacks for HWVTEP entity {} ",
397 hwvtepConnectionInstance.getConnectionInfo(), e);
402 private void registerEntityForOwnership(final HwvtepConnectionInstance hwvtepConnectionInstance) {
404 Entity candidateEntity = getEntityFromConnectionInstance(hwvtepConnectionInstance);
405 if (entityConnectionMap.get(candidateEntity) != null) {
406 InstanceIdentifier<Node> iid = hwvtepConnectionInstance.getInstanceIdentifier();
407 LOG.info("Calling disconnect before processing new connection for {}", candidateEntity);
408 disconnected(entityConnectionMap.get(candidateEntity).getOvsdbClient());
409 hwvtepConnectionInstance.setInstanceIdentifier(iid);
410 putConnectionInstance(hwvtepConnectionInstance.getInstanceIdentifier(), hwvtepConnectionInstance);
412 entityConnectionMap.put(candidateEntity, hwvtepConnectionInstance);
413 hwvtepConnectionInstance.setConnectedEntity(candidateEntity);
416 EntityOwnershipCandidateRegistration registration =
417 entityOwnershipService.registerCandidate(candidateEntity);
418 hwvtepConnectionInstance.setDeviceOwnershipCandidateRegistration(registration);
419 LOG.info("HWVTEP entity {} is registered for ownership.", candidateEntity);
420 } catch (CandidateAlreadyRegisteredException e) {
421 LOG.warn("OVSDB entity {} was already registered for ownership", candidateEntity, e);
423 handleOwnershipState(candidateEntity, hwvtepConnectionInstance);
426 private void handleOwnershipState(final Entity candidateEntity,
427 final HwvtepConnectionInstance hwvtepConnectionInstance) {
428 //If entity already has owner, it won't get notification from EntityOwnershipService
429 //so cache the connection instances.
430 java.util.Optional<EntityOwnershipState> ownershipStateOpt =
431 entityOwnershipService.getOwnershipState(candidateEntity);
432 if (ownershipStateOpt.isPresent()) {
433 EntityOwnershipState ownershipState = ownershipStateOpt.get();
434 putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance);
435 if (ownershipState != EntityOwnershipState.NO_OWNER) {
436 hwvtepConnectionInstance.setHasDeviceOwnership(ownershipState == EntityOwnershipState.IS_OWNER);
437 if (ownershipState != EntityOwnershipState.IS_OWNER) {
438 LOG.info("HWVTEP entity {} is already owned by other southbound plugin "
439 + "instance, so *this* instance is NOT an OWNER of the device",
440 hwvtepConnectionInstance.getConnectionInfo());
442 registerCallbacks(hwvtepConnectionInstance);
448 private static Global getHwvtepGlobalTableEntry(final HwvtepConnectionInstance connectionInstance) {
449 final TypedDatabaseSchema dbSchema;
451 dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get();
452 } catch (InterruptedException | ExecutionException e) {
453 LOG.warn("Not able to fetch schema for database {} from device {}",
454 HwvtepSchemaConstants.HARDWARE_VTEP, connectionInstance.getConnectionInfo(), e);
458 GenericTableSchema hwvtepSchema = dbSchema.getTableSchema(Global.class);
459 Select<GenericTableSchema> selectOperation = op.select(hwvtepSchema);
460 selectOperation.setColumns(hwvtepSchema.getColumnList());
462 ArrayList<Operation> operations = new ArrayList<>(2);
463 operations.add(selectOperation);
464 operations.add(op.comment("Fetching hardware_vtep table rows"));
466 final List<OperationResult> results;
468 results = connectionInstance.transact(dbSchema, operations).get();
469 } catch (InterruptedException | ExecutionException e) {
470 LOG.warn("Not able to fetch hardware_vtep table row from device {}", connectionInstance.getConnectionInfo(),
475 final Global globalRow;
476 if (results != null) {
477 OperationResult selectResult = results.get(0);
478 globalRow = TyperUtils.getTypedRowWrapper(dbSchema,Global.class,selectResult.getRows().get(0));
482 LOG.info("Fetched global {} from hardware_vtep schema", globalRow);
486 private Entity getEntityFromConnectionInstance(@NonNull final HwvtepConnectionInstance hwvtepConnectionInstance) {
487 InstanceIdentifier<Node> iid = hwvtepConnectionInstance.getInstanceIdentifier();
489 //TODO: Is Global the right one?
490 Global hwvtepGlobalRow = getHwvtepGlobalTableEntry(hwvtepConnectionInstance);
491 iid = HwvtepSouthboundMapper.getInstanceIdentifier(hwvtepGlobalRow);
492 /* Let's set the iid now */
493 hwvtepConnectionInstance.setInstanceIdentifier(iid);
494 LOG.trace("InstanceIdentifier {} generated for device "
495 + "connection {}",iid, hwvtepConnectionInstance.getConnectionInfo());
496 controllerTxHistory.putIfAbsent(iid,
497 new TransactionHistory(TRANSACTION_HISTORY_CAPACITY, TRANSACTION_HISTORY_WATERMARK));
498 deviceUpdateHistory.putIfAbsent(iid,
499 new TransactionHistory(TRANSACTION_HISTORY_CAPACITY, TRANSACTION_HISTORY_WATERMARK));
500 TransactionHistory controllerLog = controllerTxHistory.get(iid);
501 TransactionHistory deviceLog = deviceUpdateHistory.get(iid);
502 int port = hwvtepConnectionInstance.getOvsdbClient().getConnectionInfo().getRemotePort();
503 deviceLog.addToHistory(TransactionType.ADD, new ClientConnected(port));
504 LOG.info("CONTROLLER - {} {}", TransactionType.ADD, new ClientConnected(port));
505 hwvtepConnectionInstance.setControllerTxHistory(controllerLog);
506 hwvtepConnectionInstance.setDeviceUpdateHistory(deviceLog);
508 Entity deviceEntity = new Entity(ENTITY_TYPE, iid);
509 LOG.debug("Entity {} created for device connection {}",
510 deviceEntity, hwvtepConnectionInstance.getConnectionInfo());
514 private void unregisterEntityForOwnership(final HwvtepConnectionInstance hwvtepConnectionInstance) {
515 hwvtepConnectionInstance.closeDeviceOwnershipCandidateRegistration();
516 entityConnectionMap.remove(hwvtepConnectionInstance.getConnectedEntity());
519 public void reconcileConnection(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepNode) {
520 this.retryConnection(iid, hwvtepNode,
521 ConnectionReconciliationTriggers.ON_CONTROLLER_INITIATED_CONNECTION_FAILURE);
524 public void stopConnectionReconciliationIfActive(final InstanceIdentifier<?> iid,
525 final HwvtepGlobalAugmentation hwvtepNode) {
526 final ReconciliationTask task = new ConnectionReconciliationTask(
527 reconciliationManager,
531 reconciliationManager.dequeue(task);
534 private void retryConnection(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepNode,
535 final ConnectionReconciliationTriggers trigger) {
536 if (hwvtepNode == null) {
537 //switch initiated connection
540 final ReconciliationTask task = new ConnectionReconciliationTask(
541 reconciliationManager,
546 if (reconciliationManager.isEnqueued(task)) {
551 case ON_CONTROLLER_INITIATED_CONNECTION_FAILURE:
552 reconciliationManager.enqueueForRetry(task);
554 case ON_DISCONNECT: {
555 ReadTransaction tx = db.newReadOnlyTransaction();
556 FluentFuture<Optional<Node>> readNodeFuture =
557 tx.read(LogicalDatastoreType.CONFIGURATION, iid);
559 readNodeFuture.addCallback(new FutureCallback<Optional<Node>>() {
561 public void onSuccess(final Optional<Node> node) {
562 if (node.isPresent()) {
563 HwvtepGlobalAugmentation augmentation = node.get()
564 .augmentation(HwvtepGlobalAugmentation.class);
565 if (augmentation == null || augmentation.getConnectionInfo() == null) {
569 "Disconnected/Failed connection {} was controller initiated, attempting reconnection",
570 hwvtepNode.getConnectionInfo());
571 reconciliationManager.enqueue(task);
574 LOG.debug("Connection {} was switch initiated, no reconciliation is required",
575 iid.firstKeyOf(Node.class).getNodeId());
580 public void onFailure(final Throwable ex) {
581 LOG.warn("Read Config/DS for Node failed! {}", iid, ex);
583 }, MoreExecutors.directExecutor());
591 public void handleOwnershipChanged(final EntityOwnershipChange ownershipChange) {
592 HwvtepConnectionInstance hwvtepConnectionInstance =
593 getConnectionInstanceFromEntity(ownershipChange.getEntity());
594 LOG.info("handleOwnershipChanged: {} event received for device {}",
595 ownershipChange, hwvtepConnectionInstance != null ? hwvtepConnectionInstance.getConnectionInfo()
596 : "THAT'S NOT REGISTERED BY THIS SOUTHBOUND PLUGIN INSTANCE");
598 if (hwvtepConnectionInstance == null) {
599 if (ownershipChange.getState().isOwner()) {
600 LOG.warn("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity());
602 // EntityOwnershipService sends notification to all the nodes, irrespective of whether
603 // that instance registered for the device ownership or not. It is to make sure that
604 // If all the controller instance that was connected to the device are down, so the
605 // running instance can clear up the operational data store even though it was not
606 // connected to the device.
607 LOG.debug("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity());
610 // If entity has no owner, clean up the operational data store (it's possible because owner controller
611 // might went down abruptly and didn't get a chance to clean up the operational data store.
612 if (!ownershipChange.getState().hasOwner()) {
613 LOG.info("{} has no owner, cleaning up the operational data store", ownershipChange.getEntity());
614 // Below code might look weird but it's required. We want to give first opportunity to the
615 // previous owner of the device to clean up the operational data store if there is no owner now.
616 // That way we will avoid lot of nasty md-sal exceptions because of concurrent delete.
617 InstanceIdentifier<Node> nodeIid =
618 (InstanceIdentifier<Node>) ownershipChange.getEntity().getIdentifier();
619 hwvtepOperGlobalListener.scheduleOldConnectionNodeDelete(nodeIid);
621 Assuming node1 was the owner earlier.
622 If the owner relinquished he would have cleaned it already in which case the above would be a no op
623 If the owner crashed then the above would clean the node after the scheduled delay
624 The live nodes (two and three) will try to cleanup but that is ok one of them ends up cleaning.
625 But if the southbound connects again that connection can itself trigger the pending cleanup and
626 the above op would become noop again.
629 In The following cases it would be a noop
630 1) The southbound connects again within the scheduled cleanup delay.
631 2) The owner node1 which is not crashed cleaned the node properly.
633 In the following case both node2 and node3 will try to clean it (one of them will succeed ).
634 1) node1 which was the owner crashed
639 //Connection detail need to be cached, irrespective of ownership result.
640 putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance);
642 if (ownershipChange.getState().isOwner() == hwvtepConnectionInstance.getHasDeviceOwnership()) {
643 LOG.debug("handleOwnershipChanged: no change in ownership for {}. Ownership status is : {}",
644 hwvtepConnectionInstance.getConnectionInfo(), hwvtepConnectionInstance.getHasDeviceOwnership());
648 hwvtepConnectionInstance.setHasDeviceOwnership(ownershipChange.getState().isOwner());
649 // You were not an owner, but now you are
650 if (ownershipChange.getState().isOwner()) {
651 LOG.info("handleOwnershipChanged: *this* southbound plugin instance is owner of device {}",
652 hwvtepConnectionInstance.getConnectionInfo());
654 //*this* instance of southbound plugin is owner of the device,
655 //so register for monitor callbacks
656 registerCallbacks(hwvtepConnectionInstance);
659 //You were owner of the device, but now you are not. With the current ownership
660 //grant mechanism, this scenario should not occur. Because this scenario will occur
661 //when this controller went down or switch flap the connection, but in both the case
662 //it will go through the re-registration process. We need to implement this condition
663 //when clustering service implement a ownership grant strategy which can revoke the
664 //device ownership for load balancing the devices across the instances.
665 //Once this condition occur, we should unregister the callback.
666 LOG.error("handleOwnershipChanged: *this* southbound plugin instance is no longer the owner of device {}",
667 hwvtepConnectionInstance.getNodeId().getValue());
671 private HwvtepConnectionInstance getConnectionInstanceFromEntity(final Entity entity) {
672 return entityConnectionMap.get(entity);
675 public Map<InstanceIdentifier<Node>, TransactionHistory> getControllerTxHistory() {
676 return controllerTxHistory;
679 public Map<InstanceIdentifier<Node>, TransactionHistory> getDeviceUpdateHistory() {
680 return deviceUpdateHistory;
683 private static class HwvtepDeviceEntityOwnershipListener implements EntityOwnershipListener {
684 private final HwvtepConnectionManager hcm;
685 private final EntityOwnershipListenerRegistration listenerRegistration;
687 HwvtepDeviceEntityOwnershipListener(final HwvtepConnectionManager hcm,
688 final EntityOwnershipService entityOwnershipService) {
690 listenerRegistration = entityOwnershipService.registerListener(ENTITY_TYPE, this);
693 public void close() {
694 listenerRegistration.close();
698 public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
699 hcm.handleOwnershipChanged(ownershipChange);
703 private enum ConnectionReconciliationTriggers {
705 Reconciliation trigger for scenario where controller's attempt
706 to connect to switch fails on config data store notification
708 ON_CONTROLLER_INITIATED_CONNECTION_FAILURE,
711 Reconciliation trigger for the scenario where controller
712 initiated connection disconnects.
717 public Map<InstanceIdentifier<Node>, HwvtepConnectionInstance> getAllConnectedInstances() {
718 return Collections.unmodifiableMap(nodeIidVsConnectionInstance);
721 public void cleanupOperationalNode(final InstanceIdentifier<Node> nodeIid) {
722 txInvoker.invoke(new HwvtepGlobalRemoveCommand(nodeIid));