BUG 5479: HWVtep Southbound doesn't retry connection 13/39513/2
authorVishal Thapar <vishal.thapar@ericsson.com>
Wed, 25 May 2016 04:17:51 +0000 (09:47 +0530)
committerVishal Thapar <vishal.thapar@ericsson.com>
Fri, 27 May 2016 12:23:14 +0000 (12:23 +0000)
This fix is hwvtep counterpart of
https://git.opendaylight.org/gerrit/#/c/36028/ which fixes BUG 5221.

Wiki:
https://wiki.opendaylight.org/view/OVSDB_Integration:OVSDB_SB_Reconciliation

Change-Id: I497dbfa339de157d12afe94ce5c36b35ffda5dd9
Signed-off-by: Vishal Thapar <vishal.thapar@ericsson.com>
(cherry picked from commit 9d80362e33fb2d6a2c0dc4b7994c976e46e8799f)

hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionInstance.java
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionManager.java
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepDataChangeListener.java
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/ReconciliationManager.java [new file with mode: 0644]
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/ReconciliationTask.java [new file with mode: 0644]
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/ReconciliationTaskManager.java [new file with mode: 0644]
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/connection/ConnectionReconciliationTask.java [new file with mode: 0644]

index e3b124a9bab4a2b433042cc96fb68b6927c84e58..9a605a63e9d1512c1a1e4d8e4f43a838af2f13fb 100644 (file)
@@ -296,6 +296,9 @@ public class HwvtepConnectionInstance implements OvsdbClient{
         this.initialCreatedData = hwvtepGlobalData;
     }
 
+    public HwvtepGlobalAugmentation getHwvtepGlobalAugmentation() {
+        return this.initialCreatedData;
+    }
     public HwvtepDeviceInfo getDeviceInfo() {
         return this.deviceInfo;
     }
index 6ccb3f5990a68b584794ddc70942d83f70edf825..7c99add047b64d75dd33469dd1d17d28acfec54c 100644 (file)
@@ -19,8 +19,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
@@ -31,6 +33,10 @@ import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipL
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationManager;
+import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationTask;
+import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.connection.ConnectionReconciliationTask;
 import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.HwvtepGlobalRemoveCommand;
 import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionCommand;
 import org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md.TransactionInvoker;
@@ -56,6 +62,9 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 
 public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoCloseable{
     private Map<ConnectionInfo, HwvtepConnectionInstance> clients = new ConcurrentHashMap<>();
@@ -68,6 +77,7 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo
     private Map<Entity, HwvtepConnectionInstance> entityConnectionMap = new ConcurrentHashMap<>();
     private EntityOwnershipService entityOwnershipService;
     private HwvtepDeviceEntityOwnershipListener hwvtepDeviceEntityOwnershipListener;
+    private final ReconciliationManager reconciliationManager;
 
     public HwvtepConnectionManager(DataBroker db, TransactionInvoker txInvoker,
                     EntityOwnershipService entityOwnershipService) {
@@ -75,6 +85,7 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo
         this.txInvoker = txInvoker;
         this.entityOwnershipService = entityOwnershipService;
         this.hwvtepDeviceEntityOwnershipListener = new HwvtepDeviceEntityOwnershipListener(this,entityOwnershipService);
+        this.reconciliationManager = new ReconciliationManager(db);
     }
 
     @Override
@@ -134,6 +145,9 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo
             //Controller initiated connection can be terminated from switch side.
             //So cleanup the instance identifier cache.
             removeInstanceIdentifier(key);
+            retryConnection(hwvtepConnectionInstance.getInstanceIdentifier(),
+                    hwvtepConnectionInstance.getHwvtepGlobalAugmentation(),
+                    ConnectionReconciliationTriggers.ON_DISCONNECT);
         } else {
             LOG.warn("HWVTEP disconnected event did not find connection instance for {}", key);
         }
@@ -360,6 +374,68 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo
         entityConnectionMap.remove(hwvtepConnectionInstance.getConnectedEntity());
     }
 
