From: K.V Suneelu Verma Date: Mon, 18 Dec 2017 07:05:22 +0000 (+0530) Subject: make reconciliation the first transaction X-Git-Tag: release/oxygen~6 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=e0b34c00c9b5b32f23003377d5197c6f6169c7a6;p=ovsdb.git make reconciliation the first transaction jira bug ovsdb-441 When the south bound device disconnects and connects back reconciliation task will try to reconcile to the device. At the same time if the application pushes some more config and that gets executed, then the reconciliation task if executed later will end up corrupting the device state. Ensure that reconciliation transaction is the first towards the device Change-Id: I5b5df1e8cdc3d96e7aa518a5012759d39afe048f Signed-off-by: K.V Suneelu Verma --- diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionInstance.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionInstance.java index 5d07adf5e..203a4598a 100644 --- a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionInstance.java +++ b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionInstance.java @@ -15,9 +15,16 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nonnull; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.common.api.clustering.Entity; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration; @@ -70,6 +77,9 @@ public class HwvtepConnectionInstance { private HwvtepDeviceInfo deviceInfo; private DataBroker dataBroker; private final HwvtepConnectionManager hwvtepConnectionManager; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + private final SettableFuture reconciliationFt = SettableFuture.create(); + private final AtomicBoolean firstUpdateTriggered = new AtomicBoolean(false); HwvtepConnectionInstance (HwvtepConnectionManager hwvtepConnectionManager, ConnectionInfo key, OvsdbClient client, InstanceIdentifier iid, TransactionInvoker txInvoker, DataBroker dataBroker) { @@ -83,9 +93,43 @@ public class HwvtepConnectionInstance { this.hwvtepTableReader = new HwvtepTableReader(this); } - public synchronized void transact(TransactCommand command) { - for (TransactInvoker transactInvoker: transactInvokers.values()) { - transactInvoker.invoke(command); + public void transact(final TransactCommand command) { + String nodeId = getNodeId().getValue(); + boolean firstUpdate = firstUpdateTriggered.compareAndSet(false, true); + if (reconciliationFt.isDone()) { + transact(command, false); + } else { + LOG.info("Job waiting for reconciliation {}", nodeId); + Futures.addCallback(reconciliationFt, new FutureCallback() { + @Override + public void onSuccess(Boolean aBoolean) { + LOG.info("Running the job waiting for reconciliation {}", nodeId); + transact(command, false); + } + + @Override + public void onFailure(Throwable throwable) { + LOG.info("Running the job waiting for reconciliation {}", nodeId); + transact(command, false); + } + }); + if (firstUpdate) { + LOG.info("Scheduling the reconciliation timeout task {}", nodeId); + scheduledExecutorService.schedule( () -> reconciliationFt.set(Boolean.TRUE), + HwvtepSouthboundConstants.CONFIG_NODE_UPDATE_MAX_DELAY_MS, TimeUnit.MILLISECONDS); + } + } + } + + public synchronized void transact(TransactCommand command, boolean reconcile) { + try { + for (TransactInvoker transactInvoker : transactInvokers.values()) { + transactInvoker.invoke(command); + } + } finally { + if (reconcile) { + reconciliationFt.set(Boolean.TRUE); + } } } diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepSouthboundConstants.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepSouthboundConstants.java index de47d3023..a6e2a20fe 100644 --- a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepSouthboundConstants.java +++ b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepSouthboundConstants.java @@ -53,4 +53,6 @@ public class HwvtepSouthboundConstants { "hwvtep.intransit.job.expiry.time.millis", 10000); public static final long IN_TRANSIT_STATE_CHECK_PERIOD_MILLIS = Integer.getInteger( "hwvtep.intransit.job.check.period.millis", 30000); + public static final long CONFIG_NODE_UPDATE_MAX_DELAY_MS = Integer.getInteger( + "config.node.update.max.delay.ms", 10000); } diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/configuration/HwvtepReconciliationTask.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/configuration/HwvtepReconciliationTask.java index aa8f092a8..6f25165c3 100644 --- a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/configuration/HwvtepReconciliationTask.java +++ b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/reconciliation/configuration/HwvtepReconciliationTask.java @@ -58,7 +58,8 @@ public class HwvtepReconciliationTask extends ReconciliationTask { HwvtepOperationalState hwvtepOperationalState = new HwvtepOperationalState(db, connectionInstance, changes, globalOperNode, psNode); hwvtepOperationalState.setInReconciliation(true); - connectionInstance.transact(new TransactCommandAggregator(hwvtepOperationalState,changes)); + boolean reconcile = true; + connectionInstance.transact(new TransactCommandAggregator(hwvtepOperationalState,changes), reconcile); } @Override diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/DataChangeListenerTestBase.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/DataChangeListenerTestBase.java index 6bd9eed7e..59501ec12 100644 --- a/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/DataChangeListenerTestBase.java +++ b/hwvtepsouthbound/hwvtepsouthbound-impl/src/test/java/org/opendaylight/ovsdb/hwvtepsouthbound/DataChangeListenerTestBase.java @@ -20,6 +20,8 @@ import static org.powermock.api.support.membermodification.MemberModifier.suppre import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Field; @@ -27,6 +29,8 @@ import java.lang.reflect.Modifier; import java.net.InetAddress; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.commons.lang3.reflect.FieldUtils; import org.junit.After; import org.junit.Before; @@ -192,6 +196,11 @@ public class DataChangeListenerTestBase extends AbstractDataBrokerTest { field(HwvtepConnectionInstance.class, "instanceIdentifier").set(connectionInstance, nodeIid); field(HwvtepConnectionInstance.class, "txInvoker").set(connectionInstance, transactionInvoker); field(HwvtepConnectionInstance.class, "client").set(connectionInstance, ovsdbClient); + SettableFuture reconciliationFt = SettableFuture.create(); + reconciliationFt.set(Boolean.TRUE); + field(HwvtepConnectionInstance.class, "reconciliationFt").set(connectionInstance, reconciliationFt); + field(HwvtepConnectionInstance.class, "firstUpdateTriggered").set(connectionInstance, + new AtomicBoolean(Boolean.TRUE)); doReturn(nodeIid).when(connectionInstance).getInstanceIdentifier(); doReturn(listenableDbSchema).when(connectionInstance).getSchema(anyString()); doReturn(dataBroker).when(connectionInstance).getDataBroker();