import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepDeviceInfo;
import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepSouthboundConstants;
import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
-import org.opendaylight.yangtools.yang.binding.Identifiable;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
public class DependencyQueue {
private static final Logger LOG = LoggerFactory.getLogger(DependencyQueue.class);
private static final ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("hwvtep-waiting-job-%d").
build();
- private static final ExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFact);
+ private static final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFact);
private final LinkedBlockingQueue<DependentJob> configWaitQueue = new LinkedBlockingQueue<>(
HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
private final LinkedBlockingQueue<DependentJob> opWaitQueue = new LinkedBlockingQueue<>(
HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
private final HwvtepDeviceInfo deviceInfo;
+ private ScheduledFuture expiredTasksMonitorJob;
+ @SuppressWarnings("unchecked")
public DependencyQueue(HwvtepDeviceInfo hwvtepDeviceInfo) {
this.deviceInfo = hwvtepDeviceInfo;
+ expiredTasksMonitorJob = executorService.scheduleWithFixedDelay(() -> {
+ try {
+ LOG.debug("Processing dependencies");
+ if (!deviceInfo.getConnectionInstance().getOvsdbClient().isActive()) {
+ expiredTasksMonitorJob.cancel(false);
+ }
+ deviceInfo.onOperDataAvailable();
+ } catch (Throwable e) {
+ //If execution of one run throws error , subsequent runs are suppressed, hence catching the throwable
+ LOG.error("Failed to process dependencies", e);
+ }
+ }, 0, HwvtepSouthboundConstants.IN_TRANSIT_STATE_CHECK_PERIOD_MILLIS, TimeUnit.MILLISECONDS);
}
/**
final List<DependentJob> readyJobs = getReadyJobs(queue);
if (readyJobs.size() > 0) {
executorService.submit(() -> hwvtepConnectionInstance.transact(new TransactCommand() {
+ HwvtepOperationalState operationalState;
@Override
public void execute(TransactionBuilder transactionBuilder) {
- HwvtepOperationalState operationalState = new HwvtepOperationalState(hwvtepConnectionInstance);
+ this.operationalState = new HwvtepOperationalState(hwvtepConnectionInstance);
for (DependentJob job : readyJobs) {
job.onDependencyResolved(operationalState, transactionBuilder);
}
}
@Override
- public void onConfigUpdate(TransactionBuilder transaction, InstanceIdentifier nodeIid,
- Identifiable data, InstanceIdentifier key, Object... extraData) {
+ public void onFailure(TransactionBuilder deviceTransaction) {
+ readyJobs.forEach((job) -> job.onFailure(deviceTransaction));
+ operationalState.clearIntransitKeys();
}
@Override
- public void doDeviceTransaction(TransactionBuilder transaction, InstanceIdentifier nodeIid,
- Identifiable data, InstanceIdentifier key, Object... extraData) {
+ public void onSuccess(TransactionBuilder deviceTransaction) {
+ readyJobs.forEach((job) -> job.onSuccess(deviceTransaction));
+ operationalState.getDeviceInfo().onOperDataAvailable();
}
}));
}
while(jobIterator.hasNext()) {
DependentJob job = jobIterator.next();
long currentTime = System.currentTimeMillis();
- if (job.isExpired(currentTime)) {
+
+ //first check if its dependencies are met later check for expired status
+ if (job.areDependenciesMet(deviceInfo)) {
jobIterator.remove();
+ readyJobs.add(job);
continue;
}
- if (job.areDependenciesMet(deviceInfo)) {
+ if (job.isExpired(currentTime)) {
jobIterator.remove();
- readyJobs.add(job);
+ continue;
}
}
return readyJobs;