2 * Copyright (c) 2015, 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.ovsdb.hwvtepsouthbound;
10 import static java.util.Objects.requireNonNull;
11 import static org.opendaylight.ovsdb.lib.operations.Operations.op;
13 import com.google.common.collect.Maps;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import java.net.ConnectException;
19 import java.net.InetAddress;
20 import java.net.UnknownHostException;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
25 import java.util.Optional;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import org.eclipse.jdt.annotation.NonNull;
31 import org.opendaylight.mdsal.binding.api.DataBroker;
32 import org.opendaylight.mdsal.binding.api.ReadTransaction;
33 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
34 import org.opendaylight.mdsal.eos.binding.api.Entity;
35 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipCandidateRegistration;
36 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipChange;
37 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListener;
38 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListenerRegistration;
39 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
40 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
41 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
42 import org.opendaylight.ovsdb.hwvtepsouthbound.events.ClientConnected;
43 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationManager;
44 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationTask;
45 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.configuration.HwvtepReconciliationTask;
46 import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.connection.ConnectionReconciliationTask;
47 import org.opendaylight.ovsdb.hwvtepsouthbound.transact.DependencyQueue;
48 import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.HwvtepGlobalRemoveCommand;
49 import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionInvoker;
50 import org.opendaylight.ovsdb.lib.OvsdbClient;
51 import org.opendaylight.ovsdb.lib.OvsdbConnection;
52 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
53 import org.opendaylight.ovsdb.lib.operations.Operation;
54 import org.opendaylight.ovsdb.lib.operations.OperationResult;
55 import org.opendaylight.ovsdb.lib.operations.Select;
56 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
57 import org.opendaylight.ovsdb.lib.schema.typed.TypedDatabaseSchema;
58 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
59 import org.opendaylight.ovsdb.schema.hardwarevtep.Global;
60 import org.opendaylight.ovsdb.utils.mdsal.utils.TransactionHistory;
61 import org.opendaylight.ovsdb.utils.mdsal.utils.TransactionType;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepPhysicalSwitchAttributes;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.ConnectionInfo;
66 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
67 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
71 public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoCloseable {
72 private final Map<ConnectionInfo, HwvtepConnectionInstance> clients = new ConcurrentHashMap<>();
73 private static final Logger LOG = LoggerFactory.getLogger(HwvtepConnectionManager.class);
74 private static final String ENTITY_TYPE = "hwvtep";
75 private static final int DB_FETCH_TIMEOUT = 1000;
76 private static final int TRANSACTION_HISTORY_CAPACITY = 10000;
77 private static final int TRANSACTION_HISTORY_WATERMARK = 7500;
79 private final DataBroker db;
80 private final TransactionInvoker txInvoker;
81 private final Map<ConnectionInfo,InstanceIdentifier<Node>> instanceIdentifiers = new ConcurrentHashMap<>();
82 private final Map<Entity, HwvtepConnectionInstance> entityConnectionMap = new ConcurrentHashMap<>();
83 private final EntityOwnershipService entityOwnershipService;
84 private final HwvtepDeviceEntityOwnershipListener hwvtepDeviceEntityOwnershipListener;
85 private final ReconciliationManager reconciliationManager;
86 private final Map<InstanceIdentifier<Node>, HwvtepConnectionInstance> nodeIidVsConnectionInstance =
87 new ConcurrentHashMap<>();
88 private final HwvtepOperGlobalListener hwvtepOperGlobalListener;
89 private final Map<InstanceIdentifier<Node>, TransactionHistory> controllerTxHistory = new ConcurrentHashMap<>();
90 private final Map<InstanceIdentifier<Node>, TransactionHistory> deviceUpdateHistory = new ConcurrentHashMap<>();
91 private final OvsdbConnection ovsdbConnectionService;
92 private final Map<OvsdbClient, OvsdbClient> alreadyProcessedClients = new ConcurrentHashMap<>();
94 public HwvtepConnectionManager(final DataBroker db, final TransactionInvoker txInvoker,
95 final EntityOwnershipService entityOwnershipService, final OvsdbConnection ovsdbConnectionService) {
97 this.txInvoker = txInvoker;
98 this.entityOwnershipService = entityOwnershipService;
99 this.hwvtepDeviceEntityOwnershipListener = new HwvtepDeviceEntityOwnershipListener(this,entityOwnershipService);
100 this.reconciliationManager = new ReconciliationManager(db);
101 this.hwvtepOperGlobalListener = new HwvtepOperGlobalListener(db, this);
102 this.ovsdbConnectionService = ovsdbConnectionService;
106 public void close() throws Exception {
107 if (hwvtepDeviceEntityOwnershipListener != null) {
108 hwvtepDeviceEntityOwnershipListener.close();
110 if (hwvtepOperGlobalListener != null) {
111 hwvtepOperGlobalListener.close();
114 for (HwvtepConnectionInstance client: clients.values()) {
117 DependencyQueue.close();
121 public void connected(final OvsdbClient externalClient) {
122 if (alreadyProcessedClients.containsKey(externalClient)) {
123 LOG.info("Hwvtep Library already connected {} from {}:{} to {}:{} to this, hence skipping the processing",
124 externalClient.getConnectionInfo().getType(),
125 externalClient.getConnectionInfo().getRemoteAddress(),
126 externalClient.getConnectionInfo().getRemotePort(),
127 externalClient.getConnectionInfo().getLocalAddress(),
128 externalClient.getConnectionInfo().getLocalPort());
131 alreadyProcessedClients.put(externalClient, externalClient);
132 HwvtepConnectionInstance hwClient = null;
134 List<String> databases = externalClient.getDatabases().get(DB_FETCH_TIMEOUT, TimeUnit.MILLISECONDS);
135 if (databases != null && !databases.isEmpty() && databases.contains(HwvtepSchemaConstants.HARDWARE_VTEP)) {
136 LOG.info("Hwvtep Library connected {} from {}:{} to {}:{}",
137 externalClient.getConnectionInfo().getType(),
138 externalClient.getConnectionInfo().getRemoteAddress(),
139 externalClient.getConnectionInfo().getRemotePort(),
140 externalClient.getConnectionInfo().getLocalAddress(),
141 externalClient.getConnectionInfo().getLocalPort());
142 hwClient = connectedButCallBacksNotRegistered(externalClient);
143 registerEntityForOwnership(hwClient);
144 HwvtepOperGlobalListener.runAfterTimeoutIfNodeNotCreated(hwClient.getInstanceIdentifier(), () -> {
145 externalClient.disconnect();
146 disconnected(externalClient);
149 } catch (InterruptedException | ExecutionException | TimeoutException e) {
150 LOG.warn("Unable to fetch Database list from device {}. Disconnecting from the device.",
151 externalClient.getConnectionInfo().getRemoteAddress(), e);
152 externalClient.disconnect();
157 @SuppressFBWarnings("REC_CATCH_EXCEPTION")
158 @SuppressWarnings("checkstyle:IllegalCatch")
159 public void disconnected(final OvsdbClient client) {
160 alreadyProcessedClients.remove(client);
161 HwvtepConnectionInstance hwvtepConnectionInstance = null;
163 LOG.info("Library disconnected {} from {}:{} to {}:{}. Cleaning up the operational data store",
164 client.getConnectionInfo().getType(),
165 client.getConnectionInfo().getRemoteAddress(),
166 client.getConnectionInfo().getRemotePort(),
167 client.getConnectionInfo().getLocalAddress(),
168 client.getConnectionInfo().getLocalPort());
169 ConnectionInfo key = HwvtepSouthboundMapper.createConnectionInfo(client);
170 hwvtepConnectionInstance = getConnectionInstance(key);
171 if (hwvtepConnectionInstance != null) {
172 if (hwvtepConnectionInstance.getInstanceIdentifier() != null) {
173 int port = hwvtepConnectionInstance.getOvsdbClient().getConnectionInfo().getRemotePort();
174 deviceUpdateHistory.get(hwvtepConnectionInstance.getInstanceIdentifier()).addToHistory(
175 TransactionType.DELETE, new ClientConnected(client.getConnectionInfo().getRemotePort()));
176 LOG.info("CONTROLLER - {} {}", TransactionType.DELETE, new ClientConnected(port));
180 // Unregister Entity ownership as soon as possible ,so this instance should
181 // not be used as a candidate in Entity election (given that this instance is
182 // about to disconnect as well), if current owner get disconnected from
184 if (hwvtepConnectionInstance.getHasDeviceOwnership()) {
185 unregisterEntityForOwnership(hwvtepConnectionInstance);
186 LOG.info("Client disconnected from the Leader. Delete the Hvtep Node {} ",
187 hwvtepConnectionInstance.getInstanceIdentifier());
188 txInvoker.invoke(new HwvtepGlobalRemoveCommand(hwvtepConnectionInstance, null, null));
190 unregisterEntityForOwnership(hwvtepConnectionInstance);
191 LOG.info("Client disconnected from the Follower. Not deleteing the Hvtep Node {} ",
192 hwvtepConnectionInstance.getInstanceIdentifier());
194 //Do not delete if client disconnected from follower HwvtepGlobalRemoveCommand
197 removeConnectionInstance(key);
199 //Controller initiated connection can be terminated from switch side.
200 //So cleanup the instance identifier cache.
201 removeInstanceIdentifier(key);
202 removeConnectionInstance(hwvtepConnectionInstance.getInstanceIdentifier());
203 retryConnection(hwvtepConnectionInstance.getInstanceIdentifier(),
204 hwvtepConnectionInstance.getHwvtepGlobalAugmentation(),
205 ConnectionReconciliationTriggers.ON_DISCONNECT);
207 LOG.warn("HWVTEP disconnected event did not find connection instance for {}", key);
209 LOG.trace("HwvtepConnectionManager exit disconnected client: {}", client);
210 } catch (Exception e) {
211 LOG.error("Failed to execute disconnected ",e);
215 public OvsdbClient connect(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepGlobal)
216 throws UnknownHostException, ConnectException {
217 LOG.info("Connecting to {}", HwvtepSouthboundUtil.connectionInfoToString(hwvtepGlobal.getConnectionInfo()));
218 InetAddress ip = HwvtepSouthboundMapper.createInetAddress(hwvtepGlobal.getConnectionInfo().getRemoteIp());
219 OvsdbClient client = ovsdbConnectionService
220 .connect(ip, hwvtepGlobal.getConnectionInfo().getRemotePort().getValue().toJava());
221 if (client != null) {
222 putInstanceIdentifier(hwvtepGlobal.getConnectionInfo(), iid.firstIdentifierOf(Node.class));
223 HwvtepConnectionInstance hwvtepConnectionInstance = connectedButCallBacksNotRegistered(client);
224 hwvtepConnectionInstance.setHwvtepGlobalAugmentation(hwvtepGlobal);
225 hwvtepConnectionInstance.setInstanceIdentifier(iid.firstIdentifierOf(Node.class));
227 // Register Cluster Ownership for ConnectionInfo
228 registerEntityForOwnership(hwvtepConnectionInstance);
230 LOG.warn("Failed to connect to OVSDB node: {}", hwvtepGlobal.getConnectionInfo());
235 public void disconnect(final HwvtepGlobalAugmentation ovsdbNode) throws UnknownHostException {
236 LOG.info("Diconnecting from {}", HwvtepSouthboundUtil.connectionInfoToString(ovsdbNode.getConnectionInfo()));
237 HwvtepConnectionInstance client = getConnectionInstance(ovsdbNode.getConnectionInfo());
238 if (client != null) {
240 // Unregister Cluster Ownership for ConnectionInfo
241 unregisterEntityForOwnership(client);
242 removeInstanceIdentifier(ovsdbNode.getConnectionInfo());
246 public HwvtepConnectionInstance connectedButCallBacksNotRegistered(final OvsdbClient externalClient) {
247 LOG.info("OVSDB Connection from {}:{}",externalClient.getConnectionInfo().getRemoteAddress(),
248 externalClient.getConnectionInfo().getRemotePort());
249 ConnectionInfo key = HwvtepSouthboundMapper.createConnectionInfo(externalClient);
250 HwvtepConnectionInstance hwvtepConnectionInstance = getConnectionInstance(key);
252 // Check if existing hwvtepConnectionInstance for the OvsdbClient present.
253 // In such cases, we will see if the hwvtepConnectionInstance has same externalClient.
254 if (hwvtepConnectionInstance != null) {
255 if (hwvtepConnectionInstance.hasOvsdbClient(externalClient)) {
256 LOG.info("HWVTEP Connection Instance {} already exists for client {}", key, externalClient);
257 return hwvtepConnectionInstance;
259 LOG.info("HWVTEP Connection Instance {} being replaced with client {}", key, externalClient);
260 hwvtepConnectionInstance.disconnect();
262 // Unregister Cluster Ownership for ConnectionInfo
263 // Because the hwvtepConnectionInstance is about to be completely replaced!
264 unregisterEntityForOwnership(hwvtepConnectionInstance);
266 removeConnectionInstance(key);
269 hwvtepConnectionInstance = new HwvtepConnectionInstance(this, key,
270 externalClient, getInstanceIdentifier(key), txInvoker, db);
271 hwvtepConnectionInstance.createTransactInvokers();
272 return hwvtepConnectionInstance;
275 private void putConnectionInstance(final ConnectionInfo key,final HwvtepConnectionInstance instance) {
276 ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
277 clients.put(connectionInfo, instance);
278 LOG.info("Clients after put: {}", clients);
281 void putConnectionInstance(final InstanceIdentifier<Node> nodeIid,
282 final HwvtepConnectionInstance connectionInstance) {
283 nodeIidVsConnectionInstance.put(nodeIid, connectionInstance);
286 public HwvtepConnectionInstance getConnectionInstance(final ConnectionInfo key) {
290 ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
291 return clients.get(connectionInfo);
294 public HwvtepConnectionInstance getConnectionInstance(final Node node) {
295 requireNonNull(node);
296 HwvtepGlobalAugmentation hwvtepGlobal = node.augmentation(HwvtepGlobalAugmentation.class);
297 PhysicalSwitchAugmentation switchNode = node.augmentation(PhysicalSwitchAugmentation.class);
298 if (hwvtepGlobal != null) {
299 if (hwvtepGlobal.getConnectionInfo() != null) {
300 LOG.debug("Get the ConnectionInfo from HwvtepGlobal {}", hwvtepGlobal.getConnectionInfo());
301 return getConnectionInstance(hwvtepGlobal.getConnectionInfo());
303 //TODO: Case of user configured connection for now
304 //TODO: We could get it from Managers also.
308 else if (switchNode != null) {
309 LOG.debug("Get the ConnectionInfo from PhysicalSwitch");
310 return getConnectionInstance(switchNode);
312 LOG.warn("This is not a node that gives any hint how to find its OVSDB Manager: {}",node);
317 public HwvtepConnectionInstance getConnectionInstance(final HwvtepPhysicalSwitchAttributes node) {
318 Optional<HwvtepGlobalAugmentation> optional = HwvtepSouthboundUtil.getManagingNode(db, node);
319 if (optional.isPresent()) {
320 return getConnectionInstance(optional.get().getConnectionInfo());
326 public HwvtepConnectionInstance getConnectionInstanceFromNodeIid(final InstanceIdentifier<Node> nodeIid) {
327 HwvtepConnectionInstance hwvtepConnectionInstance = nodeIidVsConnectionInstance.get(nodeIid);
328 if (hwvtepConnectionInstance != null) {
329 return hwvtepConnectionInstance;
331 InstanceIdentifier<Node> globalNodeIid = HwvtepSouthboundUtil.getGlobalNodeIid(nodeIid);
332 if (globalNodeIid != null) {
333 LOG.debug("Get the ConnectionInfo from HwvtepGlobal : {}", globalNodeIid);
334 return nodeIidVsConnectionInstance.get(globalNodeIid);
339 public void stopConfigurationReconciliation(final InstanceIdentifier<Node> nodeIid) {
340 final ReconciliationTask task = new HwvtepReconciliationTask(
341 reconciliationManager, HwvtepConnectionManager.this, nodeIid, null, null, db);
343 reconciliationManager.dequeue(task);
346 public void reconcileConfigurations(final HwvtepConnectionInstance client, final Node psNode) {
347 final InstanceIdentifier<Node> nodeIid = client.getInstanceIdentifier();
348 final ReconciliationTask task = new HwvtepReconciliationTask(
349 reconciliationManager, HwvtepConnectionManager.this, nodeIid, psNode, client, db);
351 reconciliationManager.enqueue(task);
354 private void removeConnectionInstance(final ConnectionInfo key) {
355 ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
356 clients.remove(connectionInfo);
357 LOG.info("Clients after remove: {}", clients);
360 private void removeConnectionInstance(final InstanceIdentifier<Node> nodeIid) {
361 if (nodeIid != null) {
362 nodeIidVsConnectionInstance.remove(nodeIid);
366 private void putInstanceIdentifier(final ConnectionInfo key,final InstanceIdentifier<Node> iid) {
367 ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
368 instanceIdentifiers.put(connectionInfo, iid);
371 public InstanceIdentifier<Node> getInstanceIdentifier(final ConnectionInfo key) {
372 ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
373 return instanceIdentifiers.get(connectionInfo);
376 private void removeInstanceIdentifier(final ConnectionInfo key) {
377 ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key);
378 instanceIdentifiers.remove(connectionInfo);
381 public OvsdbClient getClient(final ConnectionInfo connectionInfo) {
382 return getConnectionInstance(connectionInfo).getOvsdbClient();
385 @SuppressWarnings("checkstyle:IllegalCatch")
386 private void registerCallbacks(final HwvtepConnectionInstance hwvtepConnectionInstance) {
387 LOG.info("HWVTEP entity {} is owned by this controller registering callbacks",
388 hwvtepConnectionInstance.getConnectionInfo());
390 hwvtepOperGlobalListener.runAfterNodeDeleted(
391 hwvtepConnectionInstance.getInstanceIdentifier(), () -> {
392 cleanupOperationalNode(hwvtepConnectionInstance.getInstanceIdentifier());
393 hwvtepConnectionInstance.registerCallbacks();
396 } catch (Exception e) {
397 LOG.error("Failed to register callbacks for HWVTEP entity {} ",
398 hwvtepConnectionInstance.getConnectionInfo(), e);
403 private void registerEntityForOwnership(final HwvtepConnectionInstance hwvtepConnectionInstance) {
405 Entity candidateEntity = getEntityFromConnectionInstance(hwvtepConnectionInstance);
406 if (entityConnectionMap.get(candidateEntity) != null) {
407 InstanceIdentifier<Node> iid = hwvtepConnectionInstance.getInstanceIdentifier();
408 LOG.info("Calling disconnect before processing new connection for {}", candidateEntity);
409 disconnected(entityConnectionMap.get(candidateEntity).getOvsdbClient());
410 hwvtepConnectionInstance.setInstanceIdentifier(iid);
411 putConnectionInstance(hwvtepConnectionInstance.getInstanceIdentifier(), hwvtepConnectionInstance);
413 entityConnectionMap.put(candidateEntity, hwvtepConnectionInstance);
414 hwvtepConnectionInstance.setConnectedEntity(candidateEntity);
417 EntityOwnershipCandidateRegistration registration =
418 entityOwnershipService.registerCandidate(candidateEntity);
419 hwvtepConnectionInstance.setDeviceOwnershipCandidateRegistration(registration);
420 LOG.info("HWVTEP entity {} is registered for ownership.", candidateEntity);
421 } catch (CandidateAlreadyRegisteredException e) {
422 LOG.warn("OVSDB entity {} was already registered for ownership", candidateEntity, e);
424 handleOwnershipState(candidateEntity, hwvtepConnectionInstance);
427 private void handleOwnershipState(final Entity candidateEntity,
428 final HwvtepConnectionInstance hwvtepConnectionInstance) {
429 //If entity already has owner, it won't get notification from EntityOwnershipService
430 //so cache the connection instances.
431 java.util.Optional<EntityOwnershipState> ownershipStateOpt =
432 entityOwnershipService.getOwnershipState(candidateEntity);
433 if (ownershipStateOpt.isPresent()) {
434 EntityOwnershipState ownershipState = ownershipStateOpt.get();
435 putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance);
436 if (ownershipState != EntityOwnershipState.NO_OWNER) {
437 hwvtepConnectionInstance.setHasDeviceOwnership(ownershipState == EntityOwnershipState.IS_OWNER);
438 if (ownershipState != EntityOwnershipState.IS_OWNER) {
439 LOG.info("HWVTEP entity {} is already owned by other southbound plugin "
440 + "instance, so *this* instance is NOT an OWNER of the device",
441 hwvtepConnectionInstance.getConnectionInfo());
443 registerCallbacks(hwvtepConnectionInstance);
449 private static Global getHwvtepGlobalTableEntry(final HwvtepConnectionInstance connectionInstance) {
450 final TypedDatabaseSchema dbSchema;
452 dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get();
453 } catch (InterruptedException | ExecutionException e) {
454 LOG.warn("Not able to fetch schema for database {} from device {}",
455 HwvtepSchemaConstants.HARDWARE_VTEP, connectionInstance.getConnectionInfo(), e);
459 GenericTableSchema hwvtepSchema = dbSchema.getTableSchema(Global.class);
460 Select<GenericTableSchema> selectOperation = op.select(hwvtepSchema);
461 selectOperation.setColumns(hwvtepSchema.getColumnList());
463 ArrayList<Operation> operations = new ArrayList<>(2);
464 operations.add(selectOperation);
465 operations.add(op.comment("Fetching hardware_vtep table rows"));
467 final List<OperationResult> results;
469 results = connectionInstance.transact(dbSchema, operations).get();
470 } catch (InterruptedException | ExecutionException e) {
471 LOG.warn("Not able to fetch hardware_vtep table row from device {}", connectionInstance.getConnectionInfo(),
476 final Global globalRow;
477 if (results != null) {
478 OperationResult selectResult = results.get(0);
479 globalRow = TyperUtils.getTypedRowWrapper(dbSchema,Global.class,selectResult.getRows().get(0));
483 LOG.info("Fetched global {} from hardware_vtep schema", globalRow);
487 private Entity getEntityFromConnectionInstance(@NonNull final HwvtepConnectionInstance hwvtepConnectionInstance) {
488 InstanceIdentifier<Node> iid = hwvtepConnectionInstance.getInstanceIdentifier();
490 //TODO: Is Global the right one?
491 Global hwvtepGlobalRow = getHwvtepGlobalTableEntry(hwvtepConnectionInstance);
492 iid = HwvtepSouthboundMapper.getInstanceIdentifier(hwvtepGlobalRow);
493 /* Let's set the iid now */
494 hwvtepConnectionInstance.setInstanceIdentifier(iid);
495 LOG.trace("InstanceIdentifier {} generated for device "
496 + "connection {}",iid, hwvtepConnectionInstance.getConnectionInfo());
497 controllerTxHistory.putIfAbsent(iid,
498 new TransactionHistory(TRANSACTION_HISTORY_CAPACITY, TRANSACTION_HISTORY_WATERMARK));
499 deviceUpdateHistory.putIfAbsent(iid,
500 new TransactionHistory(TRANSACTION_HISTORY_CAPACITY, TRANSACTION_HISTORY_WATERMARK));
501 TransactionHistory controllerLog = controllerTxHistory.get(iid);
502 TransactionHistory deviceLog = deviceUpdateHistory.get(iid);
503 int port = hwvtepConnectionInstance.getOvsdbClient().getConnectionInfo().getRemotePort();
504 deviceLog.addToHistory(TransactionType.ADD, new ClientConnected(port));
505 LOG.info("CONTROLLER - {} {}", TransactionType.ADD, new ClientConnected(port));
506 hwvtepConnectionInstance.setControllerTxHistory(controllerLog);
507 hwvtepConnectionInstance.setDeviceUpdateHistory(deviceLog);
509 Entity deviceEntity = new Entity(ENTITY_TYPE, iid);
510 LOG.debug("Entity {} created for device connection {}",
511 deviceEntity, hwvtepConnectionInstance.getConnectionInfo());
515 private void unregisterEntityForOwnership(final HwvtepConnectionInstance hwvtepConnectionInstance) {
516 hwvtepConnectionInstance.closeDeviceOwnershipCandidateRegistration();
517 entityConnectionMap.remove(hwvtepConnectionInstance.getConnectedEntity());
520 public void reconcileConnection(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepNode) {
521 this.retryConnection(iid, hwvtepNode,
522 ConnectionReconciliationTriggers.ON_CONTROLLER_INITIATED_CONNECTION_FAILURE);
525 public void stopConnectionReconciliationIfActive(final InstanceIdentifier<?> iid,
526 final HwvtepGlobalAugmentation hwvtepNode) {
527 final ReconciliationTask task = new ConnectionReconciliationTask(
528 reconciliationManager,
532 reconciliationManager.dequeue(task);
535 private void retryConnection(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepNode,
536 final ConnectionReconciliationTriggers trigger) {
537 if (hwvtepNode == null) {
538 //switch initiated connection
541 final ReconciliationTask task = new ConnectionReconciliationTask(
542 reconciliationManager,
547 if (reconciliationManager.isEnqueued(task)) {
552 case ON_CONTROLLER_INITIATED_CONNECTION_FAILURE:
553 reconciliationManager.enqueueForRetry(task);
555 case ON_DISCONNECT: {
556 ReadTransaction tx = db.newReadOnlyTransaction();
557 FluentFuture<Optional<Node>> readNodeFuture =
558 tx.read(LogicalDatastoreType.CONFIGURATION, iid);
560 readNodeFuture.addCallback(new FutureCallback<Optional<Node>>() {
562 public void onSuccess(final Optional<Node> node) {
563 if (node.isPresent()) {
564 HwvtepGlobalAugmentation augmentation = node.get()
565 .augmentation(HwvtepGlobalAugmentation.class);
566 if (augmentation == null || augmentation.getConnectionInfo() == null) {
570 "Disconnected/Failed connection {} was controller initiated, attempting reconnection",
571 hwvtepNode.getConnectionInfo());
572 reconciliationManager.enqueue(task);
575 LOG.debug("Connection {} was switch initiated, no reconciliation is required",
576 iid.firstKeyOf(Node.class).getNodeId());
581 public void onFailure(final Throwable ex) {
582 LOG.warn("Read Config/DS for Node failed! {}", iid, ex);
584 }, MoreExecutors.directExecutor());
592 public void handleOwnershipChanged(final EntityOwnershipChange ownershipChange) {
593 HwvtepConnectionInstance hwvtepConnectionInstance =
594 getConnectionInstanceFromEntity(ownershipChange.getEntity());
595 LOG.info("handleOwnershipChanged: {} event received for device {}",
596 ownershipChange, hwvtepConnectionInstance != null ? hwvtepConnectionInstance.getConnectionInfo()
597 : "THAT'S NOT REGISTERED BY THIS SOUTHBOUND PLUGIN INSTANCE");
599 if (hwvtepConnectionInstance == null) {
600 if (ownershipChange.getState().isOwner()) {
601 LOG.warn("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity());
603 // EntityOwnershipService sends notification to all the nodes, irrespective of whether
604 // that instance registered for the device ownership or not. It is to make sure that
605 // If all the controller instance that was connected to the device are down, so the
606 // running instance can clear up the operational data store even though it was not
607 // connected to the device.
608 LOG.debug("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity());
611 // If entity has no owner, clean up the operational data store (it's possible because owner controller
612 // might went down abruptly and didn't get a chance to clean up the operational data store.
613 if (!ownershipChange.getState().hasOwner()) {
614 LOG.info("{} has no owner, cleaning up the operational data store", ownershipChange.getEntity());
615 // Below code might look weird but it's required. We want to give first opportunity to the
616 // previous owner of the device to clean up the operational data store if there is no owner now.
617 // That way we will avoid lot of nasty md-sal exceptions because of concurrent delete.
618 InstanceIdentifier<Node> nodeIid =
619 (InstanceIdentifier<Node>) ownershipChange.getEntity().getIdentifier();
620 hwvtepOperGlobalListener.scheduleOldConnectionNodeDelete(nodeIid);
622 Assuming node1 was the owner earlier.
623 If the owner relinquished he would have cleaned it already in which case the above would be a no op
624 If the owner crashed then the above would clean the node after the scheduled delay
625 The live nodes (two and three) will try to cleanup but that is ok one of them ends up cleaning.
626 But if the southbound connects again that connection can itself trigger the pending cleanup and
627 the above op would become noop again.
630 In The following cases it would be a noop
631 1) The southbound connects again within the scheduled cleanup delay.
632 2) The owner node1 which is not crashed cleaned the node properly.
634 In the following case both node2 and node3 will try to clean it (one of them will succeed ).
635 1) node1 which was the owner crashed
640 //Connection detail need to be cached, irrespective of ownership result.
641 putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance);
643 if (ownershipChange.getState().isOwner() == hwvtepConnectionInstance.getHasDeviceOwnership()) {
644 LOG.debug("handleOwnershipChanged: no change in ownership for {}. Ownership status is : {}",
645 hwvtepConnectionInstance.getConnectionInfo(), hwvtepConnectionInstance.getHasDeviceOwnership());
649 hwvtepConnectionInstance.setHasDeviceOwnership(ownershipChange.getState().isOwner());
650 // You were not an owner, but now you are
651 if (ownershipChange.getState().isOwner()) {
652 LOG.info("handleOwnershipChanged: *this* southbound plugin instance is owner of device {}",
653 hwvtepConnectionInstance.getConnectionInfo());
655 //*this* instance of southbound plugin is owner of the device,
656 //so register for monitor callbacks
657 registerCallbacks(hwvtepConnectionInstance);
660 //You were owner of the device, but now you are not. With the current ownership
661 //grant mechanism, this scenario should not occur. Because this scenario will occur
662 //when this controller went down or switch flap the connection, but in both the case
663 //it will go through the re-registration process. We need to implement this condition
664 //when clustering service implement a ownership grant strategy which can revoke the
665 //device ownership for load balancing the devices across the instances.
666 //Once this condition occur, we should unregister the callback.
667 LOG.error("handleOwnershipChanged: *this* southbound plugin instance is no longer the owner of device {}",
668 hwvtepConnectionInstance.getNodeId().getValue());
672 private HwvtepConnectionInstance getConnectionInstanceFromEntity(final Entity entity) {
673 return entityConnectionMap.get(entity);
676 private static class HwvtepDeviceEntityOwnershipListener implements EntityOwnershipListener {
677 private final HwvtepConnectionManager hcm;
678 private final EntityOwnershipListenerRegistration listenerRegistration;
680 HwvtepDeviceEntityOwnershipListener(final HwvtepConnectionManager hcm,
681 final EntityOwnershipService entityOwnershipService) {
683 listenerRegistration = entityOwnershipService.registerListener(ENTITY_TYPE, this);
686 public void close() {
687 listenerRegistration.close();
691 public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
692 hcm.handleOwnershipChanged(ownershipChange);
696 final Map<InstanceIdentifier<Node>, HwvtepDeviceInfo> allConnectedInstances() {
697 return Maps.transformValues(Collections.unmodifiableMap(nodeIidVsConnectionInstance),
698 HwvtepConnectionInstance::getDeviceInfo);
701 final Map<InstanceIdentifier<Node>, TransactionHistory> controllerTxHistory() {
702 return Collections.unmodifiableMap(controllerTxHistory);
705 final Map<InstanceIdentifier<Node>, TransactionHistory> deviceUpdateHistory() {
706 return Collections.unmodifiableMap(deviceUpdateHistory);
709 public void cleanupOperationalNode(final InstanceIdentifier<Node> nodeIid) {
710 txInvoker.invoke(new HwvtepGlobalRemoveCommand(nodeIid));
713 private enum ConnectionReconciliationTriggers {
715 Reconciliation trigger for scenario where controller's attempt
716 to connect to switch fails on config data store notification
718 ON_CONTROLLER_INITIATED_CONNECTION_FAILURE,
721 Reconciliation trigger for the scenario where controller
722 initiated connection disconnects.