X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=southbound%2Fsouthbound-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fovsdb%2Fsouthbound%2FOvsdbConnectionManager.java;h=b512ce250096474b38694d0192e0f60a3b456f9c;hb=d97430282ae6a1fc03a3e8e80db8666c29dd8284;hp=2ac0f496e23754a2b5c78f6c7035f9d88ba788cb;hpb=b2096a5af7375360917cedcede1101a3aa9bf5b7;p=ovsdb.git diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionManager.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionManager.java index 2ac0f496e..b512ce250 100644 --- a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionManager.java +++ b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionManager.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright © 2014, 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -9,6 +9,14 @@ package org.opendaylight.ovsdb.southbound; import static org.opendaylight.ovsdb.lib.operations.Operations.op; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import java.net.ConnectException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -16,22 +24,21 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; - -import javax.annotation.Nonnull; - +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; -import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; -import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException; -import org.opendaylight.controller.md.sal.common.api.clustering.Entity; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.mdsal.eos.binding.api.Entity; +import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipCandidateRegistration; +import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipChange; +import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListener; +import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListenerRegistration; +import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService; +import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException; +import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState; import org.opendaylight.ovsdb.lib.OvsdbClient; import org.opendaylight.ovsdb.lib.OvsdbConnection; import org.opendaylight.ovsdb.lib.OvsdbConnectionListener; @@ -42,6 +49,10 @@ import org.opendaylight.ovsdb.lib.schema.DatabaseSchema; import org.opendaylight.ovsdb.lib.schema.GenericTableSchema; import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils; import org.opendaylight.ovsdb.schema.openvswitch.OpenVSwitch; +import org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationManager; +import org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationTask; +import org.opendaylight.ovsdb.southbound.reconciliation.configuration.BridgeConfigReconciliationTask; +import org.opendaylight.ovsdb.southbound.reconciliation.connection.ConnectionReconciliationTask; import org.opendaylight.ovsdb.southbound.transactions.md.OvsdbNodeRemoveCommand; import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvoker; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAttributes; @@ -51,52 +62,64 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.re import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.ovsdb.node.attributes.ManagedNodeEntry; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.CheckedFuture; - public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoCloseable { - private Map clients = + private final Map clients = new ConcurrentHashMap<>(); private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionManager.class); private static final String ENTITY_TYPE = "ovsdb"; + private static final int DB_FETCH_TIMEOUT = 1000; - private DataBroker db; - private TransactionInvoker txInvoker; - private Map> instanceIdentifiers = + private final DataBroker db; + private final TransactionInvoker txInvoker; + private final Map> instanceIdentifiers = new ConcurrentHashMap<>(); - private Map entityConnectionMap = + private final Map, OvsdbConnectionInstance> nodeIdVsConnectionInstance = new ConcurrentHashMap<>(); - private EntityOwnershipService entityOwnershipService; - private OvsdbDeviceEntityOwnershipListener ovsdbDeviceEntityOwnershipListener; - private OvsdbConnection ovsdbConnection; - - public OvsdbConnectionManager(DataBroker db,TransactionInvoker txInvoker, - EntityOwnershipService entityOwnershipService, - OvsdbConnection ovsdbConnection) { + private final Map entityConnectionMap = + new ConcurrentHashMap<>(); + private final EntityOwnershipService entityOwnershipService; + private final OvsdbDeviceEntityOwnershipListener ovsdbDeviceEntityOwnershipListener; + private final OvsdbConnection ovsdbConnection; + private final ReconciliationManager reconciliationManager; + private final InstanceIdentifierCodec instanceIdentifierCodec; + + public OvsdbConnectionManager(final DataBroker db,final TransactionInvoker txInvoker, + final EntityOwnershipService entityOwnershipService, + final OvsdbConnection ovsdbConnection, + final InstanceIdentifierCodec instanceIdentifierCodec) { this.db = db; this.txInvoker = txInvoker; this.entityOwnershipService = entityOwnershipService; this.ovsdbDeviceEntityOwnershipListener = new OvsdbDeviceEntityOwnershipListener(this, entityOwnershipService); this.ovsdbConnection = ovsdbConnection; + this.reconciliationManager = new ReconciliationManager(db, instanceIdentifierCodec); + this.instanceIdentifierCodec = instanceIdentifierCodec; } @Override - public void connected(@Nonnull final OvsdbClient externalClient) { + public void connected(final OvsdbClient externalClient) { LOG.info("Library connected {} from {}:{} to {}:{}", externalClient.getConnectionInfo().getType(), externalClient.getConnectionInfo().getRemoteAddress(), externalClient.getConnectionInfo().getRemotePort(), externalClient.getConnectionInfo().getLocalAddress(), externalClient.getConnectionInfo().getLocalPort()); - OvsdbConnectionInstance client = connectedButCallBacksNotRegistered(externalClient); + try { + List databases = externalClient.getDatabases().get(DB_FETCH_TIMEOUT, TimeUnit.MILLISECONDS); + if (databases.contains(SouthboundConstants.OPEN_V_SWITCH)) { + OvsdbConnectionInstance client = connectedButCallBacksNotRegistered(externalClient); + // Register Cluster Ownership for ConnectionInfo + registerEntityForOwnership(client); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.warn("Unable to fetch Database list from device {}. Disconnecting from the device.", + externalClient.getConnectionInfo().getRemoteAddress(), e); + externalClient.disconnect(); + } - // Register Cluster Ownership for ConnectionInfo - registerEntityForOwnership(client); } public OvsdbConnectionInstance connectedButCallBacksNotRegistered(final OvsdbClient externalClient) { @@ -121,6 +144,8 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos ovsdbConnectionInstance.disconnect(); removeConnectionInstance(key); + + stopBridgeConfigReconciliationIfActive(ovsdbConnectionInstance.getInstanceIdentifier()); } ovsdbConnectionInstance = new OvsdbConnectionInstance(key, externalClient, txInvoker, @@ -130,7 +155,7 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos } @Override - public void disconnected(OvsdbClient client) { + public void disconnected(final OvsdbClient client) { LOG.info("Library disconnected {} from {}:{} to {}:{}. Cleaning up the operational data store", client.getConnectionInfo().getType(), client.getConnectionInfo().getRemoteAddress(), @@ -144,19 +169,55 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos // not be used as a candidate in Entity election (given that this instance is // about to disconnect as well), if current owner get disconnected from // OVSDB device. - unregisterEntityForOwnership(ovsdbConnectionInstance); - - txInvoker.invoke(new OvsdbNodeRemoveCommand(ovsdbConnectionInstance, null, null)); - + if (ovsdbConnectionInstance.getHasDeviceOwnership()) { + LOG.info("Library disconnected {} this controller instance has ownership", key); + deleteOperNodeAndReleaseOwnership(ovsdbConnectionInstance); + } else { + LOG.info("Library disconnected {} this controller does not have ownership", key); + unregisterEntityForOwnership(ovsdbConnectionInstance); + } removeConnectionInstance(key); + + //Controller initiated connection can be terminated from switch side. + //So cleanup the instance identifier cache. + removeInstanceIdentifier(key); + nodeIdVsConnectionInstance.remove(ovsdbConnectionInstance.getInstanceIdentifier(), + ovsdbConnectionInstance); + stopBridgeConfigReconciliationIfActive(ovsdbConnectionInstance.getInstanceIdentifier()); + retryConnection(ovsdbConnectionInstance.getInstanceIdentifier(), + ovsdbConnectionInstance.getOvsdbNodeAugmentation(), + ConnectionReconciliationTriggers.ON_DISCONNECT); } else { LOG.warn("disconnected : Connection instance not found for OVSDB Node {} ", key); } LOG.trace("OvsdbConnectionManager: exit disconnected client: {}", client); } - public OvsdbClient connect(InstanceIdentifier iid, - OvsdbNodeAugmentation ovsdbNode) throws UnknownHostException { + private void deleteOperNodeAndReleaseOwnership(final OvsdbConnectionInstance ovsdbConnectionInstance) { + ovsdbConnectionInstance.setHasDeviceOwnership(false); + final InstanceIdentifier nodeIid = ovsdbConnectionInstance.getInstanceIdentifier(); + //remove the node from oper only if it has ownership + txInvoker.invoke(new OvsdbNodeRemoveCommand(ovsdbConnectionInstance, null, null) { + + @Override + public void onSuccess() { + super.onSuccess(); + LOG.debug("Successfully removed node {} from oper", nodeIid); + //Giveup the ownership only after cleanup is done + unregisterEntityForOwnership(ovsdbConnectionInstance); + } + + @Override + public void onFailure(final Throwable throwable) { + LOG.debug("Failed to remove node {} from oper", nodeIid); + super.onFailure(throwable); + unregisterEntityForOwnership(ovsdbConnectionInstance); + } + }); + } + + public OvsdbClient connect(final InstanceIdentifier iid, + final OvsdbNodeAugmentation ovsdbNode) throws UnknownHostException, ConnectException { LOG.info("Connecting to {}", SouthboundUtil.connectionInfoToString(ovsdbNode.getConnectionInfo())); // TODO handle case where we already have a connection @@ -180,16 +241,18 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos return client; } - public void disconnect(OvsdbNodeAugmentation ovsdbNode) throws UnknownHostException { + public void disconnect(final OvsdbNodeAugmentation ovsdbNode) throws UnknownHostException { LOG.info("Disconnecting from {}", SouthboundUtil.connectionInfoToString(ovsdbNode.getConnectionInfo())); OvsdbConnectionInstance client = getConnectionInstance(ovsdbNode.getConnectionInfo()); if (client != null) { // Unregister Cluster Onwership for ConnectionInfo - unregisterEntityForOwnership(client); + deleteOperNodeAndReleaseOwnership(client); client.disconnect(); removeInstanceIdentifier(ovsdbNode.getConnectionInfo()); + + stopBridgeConfigReconciliationIfActive(client.getInstanceIdentifier()); } else { LOG.debug("disconnect : connection instance not found for {}",ovsdbNode.getConnectionInfo()); } @@ -210,47 +273,49 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos } */ @Override - public void close() throws Exception { + public void close() { if (ovsdbDeviceEntityOwnershipListener != null) { ovsdbDeviceEntityOwnershipListener.close(); } - for (OvsdbClient client: clients.values()) { + for (OvsdbConnectionInstance client: clients.values()) { client.disconnect(); } } - private void putConnectionInstance(ConnectionInfo key,OvsdbConnectionInstance instance) { + @VisibleForTesting + void putConnectionInstance(final ConnectionInfo key,final OvsdbConnectionInstance instance) { ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key); clients.put(connectionInfo, instance); } - private void removeConnectionInstance(ConnectionInfo key) { + private void removeConnectionInstance(final ConnectionInfo key) { ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key); clients.remove(connectionInfo); } - private void putInstanceIdentifier(ConnectionInfo key,InstanceIdentifier iid) { + @VisibleForTesting + void putInstanceIdentifier(final ConnectionInfo key, final InstanceIdentifier iid) { ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key); instanceIdentifiers.put(connectionInfo, iid); } - private void removeInstanceIdentifier(ConnectionInfo key) { + private void removeInstanceIdentifier(final ConnectionInfo key) { ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key); instanceIdentifiers.remove(connectionInfo); } - public OvsdbConnectionInstance getConnectionInstance(ConnectionInfo key) { + public InstanceIdentifier getInstanceIdentifier(final ConnectionInfo key) { ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key); - return clients.get(connectionInfo); + return instanceIdentifiers.get(connectionInfo); } - public InstanceIdentifier getInstanceIdentifier(ConnectionInfo key) { + public OvsdbConnectionInstance getConnectionInstance(final ConnectionInfo key) { ConnectionInfo connectionInfo = SouthboundMapper.suppressLocalIpPort(key); - return instanceIdentifiers.get(connectionInfo); + return clients.get(connectionInfo); } - public OvsdbConnectionInstance getConnectionInstance(OvsdbBridgeAttributes mn) { + public OvsdbConnectionInstance getConnectionInstance(final OvsdbBridgeAttributes mn) { Optional optional = SouthboundUtil.getManagingNode(db, mn); if (optional.isPresent()) { return getConnectionInstance(optional.get().getConnectionInfo()); @@ -259,10 +324,10 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos } } - public OvsdbConnectionInstance getConnectionInstance(Node node) { + public OvsdbConnectionInstance getConnectionInstance(final Node node) { Preconditions.checkNotNull(node); - OvsdbNodeAugmentation ovsdbNode = node.getAugmentation(OvsdbNodeAugmentation.class); - OvsdbBridgeAugmentation ovsdbManagedNode = node.getAugmentation(OvsdbBridgeAugmentation.class); + OvsdbNodeAugmentation ovsdbNode = node.augmentation(OvsdbNodeAugmentation.class); + OvsdbBridgeAugmentation ovsdbManagedNode = node.augmentation(OvsdbBridgeAugmentation.class); if (ovsdbNode != null) { return getConnectionInstance(ovsdbNode.getConnectionInfo()); } else if (ovsdbManagedNode != null) { @@ -273,38 +338,45 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos } } - public OvsdbConnectionInstance getConnectionInstance(InstanceIdentifier nodePath) { + public OvsdbConnectionInstance getConnectionInstance(final InstanceIdentifier nodePath) { + if (nodeIdVsConnectionInstance.get(nodePath) != null) { + return nodeIdVsConnectionInstance.get(nodePath); + } try { ReadOnlyTransaction transaction = db.newReadOnlyTransaction(); CheckedFuture, ReadFailedException> nodeFuture = transaction.read( LogicalDatastoreType.OPERATIONAL, nodePath); transaction.close(); Optional optional = nodeFuture.get(); - if (optional != null && optional.isPresent() && optional.get() != null) { + if (optional.isPresent()) { return this.getConnectionInstance(optional.get()); } else { - LOG.warn("Found non-topological node {} on path {}",optional); + LOG.debug("Node was not found on the path in the operational DS: {}", nodePath); return null; } - } catch (Exception e) { + } catch (InterruptedException | ExecutionException e) { LOG.warn("Failed to get Ovsdb Node {}",nodePath, e); return null; } } - public OvsdbClient getClient(ConnectionInfo connectionInfo) { - return getConnectionInstance(connectionInfo); + public OvsdbClient getClient(final ConnectionInfo connectionInfo) { + OvsdbConnectionInstance connectionInstance = getConnectionInstance(connectionInfo); + if (connectionInstance != null) { + return connectionInstance.getOvsdbClient(); + } + return null; } - public OvsdbClient getClient(OvsdbBridgeAttributes mn) { - return getConnectionInstance(mn); + public OvsdbClient getClient(final OvsdbBridgeAttributes mn) { + return getConnectionInstance(mn).getOvsdbClient(); } - public OvsdbClient getClient(Node node) { - return getConnectionInstance(node); + public OvsdbClient getClient(final Node node) { + return getConnectionInstance(node).getOvsdbClient(); } - public Boolean getHasDeviceOwnership(ConnectionInfo connectionInfo) { + public Boolean getHasDeviceOwnership(final ConnectionInfo connectionInfo) { OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstance(connectionInfo); if (ovsdbConnectionInstance == null) { return Boolean.FALSE; @@ -312,14 +384,37 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos return ovsdbConnectionInstance.getHasDeviceOwnership(); } - private void handleOwnershipChanged(EntityOwnershipChange ownershipChange) { + public void reconcileConnection(final InstanceIdentifier iid, final OvsdbNodeAugmentation ovsdbNode) { + this.retryConnection(iid, ovsdbNode, + ConnectionReconciliationTriggers.ON_CONTROLLER_INITIATED_CONNECTION_FAILURE); + + } + + public void stopConnectionReconciliationIfActive(final InstanceIdentifier iid, + final OvsdbNodeAugmentation ovsdbNode) { + final ReconciliationTask task = new ConnectionReconciliationTask( + reconciliationManager, + this, + iid, + ovsdbNode); + reconciliationManager.dequeue(task); + } + + public void stopBridgeConfigReconciliationIfActive(final InstanceIdentifier iid) { + final ReconciliationTask task = + new BridgeConfigReconciliationTask(reconciliationManager, this, iid, null, instanceIdentifierCodec); + reconciliationManager.dequeue(task); + reconciliationManager.cancelTerminationPointReconciliation(); + } + + private void handleOwnershipChanged(final EntityOwnershipChange ownershipChange) { OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstanceFromEntity(ownershipChange.getEntity()); LOG.debug("handleOwnershipChanged: {} event received for device {}", ownershipChange, ovsdbConnectionInstance != null ? ovsdbConnectionInstance.getConnectionInfo() : "that's currently NOT registered by *this* southbound plugin instance"); if (ovsdbConnectionInstance == null) { - if (ownershipChange.isOwner()) { + if (ownershipChange.getState().isOwner()) { LOG.warn("handleOwnershipChanged: *this* instance is elected as an owner of the device {} but it " + "is NOT registered for ownership", ownershipChange.getEntity()); } else { @@ -333,7 +428,7 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos // If entity has no owner, clean up the operational data store (it's possible because owner controller // might went down abruptly and didn't get a chance to clean up the operational data store. - if (!ownershipChange.hasOwner()) { + if (!ownershipChange.getState().hasOwner()) { LOG.info("{} has no owner, cleaning up the operational data store", ownershipChange.getEntity()); cleanEntityOperationalData(ownershipChange.getEntity()); } @@ -342,24 +437,25 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos //Connection detail need to be cached, irrespective of ownership result. putConnectionInstance(ovsdbConnectionInstance.getMDConnectionInfo(),ovsdbConnectionInstance); - if (ownershipChange.isOwner() == ovsdbConnectionInstance.getHasDeviceOwnership()) { + if (ownershipChange.getState().isOwner() == ovsdbConnectionInstance.getHasDeviceOwnership()) { LOG.info("handleOwnershipChanged: no change in ownership for {}. Ownership status is : {}", ovsdbConnectionInstance.getConnectionInfo(), ovsdbConnectionInstance.getHasDeviceOwnership() - ? SouthboundConstants.OWNERSHIPSTATES.OWNER.getState() - : SouthboundConstants.OWNERSHIPSTATES.NONOWNER.getState()); + ? SouthboundConstants.OwnershipStates.OWNER.getState() + : SouthboundConstants.OwnershipStates.NONOWNER.getState()); return; } - ovsdbConnectionInstance.setHasDeviceOwnership(ownershipChange.isOwner()); + ovsdbConnectionInstance.setHasDeviceOwnership(ownershipChange.getState().isOwner()); // You were not an owner, but now you are - if (ownershipChange.isOwner()) { + if (ownershipChange.getState().isOwner()) { LOG.info("handleOwnershipChanged: *this* southbound plugin instance is an OWNER of the device {}", ovsdbConnectionInstance.getConnectionInfo()); //*this* instance of southbound plugin is owner of the device, //so register for monitor callbacks - ovsdbConnectionInstance.registerCallbacks(); + ovsdbConnectionInstance.registerCallbacks(instanceIdentifierCodec); + reconcileBridgeConfigurations(ovsdbConnectionInstance); } else { //You were owner of the device, but now you are not. With the current ownership //grant mechanism, this scenario should not occur. Because this scenario will occur @@ -374,42 +470,44 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos } } - private void cleanEntityOperationalData(Entity entity) { + private void cleanEntityOperationalData(final Entity entity) { //Do explicit cleanup rather than using OvsdbNodeRemoveCommand, because there // are chances that other controller instance went down abruptly and it does // not clear manager entry, which OvsdbNodeRemoveCommand look for before cleanup. - InstanceIdentifier nodeIid = (InstanceIdentifier) SouthboundUtil - .getInstanceIdentifierCodec().bindingDeserializer(entity.getId()); - - final ReadWriteTransaction transaction = db.newReadWriteTransaction(); - Optional ovsdbNodeOpt = SouthboundUtil.readNode(transaction,nodeIid); - if ( ovsdbNodeOpt.isPresent() ) { - Node ovsdbNode = ovsdbNodeOpt.get(); - OvsdbNodeAugmentation nodeAugmentation = ovsdbNode.getAugmentation(OvsdbNodeAugmentation.class); - if (nodeAugmentation != null) { - if (nodeAugmentation.getManagedNodeEntry() != null) { - for (ManagedNodeEntry managedNode : nodeAugmentation.getManagedNodeEntry()) { - transaction.delete( - LogicalDatastoreType.OPERATIONAL, managedNode.getBridgeRef().getValue()); + @SuppressWarnings("unchecked") + final InstanceIdentifier nodeIid = (InstanceIdentifier) entity.getIdentifier(); + + txInvoker.invoke(transaction -> { + Optional ovsdbNodeOpt = SouthboundUtil.readNode(transaction, nodeIid); + if (ovsdbNodeOpt.isPresent()) { + Node ovsdbNode = ovsdbNodeOpt.get(); + OvsdbNodeAugmentation nodeAugmentation = ovsdbNode.augmentation(OvsdbNodeAugmentation.class); + if (nodeAugmentation != null) { + if (nodeAugmentation.getManagedNodeEntry() != null) { + for (ManagedNodeEntry managedNode : nodeAugmentation.getManagedNodeEntry()) { + transaction.delete( + LogicalDatastoreType.OPERATIONAL, managedNode.getBridgeRef().getValue()); + } + } else { + LOG.debug("{} had no managed nodes", ovsdbNode.getNodeId().getValue()); } - } else { - LOG.debug("{} had no managed nodes", ovsdbNode.getNodeId().getValue()); } + transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeIid); } - SouthboundUtil.deleteNode(transaction, nodeIid); - } + }); + } - private OpenVSwitch getOpenVswitchTableEntry(OvsdbConnectionInstance connectionInstance) { + private OpenVSwitch getOpenVswitchTableEntry(final OvsdbConnectionInstance connectionInstance) { DatabaseSchema dbSchema = null; OpenVSwitch openVSwitchRow = null; try { - dbSchema = connectionInstance.getSchema(OvsdbSchemaContants.databaseName).get(); + dbSchema = connectionInstance.getSchema(OvsdbSchemaContants.DATABASE_NAME).get(); } catch (InterruptedException | ExecutionException e) { LOG.warn("Not able to fetch schema for database {} from device {}", - OvsdbSchemaContants.databaseName,connectionInstance.getConnectionInfo(),e); + OvsdbSchemaContants.DATABASE_NAME,connectionInstance.getConnectionInfo(),e); } if (dbSchema != null) { GenericTableSchema openVSwitchSchema = TyperUtils.getTableSchema(dbSchema, OpenVSwitch.class); @@ -424,7 +522,7 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos operations.add(op.comment("Fetching Open_VSwitch table rows")); try { List results = connectionInstance.transact(dbSchema, operations).get(); - if (results != null ) { + if (results != null) { OperationResult selectResult = results.get(0); openVSwitchRow = TyperUtils.getTypedRowWrapper( dbSchema,OpenVSwitch.class,selectResult.getRows().get(0)); @@ -437,33 +535,41 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos } return openVSwitchRow; } - private Entity getEntityFromConnectionInstance(@Nonnull OvsdbConnectionInstance ovsdbConnectionInstance) { + + private Entity getEntityFromConnectionInstance(@NonNull final OvsdbConnectionInstance ovsdbConnectionInstance) { InstanceIdentifier iid = ovsdbConnectionInstance.getInstanceIdentifier(); - if ( iid == null ) { + if (iid == null) { /* Switch initiated connection won't have iid, till it gets OpenVSwitch * table update but update callback is always registered after ownership * is granted. So we are explicitly fetch the row here to get the iid. */ OpenVSwitch openvswitchRow = getOpenVswitchTableEntry(ovsdbConnectionInstance); - iid = SouthboundMapper.getInstanceIdentifier(openvswitchRow); + iid = SouthboundMapper.getInstanceIdentifier(instanceIdentifierCodec, openvswitchRow); LOG.info("InstanceIdentifier {} generated for device " + "connection {}",iid,ovsdbConnectionInstance.getConnectionInfo()); ovsdbConnectionInstance.setInstanceIdentifier(iid); } - YangInstanceIdentifier entityId = SouthboundUtil.getInstanceIdentifierCodec().getYangInstanceIdentifier(iid); - Entity deviceEntity = new Entity(ENTITY_TYPE, entityId); + Entity deviceEntity = new Entity(ENTITY_TYPE, iid); LOG.debug("Entity {} created for device connection {}", deviceEntity, ovsdbConnectionInstance.getConnectionInfo()); return deviceEntity; } - private OvsdbConnectionInstance getConnectionInstanceFromEntity(Entity entity) { + private OvsdbConnectionInstance getConnectionInstanceFromEntity(final Entity entity) { return entityConnectionMap.get(entity); } - private void registerEntityForOwnership(OvsdbConnectionInstance ovsdbConnectionInstance) { + private void registerEntityForOwnership(final OvsdbConnectionInstance ovsdbConnectionInstance) { + putConnectionInstance(ovsdbConnectionInstance.getMDConnectionInfo(), ovsdbConnectionInstance); Entity candidateEntity = getEntityFromConnectionInstance(ovsdbConnectionInstance); + if (entityConnectionMap.containsKey(candidateEntity)) { + LOG.error("Old connection still hanging for {}", candidateEntity); + disconnected(ovsdbConnectionInstance.getOvsdbClient()); + //TODO do cleanup for old connection or stale check + } + nodeIdVsConnectionInstance.put((InstanceIdentifier) candidateEntity.getIdentifier(), + ovsdbConnectionInstance); entityConnectionMap.put(candidateEntity, ovsdbConnectionInstance); ovsdbConnectionInstance.setConnectedEntity(candidateEntity); try { @@ -472,44 +578,113 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos ovsdbConnectionInstance.setDeviceOwnershipCandidateRegistration(registration); LOG.info("OVSDB entity {} is registered for ownership.", candidateEntity); - //If entity already has owner, it won't get notification from EntityOwnershipService - //so cache the connection instances. - Optional ownershipStateOpt = - entityOwnershipService.getOwnershipState(candidateEntity); - if (ownershipStateOpt.isPresent()) { - EntityOwnershipState ownershipState = ownershipStateOpt.get(); - if (ownershipState.hasOwner() && !ownershipState.isOwner()) { - LOG.info("OVSDB entity {} is already owned by other southbound plugin " - + "instance, so *this* instance is NOT an OWNER of the device", - ovsdbConnectionInstance.getConnectionInfo()); - putConnectionInstance(ovsdbConnectionInstance.getMDConnectionInfo(),ovsdbConnectionInstance); - } - } } catch (CandidateAlreadyRegisteredException e) { LOG.warn("OVSDB entity {} was already registered for ownership", candidateEntity, e); } - + //If entity already has owner, it won't get notification from EntityOwnershipService + java.util.Optional ownershipStateOpt = + entityOwnershipService.getOwnershipState(candidateEntity); + if (ownershipStateOpt.isPresent()) { + EntityOwnershipState ownershipState = ownershipStateOpt.get(); + if (ownershipState == EntityOwnershipState.OWNED_BY_OTHER) { + ovsdbConnectionInstance.setHasDeviceOwnership(false); + } else if (ownershipState == EntityOwnershipState.IS_OWNER) { + ovsdbConnectionInstance.setHasDeviceOwnership(true); + ovsdbConnectionInstance.registerCallbacks(instanceIdentifierCodec); + } + } } - private void unregisterEntityForOwnership(OvsdbConnectionInstance ovsdbConnectionInstance) { + private void unregisterEntityForOwnership(final OvsdbConnectionInstance ovsdbConnectionInstance) { ovsdbConnectionInstance.closeDeviceOwnershipCandidateRegistration(); - entityConnectionMap.remove(ovsdbConnectionInstance.getConnectedEntity()); + entityConnectionMap.remove(ovsdbConnectionInstance.getConnectedEntity(), ovsdbConnectionInstance); + } + + private void retryConnection(final InstanceIdentifier iid, final OvsdbNodeAugmentation ovsdbNode, + final ConnectionReconciliationTriggers trigger) { + final ReconciliationTask task = new ConnectionReconciliationTask( + reconciliationManager, + this, + iid, + ovsdbNode); + + if (reconciliationManager.isEnqueued(task)) { + return; + } + switch (trigger) { + case ON_CONTROLLER_INITIATED_CONNECTION_FAILURE: + reconciliationManager.enqueueForRetry(task); + break; + case ON_DISCONNECT: { + CheckedFuture, ReadFailedException> readNodeFuture; + try (ReadOnlyTransaction tx = db.newReadOnlyTransaction()) { + readNodeFuture = tx.read(LogicalDatastoreType.CONFIGURATION, iid); + } + Futures.addCallback(readNodeFuture, new FutureCallback>() { + @Override + public void onSuccess(final Optional node) { + if (node.isPresent()) { + LOG.info("Disconnected/Failed connection {} was controller initiated, attempting " + + "reconnection", ovsdbNode.getConnectionInfo()); + reconciliationManager.enqueue(task); + + } else { + LOG.debug("Connection {} was switch initiated, no reconciliation is required", + iid.firstKeyOf(Node.class).getNodeId()); + } + } + + @Override + public void onFailure(final Throwable throwable) { + LOG.warn("Read Config/DS for Node failed! {}", iid, throwable); + } + }, MoreExecutors.directExecutor()); + break; + } + default: + break; + } + } + + private void reconcileBridgeConfigurations(final OvsdbConnectionInstance client) { + final InstanceIdentifier nodeIid = client.getInstanceIdentifier(); + final ReconciliationTask task = new BridgeConfigReconciliationTask( + reconciliationManager, OvsdbConnectionManager.this, nodeIid, client, instanceIdentifierCodec); + + reconciliationManager.enqueue(task); } - private class OvsdbDeviceEntityOwnershipListener implements EntityOwnershipListener { - private OvsdbConnectionManager cm; - private EntityOwnershipListenerRegistration listenerRegistration; + private static class OvsdbDeviceEntityOwnershipListener implements EntityOwnershipListener { + private final OvsdbConnectionManager cm; + private final EntityOwnershipListenerRegistration listenerRegistration; - OvsdbDeviceEntityOwnershipListener(OvsdbConnectionManager cm, EntityOwnershipService entityOwnershipService) { + OvsdbDeviceEntityOwnershipListener(final OvsdbConnectionManager cm, + final EntityOwnershipService entityOwnershipService) { this.cm = cm; listenerRegistration = entityOwnershipService.registerListener(ENTITY_TYPE, this); } + public void close() { listenerRegistration.close(); } + @Override - public void ownershipChanged(EntityOwnershipChange ownershipChange) { + public void ownershipChanged(final EntityOwnershipChange ownershipChange) { cm.handleOwnershipChanged(ownershipChange); } } + + private enum ConnectionReconciliationTriggers { + /* + Reconciliation trigger for scenario where controller's attempt + to connect to switch fails on config data store notification + */ + ON_CONTROLLER_INITIATED_CONNECTION_FAILURE, + + /* + Reconciliation trigger for the scenario where controller + initiated connection disconnects. + */ + ON_DISCONNECT + } }