bug 8029 handle expired in transit entries
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / transact / DependencyQueue.java
index 65af6be9ead37d85c4edf18176729371dfbf3a00..e0fca77a936afcf7361d5a8acfd326e47c509ea0 100644 (file)
@@ -13,34 +13,48 @@ import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepConnectionInstance;
 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepDeviceInfo;
 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepSouthboundConstants;
 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
-import org.opendaylight.yangtools.yang.binding.Identifiable;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
 public class DependencyQueue {
 
     private static final Logger LOG = LoggerFactory.getLogger(DependencyQueue.class);
     private static final ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("hwvtep-waiting-job-%d").
             build();
-    private static final ExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFact);
+    private static final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFact);
 
     private final LinkedBlockingQueue<DependentJob> configWaitQueue = new LinkedBlockingQueue<>(
             HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
     private final LinkedBlockingQueue<DependentJob> opWaitQueue = new LinkedBlockingQueue<>(
             HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
     private final HwvtepDeviceInfo deviceInfo;
+    private ScheduledFuture expiredTasksMonitorJob;
 
+    @SuppressWarnings("unchecked")
     public DependencyQueue(HwvtepDeviceInfo hwvtepDeviceInfo) {
         this.deviceInfo = hwvtepDeviceInfo;
+        expiredTasksMonitorJob = executorService.scheduleWithFixedDelay(() -> {
+            try {
+                LOG.debug("Processing dependencies");
+                if (!deviceInfo.getConnectionInstance().getOvsdbClient().isActive()) {
+                    expiredTasksMonitorJob.cancel(false);
+                }
+                deviceInfo.onOperDataAvailable();
+            } catch (Throwable e) {
+                //If execution of one run throws error , subsequent runs are suppressed, hence catching the throwable
+                LOG.error("Failed to process dependencies", e);
+            }
+        }, 0, HwvtepSouthboundConstants.IN_TRANSIT_STATE_CHECK_PERIOD_MILLIS, TimeUnit.MILLISECONDS);
     }
 
     /**
@@ -84,22 +98,25 @@ public class DependencyQueue {
         final List<DependentJob> readyJobs =  getReadyJobs(queue);
         if (readyJobs.size() > 0) {
             executorService.submit(() -> hwvtepConnectionInstance.transact(new TransactCommand() {
+                HwvtepOperationalState operationalState;
                 @Override
                 public void execute(TransactionBuilder transactionBuilder) {
-                    HwvtepOperationalState operationalState = new HwvtepOperationalState(hwvtepConnectionInstance);
+                    this.operationalState = new HwvtepOperationalState(hwvtepConnectionInstance);
                     for (DependentJob job : readyJobs) {
                         job.onDependencyResolved(operationalState, transactionBuilder);
                     }
                 }
 
                 @Override
-                public void onConfigUpdate(TransactionBuilder transaction, InstanceIdentifier nodeIid,
-                                           Identifiable data, InstanceIdentifier key, Object... extraData) {
+                public void onFailure(TransactionBuilder deviceTransaction) {
+                    readyJobs.forEach((job) -> job.onFailure(deviceTransaction));
+                    operationalState.clearIntransitKeys();
                 }
 
                 @Override
-                public void doDeviceTransaction(TransactionBuilder transaction, InstanceIdentifier nodeIid,
-                                                Identifiable data, InstanceIdentifier key, Object... extraData) {
+                public void onSuccess(TransactionBuilder deviceTransaction) {
+                    readyJobs.forEach((job) -> job.onSuccess(deviceTransaction));
+                    operationalState.getDeviceInfo().onOperDataAvailable();
                 }
             }));
         }
@@ -111,13 +128,16 @@ public class DependencyQueue {
         while(jobIterator.hasNext()) {
             DependentJob job = jobIterator.next();
             long currentTime = System.currentTimeMillis();
-            if (job.isExpired(currentTime)) {
+
+            //first check if its dependencies are met later check for expired status
+            if (job.areDependenciesMet(deviceInfo)) {
                 jobIterator.remove();
+                readyJobs.add(job);
                 continue;
             }
-            if (job.areDependenciesMet(deviceInfo)) {
+            if (job.isExpired(currentTime)) {
                 jobIterator.remove();
-                readyJobs.add(job);
+                continue;
             }
         }
         return readyJobs;