2 * Copyright © 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.ovsdb.hwvtepsouthbound.transact;
11 import com.google.common.util.concurrent.ThreadFactoryBuilder;
12 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
13 import java.util.ArrayList;
14 import java.util.Iterator;
15 import java.util.List;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicInteger;
22 import java.util.concurrent.atomic.AtomicReference;
24 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepConnectionInstance;
25 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepDeviceInfo;
26 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepSouthboundConstants;
27 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 public class DependencyQueue {
33 private static final Logger LOG = LoggerFactory.getLogger(DependencyQueue.class);
34 private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(
35 new ThreadFactoryBuilder().setNameFormat("hwvtep-waiting-job-%d").build());
37 private final LinkedBlockingQueue<DependentJob> configWaitQueue = new LinkedBlockingQueue<>(
38 HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
39 private final LinkedBlockingQueue<DependentJob> opWaitQueue = new LinkedBlockingQueue<>(
40 HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
41 private final HwvtepDeviceInfo deviceInfo;
43 @SuppressWarnings("checkstyle:IllegalCatch")
44 public DependencyQueue(HwvtepDeviceInfo hwvtepDeviceInfo) {
45 this.deviceInfo = hwvtepDeviceInfo;
47 final AtomicReference<ScheduledFuture<?>> expiredTasksMonitorJob = new AtomicReference<>();
48 expiredTasksMonitorJob.set(EXECUTOR_SERVICE.scheduleWithFixedDelay(() -> {
50 LOG.debug("Processing dependencies");
51 if (!deviceInfo.getConnectionInstance().getOvsdbClient().isActive()) {
52 if (expiredTasksMonitorJob.get() != null) {
53 expiredTasksMonitorJob.get().cancel(false);
56 deviceInfo.onOperDataAvailable();
57 } catch (RuntimeException e) {
58 //If execution of one run throws error , subsequent runs are suppressed, hence catching the throwable
59 LOG.error("Failed to process dependencies", e);
61 }, 0, HwvtepSouthboundConstants.IN_TRANSIT_STATE_CHECK_PERIOD_MILLIS, TimeUnit.MILLISECONDS));
65 * Tries to add the job to the waiting queue.
67 * @param waitingJob The job to be enqueued
68 * @return true if it is successfully added to the queue
70 public boolean addToQueue(DependentJob waitingJob) {
72 if (waitingJob instanceof DependentJob.ConfigWaitingJob) {
73 addedToQueue = configWaitQueue.offer(waitingJob);
75 addedToQueue = opWaitQueue.offer(waitingJob);
78 LOG.debug("Added the waiting job {} to queue", waitingJob.getKey());
80 LOG.error("Failed to add the waiting job to queue {}", waitingJob.getKey());
86 * Checks if any config data dependent jobs are ready to be processed and process them.
88 * @param connectionInstance The connection instance
90 public void processReadyJobsFromConfigQueue(HwvtepConnectionInstance connectionInstance) {
91 processReadyJobs(connectionInstance, configWaitQueue);
95 * Checks if any operational data dependent jobs are ready to be processed and process them.
97 * @param connectionInstance The connection instance
99 public void processReadyJobsFromOpQueue(HwvtepConnectionInstance connectionInstance) {
100 processReadyJobs(connectionInstance, opWaitQueue);
103 @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT")
104 private void processReadyJobs(final HwvtepConnectionInstance hwvtepConnectionInstance,
105 LinkedBlockingQueue<DependentJob> queue) {
106 final List<DependentJob> readyJobs = getReadyJobs(queue);
107 readyJobs.forEach((job) -> {
108 EXECUTOR_SERVICE.execute(() ->
109 hwvtepConnectionInstance.transact(new TransactCommand() {
110 HwvtepOperationalState operationalState = new HwvtepOperationalState(hwvtepConnectionInstance);
111 AtomicInteger retryCount = new AtomicInteger(5);
114 public boolean retry() {
115 return retryCount.decrementAndGet() > 0;
119 public void execute(TransactionBuilder transactionBuilder) {
120 deviceInfo.clearKeyFromDependencyQueue(job.getKey());
121 if (operationalState.getConnectionInstance() != null
122 && operationalState.getConnectionInstance().isActive()) {
123 job.onDependencyResolved(operationalState, transactionBuilder);
128 @SuppressFBWarnings(value = "UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR")
129 public void onFailure(TransactionBuilder tx) {
131 operationalState.clearIntransitKeys();
136 public void onSuccess(TransactionBuilder tx) {
138 operationalState.getDeviceInfo().onOperDataAvailable();
144 private List<DependentJob> getReadyJobs(LinkedBlockingQueue<DependentJob> queue) {
145 List<DependentJob> readyJobs = new ArrayList<>();
146 Iterator<DependentJob> jobIterator = queue.iterator();
147 while (jobIterator.hasNext()) {
148 DependentJob job = jobIterator.next();
149 long currentTime = System.currentTimeMillis();
150 if (job.areDependenciesMet(deviceInfo)) {
151 jobIterator.remove();
155 if (job.isExpired(currentTime)) {
156 deviceInfo.clearKeyFromDependencyQueue(job.getKey());
157 jobIterator.remove();
164 public static void close() {
165 EXECUTOR_SERVICE.shutdown();
168 public void submit(Runnable runnable) {
169 EXECUTOR_SERVICE.execute(runnable);