Bug 5221 - passive connection not reconnected if ovs service is restarted 85/36985/1
authorAnil Vishnoi <vishnoianil@gmail.com>
Thu, 10 Mar 2016 07:35:37 +0000 (23:35 -0800)
committerSam Hague <shague@redhat.com>
Fri, 1 Apr 2016 00:52:33 +0000 (00:52 +0000)
Change-Id: I830aeeed4ba7e51adb0ad952c11fd1603d207b1b
Signed-off-by: Anil Vishnoi <vishnoianil@gmail.com>
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionManager.java
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbDataChangeListener.java
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/reconciliation/ReconciliationManager.java [new file with mode: 0644]
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/reconciliation/ReconciliationTask.java [new file with mode: 0644]
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/reconciliation/ReconciliationTaskManager.java [new file with mode: 0644]
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/reconciliation/connection/ConnectionReconciliationTask.java [new file with mode: 0644]
southbound/southbound-impl/src/test/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionManagerTest.java
southbound/southbound-impl/src/test/java/org/opendaylight/ovsdb/southbound/OvsdbDataChangeListenerTest.java

index 6f7f8595745acec5b96236bec85e50c0c21f7d03..a3ee6cd37ab5ae91d695ad4d23f941670e1a9161 100644 (file)
@@ -18,7 +18,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -42,6 +45,9 @@ import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
 import org.opendaylight.ovsdb.schema.openvswitch.OpenVSwitch;
+import org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationManager;
+import org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationTask;
+import org.opendaylight.ovsdb.southbound.reconciliation.connection.ConnectionReconciliationTask;
 import org.opendaylight.ovsdb.southbound.transactions.md.OvsdbNodeRemoveCommand;
 import org.opendaylight.ovsdb.southbound.transactions.md.TransactionCommand;
 import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvoker;
@@ -75,6 +81,7 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos
     private EntityOwnershipService entityOwnershipService;
     private OvsdbDeviceEntityOwnershipListener ovsdbDeviceEntityOwnershipListener;
     private OvsdbConnection ovsdbConnection;
+    private final ReconciliationManager reconciliationManager;
 
     public OvsdbConnectionManager(DataBroker db,TransactionInvoker txInvoker,
                                   EntityOwnershipService entityOwnershipService,
@@ -84,6 +91,7 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos
         this.entityOwnershipService = entityOwnershipService;
         this.ovsdbDeviceEntityOwnershipListener = new OvsdbDeviceEntityOwnershipListener(this, entityOwnershipService);
         this.ovsdbConnection = ovsdbConnection;
+        this.reconciliationManager = new ReconciliationManager(db);
     }
 
     @Override
@@ -162,10 +170,14 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos
             //Controller initiated connection can be terminated from switch side.
             //So cleanup the instance identifier cache.
             removeInstanceIdentifier(key);
+            retryConnection(ovsdbConnectionInstance.getInstanceIdentifier(),
+                    ovsdbConnectionInstance.getOvsdbNodeAugmentation(),
+                    ConnectionReconciliationTriggers.ON_DISCONNECT);
         } else {
             LOG.warn("disconnected : Connection instance not found for OVSDB Node {} ", key);
         }
         LOG.trace("OvsdbConnectionManager: exit disconnected client: {}", client);
+
     }
 
     public OvsdbClient connect(InstanceIdentifier<Node> iid,
@@ -325,6 +337,20 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos
         return ovsdbConnectionInstance.getHasDeviceOwnership();
     }
 
