2 * Copyright © 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.ovsdb.hwvtepsouthbound.transact;
11 import com.google.common.util.concurrent.ThreadFactoryBuilder;
12 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepConnectionInstance;
13 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepDeviceInfo;
14 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepSouthboundConstants;
15 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
19 import java.util.ArrayList;
20 import java.util.Iterator;
21 import java.util.List;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
29 public class DependencyQueue {
31 private static final Logger LOG = LoggerFactory.getLogger(DependencyQueue.class);
32 private static final ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("hwvtep-waiting-job-%d").
34 private static final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFact);
36 private final LinkedBlockingQueue<DependentJob> configWaitQueue = new LinkedBlockingQueue<>(
37 HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
38 private final LinkedBlockingQueue<DependentJob> opWaitQueue = new LinkedBlockingQueue<>(
39 HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
40 private final HwvtepDeviceInfo deviceInfo;
41 private ScheduledFuture expiredTasksMonitorJob;
43 @SuppressWarnings("unchecked")
44 public DependencyQueue(HwvtepDeviceInfo hwvtepDeviceInfo) {
45 this.deviceInfo = hwvtepDeviceInfo;
46 expiredTasksMonitorJob = executorService.scheduleWithFixedDelay(() -> {
48 LOG.debug("Processing dependencies");
49 if (!deviceInfo.getConnectionInstance().getOvsdbClient().isActive()) {
50 expiredTasksMonitorJob.cancel(false);
52 deviceInfo.onOperDataAvailable();
53 } catch (Throwable e) {
54 //If execution of one run throws error , subsequent runs are suppressed, hence catching the throwable
55 LOG.error("Failed to process dependencies", e);
57 }, 0, HwvtepSouthboundConstants.IN_TRANSIT_STATE_CHECK_PERIOD_MILLIS, TimeUnit.MILLISECONDS);
61 * Tries to add the job to the waiting queue
62 * @param waitingJob The job to be enqueued
63 * @return true if it is successfully added to the queue
65 public boolean addToQueue(DependentJob waitingJob) {
67 if (waitingJob instanceof DependentJob.ConfigWaitingJob) {
68 addedToQueue = configWaitQueue.offer(waitingJob);
70 addedToQueue = opWaitQueue.offer(waitingJob);
73 LOG.debug("Added the waiting job {} to queue", waitingJob.getKey());
75 LOG.error("Failed to add the waiting job to queue {}", waitingJob.getKey());
81 * Checks if any config data dependent jobs are ready to be processed and process them
82 * @param connectionInstance The connection instance
84 public void processReadyJobsFromConfigQueue(HwvtepConnectionInstance connectionInstance) {
85 processReadyJobs(connectionInstance, configWaitQueue);
89 * Checks if any operational data dependent jobs are ready to be processed and process them
90 * @param connectionInstance The connection instance
92 public void processReadyJobsFromOpQueue(HwvtepConnectionInstance connectionInstance) {
93 processReadyJobs(connectionInstance, opWaitQueue);
96 private void processReadyJobs(final HwvtepConnectionInstance hwvtepConnectionInstance,
97 LinkedBlockingQueue<DependentJob> queue) {
98 final List<DependentJob> readyJobs = getReadyJobs(queue);
99 if (readyJobs.size() > 0) {
100 executorService.submit(() -> hwvtepConnectionInstance.transact(new TransactCommand() {
101 HwvtepOperationalState operationalState;
103 public void execute(TransactionBuilder transactionBuilder) {
104 this.operationalState = new HwvtepOperationalState(hwvtepConnectionInstance);
105 for (DependentJob job : readyJobs) {
106 job.onDependencyResolved(operationalState, transactionBuilder);
111 public void onFailure(TransactionBuilder deviceTransaction) {
112 readyJobs.forEach((job) -> job.onFailure(deviceTransaction));
113 operationalState.clearIntransitKeys();
117 public void onSuccess(TransactionBuilder deviceTransaction) {
118 readyJobs.forEach((job) -> job.onSuccess(deviceTransaction));
119 operationalState.getDeviceInfo().onOperDataAvailable();
125 private List<DependentJob> getReadyJobs(LinkedBlockingQueue<DependentJob> queue) {
126 List<DependentJob> readyJobs = new ArrayList<>();
127 Iterator<DependentJob> jobIterator = queue.iterator();
128 while(jobIterator.hasNext()) {
129 DependentJob job = jobIterator.next();
130 long currentTime = System.currentTimeMillis();
132 //first check if its dependencies are met later check for expired status
133 if (job.areDependenciesMet(deviceInfo)) {
134 jobIterator.remove();
138 if (job.isExpired(currentTime)) {
139 jobIterator.remove();
146 public static void close() {
147 executorService.shutdown();
150 public void submit(Runnable runnable) {
151 executorService.submit(runnable);