import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
private HwvtepDeviceInfo deviceInfo;
private DataBroker dataBroker;
private final HwvtepConnectionManager hwvtepConnectionManager;
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ private final SettableFuture<Boolean> reconciliationFt = SettableFuture.create();
+ private final AtomicBoolean firstUpdateTriggered = new AtomicBoolean(false);
HwvtepConnectionInstance (HwvtepConnectionManager hwvtepConnectionManager, ConnectionInfo key, OvsdbClient client,
InstanceIdentifier<Node> iid, TransactionInvoker txInvoker, DataBroker dataBroker) {
this.hwvtepTableReader = new HwvtepTableReader(this);
}
- public synchronized void transact(TransactCommand command) {
- for (TransactInvoker transactInvoker: transactInvokers.values()) {
- transactInvoker.invoke(command);
+ public void transact(final TransactCommand command) {
+ String nodeId = getNodeId().getValue();
+ boolean firstUpdate = firstUpdateTriggered.compareAndSet(false, true);
+ if (reconciliationFt.isDone()) {
+ transact(command, false);
+ } else {
+ LOG.info("Job waiting for reconciliation {}", nodeId);
+ Futures.addCallback(reconciliationFt, new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(Boolean aBoolean) {
+ LOG.info("Running the job waiting for reconciliation {}", nodeId);
+ transact(command, false);
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ LOG.info("Running the job waiting for reconciliation {}", nodeId);
+ transact(command, false);
+ }
+ });
+ if (firstUpdate) {
+ LOG.info("Scheduling the reconciliation timeout task {}", nodeId);
+ scheduledExecutorService.schedule( () -> reconciliationFt.set(Boolean.TRUE),
+ HwvtepSouthboundConstants.CONFIG_NODE_UPDATE_MAX_DELAY_MS, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ public synchronized void transact(TransactCommand command, boolean reconcile) {
+ try {
+ for (TransactInvoker transactInvoker : transactInvokers.values()) {
+ transactInvoker.invoke(command);
+ }
+ } finally {
+ if (reconcile) {
+ reconciliationFt.set(Boolean.TRUE);
+ }
}
}
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.commons.lang3.reflect.FieldUtils;
import org.junit.After;
import org.junit.Before;
field(HwvtepConnectionInstance.class, "instanceIdentifier").set(connectionInstance, nodeIid);
field(HwvtepConnectionInstance.class, "txInvoker").set(connectionInstance, transactionInvoker);
field(HwvtepConnectionInstance.class, "client").set(connectionInstance, ovsdbClient);
+ SettableFuture<Boolean> reconciliationFt = SettableFuture.create();
+ reconciliationFt.set(Boolean.TRUE);
+ field(HwvtepConnectionInstance.class, "reconciliationFt").set(connectionInstance, reconciliationFt);
+ field(HwvtepConnectionInstance.class, "firstUpdateTriggered").set(connectionInstance,
+ new AtomicBoolean(Boolean.TRUE));
doReturn(nodeIid).when(connectionInstance).getInstanceIdentifier();
doReturn(listenableDbSchema).when(connectionInstance).getSchema(anyString());
doReturn(dataBroker).when(connectionInstance).getDataBroker();