X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=hwvtepsouthbound%2Fhwvtepsouthbound-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fovsdb%2Fhwvtepsouthbound%2FHwvtepConnectionManager.java;h=8a7514aa43864c8f5fcdb78b01f011b98e9c520c;hb=8268b2a995fad8fc9b64671f3e46c046d0c4ba99;hp=bac5a71b763b33f4f6fc890bd08aac89dd146c4c;hpb=11a849cd291f8e74725745becbb4d6d94192ee6b;p=ovsdb.git diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionManager.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionManager.java index bac5a71b7..8a7514aa4 100644 --- a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionManager.java +++ b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionManager.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Ericsson India Global Services Pvt Ltd. and others. All rights reserved. + * Copyright (c) 2015, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -10,6 +10,13 @@ package org.opendaylight.ovsdb.hwvtepsouthbound; import static org.opendaylight.ovsdb.lib.operations.Operations.op; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import java.net.ConnectException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -17,21 +24,28 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; - +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; - import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction; -import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException; -import org.opendaylight.controller.md.sal.common.api.clustering.Entity; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService; -import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.mdsal.eos.binding.api.Entity; +import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipCandidateRegistration; +import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipChange; +import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListener; +import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListenerRegistration; +import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService; +import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException; +import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState; +import org.opendaylight.ovsdb.hwvtepsouthbound.events.ClientConnected; +import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationManager; +import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationTask; +import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.configuration.HwvtepReconciliationTask; +import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.connection.ConnectionReconciliationTask; +import org.opendaylight.ovsdb.hwvtepsouthbound.transact.DependencyQueue; import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.HwvtepGlobalRemoveCommand; -import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.PhysicalSwitchRemoveCommand; import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionInvoker; import org.opendaylight.ovsdb.lib.OvsdbClient; import org.opendaylight.ovsdb.lib.OvsdbConnectionListener; @@ -43,31 +57,37 @@ import org.opendaylight.ovsdb.lib.schema.DatabaseSchema; import org.opendaylight.ovsdb.lib.schema.GenericTableSchema; import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils; import org.opendaylight.ovsdb.schema.hardwarevtep.Global; +import org.opendaylight.ovsdb.utils.mdsal.utils.TransactionHistory; +import org.opendaylight.ovsdb.utils.mdsal.utils.TransactionType; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepPhysicalSwitchAttributes; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.PhysicalSwitchAugmentation; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.hwvtep.global.attributes.ConnectionInfo; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; - -public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoCloseable{ - private Map clients = - new ConcurrentHashMap(); +public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoCloseable { + private final Map clients = new ConcurrentHashMap<>(); private static final Logger LOG = LoggerFactory.getLogger(HwvtepConnectionManager.class); private static final String ENTITY_TYPE = "hwvtep"; - - private DataBroker db; - private TransactionInvoker txInvoker; - private Map> instanceIdentifiers = - new ConcurrentHashMap>(); - private Map entityConnectionMap = - new ConcurrentHashMap<>(); - private EntityOwnershipService entityOwnershipService; - private HwvtepDeviceEntityOwnershipListener hwvtepDeviceEntityOwnershipListener; + private static final int DB_FETCH_TIMEOUT = 1000; + private static final int TRANSACTION_HISTORY_CAPACITY = 10000; + private static final int TRANSACTION_HISTORY_WATERMARK = 7500; + + private final DataBroker db; + private final TransactionInvoker txInvoker; + private final Map> instanceIdentifiers = new ConcurrentHashMap<>(); + private final Map entityConnectionMap = new ConcurrentHashMap<>(); + private final EntityOwnershipService entityOwnershipService; + private final HwvtepDeviceEntityOwnershipListener hwvtepDeviceEntityOwnershipListener; + private final ReconciliationManager reconciliationManager; + private final Map, HwvtepConnectionInstance> nodeIidVsConnectionInstance = + new ConcurrentHashMap<>(); + private final HwvtepOperGlobalListener hwvtepOperGlobalListener; + private final Map, TransactionHistory> controllerTxHistory = new ConcurrentHashMap<>(); + private final Map, TransactionHistory> deviceUpdateHistory = new ConcurrentHashMap<>(); public HwvtepConnectionManager(DataBroker db, TransactionInvoker txInvoker, EntityOwnershipService entityOwnershipService) { @@ -75,6 +95,8 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo this.txInvoker = txInvoker; this.entityOwnershipService = entityOwnershipService; this.hwvtepDeviceEntityOwnershipListener = new HwvtepDeviceEntityOwnershipListener(this,entityOwnershipService); + this.reconciliationManager = new ReconciliationManager(db); + this.hwvtepOperGlobalListener = new HwvtepOperGlobalListener(db, this); } @Override @@ -83,43 +105,81 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo hwvtepDeviceEntityOwnershipListener.close(); } - for (OvsdbClient client: clients.values()) { + for (HwvtepConnectionInstance client: clients.values()) { client.disconnect(); } + DependencyQueue.close(); } @Override - public void connected(@Nonnull final OvsdbClient client) { - HwvtepConnectionInstance hwClient = connectedButCallBacksNotRegistered(client); - registerEntityForOwnership(hwClient); - LOG.trace("connected client: {}", client); + public void connected(@Nonnull final OvsdbClient externalClient) { + LOG.info("Library connected {} from {}:{} to {}:{}", + externalClient.getConnectionInfo().getType(), + externalClient.getConnectionInfo().getRemoteAddress(), + externalClient.getConnectionInfo().getRemotePort(), + externalClient.getConnectionInfo().getLocalAddress(), + externalClient.getConnectionInfo().getLocalPort()); + try { + List databases = externalClient.getDatabases().get(DB_FETCH_TIMEOUT, TimeUnit.MILLISECONDS); + if (databases.contains(HwvtepSchemaConstants.HARDWARE_VTEP)) { + HwvtepConnectionInstance hwClient = connectedButCallBacksNotRegistered(externalClient); + registerEntityForOwnership(hwClient); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.warn("Unable to fetch Database list from device {}. Disconnecting from the device.", + externalClient.getConnectionInfo().getRemoteAddress(), e); + externalClient.disconnect(); + } } @Override public void disconnected(OvsdbClient client) { - LOG.info("HWVTEP Disconnected from {}:{}. Cleaning up the operational data store" - ,client.getConnectionInfo().getRemoteAddress(), - client.getConnectionInfo().getRemotePort()); + LOG.info("Library disconnected {} from {}:{} to {}:{}. Cleaning up the operational data store", + client.getConnectionInfo().getType(), + client.getConnectionInfo().getRemoteAddress(), + client.getConnectionInfo().getRemotePort(), + client.getConnectionInfo().getLocalAddress(), + client.getConnectionInfo().getLocalPort()); ConnectionInfo key = HwvtepSouthboundMapper.createConnectionInfo(client); HwvtepConnectionInstance hwvtepConnectionInstance = getConnectionInstance(key); if (hwvtepConnectionInstance != null) { - //TODO: remove all the hwvtep nodes - txInvoker.invoke(new HwvtepGlobalRemoveCommand(hwvtepConnectionInstance, null, null)); + deviceUpdateHistory.get(hwvtepConnectionInstance.getInstanceIdentifier()).addToHistory( + TransactionType.DELETE, new ClientConnected(client.getConnectionInfo().getRemotePort())); + + // Unregister Entity ownership as soon as possible ,so this instance should + // not be used as a candidate in Entity election (given that this instance is + // about to disconnect as well), if current owner get disconnected from + // HWVTEP device. + if (hwvtepConnectionInstance.getHasDeviceOwnership()) { + unregisterEntityForOwnership(hwvtepConnectionInstance); + txInvoker.invoke(new HwvtepGlobalRemoveCommand(hwvtepConnectionInstance, null, null)); + } else { + unregisterEntityForOwnership(hwvtepConnectionInstance); + //Do not delete if client disconnected from follower HwvtepGlobalRemoveCommand + } + removeConnectionInstance(key); - // Unregister Cluster Ownership for ConnectionInfo - unregisterEntityForOwnership(hwvtepConnectionInstance); + //Controller initiated connection can be terminated from switch side. + //So cleanup the instance identifier cache. + removeInstanceIdentifier(key); + removeConnectionInstance(hwvtepConnectionInstance.getInstanceIdentifier()); + retryConnection(hwvtepConnectionInstance.getInstanceIdentifier(), + hwvtepConnectionInstance.getHwvtepGlobalAugmentation(), + ConnectionReconciliationTriggers.ON_DISCONNECT); } else { LOG.warn("HWVTEP disconnected event did not find connection instance for {}", key); } - LOG.trace("disconnected client: {}", client); + LOG.trace("HwvtepConnectionManager exit disconnected client: {}", client); } - public OvsdbClient connect(InstanceIdentifier iid, HwvtepGlobalAugmentation hwvtepGlobal) throws UnknownHostException { + public OvsdbClient connect(InstanceIdentifier iid, + HwvtepGlobalAugmentation hwvtepGlobal) throws UnknownHostException, ConnectException { + LOG.info("Connecting to {}", HwvtepSouthboundUtil.connectionInfoToString(hwvtepGlobal.getConnectionInfo())); InetAddress ip = HwvtepSouthboundMapper.createInetAddress(hwvtepGlobal.getConnectionInfo().getRemoteIp()); OvsdbClient client = OvsdbConnectionService.getService() .connect(ip, hwvtepGlobal.getConnectionInfo().getRemotePort().getValue()); - if(client != null) { + if (client != null) { putInstanceIdentifier(hwvtepGlobal.getConnectionInfo(), iid.firstIdentifierOf(Node.class)); HwvtepConnectionInstance hwvtepConnectionInstance = connectedButCallBacksNotRegistered(client); hwvtepConnectionInstance.setHwvtepGlobalAugmentation(hwvtepGlobal); @@ -132,7 +192,9 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo } return client; } + public void disconnect(HwvtepGlobalAugmentation ovsdbNode) throws UnknownHostException { + LOG.info("Diconnecting from {}", HwvtepSouthboundUtil.connectionInfoToString(ovsdbNode.getConnectionInfo())); HwvtepConnectionInstance client = getConnectionInstance(ovsdbNode.getConnectionInfo()); if (client != null) { client.disconnect(); @@ -165,8 +227,8 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo removeConnectionInstance(key); } - hwvtepConnectionInstance = new HwvtepConnectionInstance(key, externalClient, getInstanceIdentifier(key), - txInvoker); + hwvtepConnectionInstance = new HwvtepConnectionInstance(this, key, externalClient, getInstanceIdentifier(key), + txInvoker, db); hwvtepConnectionInstance.createTransactInvokers(); return hwvtepConnectionInstance; } @@ -177,28 +239,88 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo LOG.info("Clients after put: {}", clients); } - public HwvtepConnectionInstance getConnectionInstance(ConnectionInfo key) { + void putConnectionInstance(final InstanceIdentifier nodeIid, + final HwvtepConnectionInstance connectionInstance) { + nodeIidVsConnectionInstance.put(nodeIid, connectionInstance); + } + + public HwvtepConnectionInstance getConnectionInstanceFromNodeIid(final InstanceIdentifier nodeIid) { + HwvtepConnectionInstance hwvtepConnectionInstance = nodeIidVsConnectionInstance.get(nodeIid); + if (hwvtepConnectionInstance != null) { + return hwvtepConnectionInstance; + } + InstanceIdentifier globalNodeIid = HwvtepSouthboundUtil.getGlobalNodeIid(nodeIid); + if (globalNodeIid != null) { + return nodeIidVsConnectionInstance.get(globalNodeIid); + } + return null; + } + + public HwvtepConnectionInstance getConnectionInstance(final ConnectionInfo key) { + if (key == null) { + return null; + } ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key); return clients.get(connectionInfo); } + public HwvtepConnectionInstance getConnectionInstance(Node node) { Preconditions.checkNotNull(node); - HwvtepGlobalAugmentation hwvtepGlobal = node.getAugmentation(HwvtepGlobalAugmentation.class); + HwvtepGlobalAugmentation hwvtepGlobal = node.augmentation(HwvtepGlobalAugmentation.class); + PhysicalSwitchAugmentation switchNode = node.augmentation(PhysicalSwitchAugmentation.class); if (hwvtepGlobal != null) { - return getConnectionInstance(hwvtepGlobal.getConnectionInfo()); - } //TODO: We could get it from Managers also. - else { + if (hwvtepGlobal.getConnectionInfo() != null) { + return getConnectionInstance(hwvtepGlobal.getConnectionInfo()); + } else { + //TODO: Case of user configured connection for now + //TODO: We could get it from Managers also. + return null; + } + } + else if (switchNode != null) { + return getConnectionInstance(switchNode); + } else { LOG.warn("This is not a node that gives any hint how to find its OVSDB Manager: {}",node); return null; } } + public HwvtepConnectionInstance getConnectionInstance(HwvtepPhysicalSwitchAttributes node) { + Optional optional = HwvtepSouthboundUtil.getManagingNode(db, node); + if (optional.isPresent()) { + return getConnectionInstance(optional.get().getConnectionInfo()); + } else { + return null; + } + } + + public void stopConfigurationReconciliation(final InstanceIdentifier nodeIid) { + final ReconciliationTask task = new HwvtepReconciliationTask( + reconciliationManager, HwvtepConnectionManager.this, nodeIid, null, null, db); + + reconciliationManager.dequeue(task); + } + + public void reconcileConfigurations(final HwvtepConnectionInstance client, Node psNode) { + final InstanceIdentifier nodeIid = client.getInstanceIdentifier(); + final ReconciliationTask task = new HwvtepReconciliationTask( + reconciliationManager, HwvtepConnectionManager.this, nodeIid, psNode, client, db); + + reconciliationManager.enqueue(task); + } + private void removeConnectionInstance(ConnectionInfo key) { ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key); clients.remove(connectionInfo); LOG.info("Clients after remove: {}", clients); } + private void removeConnectionInstance(final InstanceIdentifier nodeIid) { + if (nodeIid != null) { + nodeIidVsConnectionInstance.remove(nodeIid); + } + } + private void putInstanceIdentifier(ConnectionInfo key,InstanceIdentifier iid) { ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key); instanceIdentifiers.put(connectionInfo, iid); @@ -206,8 +328,7 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo public InstanceIdentifier getInstanceIdentifier(ConnectionInfo key) { ConnectionInfo connectionInfo = HwvtepSouthboundMapper.suppressLocalIpPort(key); - InstanceIdentifier iid = instanceIdentifiers.get(connectionInfo); - return iid; + return instanceIdentifiers.get(connectionInfo); } private void removeInstanceIdentifier(ConnectionInfo key) { @@ -216,12 +337,16 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo } public OvsdbClient getClient(ConnectionInfo connectionInfo) { - return getConnectionInstance(connectionInfo); + return getConnectionInstance(connectionInfo).getOvsdbClient(); } private void registerEntityForOwnership(HwvtepConnectionInstance hwvtepConnectionInstance) { Entity candidateEntity = getEntityFromConnectionInstance(hwvtepConnectionInstance); + if (entityConnectionMap.get(candidateEntity) != null) { + disconnected(entityConnectionMap.get(candidateEntity).getOvsdbClient()); + putConnectionInstance(hwvtepConnectionInstance.getInstanceIdentifier(), hwvtepConnectionInstance); + } entityConnectionMap.put(candidateEntity, hwvtepConnectionInstance); hwvtepConnectionInstance.setConnectedEntity(candidateEntity); @@ -233,23 +358,39 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo //If entity already has owner, it won't get notification from EntityOwnershipService //so cache the connection instances. - Optional ownershipStateOpt = - entityOwnershipService.getOwnershipState(candidateEntity); - if (ownershipStateOpt.isPresent()) { - EntityOwnershipState ownershipState = ownershipStateOpt.get(); - if (ownershipState.hasOwner() && !ownershipState.isOwner()) { - if (getConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo()) != null) { - LOG.info("OVSDB entity {} is already owned by other southbound plugin " - + "instance, so *this* instance is NOT an OWNER of the device", - hwvtepConnectionInstance.getConnectionInfo()); - putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(),hwvtepConnectionInstance); - } + handleOwnershipState(candidateEntity, hwvtepConnectionInstance); + } catch (CandidateAlreadyRegisteredException e) { + LOG.warn("OVSDB entity {} was already registered for ownership", candidateEntity, e); + } + + } + + private void handleOwnershipState(Entity candidateEntity, HwvtepConnectionInstance hwvtepConnectionInstance) { + //If entity already has owner, it won't get notification from EntityOwnershipService + //so cache the connection instances. + Optional ownershipStateOpt = + entityOwnershipService.getOwnershipState(candidateEntity); + if (ownershipStateOpt.isPresent()) { + EntityOwnershipState ownershipState = ownershipStateOpt.get(); + putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance); + if (ownershipState != EntityOwnershipState.NO_OWNER) { + hwvtepConnectionInstance.setHasDeviceOwnership(ownershipState == EntityOwnershipState.IS_OWNER); + if (ownershipState != EntityOwnershipState.IS_OWNER) { + LOG.info("HWVTEP entity {} is already owned by other southbound plugin " + + "instance, so *this* instance is NOT an OWNER of the device", + hwvtepConnectionInstance.getConnectionInfo()); + } else { + afterTakingOwnership(hwvtepConnectionInstance); } } - } catch (CandidateAlreadyRegisteredException e) { - LOG.warn("OVSDB entity {} was already registered for {} ownership", candidateEntity, e); } + } + private void afterTakingOwnership(HwvtepConnectionInstance hwvtepConnectionInstance) { + txInvoker.invoke(new HwvtepGlobalRemoveCommand(hwvtepConnectionInstance, null, null)); + putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance); + hwvtepConnectionInstance.setHasDeviceOwnership(true); + hwvtepConnectionInstance.registerCallbacks(); } private Global getHwvtepGlobalTableEntry(HwvtepConnectionInstance connectionInstance) { @@ -257,28 +398,27 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo Global globalRow = null; try { - dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.databaseName).get(); + dbSchema = connectionInstance.getSchema(HwvtepSchemaConstants.HARDWARE_VTEP).get(); } catch (InterruptedException | ExecutionException e) { LOG.warn("Not able to fetch schema for database {} from device {}", - HwvtepSchemaConstants.databaseName,connectionInstance.getConnectionInfo(),e); + HwvtepSchemaConstants.HARDWARE_VTEP,connectionInstance.getConnectionInfo(),e); } if (dbSchema != null) { GenericTableSchema hwvtepSchema = TyperUtils.getTableSchema(dbSchema, Global.class); - List hwvtepTableColumn = new ArrayList(); + List hwvtepTableColumn = new ArrayList<>(); hwvtepTableColumn.addAll(hwvtepSchema.getColumns()); Select selectOperation = op.select(hwvtepSchema); - selectOperation.setColumns(hwvtepTableColumn);; + selectOperation.setColumns(hwvtepTableColumn); - ArrayList operations = new ArrayList(); + ArrayList operations = new ArrayList<>(); operations.add(selectOperation); operations.add(op.comment("Fetching hardware_vtep table rows")); - List results = null; try { - results = connectionInstance.transact(dbSchema, operations).get(); - if (results != null ) { + List results = connectionInstance.transact(dbSchema, operations).get(); + if (results != null) { OperationResult selectResult = results.get(0); globalRow = TyperUtils.getTypedRowWrapper( dbSchema,Global.class,selectResult.getRows().get(0)); @@ -293,9 +433,8 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo } private Entity getEntityFromConnectionInstance(@Nonnull HwvtepConnectionInstance hwvtepConnectionInstance) { - YangInstanceIdentifier entityId = null; InstanceIdentifier iid = hwvtepConnectionInstance.getInstanceIdentifier(); - if ( iid == null ) { + if (iid == null) { //TODO: Is Global the right one? Global hwvtepGlobalRow = getHwvtepGlobalTableEntry(hwvtepConnectionInstance); iid = HwvtepSouthboundMapper.getInstanceIdentifier(hwvtepGlobalRow); @@ -303,27 +442,105 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo hwvtepConnectionInstance.setInstanceIdentifier(iid); LOG.info("InstanceIdentifier {} generated for device " + "connection {}",iid, hwvtepConnectionInstance.getConnectionInfo()); - + controllerTxHistory.putIfAbsent(iid, + new TransactionHistory(TRANSACTION_HISTORY_CAPACITY, TRANSACTION_HISTORY_WATERMARK)); + deviceUpdateHistory.putIfAbsent(iid, + new TransactionHistory(TRANSACTION_HISTORY_CAPACITY, TRANSACTION_HISTORY_WATERMARK)); + TransactionHistory controllerLog = controllerTxHistory.get(iid); + TransactionHistory deviceLog = deviceUpdateHistory.get(iid); + int port = hwvtepConnectionInstance.getOvsdbClient().getConnectionInfo().getRemotePort(); + deviceLog.addToHistory(TransactionType.ADD, new ClientConnected(port)); + hwvtepConnectionInstance.setControllerTxHistory(controllerLog); + hwvtepConnectionInstance.setDeviceUpdateHistory(deviceLog); } - entityId = HwvtepSouthboundUtil.getInstanceIdentifierCodec().getYangInstanceIdentifier(iid); - Entity deviceEntity = new Entity(ENTITY_TYPE, entityId); + Entity deviceEntity = new Entity(ENTITY_TYPE, iid); LOG.debug("Entity {} created for device connection {}", deviceEntity, hwvtepConnectionInstance.getConnectionInfo()); return deviceEntity; } + private void unregisterEntityForOwnership(HwvtepConnectionInstance hwvtepConnectionInstance) { hwvtepConnectionInstance.closeDeviceOwnershipCandidateRegistration(); entityConnectionMap.remove(hwvtepConnectionInstance.getConnectedEntity()); } + public void reconcileConnection(InstanceIdentifier iid, HwvtepGlobalAugmentation hwvtepNode) { + this.retryConnection(iid, hwvtepNode, + ConnectionReconciliationTriggers.ON_CONTROLLER_INITIATED_CONNECTION_FAILURE); + } + + public void stopConnectionReconciliationIfActive(InstanceIdentifier iid, HwvtepGlobalAugmentation hwvtepNode) { + final ReconciliationTask task = new ConnectionReconciliationTask( + reconciliationManager, + this, + iid, + hwvtepNode); + reconciliationManager.dequeue(task); + } + + private void retryConnection(final InstanceIdentifier iid, final HwvtepGlobalAugmentation hwvtepNode, + ConnectionReconciliationTriggers trigger) { + final ReconciliationTask task = new ConnectionReconciliationTask( + reconciliationManager, + this, + iid, + hwvtepNode); + + if (reconciliationManager.isEnqueued(task)) { + return; + } + + switch (trigger) { + case ON_CONTROLLER_INITIATED_CONNECTION_FAILURE: + reconciliationManager.enqueueForRetry(task); + break; + case ON_DISCONNECT: + { + ReadOnlyTransaction tx = db.newReadOnlyTransaction(); + CheckedFuture, ReadFailedException> readNodeFuture = + tx.read(LogicalDatastoreType.CONFIGURATION, iid); + + Futures.addCallback(readNodeFuture, new FutureCallback>() { + @Override + public void onSuccess(@Nonnull Optional node) { + if (node.isPresent()) { + HwvtepGlobalAugmentation augmentation = node.get() + .augmentation(HwvtepGlobalAugmentation.class); + if (augmentation == null || augmentation.getConnectionInfo() == null) { + return; + } + LOG.info( + "Disconnected/Failed connection {} was controller initiated, attempting reconnection", + hwvtepNode.getConnectionInfo()); + reconciliationManager.enqueue(task); + + } else { + LOG.debug("Connection {} was switch initiated, no reconciliation is required", + iid.firstKeyOf(Node.class).getNodeId()); + } + } + + @Override + public void onFailure(Throwable ex) { + LOG.warn("Read Config/DS for Node failed! {}", iid, ex); + } + }, MoreExecutors.directExecutor()); + break; + } + default: + break; + } + } + public void handleOwnershipChanged(EntityOwnershipChange ownershipChange) { - HwvtepConnectionInstance hwvtepConnectionInstance = getConnectionInstanceFromEntity(ownershipChange.getEntity()); + HwvtepConnectionInstance hwvtepConnectionInstance = + getConnectionInstanceFromEntity(ownershipChange.getEntity()); LOG.info("handleOwnershipChanged: {} event received for device {}", ownershipChange, hwvtepConnectionInstance != null ? hwvtepConnectionInstance.getConnectionInfo() : "THAT'S NOT REGISTERED BY THIS SOUTHBOUND PLUGIN INSTANCE"); if (hwvtepConnectionInstance == null) { - if (ownershipChange.isOwner()) { + if (ownershipChange.getState().isOwner()) { LOG.warn("handleOwnershipChanged: found no connection instance for {}", ownershipChange.getEntity()); } else { // EntityOwnershipService sends notification to all the nodes, irrespective of whether @@ -336,37 +553,31 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo // If entity has no owner, clean up the operational data store (it's possible because owner controller // might went down abruptly and didn't get a chance to clean up the operational data store. - if (!ownershipChange.hasOwner()) { + if (!ownershipChange.getState().hasOwner()) { LOG.debug("{} has no owner, cleaning up the operational data store", ownershipChange.getEntity()); - // Below code might look weird but it's required. We want to give first opportunity to the - // previous owner of the device to clean up the operational data store if there is no owner now. - // That way we will avoid lot of nasty md-sal exceptions because of concurrent delete. - if (ownershipChange.wasOwner()) { - cleanEntityOperationalData(ownershipChange.getEntity()); - } // If first cleanEntityOperationalData() was called, this call will be no-op. cleanEntityOperationalData(ownershipChange.getEntity()); } return; } //Connection detail need to be cached, irrespective of ownership result. - putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(),hwvtepConnectionInstance); + putConnectionInstance(hwvtepConnectionInstance.getMDConnectionInfo(), hwvtepConnectionInstance); - if (ownershipChange.isOwner() == hwvtepConnectionInstance.getHasDeviceOwnership()) { + if (ownershipChange.getState().isOwner() == hwvtepConnectionInstance.getHasDeviceOwnership()) { LOG.debug("handleOwnershipChanged: no change in ownership for {}. Ownership status is : {}", hwvtepConnectionInstance.getConnectionInfo(), hwvtepConnectionInstance.getHasDeviceOwnership()); return; } - hwvtepConnectionInstance.setHasDeviceOwnership(ownershipChange.isOwner()); + hwvtepConnectionInstance.setHasDeviceOwnership(ownershipChange.getState().isOwner()); // You were not an owner, but now you are - if (ownershipChange.isOwner()) { + if (ownershipChange.getState().isOwner()) { LOG.info("handleOwnershipChanged: *this* southbound plugin instance is owner of device {}", hwvtepConnectionInstance.getConnectionInfo()); //*this* instance of southbound plugin is owner of the device, //so register for monitor callbacks - hwvtepConnectionInstance.registerCallbacks(); + afterTakingOwnership(hwvtepConnectionInstance); } else { //You were owner of the device, but now you are not. With the current ownership @@ -382,35 +593,54 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo } private void cleanEntityOperationalData(Entity entity) { - InstanceIdentifier nodeIid = (InstanceIdentifier) HwvtepSouthboundUtil - .getInstanceIdentifierCodec().bindingDeserializer(entity.getId()); - - final ReadWriteTransaction transaction = db.newReadWriteTransaction(); - Optional node = HwvtepSouthboundUtil.readNode(transaction, nodeIid); - if (node.isPresent()) { - HwvtepSouthboundUtil.deleteNode(transaction, nodeIid); - } + @SuppressWarnings("unchecked") + final InstanceIdentifier nodeIid = (InstanceIdentifier) entity.getIdentifier(); + txInvoker.invoke(new HwvtepGlobalRemoveCommand(nodeIid)); } private HwvtepConnectionInstance getConnectionInstanceFromEntity(Entity entity) { return entityConnectionMap.get(entity); } - private class HwvtepDeviceEntityOwnershipListener implements EntityOwnershipListener { - private HwvtepConnectionManager hcm; - private EntityOwnershipListenerRegistration listenerRegistration; + public Map, TransactionHistory> getControllerTxHistory() { + return controllerTxHistory; + } + + public Map, TransactionHistory> getDeviceUpdateHistory() { + return deviceUpdateHistory; + } + + private static class HwvtepDeviceEntityOwnershipListener implements EntityOwnershipListener { + private final HwvtepConnectionManager hcm; + private final EntityOwnershipListenerRegistration listenerRegistration; - HwvtepDeviceEntityOwnershipListener(HwvtepConnectionManager hcm, EntityOwnershipService entityOwnershipService) { + HwvtepDeviceEntityOwnershipListener(HwvtepConnectionManager hcm, + EntityOwnershipService entityOwnershipService) { this.hcm = hcm; listenerRegistration = entityOwnershipService.registerListener(ENTITY_TYPE, this); } + public void close() { listenerRegistration.close(); } + @Override public void ownershipChanged(EntityOwnershipChange ownershipChange) { hcm.handleOwnershipChanged(ownershipChange); } } + private enum ConnectionReconciliationTriggers { + /* + Reconciliation trigger for scenario where controller's attempt + to connect to switch fails on config data store notification + */ + ON_CONTROLLER_INITIATED_CONNECTION_FAILURE, + + /* + Reconciliation trigger for the scenario where controller + initiated connection disconnects. + */ + ON_DISCONNECT + } }