+    public void reconcileConnection(InstanceIdentifier<Node> iid, OvsdbNodeAugmentation ovsdbNode){
+        this.retryConnection(iid, ovsdbNode,
+                ConnectionReconciliationTriggers.ON_CONTROLLER_INITIATED_CONNECTION_FAILURE);
+
+    }
+
+    public void stopConnectionReconciliationIfActive(InstanceIdentifier<?> iid, OvsdbNodeAugmentation ovsdbNode) {
+        final ReconciliationTask task = new ConnectionReconciliationTask(
+                reconciliationManager,
+                this,
+                iid,
+                ovsdbNode);
+        reconciliationManager.dequeue(task);
+    }
     private void handleOwnershipChanged(EntityOwnershipChange ownershipChange) {
         OvsdbConnectionInstance ovsdbConnectionInstance = getConnectionInstanceFromEntity(ownershipChange.getEntity());
         LOG.debug("handleOwnershipChanged: {} event received for device {}",
@@ -515,6 +541,54 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos
         entityConnectionMap.remove(ovsdbConnectionInstance.getConnectedEntity());
     }
 
+    private void retryConnection(final InstanceIdentifier<Node> iid, final OvsdbNodeAugmentation ovsdbNode,
+                                 ConnectionReconciliationTriggers trigger) {
+        final ReconciliationTask task = new ConnectionReconciliationTask(
+                reconciliationManager,
+                this,
+                iid,
+                ovsdbNode);
+
+        if(reconciliationManager.isEnqueued(task)){
+            return;
+        }
+        switch(trigger){
+            case ON_CONTROLLER_INITIATED_CONNECTION_FAILURE:
+                reconciliationManager.enqueueForRetry(task);
+                break;
+            case ON_DISCONNECT:
+            {
+                ReadOnlyTransaction tx = db.newReadOnlyTransaction();
+                CheckedFuture<Optional<Node>, ReadFailedException> readNodeFuture =
+                        tx.read(LogicalDatastoreType.CONFIGURATION, iid);
+
+                final OvsdbConnectionManager connectionManager = this;
+                Futures.addCallback(readNodeFuture, new FutureCallback<Optional<Node>>() {
+                    @Override
+                    public void onSuccess(@Nullable Optional<Node> node) {
+                        if (node.isPresent()) {
+                            LOG.info("Disconnected/Failed connection {} was controller initiated, attempting " +
+                                    "reconnection", ovsdbNode.getConnectionInfo());
+                            reconciliationManager.enqueue(task);
+
+                        } else {
+                            LOG.debug("Connection {} was switch initiated, no reconciliation is required"
+                                    , ovsdbNode.getConnectionInfo());
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        LOG.warn("Read Config/DS for Node failed! {}", iid, t);
+                    }
+                });
+                break;
+            }
+            default:
+                break;
+        }
+    }
+
     private class OvsdbDeviceEntityOwnershipListener implements EntityOwnershipListener {
         private OvsdbConnectionManager cm;
         private EntityOwnershipListenerRegistration listenerRegistration;
@@ -531,4 +605,18 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos
             cm.handleOwnershipChanged(ownershipChange);
         }
     }
+
+    private enum ConnectionReconciliationTriggers {
+        /*
+        Reconciliation trigger for scenario where controller's attempt
+        to connect to switch fails on config data store notification
+        */
+        ON_CONTROLLER_INITIATED_CONNECTION_FAILURE,
+
+        /*
+        Reconciliation trigger for the scenario where controller
+        initiated connection disconnects.
+        */
+        ON_DISCONNECT
+    }
 }
