this.initialCreatedData = hwvtepGlobalData;
}
+ public HwvtepGlobalAugmentation getHwvtepGlobalAugmentation() {
+ return this.initialCreatedData;
+ }
public HwvtepDeviceInfo getDeviceInfo() {
return this.deviceInfo;
}
import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationManager;
+import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationTask;
+import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.connection.ConnectionReconciliationTask;
import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.HwvtepGlobalRemoveCommand;
import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionCommand;
import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionInvoker;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoCloseable{
private Map<ConnectionInfo, HwvtepConnectionInstance> clients = new ConcurrentHashMap<>();
private Map<Entity, HwvtepConnectionInstance> entityConnectionMap = new ConcurrentHashMap<>();
private EntityOwnershipService entityOwnershipService;
private HwvtepDeviceEntityOwnershipListener hwvtepDeviceEntityOwnershipListener;
+ private final ReconciliationManager reconciliationManager;
public HwvtepConnectionManager(DataBroker db, TransactionInvoker txInvoker,
EntityOwnershipService entityOwnershipService) {
this.txInvoker = txInvoker;
this.entityOwnershipService = entityOwnershipService;
this.hwvtepDeviceEntityOwnershipListener = new HwvtepDeviceEntityOwnershipListener(this,entityOwnershipService);
+ this.reconciliationManager = new ReconciliationManager(db);
}
@Override
//Controller initiated connection can be terminated from switch side.
//So cleanup the instance identifier cache.
removeInstanceIdentifier(key);
+ retryConnection(hwvtepConnectionInstance.getInstanceIdentifier(),
+ hwvtepConnectionInstance.getHwvtepGlobalAugmentation(),
+ ConnectionReconciliationTriggers.ON_DISCONNECT);
} else {
LOG.warn("HWVTEP disconnected event did not find connection instance for {}", key);
}
entityConnectionMap.remove(hwvtepConnectionInstance.getConnectedEntity());
}
+ public void reconcileConnection(InstanceIdentifier<Node> iid, HwvtepGlobalAugmentation hwvtepNode) {
+ this.retryConnection(iid, hwvtepNode,
+ ConnectionReconciliationTriggers.ON_CONTROLLER_INITIATED_CONNECTION_FAILURE);
+ }
+
+ public void stopConnectionReconciliationIfActive(InstanceIdentifier<?> iid, HwvtepGlobalAugmentation hwvtepNode) {
+ final ReconciliationTask task = new ConnectionReconciliationTask(
+ reconciliationManager,
+ this,
+ iid,
+ hwvtepNode);
+ reconciliationManager.dequeue(task);
+ }
+
+ private void retryConnection(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepNode,
+ ConnectionReconciliationTriggers trigger) {
+ final ReconciliationTask task = new ConnectionReconciliationTask(
+ reconciliationManager,
+ this,
+ iid,
+ hwvtepNode);
+
+ if(reconciliationManager.isEnqueued(task)){
+ return;
+ }
+ switch(trigger){
+ case ON_CONTROLLER_INITIATED_CONNECTION_FAILURE:
+ reconciliationManager.enqueueForRetry(task);
+ break;
+ case ON_DISCONNECT:
+ {
+ ReadOnlyTransaction tx = db.newReadOnlyTransaction();
+ CheckedFuture<Optional<Node>, ReadFailedException> readNodeFuture =
+ tx.read(LogicalDatastoreType.CONFIGURATION, iid);
+
+ final HwvtepConnectionManager connectionManager = this;
+ Futures.addCallback(readNodeFuture, new FutureCallback<Optional<Node>>() {
+ @Override
+ public void onSuccess(@Nullable Optional<Node> node) {
+ if (node.isPresent()) {
+ LOG.info("Disconnected/Failed connection {} was controller initiated, attempting " +
+ "reconnection", hwvtepNode.getConnectionInfo());
+ reconciliationManager.enqueue(task);
+
+ } else {
+ LOG.debug("Connection {} was switch initiated, no reconciliation is required"
+ , iid.firstKeyOf(Node.class).getNodeId());
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.warn("Read Config/DS for Node failed! {}", iid, t);
+ }
+ });
+ break;
+ }
+ default:
+ break;
+ }
+ }
+
public void handleOwnershipChanged(EntityOwnershipChange ownershipChange) {
HwvtepConnectionInstance hwvtepConnectionInstance = getConnectionInstanceFromEntity(ownershipChange.getEntity());
LOG.info("handleOwnershipChanged: {} event received for device {}",
hcm.handleOwnershipChanged(ownershipChange);
}
}
+
+ private enum ConnectionReconciliationTriggers {
+ /*
+ Reconciliation trigger for scenario where controller's attempt
+ to connect to switch fails on config data store notification
+ */
+ ON_CONTROLLER_INITIATED_CONNECTION_FAILURE,
+
+ /*
+ Reconciliation trigger for the scenario where controller
+ initiated connection disconnects.
+ */
+ ON_DISCONNECT
+ }
}
+ "to same device, hence dropping the request {}", connection, hwvtepGlobal);
} else {
try {
- hcm.connect(HwvtepSouthboundMapper.createInstanceIdentifier(node.getNodeId()), hwvtepGlobal);
+ OvsdbClient client = hcm.connect(key, hwvtepGlobal);
} catch (UnknownHostException e) {
- LOG.warn("Failed to connect to OVSDB node", e);
+ LOG.warn("Failed to connect to HWVTEP node", e);
}
}
}
if (client == null) {
try {
hcm.disconnect(hgOriginal);
- hcm.connect(HwvtepSouthboundMapper.createInstanceIdentifier(original.getNodeId()), hgUpdated);
+ hcm.stopConnectionReconciliationIfActive(key, hgOriginal);
+ OvsdbClient newClient = hcm.connect(key, hgUpdated);
+ if (newClient == null) {
+ hcm.reconcileConnection(key, hgUpdated);
+ }
} catch (UnknownHostException e) {
- LOG.warn("Failed to update connection on OVSDB Node", e);
+ LOG.warn("Failed to update connection on HWVTEP Node", e);
}
}
}
}
private void updateData(Collection<DataTreeModification<Node>> changes) {
- /* TODO:
+ /* TODO:
* Get connection instances for each change
* Update data for each connection
* Requires Command patterns. TBD.
if (hgDeleted != null) {
try {
hcm.disconnect(hgDeleted);
+ hcm.stopConnectionReconciliationIfActive(key, hgDeleted);
} catch (UnknownHostException e) {
- LOG.warn("Failed to disconnect OVSDB Node", e);
+ LOG.warn("Failed to disconnect HWVTEP Node", e);
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.*;
+
+/**
+ * Copied from org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationManager
+ *
+ * This class provides the implementation of ovsdb southbound plugins
+ * configuration reconciliation engine. This engine provide interfaces
+ * to enqueue (one time retry)/ enqueueForRetry(periodic retry)/ dequeue
+ * (remove from retry queue) reconciliation task. Reconciliation task can
+ * be a connection reconciliation or configuration reconciliation of any
+ * ovsdb managed resource like bridge, termination point etc. This engine
+ * execute all the reconciliation task through a fixed size thread pool.
+ * If submitted task need to be retry after a periodic interval they are
+ * submitted to a single thread executor to periodically wake up and check
+ * if task is ready for execution.
+ * Ideally, addition of any type of reconciliation task should not require
+ * any change in this reconciliation manager execution engine.
+ *
+ * 3-Node Cluster:
+ * Reconciliation manager is agnostic of whether it's running in single
+ * node cluster or 3-node cluster. It's a responsibility of the task
+ * submitter to make sure that it submit the task for reconciliation only
+ * if it's an owner of that device EXCEPT controller initiated Connection.
+ * Reconciliation of controller initiated connection should be done by all
+ * the 3-nodes in the cluster, because connection to individual controller
+ * can be interrupted for various reason.
+ *
+ * Created by Anil Vishnoi (avishnoi@Brocade.com) on 3/9/16.
+ */
+public class ReconciliationManager implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(ReconciliationManager.class);
+
+ private static final int NO_OF_RECONCILER = 10;
+ private static final int RECON_TASK_QUEUE_SIZE = 5000;
+
+ private final DataBroker db;
+ private final ExecutorService reconcilers;
+ private final ScheduledExecutorService taskTriager;
+
+ private final ReconciliationTaskManager reconTaskManager = new ReconciliationTaskManager();
+
+ public ReconciliationManager(final DataBroker db) {
+ this.db = db;
+ reconcilers = SpecialExecutors.newBoundedCachedThreadPool(NO_OF_RECONCILER, RECON_TASK_QUEUE_SIZE, "ovsdb-reconciler");
+
+ ThreadFactory threadFact = new ThreadFactoryBuilder()
+ .setNameFormat("ovsdb-recon-task-triager-%d").build();
+ taskTriager = Executors.newSingleThreadScheduledExecutor(threadFact);
+ }
+
+ public boolean isEnqueued(final ReconciliationTask task) {
+ return reconTaskManager.isTaskQueued(task);
+ }
+
+ public void enqueue(final ReconciliationTask task) {
+ LOG.trace("Reconciliation task submitted for execution {}",task);
+ reconTaskManager.cacheTask(task, reconcilers.submit(task));
+ }
+
+ public void enqueueForRetry(final ReconciliationTask task) {
+ LOG.trace("Reconciliation task re-queued for re-execution {}",task);
+ reconTaskManager.cacheTask(task, taskTriager.schedule(
+ new Runnable() {
+ @Override
+ public void run() {
+ task.checkReadinessAndProcess();
+ }
+ }, task.retryDelayInMills(), TimeUnit.MILLISECONDS
+ )
+ );
+ }
+
+ public void dequeue(final ReconciliationTask task) {
+ reconTaskManager.cancelTask(task);
+ }
+
+ public DataBroker getDb() {
+ return db;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (this.reconcilers != null) {
+ this.reconcilers.shutdownNow();
+ }
+
+ if (this.taskTriager != null) {
+ this.taskTriager.shutdownNow();
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation;
+
+import com.google.common.base.Preconditions;
+
+import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepConnectionManager;
+import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.connection.ConnectionReconciliationTask;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Copied from org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationTask
+ *
+ * Abstract implementation of a reconciliation task. Each new type of
+ * resource configuration reconciliation task should extend this class
+ * and implement the abstract methods.
+ * Created by Anil Vishnoi (avishnoi@Brocade.com) on 3/9/16.
+ */
+public abstract class ReconciliationTask implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReconciliationTask.class);
+
+ protected final ReconciliationManager reconciliationManager;
+ protected final HwvtepConnectionManager connectionManager;
+ protected final InstanceIdentifier<?> nodeIid;
+ protected final DataObject configData;
+
+ protected ReconciliationTask(ReconciliationManager reconciliationManager, HwvtepConnectionManager connectionManager,
+ InstanceIdentifier<?> nodeIid,
+ DataObject configData) {
+ Preconditions.checkNotNull(reconciliationManager, "Reconciliation manager must not be null");
+ Preconditions.checkNotNull(connectionManager, "Connection manager must not be null");
+ Preconditions.checkNotNull(nodeIid, "Node Iid must not be null");
+ this.reconciliationManager = reconciliationManager;
+ this.connectionManager = connectionManager;
+ this.nodeIid = nodeIid;
+ this.configData = configData;
+ }
+
+ /**
+ * Method contains task reconciliation logic. Please refer to
+ * {@link ConnectionReconciliationTask#reconcileConfiguration(HwvtepConnectionManager)}
+ * for example.
+ * @param connectionManager Connection manager to get connection instance of the device
+ * @return True if reconciliation was successful, else false
+ */
+ public abstract boolean reconcileConfiguration(HwvtepConnectionManager connectionManager);
+
+ /**
+ * Extended task should implement the logic that decides whether retry for the task
+ * is required or not. If retry is required but it does not requires any delay, submit
+ * the task immediately using {@link ReconciliationManager#enqueue(ReconciliationTask)}.
+ * If retry requires delay, use {@link ReconciliationManager#enqueueForRetry(ReconciliationTask)}
+ * and specify the delay using {@link #retryDelayInMills()}.
+ * If data store operation is required to decide if the task need retry, please implement
+ * it as an async operation and submit the task on the callback of the future.
+ * <p>
+ * Note:Please do not write blocking data store operations
+ * {@link ConnectionReconciliationTask#doRetry(boolean)}
+ * </p>
+ * @param wasPreviousAttemptSuccessful Status of the previous attempt
+ */
+ public abstract void doRetry(boolean wasPreviousAttemptSuccessful);
+
+ /**
+ * Extended task should implement the logic that check the readiness of the task
+ * for execution. If task is ready for the execution, submit it for immediate
+ * execution using {@link ReconciliationManager#enqueue(ReconciliationTask)}.
+ * If task is not ready for execution yet, enqueue it again for delayed execution
+ * using {@link ReconciliationManager#enqueueForRetry(ReconciliationTask)}.
+ * To check the readiness of the task, if the data store operation is required, please
+ * implement it as an async operation and submit the task on the callback of the future.
+ * <p>
+ * Note:Please do not write blocking data store operations
+ * {@link ConnectionReconciliationTask#doRetry(boolean)}
+ * </p>
+ */
+ public abstract void checkReadinessAndProcess();
+
+ /**
+ * Method returns the time interval for retrying the failed task.
+ * {@link ReconciliationTask#doRetry(boolean)}
+ * @return time
+ */
+ public abstract long retryDelayInMills();
+
+ @Override
+ public void run() {
+ boolean status = this.reconcileConfiguration(connectionManager);
+ doRetry(status);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ReconciliationTask that = (ReconciliationTask) o;
+
+ return nodeIid.equals(that.nodeIid);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode() + nodeIid.hashCode();
+ }
+
+
+ @Override
+ public String toString() {
+ return "ReconciliationTask{ type=" + getClass().toString()+
+ ", nodeIid=" + nodeIid +
+ '}';
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+
+/**
+ * Copied from org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationTaskManager
+ *
+ * This class is a task cache manager that provides a cache to store
+ * the task that is queued for the reconciliation. Whenever new task
+ * is submitted to the reconciliation manager, it will be cached in
+ * the cache. If the reconciliation is successful or it's done with
+ * all the attempt of reconciliation,
+ *
+ * Caching of the task is required, because reconciliation task might
+ * require longer duration to reconcile and there is a possibility that
+ * meanwhile user can change the configuration in config data store while
+ * task is queued for the reconciliation. In that scenario, reconciliation
+ * manager should not attempt any further reconciliation attempt for that
+ * task. ReconciliationManager will call cancelTask() to cancel the task.
+ *
+ * Created by Anil Vishnoi (avishnoi@Brocade.com) on 3/15/16.
+ */
+class ReconciliationTaskManager {
+ private static final Logger LOG = LoggerFactory.getLogger(ReconciliationTaskManager.class);
+
+ private final ConcurrentHashMap<ReconciliationTask,Future<?>> reconciliationTaskCache
+ = new ConcurrentHashMap();
+
+ public boolean isTaskQueued(ReconciliationTask task) {
+ return reconciliationTaskCache.containsKey(task);
+ }
+ public boolean cancelTask(ReconciliationTask task) {
+ if(reconciliationTaskCache.containsKey(task)){
+ Future<?> taskFuture = reconciliationTaskCache.remove(task);
+ if( !taskFuture.isDone() && !taskFuture.isCancelled() ) {
+ LOG.info("Reconciliation task is cancelled : {}",task);
+ return taskFuture.cancel(true);
+ }
+ }
+ return false;
+
+ }
+ public void cacheTask(ReconciliationTask task, Future<?> taskFuture) {
+ reconciliationTaskCache.put(task,taskFuture);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.connection;
+
+import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationManager;
+import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationTask;
+import org.opendaylight.ovsdb.lib.OvsdbClient;
+import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepConnectionManager;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Copied from org.opendaylight.ovsdb.southbound.reconciliation.connection.ConnectionReconciliationTask
+ *
+ * Created by Anil Vishnoi (avishnoi@Brocade.com) on 3/9/16.
+ */
+public class ConnectionReconciliationTask extends ReconciliationTask {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConnectionReconciliationTask.class);
+
+ private static final int RETRY_INTERVAL_FACTOR = 10000;
+ private static final int MAX_ATTEMPT = 10;
+
+ private AtomicInteger connectionAttempt = new AtomicInteger(0);
+
+ public ConnectionReconciliationTask(ReconciliationManager reconciliationManager, HwvtepConnectionManager
+ connectionManager, InstanceIdentifier<?> nodeIid, DataObject configData) {
+ super(reconciliationManager, connectionManager, nodeIid, configData);
+
+ }
+
+ @Override
+ public boolean reconcileConfiguration(HwvtepConnectionManager connectionManager) {
+ boolean result = false;
+ connectionAttempt.incrementAndGet();
+ InstanceIdentifier<Node> nIid = (InstanceIdentifier<Node>) nodeIid;
+ HwvtepGlobalAugmentation hwvtepNode = (HwvtepGlobalAugmentation)configData;
+
+ LOG.info("Retry({}) connection to Ovsdb Node {} ", connectionAttempt.get(), hwvtepNode.getConnectionInfo());
+ OvsdbClient client = null;
+ try {
+ client = connectionManager.connect(nIid, hwvtepNode);
+ if (client != null) {
+ LOG.info("Successfully connected to Hwvtep Node {} ", hwvtepNode.getConnectionInfo());
+ result = true;
+ } else {
+ LOG.warn("Connection retry({}) failed for {}.",
+ connectionAttempt.get(), hwvtepNode.getConnectionInfo());
+ }
+ } catch (UnknownHostException e) {
+ LOG.warn("Connection retry({}) failed with exception. ",connectionAttempt.get(), e);
+ }
+ return result;
+ }
+
+ @Override
+ public void doRetry(boolean wasLastAttemptSuccessful) {
+
+ if( !wasLastAttemptSuccessful && connectionAttempt.get() <= MAX_ATTEMPT ) {
+ reconciliationManager.enqueueForRetry(ConnectionReconciliationTask.this);
+ } else {
+ reconciliationManager.dequeue(this);
+ }
+ }
+
+ @Override
+ public void checkReadinessAndProcess() {
+ reconciliationManager.enqueue(this);
+ }
+
+ @Override
+ public long retryDelayInMills() {
+ return connectionAttempt.get() * RETRY_INTERVAL_FACTOR;
+ }
+}