+    public void reconcileConnection(InstanceIdentifier<Node> iid, HwvtepGlobalAugmentation hwvtepNode) {
+        this.retryConnection(iid, hwvtepNode,
+                ConnectionReconciliationTriggers.ON_CONTROLLER_INITIATED_CONNECTION_FAILURE);
+        }
+
+    public void stopConnectionReconciliationIfActive(InstanceIdentifier<?> iid, HwvtepGlobalAugmentation hwvtepNode) {
+        final ReconciliationTask task = new ConnectionReconciliationTask(
+                reconciliationManager,
+                this,
+                iid,
+                hwvtepNode);
+        reconciliationManager.dequeue(task);
+    }
+
+    private void retryConnection(final InstanceIdentifier<Node> iid, final HwvtepGlobalAugmentation hwvtepNode,
+                                 ConnectionReconciliationTriggers trigger) {
+        final ReconciliationTask task = new ConnectionReconciliationTask(
+                reconciliationManager,
+                this,
+                iid,
+                hwvtepNode);
+
+        if(reconciliationManager.isEnqueued(task)){
+            return;
+        }
+        switch(trigger){
+            case ON_CONTROLLER_INITIATED_CONNECTION_FAILURE:
+                reconciliationManager.enqueueForRetry(task);
+                break;
+            case ON_DISCONNECT:
+            {
+                ReadOnlyTransaction tx = db.newReadOnlyTransaction();
+                CheckedFuture<Optional<Node>, ReadFailedException> readNodeFuture =
+                        tx.read(LogicalDatastoreType.CONFIGURATION, iid);
+
+                final HwvtepConnectionManager connectionManager = this;
+                Futures.addCallback(readNodeFuture, new FutureCallback<Optional<Node>>() {
+                    @Override
+                    public void onSuccess(@Nullable Optional<Node> node) {
+                        if (node.isPresent()) {
+                            LOG.info("Disconnected/Failed connection {} was controller initiated, attempting " +
+                                    "reconnection", hwvtepNode.getConnectionInfo());
+                            reconciliationManager.enqueue(task);
+
+                        } else {
+                            LOG.debug("Connection {} was switch initiated, no reconciliation is required"
+                                    , iid.firstKeyOf(Node.class).getNodeId());
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        LOG.warn("Read Config/DS for Node failed! {}", iid, t);
+                    }
+                });
+                break;
+            }
+            default:
+                break;
+        }
+    }
+
     public void handleOwnershipChanged(EntityOwnershipChange ownershipChange) {
         HwvtepConnectionInstance hwvtepConnectionInstance = getConnectionInstanceFromEntity(ownershipChange.getEntity());
         LOG.info("handleOwnershipChanged: {} event received for device {}",
@@ -458,4 +534,18 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo
             hcm.handleOwnershipChanged(ownershipChange);
         }
     }
+
+    private enum ConnectionReconciliationTriggers {
+        /*
+        Reconciliation trigger for scenario where controller's attempt
+        to connect to switch fails on config data store notification
+        */
+        ON_CONTROLLER_INITIATED_CONNECTION_FAILURE,
+
+        /*
+        Reconciliation trigger for the scenario where controller
+        initiated connection disconnects.
+        */
+        ON_DISCONNECT
+    }
 }
index 0da5f37cb26f28be55e74f35d26712f89b6098f5..ce6d6150697fd24dc13bbfb7186d9c8ad24aa43e 100644 (file)
@@ -133,9 +133,9 @@ public class HwvtepDataChangeListener implements ClusteredDataTreeChangeListener
                                         + "to same device, hence dropping the request {}", connection, hwvtepGlobal);
                     } else {
                         try {
-                            hcm.connect(HwvtepSouthboundMapper.createInstanceIdentifier(node.getNodeId()), hwvtepGlobal);
+                            OvsdbClient client = hcm.connect(key, hwvtepGlobal);
                         } catch (UnknownHostException e) {
-                            LOG.warn("Failed to connect to OVSDB node", e);
+                            LOG.warn("Failed to connect to HWVTEP node", e);
                         }
                     }
                 }
@@ -159,9 +159,13 @@ public class HwvtepDataChangeListener implements ClusteredDataTreeChangeListener
                     if (client == null) {
                         try {
                             hcm.disconnect(hgOriginal);
-                            hcm.connect(HwvtepSouthboundMapper.createInstanceIdentifier(original.getNodeId()), hgUpdated);
+                            hcm.stopConnectionReconciliationIfActive(key, hgOriginal);
+                            OvsdbClient newClient = hcm.connect(key, hgUpdated);
+                            if (newClient == null) {
+                                hcm.reconcileConnection(key, hgUpdated);
+                            }
                         } catch (UnknownHostException e) {
-                            LOG.warn("Failed to update connection on OVSDB Node", e);
+                            LOG.warn("Failed to update connection on HWVTEP Node", e);
                         }
                     }
                 }