index 9e2c5e7f30da82aeee0af88519136fb90f8947b1..8951ca31f2a37520da4aa055add1f577e4e3ad23 100644 (file)
@@ -110,6 +110,9 @@ public class OvsdbDataChangeListener implements ClusteredDataChangeListener, Aut
             if (originalDataObject.get(instanceIdentifier) instanceof OvsdbNodeAugmentation) {
                 try {
                     cm.disconnect((OvsdbNodeAugmentation) originalDataObject.get(instanceIdentifier));
+                    cm.stopConnectionReconciliationIfActive(
+                            instanceIdentifier.firstIdentifierOf(Node.class),
+                            (OvsdbNodeAugmentation) originalDataObject.get(instanceIdentifier));
                 } catch (UnknownHostException e) {
                     LOG.warn("Failed to disconnect ovsdbNode", e);
                 }
@@ -128,7 +131,15 @@ public class OvsdbDataChangeListener implements ClusteredDataChangeListener, Aut
                         if (original.getValue() instanceof OvsdbNodeAugmentation) {
                             try {
                                 cm.disconnect((OvsdbNodeAugmentation) original.getValue());
-                                cm.connect((InstanceIdentifier<Node>) original.getKey(),value);
+                                cm.stopConnectionReconciliationIfActive(
+                                        original.getKey().firstIdentifierOf(Node.class),
+                                        (OvsdbNodeAugmentation) original.getValue());
+
+                                OvsdbClient newClient = cm.connect((InstanceIdentifier<Node>) original.getKey(),value);
+                                if(newClient == null) {
+                                    cm.reconcileConnection(original.getKey().firstIdentifierOf(Node.class),value);
+                                }
+
                             } catch (UnknownHostException e) {
                                 LOG.warn("Failed to disconnect to ovsdbNode", e);
                             }
@@ -152,8 +163,12 @@ public class OvsdbDataChangeListener implements ClusteredDataChangeListener, Aut
                               + "to same device, hence dropping the request {}", key, ovsdbNode);
                 } else {
                     try {
-                        cm.connect((InstanceIdentifier<Node>) created.getKey(),
+                        OvsdbClient client = cm.connect((InstanceIdentifier<Node>) created.getKey(),
                                 (OvsdbNodeAugmentation) created.getValue());
+                        if(client == null) {
+                            cm.reconcileConnection(created.getKey().firstIdentifierOf(Node.class),
+                                    (OvsdbNodeAugmentation) created.getValue());
+                        }
                     } catch (UnknownHostException e) {
                         LOG.warn("Failed to connect to ovsdbNode", e);
                     }
diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/reconciliation/ReconciliationManager.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/reconciliation/ReconciliationManager.java
new file mode 100644 (file)
index 0000000..aad6527
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.southbound.reconciliation;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.*;
+
+/**
+ * This class provides the implementation of ovsdb southbound plugins
+ * configuration reconciliation engine. This engine provide interfaces
+ * to enqueue (one time retry)/ enqueueForRetry(periodic retry)/ dequeue
+ * (remove from retry queue) reconciliation task. Reconciliation task can
+ * be a connection reconciliation or configuration reconciliation of any
+ * ovsdb managed resource like bridge, termination point etc. This engine
+ * execute all the reconciliation task through a fixed size thread pool.
+ * If submitted task need to be retry after a periodic interval they are
+ * submitted to a single thread executor to periodically wake up and check
+ * if task is ready for execution.
+ * Ideally, addition of any type of reconciliation task should not require
+ * any change in this reconciliation manager execution engine.
+ *
+ * 3-Node Cluster:
+ * Reconciliation manager is agnostic of whether it's running in single
+ * node cluster or 3-node cluster. It's a responsibility of the task
+ * submitter to make sure that it submit the task for reconciliation only
+ * if it's an owner of that device EXCEPT controller initiated Connection.
+ * Reconciliation of controller initiated connection should be done by all
+ * the 3-nodes in the cluster, because connection to individual controller
+ * can be interrupted for various reason.
+ *
+ * Created by Anil Vishnoi (avishnoi@Brocade.com) on 3/9/16.
+ */
+public class ReconciliationManager implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(ReconciliationManager.class);
+
+    private static final int NO_OF_RECONCILER = 10;
+    private static final int RECON_TASK_QUEUE_SIZE = 5000;
+
+    private final DataBroker db;
+    private final ExecutorService reconcilers;
+    private final ScheduledExecutorService taskTriager;
+
+    private final ReconciliationTaskManager reconTaskManager = new ReconciliationTaskManager();
+
+    public ReconciliationManager(final DataBroker db) {
+        this.db = db;
+        reconcilers = SpecialExecutors.newBoundedCachedThreadPool(NO_OF_RECONCILER, RECON_TASK_QUEUE_SIZE, "ovsdb-reconciler");
+
+        ThreadFactory threadFact = new ThreadFactoryBuilder()
+                .setNameFormat("ovsdb-recon-task-triager-%d").build();
+        taskTriager = Executors.newSingleThreadScheduledExecutor(threadFact);
+    }
+
+    public boolean isEnqueued(final ReconciliationTask task) {
+        return reconTaskManager.isTaskQueued(task);
+    }
+
+    public void enqueue(final ReconciliationTask task) {
+        LOG.trace("Reconciliation task submitted for execution {}",task);
+        reconTaskManager.cacheTask(task, reconcilers.submit(task));
+    }
+
+    public void enqueueForRetry(final ReconciliationTask task) {
+        LOG.trace("Reconciliation task re-queued for re-execution {}",task);
+        reconTaskManager.cacheTask(task, taskTriager.schedule(
+                new Runnable() {
+                    @Override
+                    public void run() {
+                        task.checkReadinessAndProcess();
+                    }
+                }, task.retryDelayInMills(), TimeUnit.MILLISECONDS
+            )
+        );
+    }
+
+    public void dequeue(final ReconciliationTask task) {
+        reconTaskManager.cancelTask(task);
+    }
+
+    public DataBroker getDb() {
+        return db;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (this.reconcilers != null) {
+            this.reconcilers.shutdownNow();
+        }
+
+        if (this.taskTriager != null) {
+            this.taskTriager.shutdownNow();
+        }
+    }
+}
diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/reconciliation/ReconciliationTask.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/reconciliation/ReconciliationTask.java
new file mode 100644 (file)
index 0000000..a1e3f2d
--- /dev/null
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.southbound.reconciliation;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.ovsdb.southbound.OvsdbConnectionManager;
+import org.opendaylight.ovsdb.southbound.reconciliation.connection.ConnectionReconciliationTask;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract implementation of a reconciliation task. Each new type of
+ * resource configuration reconciliation task should extend this class
+ * and implement the abstract methods.
+ * Created by Anil Vishnoi (avishnoi@Brocade.com) on 3/9/16.
+ */
+public abstract class ReconciliationTask implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReconciliationTask.class);
+
+    protected final ReconciliationManager reconciliationManager;
+    protected final OvsdbConnectionManager connectionManager;
+    protected final InstanceIdentifier<?> nodeIid;
+    protected final DataObject configData;
+
+    protected ReconciliationTask(ReconciliationManager reconciliationManager, OvsdbConnectionManager connectionManager,
+                                 InstanceIdentifier<?> nodeIid,
+                                 DataObject configData) {
+        Preconditions.checkNotNull(reconciliationManager, "Reconciliation manager must not be null");
+        Preconditions.checkNotNull(connectionManager, "Connection manager must not be null");
+        Preconditions.checkNotNull(nodeIid, "Node Iid must not be null");
+        this.reconciliationManager = reconciliationManager;
+        this.connectionManager = connectionManager;
+        this.nodeIid = nodeIid;
+        this.configData = configData;
+    }
+
+    /**
+     * Method contains task reconciliation logic. Please refer to
+     * {@link ConnectionReconciliationTask#reconcileConfiguration(OvsdbConnectionManager)}
+     * for example.
+     * @param connectionManager Connection manager to get connection instance of the device
+     * @return True if reconciliation was successful, else false
+     */
+    public abstract boolean reconcileConfiguration(OvsdbConnectionManager connectionManager);
+
+    /**
+     * Extended task should implement the logic that decides whether retry for the task
+     * is required or not. If retry is required but it does not requires any delay, submit
+     * the task immediately using {@link ReconciliationManager#enqueue(ReconciliationTask)}.
+     * If retry requires delay, use {@link ReconciliationManager#enqueueForRetry(ReconciliationTask)}
+     * and specify the delay using {@link #retryDelayInMills()}.
+     * If data store operation is required to decide if the task need retry, please implement
+     * it as an async operation and submit the task on the callback of the future.
+     * <p>
+     * Note:Please do not write blocking data store operations
+     * {@link ConnectionReconciliationTask#doRetry(boolean)}
+     * </p>
+     * @param wasPreviousAttemptSuccessful Status of the previous attempt
+     */
+    public abstract void doRetry(boolean wasPreviousAttemptSuccessful);
+
+    /**
+     * Extended task should implement the logic that check the readiness of the task
+     * for execution. If task is ready for the execution, submit it for immediate
+     * execution using {@link ReconciliationManager#enqueue(ReconciliationTask)}.
+     * If task is not ready for execution yet, enqueue it again for delayed execution
+     * using {@link ReconciliationManager#enqueueForRetry(ReconciliationTask)}.
+     * To check the readiness of the task, if the data store operation is required, please
+     * implement it as an async operation and submit the task on the callback of the future.
+     * <p>
+     * Note:Please do not write blocking data store operations
+     * {@link ConnectionReconciliationTask#doRetry(boolean)}
+     * </p>
+     */
+    public abstract void checkReadinessAndProcess();
+
+    /**
+     * Method returns the time interval for retrying the failed task.
+     * {@link ReconciliationTask#doRetry(boolean)}
+     * @return time
+     */
+    public abstract long retryDelayInMills();
+
+    @Override
+    public void run() {
+        boolean status = this.reconcileConfiguration(connectionManager);
+        doRetry(status);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        ReconciliationTask that = (ReconciliationTask) o;
+
+        return nodeIid.equals(that.nodeIid);
+
+    }
+
+    @Override
+    public int hashCode() {
+        return getClass().hashCode() + nodeIid.hashCode();
+    }
+
+
+    @Override
+    public String toString() {
+        return "ReconciliationTask{ type=" + getClass().toString()+
+                ", nodeIid=" + nodeIid +
+                '}';
+    }
+}
diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/reconciliation/ReconciliationTaskManager.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/reconciliation/ReconciliationTaskManager.java
new file mode 100644 (file)
index 0000000..2a3e9c3
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.southbound.reconciliation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+
+/**
+ * This class is a task cache manager that provides a cache to store
+ * the task that is queued for the reconciliation. Whenever new task
+ * is submitted to the reconciliation manager, it will be cached in
+ * the cache. If the reconciliation is successful or it's done with
+ * all the attempt of reconciliation,
+ *
+ * Caching of the task is required, because reconciliation task might
+ * require longer duration to reconcile and there is a possibility that
+ * meanwhile user can change the configuration in config data store while
+ * task is queued for the reconciliation. In that scenario, reconciliation
+ * manager should not attempt any further reconciliation attempt for that
+ * task. ReconciliationManager will call cancelTask() to cancel the task.
+ *
+ * Created by Anil Vishnoi (avishnoi@Brocade.com) on 3/15/16.
+ */
+class ReconciliationTaskManager {
+    private static final Logger LOG = LoggerFactory.getLogger(ReconciliationTaskManager.class);
+
+    private final ConcurrentHashMap<ReconciliationTask,Future<?>> reconciliationTaskCache
+            = new ConcurrentHashMap();
+
+    public boolean isTaskQueued(ReconciliationTask task) {
+        return reconciliationTaskCache.containsKey(task);
+    }
+    public boolean cancelTask(ReconciliationTask task) {
+        if(reconciliationTaskCache.containsKey(task)){
+            Future<?> taskFuture = reconciliationTaskCache.remove(task);
+            if( !taskFuture.isDone() && !taskFuture.isCancelled() ) {
+                LOG.info("Reconciliation task is cancelled : {}",task);
+                return taskFuture.cancel(true);
+            }
+        }
+        return false;
+
+    }
+    public void cacheTask(ReconciliationTask task, Future<?> taskFuture) {
+        reconciliationTaskCache.put(task,taskFuture);
+    }
+}
diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/reconciliation/connection/ConnectionReconciliationTask.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/reconciliation/connection/ConnectionReconciliationTask.java
new file mode 100644 (file)
index 0000000..f489936
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.southbound.reconciliation.connection;
+
+import org.opendaylight.ovsdb.lib.OvsdbClient;
+import org.opendaylight.ovsdb.southbound.OvsdbConnectionManager;
+import org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationManager;
+import org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationTask;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Created by Anil Vishnoi (avishnoi@Brocade.com) on 3/9/16.
+ */
+public class ConnectionReconciliationTask extends ReconciliationTask {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ConnectionReconciliationTask.class);
+
+    private static final int RETRY_INTERVAL_FACTOR = 10000;
+    private static final int MAX_ATTEMPT = 10;
+
+    private AtomicInteger connectionAttempt = new AtomicInteger(0);
+
+    public ConnectionReconciliationTask(ReconciliationManager reconciliationManager, OvsdbConnectionManager
+            connectionManager, InstanceIdentifier<?> nodeIid, DataObject configData) {
+        super(reconciliationManager, connectionManager, nodeIid, configData);
+
+    }
+
+    @Override
+    public boolean reconcileConfiguration(OvsdbConnectionManager connectionManager) {
+        boolean result = false;
+        connectionAttempt.incrementAndGet();
+        InstanceIdentifier<Node> nIid = (InstanceIdentifier<Node>) nodeIid;
+        OvsdbNodeAugmentation ovsdbNode = (OvsdbNodeAugmentation)configData;
+
+        LOG.info("Retry({}) connection to Ovsdb Node {} ", connectionAttempt.get(), ovsdbNode.getConnectionInfo());
+        OvsdbClient client = null;
+        try {
+            client = connectionManager.connect(nIid, ovsdbNode);
+            if (client != null) {
+                LOG.info("Successfully connected to Ovsdb Node {} ", ovsdbNode.getConnectionInfo());
+                result = true;
+            } else {
+                LOG.warn("Connection retry({}) failed for {}.",
+                        connectionAttempt.get(), ovsdbNode.getConnectionInfo());
+            }
+        } catch (UnknownHostException e) {
+            LOG.warn("Connection retry({}) failed with exception. ",connectionAttempt.get(), e);
+        }
+        return result;
+    }
+
+    @Override
+    public void doRetry(boolean wasLastAttemptSuccessful) {
+
+        if( !wasLastAttemptSuccessful && connectionAttempt.get() <= MAX_ATTEMPT ) {
+            reconciliationManager.enqueueForRetry(ConnectionReconciliationTask.this);
+        } else {
+            reconciliationManager.dequeue(this);
+        }
+    }
+
+    @Override
+    public void checkReadinessAndProcess() {
+        reconciliationManager.enqueue(this);
+    }
+
+    @Override
+    public long retryDelayInMills() {
+        return connectionAttempt.get() * RETRY_INTERVAL_FACTOR;
+    }
+}
index e42998c3cfc724f93027a170bb8b146f940384c2..7bb7eef25f41fe5cd7862bb873f79d8b6460f2c5 100644 (file)
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.util.concurrent.CheckedFuture;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -30,6 +31,7 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
@@ -37,10 +39,12 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.ovsdb.lib.OvsdbClient;
 import org.opendaylight.ovsdb.lib.OvsdbConnection;
 import org.opendaylight.ovsdb.lib.impl.OvsdbConnectionService;
