make reconciliation the first transaction 54/66554/3
authorK.V Suneelu Verma <k.v.suneelu.verma@ericsson.com>
Mon, 18 Dec 2017 07:05:22 +0000 (12:35 +0530)
committerSam Hague <shague@redhat.com>
Fri, 16 Feb 2018 02:45:02 +0000 (02:45 +0000)
jira bug ovsdb-441

When the south bound device disconnects and connects back reconciliation
task will try to reconcile to the device.
At the same time if the application pushes some more config and that gets
executed,
then the reconciliation task if executed later will end up corrupting the
device state.
Ensure that reconciliation transaction is the first towards the device

Change-Id: I5b5df1e8cdc3d96e7aa518a5012759d39afe048f
Signed-off-by: K.V Suneelu Verma <k.v.suneelu.verma@ericsson.com>
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionInstance.java
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepSouthboundConstants.java
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/configuration/HwvtepReconciliationTask.java
hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/DataChangeListenerTestBase.java

index 5d07adf5ee8dbbc1695a0f550f9ad08501244aa8..203a4598ac5fccb33e8098434fe0c57f7c322098 100644 (file)
@@ -15,9 +15,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.Nonnull;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
@@ -70,6 +77,9 @@ public class HwvtepConnectionInstance {
     private HwvtepDeviceInfo deviceInfo;
     private DataBroker dataBroker;
     private final HwvtepConnectionManager hwvtepConnectionManager;
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+    private final SettableFuture<Boolean> reconciliationFt = SettableFuture.create();
+    private final AtomicBoolean firstUpdateTriggered = new AtomicBoolean(false);
 
     HwvtepConnectionInstance (HwvtepConnectionManager hwvtepConnectionManager, ConnectionInfo key, OvsdbClient client,
                               InstanceIdentifier<Node> iid, TransactionInvoker txInvoker, DataBroker dataBroker) {
@@ -83,9 +93,43 @@ public class HwvtepConnectionInstance {
         this.hwvtepTableReader = new HwvtepTableReader(this);
     }
 
-    public synchronized void transact(TransactCommand command) {
-        for (TransactInvoker transactInvoker: transactInvokers.values()) {
-            transactInvoker.invoke(command);
+    public void transact(final TransactCommand command) {
+        String nodeId = getNodeId().getValue();
+        boolean firstUpdate = firstUpdateTriggered.compareAndSet(false, true);
+        if (reconciliationFt.isDone()) {
+            transact(command, false);
+        } else {
+            LOG.info("Job waiting for reconciliation {}", nodeId);
+            Futures.addCallback(reconciliationFt, new FutureCallback<Boolean>() {
+                @Override
+                public void onSuccess(Boolean aBoolean) {
+                    LOG.info("Running the job waiting for reconciliation {}", nodeId);
+                    transact(command, false);
+                }
+
+                @Override
+                public void onFailure(Throwable throwable) {
+                    LOG.info("Running the job waiting for reconciliation {}", nodeId);
+                    transact(command, false);
+                }
+            });
+            if (firstUpdate) {
+                LOG.info("Scheduling the reconciliation timeout task {}", nodeId);
+                scheduledExecutorService.schedule( () -> reconciliationFt.set(Boolean.TRUE),
+                        HwvtepSouthboundConstants.CONFIG_NODE_UPDATE_MAX_DELAY_MS, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
+    public synchronized void transact(TransactCommand command, boolean reconcile) {
+        try {
+            for (TransactInvoker transactInvoker : transactInvokers.values()) {
+                transactInvoker.invoke(command);
+            }
+        } finally {
+            if (reconcile) {
+                reconciliationFt.set(Boolean.TRUE);
+            }
         }
     }
 
index de47d3023d7b96e945923964c1b9b98a285ec1db..a6e2a20fe00f5facfe364e902ff7f63660df1be8 100644 (file)
@@ -53,4 +53,6 @@ public class HwvtepSouthboundConstants {
             "hwvtep.intransit.job.expiry.time.millis", 10000);
     public static final long IN_TRANSIT_STATE_CHECK_PERIOD_MILLIS = Integer.getInteger(
             "hwvtep.intransit.job.check.period.millis", 30000);
+    public static final long CONFIG_NODE_UPDATE_MAX_DELAY_MS = Integer.getInteger(
+            "config.node.update.max.delay.ms", 10000);
 }
index aa8f092a80ae450154ed122567da74864534806d..6f25165c3cb704330d12f3004338ba228df766ce 100644 (file)
@@ -58,7 +58,8 @@ public class HwvtepReconciliationTask extends ReconciliationTask {
         HwvtepOperationalState hwvtepOperationalState = new HwvtepOperationalState(db, connectionInstance, changes,
                 globalOperNode, psNode);
         hwvtepOperationalState.setInReconciliation(true);
-        connectionInstance.transact(new TransactCommandAggregator(hwvtepOperationalState,changes));
+        boolean reconcile = true;
+        connectionInstance.transact(new TransactCommandAggregator(hwvtepOperationalState,changes), reconcile);
     }
 
     @Override
index 6bd9eed7e6f6315701795abef0e8429f8b153fab..59501ec12e37034c272968b4d8404c6a231288ec 100644 (file)
@@ -20,6 +20,8 @@ import static org.powermock.api.support.membermodification.MemberModifier.suppre
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Field;
@@ -27,6 +29,8 @@ import java.lang.reflect.Modifier;
 import java.net.InetAddress;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -192,6 +196,11 @@ public class DataChangeListenerTestBase extends AbstractDataBrokerTest {
         field(HwvtepConnectionInstance.class, "instanceIdentifier").set(connectionInstance, nodeIid);
         field(HwvtepConnectionInstance.class, "txInvoker").set(connectionInstance, transactionInvoker);
         field(HwvtepConnectionInstance.class, "client").set(connectionInstance, ovsdbClient);
+        SettableFuture<Boolean> reconciliationFt = SettableFuture.create();
+        reconciliationFt.set(Boolean.TRUE);
+        field(HwvtepConnectionInstance.class, "reconciliationFt").set(connectionInstance, reconciliationFt);
+        field(HwvtepConnectionInstance.class, "firstUpdateTriggered").set(connectionInstance,
+                new AtomicBoolean(Boolean.TRUE));
         doReturn(nodeIid).when(connectionInstance).getInstanceIdentifier();
         doReturn(listenableDbSchema).when(connectionInstance).getSchema(anyString());
         doReturn(dataBroker).when(connectionInstance).getDataBroker();