@@ -170,7 +174,7 @@ public class HwvtepDataChangeListener implements ClusteredDataTreeChangeListener
     }
 
     private void updateData(Collection<DataTreeModification<Node>> changes) {
-        /* TODO: 
+        /* TODO:
          * Get connection instances for each change
          * Update data for each connection
          * Requires Command patterns. TBD.
@@ -193,8 +197,9 @@ public class HwvtepDataChangeListener implements ClusteredDataTreeChangeListener
                 if (hgDeleted != null) {
                     try {
                         hcm.disconnect(hgDeleted);
+                        hcm.stopConnectionReconciliationIfActive(key, hgDeleted);
                     } catch (UnknownHostException e) {
-                        LOG.warn("Failed to disconnect OVSDB Node", e);
+                        LOG.warn("Failed to disconnect HWVTEP Node", e);
                     }
                 }
             }
diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/ReconciliationManager.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/ReconciliationManager.java
new file mode 100644 (file)
index 0000000..397dd63
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.*;
+
+/**
+ * Copied from org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationManager
+ *
+ * This class provides the implementation of ovsdb southbound plugins
+ * configuration reconciliation engine. This engine provide interfaces
+ * to enqueue (one time retry)/ enqueueForRetry(periodic retry)/ dequeue
+ * (remove from retry queue) reconciliation task. Reconciliation task can
+ * be a connection reconciliation or configuration reconciliation of any
+ * ovsdb managed resource like bridge, termination point etc. This engine
+ * execute all the reconciliation task through a fixed size thread pool.
+ * If submitted task need to be retry after a periodic interval they are
+ * submitted to a single thread executor to periodically wake up and check
+ * if task is ready for execution.
+ * Ideally, addition of any type of reconciliation task should not require
+ * any change in this reconciliation manager execution engine.
+ *
+ * 3-Node Cluster:
+ * Reconciliation manager is agnostic of whether it's running in single
+ * node cluster or 3-node cluster. It's a responsibility of the task
+ * submitter to make sure that it submit the task for reconciliation only
+ * if it's an owner of that device EXCEPT controller initiated Connection.
+ * Reconciliation of controller initiated connection should be done by all
+ * the 3-nodes in the cluster, because connection to individual controller
+ * can be interrupted for various reason.
+ *
+ * Created by Anil Vishnoi (avishnoi@Brocade.com) on 3/9/16.
+ */
+public class ReconciliationManager implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(ReconciliationManager.class);
+
+    private static final int NO_OF_RECONCILER = 10;
+    private static final int RECON_TASK_QUEUE_SIZE = 5000;
+
+    private final DataBroker db;
+    private final ExecutorService reconcilers;
+    private final ScheduledExecutorService taskTriager;
+
+    private final ReconciliationTaskManager reconTaskManager = new ReconciliationTaskManager();
+
+    public ReconciliationManager(final DataBroker db) {
+        this.db = db;
+        reconcilers = SpecialExecutors.newBoundedCachedThreadPool(NO_OF_RECONCILER, RECON_TASK_QUEUE_SIZE, "ovsdb-reconciler");
+
+        ThreadFactory threadFact = new ThreadFactoryBuilder()
+                .setNameFormat("ovsdb-recon-task-triager-%d").build();
+        taskTriager = Executors.newSingleThreadScheduledExecutor(threadFact);
+    }
+
+    public boolean isEnqueued(final ReconciliationTask task) {
+        return reconTaskManager.isTaskQueued(task);
+    }
+
+    public void enqueue(final ReconciliationTask task) {
+        LOG.trace("Reconciliation task submitted for execution {}",task);
+        reconTaskManager.cacheTask(task, reconcilers.submit(task));
+    }
+
+    public void enqueueForRetry(final ReconciliationTask task) {
+        LOG.trace("Reconciliation task re-queued for re-execution {}",task);
+        reconTaskManager.cacheTask(task, taskTriager.schedule(
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        task.checkReadinessAndProcess();
+                    }
+                }, task.retryDelayInMills(), TimeUnit.MILLISECONDS
+            )
+        );
+    }
+
+    public void dequeue(final ReconciliationTask task) {
+        reconTaskManager.cancelTask(task);
+    }
+
+    public DataBroker getDb() {
+        return db;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (this.reconcilers != null) {
+            this.reconcilers.shutdownNow();
+        }
+
+        if (this.taskTriager != null) {
+            this.taskTriager.shutdownNow();
+        }
+    }
+}
diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/ReconciliationTask.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/ReconciliationTask.java
new file mode 100644 (file)
index 0000000..8a77ffb
--- /dev/null
@@ -0,0 +1,126 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation;
+
+import com.google.common.base.Preconditions;
+
+import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepConnectionManager;
+import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.connection.ConnectionReconciliationTask;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Copied from org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationTask
+ *
+ * Abstract implementation of a reconciliation task. Each new type of
+ * resource configuration reconciliation task should extend this class
+ * and implement the abstract methods.
+ * Created by Anil Vishnoi (avishnoi@Brocade.com) on 3/9/16.
+ */
+public abstract class ReconciliationTask implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReconciliationTask.class);
+
+    protected final ReconciliationManager reconciliationManager;
+    protected final HwvtepConnectionManager connectionManager;
+    protected final InstanceIdentifier<?> nodeIid;
+    protected final DataObject configData;
+
+    protected ReconciliationTask(ReconciliationManager reconciliationManager, HwvtepConnectionManager connectionManager,
+                                 InstanceIdentifier<?> nodeIid,
+                                 DataObject configData) {
+        Preconditions.checkNotNull(reconciliationManager, "Reconciliation manager must not be null");
+        Preconditions.checkNotNull(connectionManager, "Connection manager must not be null");
+        Preconditions.checkNotNull(nodeIid, "Node Iid must not be null");
+        this.reconciliationManager = reconciliationManager;
+        this.connectionManager = connectionManager;
+        this.nodeIid = nodeIid;
+        this.configData = configData;
+    }
+
+    /**
+     * Method contains task reconciliation logic. Please refer to
+     * {@link ConnectionReconciliationTask#reconcileConfiguration(HwvtepConnectionManager)}
+     * for example.
+     * @param connectionManager Connection manager to get connection instance of the device
+     * @return True if reconciliation was successful, else false
+     */
+    public abstract boolean reconcileConfiguration(HwvtepConnectionManager connectionManager);
+
+    /**
+     * Extended task should implement the logic that decides whether retry for the task
+     * is required or not. If retry is required but it does not requires any delay, submit
+     * the task immediately using {@link ReconciliationManager#enqueue(ReconciliationTask)}.
+     * If retry requires delay, use {@link ReconciliationManager#enqueueForRetry(ReconciliationTask)}
+     * and specify the delay using {@link #retryDelayInMills()}.
+     * If data store operation is required to decide if the task need retry, please implement
+     * it as an async operation and submit the task on the callback of the future.
+     * <p>
+     * Note:Please do not write blocking data store operations
+     * {@link ConnectionReconciliationTask#doRetry(boolean)}
+     * </p>
+     * @param wasPreviousAttemptSuccessful Status of the previous attempt
+     */
+    public abstract void doRetry(boolean wasPreviousAttemptSuccessful);
+
+    /**
+     * Extended task should implement the logic that check the readiness of the task
+     * for execution. If task is ready for the execution, submit it for immediate
+     * execution using {@link ReconciliationManager#enqueue(ReconciliationTask)}.
+     * If task is not ready for execution yet, enqueue it again for delayed execution
+     * using {@link ReconciliationManager#enqueueForRetry(ReconciliationTask)}.
+     * To check the readiness of the task, if the data store operation is required, please
+     * implement it as an async operation and submit the task on the callback of the future.
+     * <p>
+     * Note:Please do not write blocking data store operations
+     * {@link ConnectionReconciliationTask#doRetry(boolean)}
+     * </p>
+     */
+    public abstract void checkReadinessAndProcess();
+
+    /**
+     * Method returns the time interval for retrying the failed task.
+     * {@link ReconciliationTask#doRetry(boolean)}
+     * @return time
+     */
+    public abstract long retryDelayInMills();
+
+    @Override
+    public void run() {
+        boolean status = this.reconcileConfiguration(connectionManager);
+        doRetry(status);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        ReconciliationTask that = (ReconciliationTask) o;
+
+        return nodeIid.equals(that.nodeIid);
+
+    }
+
+    @Override
+    public int hashCode() {
+        return getClass().hashCode() + nodeIid.hashCode();
+    }
+
+
+    @Override
+    public String toString() {
+        return "ReconciliationTask{ type=" + getClass().toString()+
+                ", nodeIid=" + nodeIid +
+                '}';
+    }
+}
diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/ReconciliationTaskManager.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/ReconciliationTaskManager.java
new file mode 100644 (file)
index 0000000..4838aae
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+
+/**
+ * Copied from org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationTaskManager
+ *
+ * This class is a task cache manager that provides a cache to store
+ * the task that is queued for the reconciliation. Whenever new task
+ * is submitted to the reconciliation manager, it will be cached in
+ * the cache. If the reconciliation is successful or it's done with
+ * all the attempt of reconciliation,
+ *
+ * Caching of the task is required, because reconciliation task might
+ * require longer duration to reconcile and there is a possibility that
+ * meanwhile user can change the configuration in config data store while
+ * task is queued for the reconciliation. In that scenario, reconciliation
+ * manager should not attempt any further reconciliation attempt for that
+ * task. ReconciliationManager will call cancelTask() to cancel the task.
+ *
+ * Created by Anil Vishnoi (avishnoi@Brocade.com) on 3/15/16.
+ */
+class ReconciliationTaskManager {
+    private static final Logger LOG = LoggerFactory.getLogger(ReconciliationTaskManager.class);
+
+    private final ConcurrentHashMap<ReconciliationTask,Future<?>> reconciliationTaskCache
+            = new ConcurrentHashMap();
+
+    public boolean isTaskQueued(ReconciliationTask task) {
+        return reconciliationTaskCache.containsKey(task);
+    }
+    public boolean cancelTask(ReconciliationTask task) {
+        if(reconciliationTaskCache.containsKey(task)){
+            Future<?> taskFuture = reconciliationTaskCache.remove(task);
+            if( !taskFuture.isDone() && !taskFuture.isCancelled() ) {
+                LOG.info("Reconciliation task is cancelled : {}",task);
+                return taskFuture.cancel(true);
+            }
+        }
+        return false;
+
+    }
+    public void cacheTask(ReconciliationTask task, Future<?> taskFuture) {
+        reconciliationTaskCache.put(task,taskFuture);
+    }
+}
diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/connection/ConnectionReconciliationTask.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/connection/ConnectionReconciliationTask.java
new file mode 100644 (file)
index 0000000..ce02e8e
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.connection;
+
+import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationManager;
+import org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation.ReconciliationTask;
+import org.opendaylight.ovsdb.lib.OvsdbClient;
+import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepConnectionManager;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.hwvtep.rev150901.HwvtepGlobalAugmentation;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Copied from org.opendaylight.ovsdb.southbound.reconciliation.connection.ConnectionReconciliationTask
+ *
+ * Created by Anil Vishnoi (avishnoi@Brocade.com) on 3/9/16.
+ */
+public class ConnectionReconciliationTask extends ReconciliationTask {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConnectionReconciliationTask.class);
+
+    private static final int RETRY_INTERVAL_FACTOR = 10000;
+    private static final int MAX_ATTEMPT = 10;
+
+    private AtomicInteger connectionAttempt = new AtomicInteger(0);
+
+    public ConnectionReconciliationTask(ReconciliationManager reconciliationManager, HwvtepConnectionManager
+            connectionManager, InstanceIdentifier<?> nodeIid, DataObject configData) {
+        super(reconciliationManager, connectionManager, nodeIid, configData);
+
+    }
+
+    @Override
+    public boolean reconcileConfiguration(HwvtepConnectionManager connectionManager) {
+        boolean result = false;
+        connectionAttempt.incrementAndGet();
+        InstanceIdentifier<Node> nIid = (InstanceIdentifier<Node>) nodeIid;
+        HwvtepGlobalAugmentation hwvtepNode = (HwvtepGlobalAugmentation)configData;
+
+        LOG.info("Retry({}) connection to Ovsdb Node {} ", connectionAttempt.get(), hwvtepNode.getConnectionInfo());
+        OvsdbClient client = null;
+        try {
+            client = connectionManager.connect(nIid, hwvtepNode);
+            if (client != null) {
+                LOG.info("Successfully connected to Hwvtep Node {} ", hwvtepNode.getConnectionInfo());
+                result = true;
+            } else {
+                LOG.warn("Connection retry({}) failed for {}.",
+                        connectionAttempt.get(), hwvtepNode.getConnectionInfo());
+            }
+        } catch (UnknownHostException e) {
+            LOG.warn("Connection retry({}) failed with exception. ",connectionAttempt.get(), e);
+        }
+        return result;
+    }
+
+    @Override
+    public void doRetry(boolean wasLastAttemptSuccessful) {
+
+        if( !wasLastAttemptSuccessful && connectionAttempt.get() <= MAX_ATTEMPT ) {
+            reconciliationManager.enqueueForRetry(ConnectionReconciliationTask.this);
+        } else {
+            reconciliationManager.dequeue(this);
+        }
+    }
+
+    @Override
+    public void checkReadinessAndProcess() {
+        reconciliationManager.enqueue(this);
+    }
+
+    @Override
+    public long retryDelayInMills() {
+        return connectionAttempt.get() * RETRY_INTERVAL_FACTOR;
+    }
+}