+import org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationManager;
 import org.opendaylight.ovsdb.southbound.transactions.md.TransactionCommand;
 import org.opendaylight.ovsdb.southbound.transactions.md.TransactionInvoker;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.PortNumber;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.rev130405.services.service.Instance;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAttributes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.ovsdb.node.attributes.ConnectionInfo;
@@ -66,6 +70,7 @@ public class OvsdbConnectionManagerTest {
     @Mock private EntityOwnershipService entityOwnershipService;
     @Mock private OvsdbConnection ovsdbConnection;
     @Mock private OvsdbClient externalClient;
+    @Mock private ReconciliationManager reconciliationManager;
     private Map<ConnectionInfo,OvsdbConnectionInstance> clients;
     private Map<ConnectionInfo,InstanceIdentifier<Node>> instanceIdentifiers;
     private Map<Entity, OvsdbConnectionInstance> entityConnectionMap;
@@ -78,6 +83,7 @@ public class OvsdbConnectionManagerTest {
         MemberModifier.field(OvsdbConnectionManager.class, "db").set(ovsdbConnectionManager, db);
         MemberModifier.field(OvsdbConnectionManager.class, "txInvoker").set(ovsdbConnectionManager, txInvoker);
         MemberModifier.field(OvsdbConnectionManager.class, "entityOwnershipService").set(ovsdbConnectionManager, entityOwnershipService);
+        MemberModifier.field(OvsdbConnectionManager.class, "reconciliationManager").set(ovsdbConnectionManager, reconciliationManager);
         MemberModifier.field(OvsdbConnectionManager.class, "ovsdbConnection")
                 .set(ovsdbConnectionManager, ovsdbConnection);
         entityConnectionMap = new ConcurrentHashMap<>();
@@ -159,7 +165,13 @@ public class OvsdbConnectionManagerTest {
         instanceIdentifiers = new ConcurrentHashMap<>();
         MemberModifier.field(OvsdbConnectionManager.class, "instanceIdentifiers").set(ovsdbConnectionManager, instanceIdentifiers);
 
-
+        MemberModifier.suppress(MemberMatcher.method(OvsdbConnectionManager.class, "reconcileConnection",
+                InstanceIdentifier.class, OvsdbNodeAugmentation.class));
+        ReadOnlyTransaction tx = mock(ReadOnlyTransaction.class);
+        when(db.newReadOnlyTransaction()).thenReturn(tx);
+        when(tx.read(any(LogicalDatastoreType.class),any(InstanceIdentifier.class)))
+                .thenReturn(mock(CheckedFuture.class));
+        when(ovsdbConnectionInstance.getInstanceIdentifier()).thenReturn(mock(InstanceIdentifier.class));
         ovsdbConnectionManager.disconnected(externalClient);
         Map<ConnectionInfo,OvsdbConnectionInstance> testClients = Whitebox.getInternalState(ovsdbConnectionManager, "clients");
         assertEquals("Error, size of the hashmap is incorrect", 0, testClients.size());
index 4858eadfc914cc3c5bdf467aeddfc5ed42e0aad2..5cbf952a591d16316a6ced23a1b123450cc91cba 100644 (file)
@@ -9,13 +9,8 @@
 package org.opendaylight.ovsdb.southbound;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.*;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -39,8 +34,12 @@ import org.opendaylight.ovsdb.southbound.ovsdb.transact.TransactCommandAggregato
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAugmentation;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbNodeAugmentation;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.ovsdb.node.attributes.ConnectionInfo;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
@@ -135,7 +134,10 @@ public class OvsdbDataChangeListenerTest {
         AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes = mock(AsyncDataChangeEvent.class);
         Map<InstanceIdentifier<?>, DataObject> originalDataObject = new HashMap<>();
         Set<InstanceIdentifier<?>> iiD = new HashSet<>();
-        InstanceIdentifier instanceIdentifier = mock(InstanceIdentifier.class);
+        InstanceIdentifier<?> instanceIdentifier = InstanceIdentifier.builder(NetworkTopology.class)
+                .child(Topology.class,new TopologyKey(SouthboundConstants.OVSDB_TOPOLOGY_ID))
+                .child(Node.class,new NodeKey(new NodeId("ovsdb://127.0.0.1:16640")))
+                .build();
         OvsdbNodeAugmentation ovsdbNode = mock(OvsdbNodeAugmentation.class);
         iiD.add(instanceIdentifier);
         originalDataObject.put(instanceIdentifier, ovsdbNode);
@@ -166,7 +168,10 @@ public class OvsdbDataChangeListenerTest {
     public void testUpdateConnections() throws Exception {
         AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> changes = mock(AsyncDataChangeEvent.class);
         Map<InstanceIdentifier<?>, DataObject> map = new HashMap<>();
-        InstanceIdentifier instanceIdentifier = mock(InstanceIdentifier.class);
+        InstanceIdentifier<?> instanceIdentifier = InstanceIdentifier.builder(NetworkTopology.class)
+                .child(Topology.class,new TopologyKey(SouthboundConstants.OVSDB_TOPOLOGY_ID))
+                .child(Node.class,new NodeKey(new NodeId("ovsdb://127.0.0.1:16640")))
+                .build();
         OvsdbNodeAugmentation value = mock(OvsdbNodeAugmentation.class);
         map.put(instanceIdentifier, value);