2 * Copyright © 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.ovsdb.southbound;
10 import static org.opendaylight.ovsdb.lib.operations.Operations.op;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Optional;
14 import com.google.common.base.Preconditions;
15 import com.google.common.util.concurrent.CheckedFuture;
16 import com.google.common.util.concurrent.FutureCallback;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
20 import java.net.ConnectException;
21 import java.net.InetAddress;
22 import java.net.UnknownHostException;
23 import java.util.ArrayList;
24 import java.util.List;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import org.eclipse.jdt.annotation.NonNull;
31 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
32 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
35 import org.opendaylight.mdsal.eos.binding.api.Entity;
36 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipCandidateRegistration;
37 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipChange;
38 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListener;
39 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListenerRegistration;
40 import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService;
41 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
42 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
43 import org.opendaylight.ovsdb.lib.OvsdbClient;
44 import org.opendaylight.ovsdb.lib.OvsdbConnection;
45 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
46 import org.opendaylight.ovsdb.lib.operations.Operation;
47 import org.opendaylight.ovsdb.lib.operations.OperationResult;
48 import org.opendaylight.ovsdb.lib.operations.Select;
49 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
50 import org.opendaylight.ovsdb.lib.schema.typed.TypedDatabaseSchema;
51 import org.opendaylight.ovsdb.schema.openvswitch.OpenVSwitch;
52 import org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationManager;
53 import org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationTask;
54 import org.opendaylight.ovsdb.southbound.reconciliation.configuration.BridgeConfigReconciliationTask;
55 import org.opendaylight.ovsdb.southbound.reconciliation.connection.ConnectionReconciliationTask;
56 import org.opendaylight.ovsdb.southbound.transactions.md.OvsdbNodeRemoveCommand;
57 import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvoker;
58 import org.opendaylight.serviceutils.upgrade.UpgradeState;
59 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAttributes;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAugmentation;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.ovsdb.node.attributes.ConnectionInfo;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.ovsdb.node.attributes.ManagedNodeEntry;
64 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
65 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
69 public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoCloseable {
71 private final Map<ConnectionInfo, OvsdbConnectionInstance> clients =
72 new ConcurrentHashMap<>();
73 private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionManager.class);
74 private static final String ENTITY_TYPE = "ovsdb";
75 private static final int DB_FETCH_TIMEOUT = 1000;
77 private final DataBroker db;
78 private final TransactionInvoker txInvoker;
79 private final Map<OvsdbClient, OvsdbClient> alreadyProcessedClients = new ConcurrentHashMap<>();
80 private final Map<ConnectionInfo,InstanceIdentifier<Node>> instanceIdentifiers =
81 new ConcurrentHashMap<>();
82 private final Map<InstanceIdentifier<Node>, OvsdbConnectionInstance> nodeIdVsConnectionInstance =
83 new ConcurrentHashMap<>();
84 private final Map<Entity, OvsdbConnectionInstance> entityConnectionMap =
85 new ConcurrentHashMap<>();
86 private final EntityOwnershipService entityOwnershipService;
87 private final OvsdbDeviceEntityOwnershipListener ovsdbDeviceEntityOwnershipListener;
88 private final OvsdbConnection ovsdbConnection;
89 private final ReconciliationManager reconciliationManager;
90 private final InstanceIdentifierCodec instanceIdentifierCodec;
91 private final UpgradeState upgradeState;
93 public OvsdbConnectionManager(final DataBroker db,final TransactionInvoker txInvoker,
94 final EntityOwnershipService entityOwnershipService,
95 final OvsdbConnection ovsdbConnection,
96 final InstanceIdentifierCodec instanceIdentifierCodec,
97 final UpgradeState upgradeState) {
99 this.txInvoker = txInvoker;
100 this.entityOwnershipService = entityOwnershipService;
101 this.ovsdbDeviceEntityOwnershipListener = new OvsdbDeviceEntityOwnershipListener(this, entityOwnershipService);
102 this.ovsdbConnection = ovsdbConnection;
103 this.reconciliationManager = new ReconciliationManager(db, instanceIdentifierCodec);
104 this.instanceIdentifierCodec = instanceIdentifierCodec;
105 this.upgradeState = upgradeState;
109 public void connected(final OvsdbClient externalClient) {
110 if (alreadyProcessedClients.containsKey(externalClient)) {
111 LOG.info("OvsdbConnectionManager Library already connected {} from {}:{} to {}:{} "
112 + "to this, hence skipping the processing",
113 externalClient.getConnectionInfo().getType(),
114 externalClient.getConnectionInfo().getRemoteAddress(),
115 externalClient.getConnectionInfo().getRemotePort(),
116 externalClient.getConnectionInfo().getLocalAddress(),
117 externalClient.getConnectionInfo().getLocalPort());
120 alreadyProcessedClients.put(externalClient, externalClient);
122 LOG.info("Library connected {} from {}:{} to {}:{}",
123 externalClient.getConnectionInfo().getType(),
124 externalClient.getConnectionInfo().getRemoteAddress(),
125 externalClient.getConnectionInfo().getRemotePort(),
126 externalClient.getConnectionInfo().getLocalAddress(),
127 externalClient.getConnectionInfo().getLocalPort());
129 List<String> databases = externalClient.getDatabases().get(DB_FETCH_TIMEOUT, TimeUnit.MILLISECONDS);
130 if (databases.contains(SouthboundConstants.OPEN_V_SWITCH)) {
131 OvsdbConnectionInstance client = connectedButCallBacksNotRegistered(externalClient);
132 // Register Cluster Ownership for ConnectionInfo
133 registerEntityForOwnership(client);
135 } catch (InterruptedException | ExecutionException | TimeoutException e) {
136 LOG.warn("Unable to fetch Database list from device {}. Disconnecting from the device.",
137 externalClient.getConnectionInfo().getRemoteAddress(), e);
138 externalClient.disconnect();
143 public OvsdbConnectionInstance connectedButCallBacksNotRegistered(final OvsdbClient externalClient) {
144 LOG.info("OVSDB Connection from {}:{}",externalClient.getConnectionInfo().getRemoteAddress(),
145 externalClient.getConnectionInfo().getRemotePort());
146 ConnectionInfo key = SouthboundMapper.createConnectionInfo(externalClient);
147 OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstance(key);
149 // Check if existing ovsdbConnectionInstance for the OvsdbClient present.
150 // In such cases, we will see if the ovsdbConnectionInstance has same externalClient.
151 if (ovsdbConnectionInstance != null) {
152 if (ovsdbConnectionInstance.hasOvsdbClient(externalClient)) {
153 LOG.warn("OVSDB Connection Instance {} already exists for client {}", key, externalClient);
154 return ovsdbConnectionInstance;
156 LOG.warn("OVSDB Connection Instance {} being replaced with client {}", key, externalClient);
158 // Unregister Cluster Ownership for ConnectionInfo
159 // Because the ovsdbConnectionInstance is about to be completely replaced!
160 unregisterEntityForOwnership(ovsdbConnectionInstance);
162 ovsdbConnectionInstance.disconnect();
164 removeConnectionInstance(key);
166 stopBridgeConfigReconciliationIfActive(ovsdbConnectionInstance.getInstanceIdentifier());
169 ovsdbConnectionInstance = new OvsdbConnectionInstance(key, externalClient, txInvoker,
170 getInstanceIdentifier(key));
171 ovsdbConnectionInstance.createTransactInvokers();
172 return ovsdbConnectionInstance;
176 public void disconnected(final OvsdbClient client) {
177 alreadyProcessedClients.remove(client);
178 LOG.info("Library disconnected {} from {}:{} to {}:{}. Cleaning up the operational data store",
179 client.getConnectionInfo().getType(),
180 client.getConnectionInfo().getRemoteAddress(),
181 client.getConnectionInfo().getRemotePort(),
182 client.getConnectionInfo().getLocalAddress(),
183 client.getConnectionInfo().getLocalPort());
184 ConnectionInfo key = SouthboundMapper.createConnectionInfo(client);
185 OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstance(key);
186 if (ovsdbConnectionInstance != null) {
187 // Unregister Entity ownership as soon as possible ,so this instance should
188 // not be used as a candidate in Entity election (given that this instance is
189 // about to disconnect as well), if current owner get disconnected from
191 if (ovsdbConnectionInstance.getHasDeviceOwnership()) {
192 LOG.info("Library disconnected {} this controller instance has ownership", key);
193 deleteOperNodeAndReleaseOwnership(ovsdbConnectionInstance);
195 LOG.info("Library disconnected {} this controller does not have ownership", key);
196 unregisterEntityForOwnership(ovsdbConnectionInstance);
198 removeConnectionInstance(key);
200 //Controller initiated connection can be terminated from switch side.
201 //So cleanup the instance identifier cache.
202 removeInstanceIdentifier(key);
203 nodeIdVsConnectionInstance.remove(ovsdbConnectionInstance.getInstanceIdentifier(),
204 ovsdbConnectionInstance);
205 stopBridgeConfigReconciliationIfActive(ovsdbConnectionInstance.getInstanceIdentifier());
206 retryConnection(ovsdbConnectionInstance.getInstanceIdentifier(),
207 ovsdbConnectionInstance.getOvsdbNodeAugmentation(),
208 ConnectionReconciliationTriggers.ON_DISCONNECT);
210 LOG.warn("disconnected : Connection instance not found for OVSDB Node {} ", key);
212 LOG.trace("OvsdbConnectionManager: exit disconnected client: {}", client);
215 private void deleteOperNodeAndReleaseOwnership(final OvsdbConnectionInstance ovsdbConnectionInstance) {
216 ovsdbConnectionInstance.setHasDeviceOwnership(false);
217 final InstanceIdentifier nodeIid = ovsdbConnectionInstance.getInstanceIdentifier();
218 //remove the node from oper only if it has ownership
219 txInvoker.invoke(new OvsdbNodeRemoveCommand(ovsdbConnectionInstance, null, null) {
222 public void onSuccess() {
224 LOG.debug("Successfully removed node {} from oper", nodeIid);
225 //Giveup the ownership only after cleanup is done
226 unregisterEntityForOwnership(ovsdbConnectionInstance);
230 public void onFailure(final Throwable throwable) {
231 LOG.debug("Failed to remove node {} from oper", nodeIid);
232 super.onFailure(throwable);
233 unregisterEntityForOwnership(ovsdbConnectionInstance);
238 public OvsdbClient connect(final InstanceIdentifier<Node> iid,
239 final OvsdbNodeAugmentation ovsdbNode) throws UnknownHostException, ConnectException {
240 LOG.info("Connecting to {}", SouthboundUtil.connectionInfoToString(ovsdbNode.getConnectionInfo()));
242 // TODO handle case where we already have a connection
243 // TODO use transaction chains to handle ordering issues between disconnected
244 // TODO and connected when writing to the operational store
245 InetAddress ip = SouthboundMapper.createInetAddress(ovsdbNode.getConnectionInfo().getRemoteIp());
246 OvsdbClient client = ovsdbConnection.connect(ip,
247 ovsdbNode.getConnectionInfo().getRemotePort().getValue().toJava());
248 // For connections from the controller to the ovs instance, the library doesn't call
249 // this method for us
250 if (client != null) {
251 putInstanceIdentifier(ovsdbNode.getConnectionInfo(), iid.firstIdentifierOf(Node.class));
252 OvsdbConnectionInstance ovsdbConnectionInstance = connectedButCallBacksNotRegistered(client);
253 ovsdbConnectionInstance.setOvsdbNodeAugmentation(ovsdbNode);
255 // Register Cluster Ownership for ConnectionInfo
256 registerEntityForOwnership(ovsdbConnectionInstance);
258 LOG.warn("Failed to connect to OVSDB Node {}", ovsdbNode.getConnectionInfo());
263 public void disconnect(final OvsdbNodeAugmentation ovsdbNode) throws UnknownHostException {
264 LOG.info("Disconnecting from {}", SouthboundUtil.connectionInfoToString(ovsdbNode.getConnectionInfo()));
265 OvsdbConnectionInstance client = getConnectionInstance(ovsdbNode.getConnectionInfo());
266 if (client != null) {
267 // Unregister Cluster Onwership for ConnectionInfo
268 deleteOperNodeAndReleaseOwnership(client);
272 removeInstanceIdentifier(ovsdbNode.getConnectionInfo());
274 stopBridgeConfigReconciliationIfActive(client.getInstanceIdentifier());
276 LOG.debug("disconnect : connection instance not found for {}",ovsdbNode.getConnectionInfo());
280 /* public void init(ConnectionInfo key) {
281 OvsdbConnectionInstance client = getConnectionInstance(key);
283 // TODO (FF): make sure that this cluster instance is the 'entity owner' fo the given OvsdbConnectionInstance ?
285 if (client != null) {
287 * Note: registerCallbacks() is idemPotent... so if you call it repeatedly all is safe,
288 * it only registersCallbacks on the *first* call.
290 client.registerCallbacks();
295 public void close() {
296 if (ovsdbDeviceEntityOwnershipListener != null) {
297 ovsdbDeviceEntityOwnershipListener.close();
300 for (OvsdbConnectionInstance client: clients.values()) {
306 void putConnectionInstance(final ConnectionInfo key,final OvsdbConnectionInstance instance) {
307 ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
308 clients.put(connectionInfo, instance);
311 private void removeConnectionInstance(final ConnectionInfo key) {
312 ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
313 clients.remove(connectionInfo);
317 void putInstanceIdentifier(final ConnectionInfo key, final InstanceIdentifier<Node> iid) {
318 ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
319 instanceIdentifiers.put(connectionInfo, iid);
322 private void removeInstanceIdentifier(final ConnectionInfo key) {
323 ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
324 instanceIdentifiers.remove(connectionInfo);
327 public InstanceIdentifier<Node> getInstanceIdentifier(final ConnectionInfo key) {
328 ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
329 return instanceIdentifiers.get(connectionInfo);
332 public OvsdbConnectionInstance getConnectionInstance(final ConnectionInfo key) {
333 ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key);
334 return clients.get(connectionInfo);
337 public OvsdbConnectionInstance getConnectionInstance(final OvsdbBridgeAttributes mn) {
338 Optional<OvsdbNodeAugmentation> optional = SouthboundUtil.getManagingNode(db, mn);
339 if (optional.isPresent()) {
340 return getConnectionInstance(optional.get().getConnectionInfo());
346 public OvsdbConnectionInstance getConnectionInstance(final Node node) {
347 Preconditions.checkNotNull(node);
348 OvsdbNodeAugmentation ovsdbNode = node.augmentation(OvsdbNodeAugmentation.class);
349 OvsdbBridgeAugmentation ovsdbManagedNode = node.augmentation(OvsdbBridgeAugmentation.class);
350 if (ovsdbNode != null) {
351 return getConnectionInstance(ovsdbNode.getConnectionInfo());
352 } else if (ovsdbManagedNode != null) {
353 return getConnectionInstance(ovsdbManagedNode);
355 LOG.warn("This is not a node that gives any hint how to find its OVSDB Manager: {}",node);
360 public OvsdbConnectionInstance getConnectionInstance(final InstanceIdentifier<Node> nodePath) {
361 if (nodeIdVsConnectionInstance.get(nodePath) != null) {
362 return nodeIdVsConnectionInstance.get(nodePath);
365 ReadOnlyTransaction transaction = db.newReadOnlyTransaction();
366 CheckedFuture<Optional<Node>, ReadFailedException> nodeFuture = transaction.read(
367 LogicalDatastoreType.OPERATIONAL, nodePath);
369 Optional<Node> optional = nodeFuture.get();
370 if (optional.isPresent()) {
371 return this.getConnectionInstance(optional.get());
373 LOG.debug("Node was not found on the path in the operational DS: {}", nodePath);
376 } catch (InterruptedException | ExecutionException e) {
377 LOG.warn("Failed to get Ovsdb Node {}",nodePath, e);
382 public OvsdbClient getClient(final ConnectionInfo connectionInfo) {
383 OvsdbConnectionInstance connectionInstance = getConnectionInstance(connectionInfo);
384 if (connectionInstance != null) {
385 return connectionInstance.getOvsdbClient();
390 public OvsdbClient getClient(final OvsdbBridgeAttributes mn) {
391 return getConnectionInstance(mn).getOvsdbClient();
394 public OvsdbClient getClient(final Node node) {
395 return getConnectionInstance(node).getOvsdbClient();
398 public Boolean getHasDeviceOwnership(final ConnectionInfo connectionInfo) {
399 OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstance(connectionInfo);
400 if (ovsdbConnectionInstance == null) {
401 return Boolean.FALSE;
403 return ovsdbConnectionInstance.getHasDeviceOwnership();
406 public void reconcileConnection(final InstanceIdentifier<Node> iid, final OvsdbNodeAugmentation ovsdbNode) {
407 this.retryConnection(iid, ovsdbNode,
408 ConnectionReconciliationTriggers.ON_CONTROLLER_INITIATED_CONNECTION_FAILURE);
412 public void stopConnectionReconciliationIfActive(final InstanceIdentifier<Node> iid,
413 final OvsdbNodeAugmentation ovsdbNode) {
414 final ReconciliationTask task = new ConnectionReconciliationTask(
415 reconciliationManager,
419 reconciliationManager.dequeue(task);
422 public void stopBridgeConfigReconciliationIfActive(final InstanceIdentifier<Node> iid) {
423 final ReconciliationTask task =
424 new BridgeConfigReconciliationTask(reconciliationManager, this, iid, null, instanceIdentifierCodec);
425 reconciliationManager.dequeue(task);
426 reconciliationManager.cancelTerminationPointReconciliation();
429 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
430 justification = "https://github.com/spotbugs/spotbugs/issues/811")
431 private void handleOwnershipChanged(final EntityOwnershipChange ownershipChange) {
432 OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstanceFromEntity(ownershipChange.getEntity());
433 LOG.debug("handleOwnershipChanged: {} event received for device {}",
434 ownershipChange, ovsdbConnectionInstance != null ? ovsdbConnectionInstance.getConnectionInfo()
435 : "that's currently NOT registered by *this* southbound plugin instance");
437 if (ovsdbConnectionInstance == null) {
438 if (ownershipChange.getState().isOwner()) {
439 LOG.warn("handleOwnershipChanged: *this* instance is elected as an owner of the device {} but it "
440 + "is NOT registered for ownership", ownershipChange.getEntity());
442 // EntityOwnershipService sends notification to all the nodes, irrespective of whether
443 // that instance registered for the device ownership or not. It is to make sure that
444 // If all the controller instance that was connected to the device are down, so the
445 // running instance can clear up the operational data store even though it was not
446 // connected to the device.
447 LOG.debug("handleOwnershipChanged: No connection instance found for {}", ownershipChange.getEntity());
450 // If entity has no owner, clean up the operational data store (it's possible because owner controller
451 // might went down abruptly and didn't get a chance to clean up the operational data store.
452 if (!ownershipChange.getState().hasOwner()) {
453 LOG.info("{} has no owner, cleaning up the operational data store", ownershipChange.getEntity());
454 cleanEntityOperationalData(ownershipChange.getEntity());
458 //Connection detail need to be cached, irrespective of ownership result.
459 putConnectionInstance(ovsdbConnectionInstance.getMDConnectionInfo(),ovsdbConnectionInstance);
461 if (ownershipChange.getState().isOwner() == ovsdbConnectionInstance.getHasDeviceOwnership()) {
462 LOG.info("handleOwnershipChanged: no change in ownership for {}. Ownership status is : {}",
463 ovsdbConnectionInstance.getConnectionInfo(), ovsdbConnectionInstance.getHasDeviceOwnership()
464 ? SouthboundConstants.OwnershipStates.OWNER.getState()
465 : SouthboundConstants.OwnershipStates.NONOWNER.getState());
469 ovsdbConnectionInstance.setHasDeviceOwnership(ownershipChange.getState().isOwner());
470 // You were not an owner, but now you are
471 if (ownershipChange.getState().isOwner()) {
472 LOG.info("handleOwnershipChanged: *this* southbound plugin instance is an OWNER of the device {}",
473 ovsdbConnectionInstance.getConnectionInfo());
475 //*this* instance of southbound plugin is owner of the device,
476 //so register for monitor callbacks
477 ovsdbConnectionInstance.registerCallbacks(instanceIdentifierCodec);
478 LOG.trace("isUpgradeInProgress {}", upgradeState.isUpgradeInProgress());
479 if (!upgradeState.isUpgradeInProgress()) {
480 reconcileBridgeConfigurations(ovsdbConnectionInstance);
483 //You were owner of the device, but now you are not. With the current ownership
484 //grant mechanism, this scenario should not occur. Because this scenario will occur
485 //when this controller went down or switch flap the connection, but in both the case
486 //it will go through the re-registration process. We need to implement this condition
487 //when clustering service implement a ownership grant strategy which can revoke the
488 //device ownership for load balancing the devices across the instances.
489 //Once this condition occur, we should unregister the callback.
490 LOG.error("handleOwnershipChanged: *this* southbound plugin instance is no longer the owner of device {}."
491 + "This should NOT happen.",
492 ovsdbConnectionInstance.getNodeId().getValue());
496 private void cleanEntityOperationalData(final Entity entity) {
498 //Do explicit cleanup rather than using OvsdbNodeRemoveCommand, because there
499 // are chances that other controller instance went down abruptly and it does
500 // not clear manager entry, which OvsdbNodeRemoveCommand look for before cleanup.
502 @SuppressWarnings("unchecked")
503 final InstanceIdentifier<Node> nodeIid = (InstanceIdentifier<Node>) entity.getIdentifier();
505 txInvoker.invoke(transaction -> {
506 Optional<Node> ovsdbNodeOpt = SouthboundUtil.readNode(transaction, nodeIid);
507 if (ovsdbNodeOpt.isPresent()) {
508 Node ovsdbNode = ovsdbNodeOpt.get();
509 OvsdbNodeAugmentation nodeAugmentation = ovsdbNode.augmentation(OvsdbNodeAugmentation.class);
510 if (nodeAugmentation != null) {
511 if (nodeAugmentation.getManagedNodeEntry() != null) {
512 for (ManagedNodeEntry managedNode : nodeAugmentation.getManagedNodeEntry()) {
514 LogicalDatastoreType.OPERATIONAL, managedNode.getBridgeRef().getValue());
517 LOG.debug("{} had no managed nodes", ovsdbNode.getNodeId().getValue());
520 transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeIid);
526 private static OpenVSwitch getOpenVswitchTableEntry(final OvsdbConnectionInstance connectionInstance) {
527 final TypedDatabaseSchema dbSchema;
529 dbSchema = connectionInstance.getSchema(OvsdbSchemaContants.DATABASE_NAME).get();
530 } catch (InterruptedException | ExecutionException e) {
531 LOG.warn("Not able to fetch schema for database {} from device {}",
532 OvsdbSchemaContants.DATABASE_NAME,connectionInstance.getConnectionInfo(),e);
536 final GenericTableSchema openVSwitchSchema = dbSchema.getTableSchema(OpenVSwitch.class);
537 final Select<GenericTableSchema> selectOperation = op.select(openVSwitchSchema);
538 selectOperation.setColumns(openVSwitchSchema.getColumnList());
540 List<Operation> operations = new ArrayList<>();
541 operations.add(selectOperation);
542 operations.add(op.comment("Fetching Open_VSwitch table rows"));
543 final List<OperationResult> results;
545 results = connectionInstance.transact(dbSchema, operations).get();
546 } catch (InterruptedException | ExecutionException e) {
547 LOG.warn("Not able to fetch OpenVswitch table row from device {}", connectionInstance.getConnectionInfo(),
552 return results == null || results.isEmpty() ? null
553 : dbSchema.getTypedRowWrapper(OpenVSwitch.class, results.get(0).getRows().get(0));
556 private Entity getEntityFromConnectionInstance(@NonNull final OvsdbConnectionInstance ovsdbConnectionInstance) {
557 InstanceIdentifier<Node> iid = ovsdbConnectionInstance.getInstanceIdentifier();
559 /* Switch initiated connection won't have iid, till it gets OpenVSwitch
560 * table update but update callback is always registered after ownership
561 * is granted. So we are explicitly fetch the row here to get the iid.
563 OpenVSwitch openvswitchRow = getOpenVswitchTableEntry(ovsdbConnectionInstance);
564 iid = SouthboundMapper.getInstanceIdentifier(instanceIdentifierCodec, openvswitchRow);
565 LOG.info("InstanceIdentifier {} generated for device "
566 + "connection {}",iid,ovsdbConnectionInstance.getConnectionInfo());
567 ovsdbConnectionInstance.setInstanceIdentifier(iid);
569 Entity deviceEntity = new Entity(ENTITY_TYPE, iid);
570 LOG.debug("Entity {} created for device connection {}",
571 deviceEntity, ovsdbConnectionInstance.getConnectionInfo());
575 private OvsdbConnectionInstance getConnectionInstanceFromEntity(final Entity entity) {
576 return entityConnectionMap.get(entity);
579 private void registerEntityForOwnership(final OvsdbConnectionInstance ovsdbConnectionInstance) {
580 putConnectionInstance(ovsdbConnectionInstance.getMDConnectionInfo(), ovsdbConnectionInstance);
582 Entity candidateEntity = getEntityFromConnectionInstance(ovsdbConnectionInstance);
583 if (entityConnectionMap.containsKey(candidateEntity)) {
584 LOG.error("Old connection still hanging for {}", candidateEntity);
585 disconnected(ovsdbConnectionInstance.getOvsdbClient());
586 //TODO do cleanup for old connection or stale check
588 nodeIdVsConnectionInstance.put((InstanceIdentifier<Node>) candidateEntity.getIdentifier(),
589 ovsdbConnectionInstance);
590 entityConnectionMap.put(candidateEntity, ovsdbConnectionInstance);
591 ovsdbConnectionInstance.setConnectedEntity(candidateEntity);
593 EntityOwnershipCandidateRegistration registration =
594 entityOwnershipService.registerCandidate(candidateEntity);
595 ovsdbConnectionInstance.setDeviceOwnershipCandidateRegistration(registration);
596 LOG.info("OVSDB entity {} is registered for ownership.", candidateEntity);
598 } catch (CandidateAlreadyRegisteredException e) {
599 LOG.warn("OVSDB entity {} was already registered for ownership", candidateEntity, e);
601 //If entity already has owner, it won't get notification from EntityOwnershipService
602 java.util.Optional<EntityOwnershipState> ownershipStateOpt =
603 entityOwnershipService.getOwnershipState(candidateEntity);
604 if (ownershipStateOpt.isPresent()) {
605 EntityOwnershipState ownershipState = ownershipStateOpt.get();
606 if (ownershipState == EntityOwnershipState.OWNED_BY_OTHER) {
607 ovsdbConnectionInstance.setHasDeviceOwnership(false);
608 } else if (ownershipState == EntityOwnershipState.IS_OWNER) {
609 ovsdbConnectionInstance.setHasDeviceOwnership(true);
610 ovsdbConnectionInstance.registerCallbacks(instanceIdentifierCodec);
615 private void unregisterEntityForOwnership(final OvsdbConnectionInstance ovsdbConnectionInstance) {
616 ovsdbConnectionInstance.closeDeviceOwnershipCandidateRegistration();
617 entityConnectionMap.remove(ovsdbConnectionInstance.getConnectedEntity(), ovsdbConnectionInstance);
620 private void retryConnection(final InstanceIdentifier<Node> iid, final OvsdbNodeAugmentation ovsdbNode,
621 final ConnectionReconciliationTriggers trigger) {
622 final ReconciliationTask task = new ConnectionReconciliationTask(
623 reconciliationManager,
628 if (reconciliationManager.isEnqueued(task)) {
632 case ON_CONTROLLER_INITIATED_CONNECTION_FAILURE:
633 reconciliationManager.enqueueForRetry(task);
635 case ON_DISCONNECT: {
636 CheckedFuture<Optional<Node>, ReadFailedException> readNodeFuture;
637 try (ReadOnlyTransaction tx = db.newReadOnlyTransaction()) {
638 readNodeFuture = tx.read(LogicalDatastoreType.CONFIGURATION, iid);
640 Futures.addCallback(readNodeFuture, new FutureCallback<Optional<Node>>() {
642 public void onSuccess(final Optional<Node> node) {
643 if (node.isPresent()) {
644 LOG.info("Disconnected/Failed connection {} was controller initiated, attempting "
645 + "reconnection", ovsdbNode.getConnectionInfo());
646 reconciliationManager.enqueue(task);
649 LOG.debug("Connection {} was switch initiated, no reconciliation is required",
650 iid.firstKeyOf(Node.class).getNodeId());
655 public void onFailure(final Throwable throwable) {
656 LOG.warn("Read Config/DS for Node failed! {}", iid, throwable);
658 }, MoreExecutors.directExecutor());
666 public void reconcileBridgeConfigurations(final OvsdbConnectionInstance client) {
667 final InstanceIdentifier<Node> nodeIid = client.getInstanceIdentifier();
668 final ReconciliationTask task = new BridgeConfigReconciliationTask(
669 reconciliationManager, OvsdbConnectionManager.this, nodeIid, client, instanceIdentifierCodec);
671 reconciliationManager.enqueue(task);
674 private static class OvsdbDeviceEntityOwnershipListener implements EntityOwnershipListener {
675 private final OvsdbConnectionManager cm;
676 private final EntityOwnershipListenerRegistration listenerRegistration;
678 OvsdbDeviceEntityOwnershipListener(final OvsdbConnectionManager cm,
679 final EntityOwnershipService entityOwnershipService) {
681 listenerRegistration = entityOwnershipService.registerListener(ENTITY_TYPE, this);
684 public void close() {
685 listenerRegistration.close();
689 public void ownershipChanged(final EntityOwnershipChange ownershipChange) {
690 cm.handleOwnershipChanged(ownershipChange);
694 private enum ConnectionReconciliationTriggers {
696 Reconciliation trigger for scenario where controller's attempt
697 to connect to switch fails on config data store notification
699 ON_CONTROLLER_INITIATED_CONNECTION_FAILURE,
702 Reconciliation trigger for the scenario where controller
703 initiated connection disconnects.
708 public Map<ConnectionInfo, OvsdbConnectionInstance> getClients() {