2 * Copyright © 2016, 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.ovsdb.hwvtepsouthbound.transact;
10 import com.google.common.util.concurrent.ThreadFactoryBuilder;
11 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
12 import java.util.ArrayList;
13 import java.util.Iterator;
14 import java.util.List;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.ScheduledExecutorService;
18 import java.util.concurrent.ScheduledFuture;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.atomic.AtomicInteger;
21 import java.util.concurrent.atomic.AtomicReference;
22 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepConnectionInstance;
23 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepDeviceInfo;
24 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepSouthboundConstants;
25 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
29 public class DependencyQueue {
31 private static final Logger LOG = LoggerFactory.getLogger(DependencyQueue.class);
32 private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(
33 new ThreadFactoryBuilder().setNameFormat("hwvtep-waiting-job-%d").build());
35 private final LinkedBlockingQueue<DependentJob> configWaitQueue = new LinkedBlockingQueue<>(
36 HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
37 private final LinkedBlockingQueue<DependentJob> opWaitQueue = new LinkedBlockingQueue<>(
38 HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
39 private final HwvtepDeviceInfo deviceInfo;
41 @SuppressWarnings("checkstyle:IllegalCatch")
42 public DependencyQueue(HwvtepDeviceInfo hwvtepDeviceInfo) {
43 this.deviceInfo = hwvtepDeviceInfo;
45 final AtomicReference<ScheduledFuture<?>> expiredTasksMonitorJob = new AtomicReference<>();
46 expiredTasksMonitorJob.set(EXECUTOR_SERVICE.scheduleWithFixedDelay(() -> {
48 LOG.debug("Processing dependencies");
49 if (!deviceInfo.getConnectionInstance().getOvsdbClient().isActive()) {
50 if (expiredTasksMonitorJob.get() != null) {
51 expiredTasksMonitorJob.get().cancel(false);
54 deviceInfo.onOperDataAvailable();
55 } catch (RuntimeException e) {
56 //If execution of one run throws error , subsequent runs are suppressed, hence catching the throwable
57 LOG.error("Failed to process dependencies", e);
59 }, 0, HwvtepSouthboundConstants.IN_TRANSIT_STATE_CHECK_PERIOD_MILLIS, TimeUnit.MILLISECONDS));
63 * Tries to add the job to the waiting queue.
65 * @param waitingJob The job to be enqueued
66 * @return true if it is successfully added to the queue
68 public boolean addToQueue(DependentJob waitingJob) {
70 if (waitingJob instanceof DependentJob.ConfigWaitingJob) {
71 addedToQueue = configWaitQueue.offer(waitingJob);
73 addedToQueue = opWaitQueue.offer(waitingJob);
76 LOG.debug("Added the waiting job {} to queue", waitingJob.getKey());
78 LOG.error("Failed to add the waiting job to queue {}", waitingJob.getKey());
84 * Checks if any config data dependent jobs are ready to be processed and process them.
86 * @param connectionInstance The connection instance
88 public void processReadyJobsFromConfigQueue(HwvtepConnectionInstance connectionInstance) {
89 processReadyJobs(connectionInstance, configWaitQueue);
93 * Checks if any operational data dependent jobs are ready to be processed and process them.
95 * @param connectionInstance The connection instance
97 public void processReadyJobsFromOpQueue(HwvtepConnectionInstance connectionInstance) {
98 processReadyJobs(connectionInstance, opWaitQueue);
101 @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT")
102 private void processReadyJobs(final HwvtepConnectionInstance hwvtepConnectionInstance,
103 LinkedBlockingQueue<DependentJob> queue) {
104 final List<DependentJob> readyJobs = getReadyJobs(queue);
105 readyJobs.forEach((job) -> {
106 EXECUTOR_SERVICE.execute(() ->
107 hwvtepConnectionInstance.transact(new TransactCommand() {
108 HwvtepOperationalState operationalState = new HwvtepOperationalState(hwvtepConnectionInstance);
109 AtomicInteger retryCount = new AtomicInteger(5);
112 public boolean retry() {
113 return retryCount.decrementAndGet() > 0;
117 public void execute(TransactionBuilder transactionBuilder) {
118 deviceInfo.clearKeyFromDependencyQueue(job.getKey());
119 if (operationalState.getConnectionInstance() != null
120 && operationalState.getConnectionInstance().isActive()) {
121 job.onDependencyResolved(operationalState, transactionBuilder);
126 @SuppressFBWarnings(value = "UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR")
127 public void onFailure(TransactionBuilder tx) {
129 operationalState.clearIntransitKeys();
134 public void onSuccess(TransactionBuilder tx) {
136 operationalState.getDeviceInfo().onOperDataAvailable();
142 private List<DependentJob> getReadyJobs(LinkedBlockingQueue<DependentJob> queue) {
143 List<DependentJob> readyJobs = new ArrayList<>();
144 Iterator<DependentJob> jobIterator = queue.iterator();
145 while (jobIterator.hasNext()) {
146 DependentJob job = jobIterator.next();
147 long currentTime = System.currentTimeMillis();
148 if (job.areDependenciesMet(deviceInfo)) {
149 jobIterator.remove();
153 if (job.isExpired(currentTime)) {
154 deviceInfo.clearKeyFromDependencyQueue(job.getKey());
155 jobIterator.remove();
162 public static void close() {
163 EXECUTOR_SERVICE.shutdown();
166 public void submit(Runnable runnable) {
167 EXECUTOR_SERVICE.execute(runnable);