f6cd3aa7e624f336d4ac2e186b94c0c894af843a
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / transact / DependencyQueue.java
1 /*
2  * Copyright © 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.hwvtepsouthbound.transact;
10
11 import com.google.common.util.concurrent.ThreadFactoryBuilder;
12 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
13 import java.util.ArrayList;
14 import java.util.Iterator;
15 import java.util.List;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.ScheduledExecutorService;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicInteger;
22 import java.util.concurrent.atomic.AtomicReference;
23
24 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepConnectionInstance;
25 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepDeviceInfo;
26 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepSouthboundConstants;
27 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 public class DependencyQueue {
32
33     private static final Logger LOG = LoggerFactory.getLogger(DependencyQueue.class);
34     private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(
35             new ThreadFactoryBuilder().setNameFormat("hwvtep-waiting-job-%d").build());
36
37     private final LinkedBlockingQueue<DependentJob> configWaitQueue = new LinkedBlockingQueue<>(
38             HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
39     private final LinkedBlockingQueue<DependentJob> opWaitQueue = new LinkedBlockingQueue<>(
40             HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
41     private final HwvtepDeviceInfo deviceInfo;
42
43     @SuppressWarnings("checkstyle:IllegalCatch")
44     public DependencyQueue(HwvtepDeviceInfo hwvtepDeviceInfo) {
45         this.deviceInfo = hwvtepDeviceInfo;
46
47         final AtomicReference<ScheduledFuture<?>> expiredTasksMonitorJob = new AtomicReference<>();
48         expiredTasksMonitorJob.set(EXECUTOR_SERVICE.scheduleWithFixedDelay(() -> {
49             try {
50                 LOG.debug("Processing dependencies");
51                 if (!deviceInfo.getConnectionInstance().getOvsdbClient().isActive()) {
52                     if (expiredTasksMonitorJob.get() != null) {
53                         expiredTasksMonitorJob.get().cancel(false);
54                     }
55                 }
56                 deviceInfo.onOperDataAvailable();
57             } catch (RuntimeException e) {
58                 //If execution of one run throws error , subsequent runs are suppressed, hence catching the throwable
59                 LOG.error("Failed to process dependencies", e);
60             }
61         }, 0, HwvtepSouthboundConstants.IN_TRANSIT_STATE_CHECK_PERIOD_MILLIS, TimeUnit.MILLISECONDS));
62     }
63
64     /**
65      * Tries to add the job to the waiting queue.
66      *
67      * @param waitingJob The job to be enqueued
68      * @return true if it is successfully added to the queue
69      */
70     public boolean addToQueue(DependentJob waitingJob) {
71         boolean addedToQueue;
72         if (waitingJob instanceof DependentJob.ConfigWaitingJob) {
73             addedToQueue = configWaitQueue.offer(waitingJob);
74         } else {
75             addedToQueue = opWaitQueue.offer(waitingJob);
76         }
77         if (addedToQueue) {
78             LOG.debug("Added the waiting job {} to queue", waitingJob.getKey());
79         } else {
80             LOG.error("Failed to add the waiting job to queue {}", waitingJob.getKey());
81         }
82         return addedToQueue;
83     }
84
85     /**
86      * Checks if any config data dependent jobs are ready to be processed and process them.
87      *
88      * @param connectionInstance The connection instance
89      */
90     public void processReadyJobsFromConfigQueue(HwvtepConnectionInstance connectionInstance) {
91         processReadyJobs(connectionInstance, configWaitQueue);
92     }
93
94     /**
95      * Checks if any operational data dependent jobs are ready to be processed and process them.
96      *
97      * @param connectionInstance The connection instance
98      */
99     public void processReadyJobsFromOpQueue(HwvtepConnectionInstance connectionInstance) {
100         processReadyJobs(connectionInstance, opWaitQueue);
101     }
102
103     @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT")
104     private void processReadyJobs(final HwvtepConnectionInstance hwvtepConnectionInstance,
105                                   LinkedBlockingQueue<DependentJob> queue) {
106         final List<DependentJob> readyJobs = getReadyJobs(queue);
107         readyJobs.forEach((job) -> {
108             EXECUTOR_SERVICE.execute(() ->
109                 hwvtepConnectionInstance.transact(new TransactCommand() {
110                     HwvtepOperationalState operationalState = new HwvtepOperationalState(hwvtepConnectionInstance);
111                     AtomicInteger retryCount = new AtomicInteger(5);
112
113                     @Override
114                     public boolean retry() {
115                         return retryCount.decrementAndGet() > 0;
116                     }
117
118                     @Override
119                     public void execute(TransactionBuilder transactionBuilder) {
120                         deviceInfo.clearKeyFromDependencyQueue(job.getKey());
121                         if (operationalState.getConnectionInstance() != null
122                                 && operationalState.getConnectionInstance().isActive()) {
123                             job.onDependencyResolved(operationalState, transactionBuilder);
124                         }
125                     }
126
127                     @Override
128                     @SuppressFBWarnings(value = "UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR")
129                     public void onFailure(TransactionBuilder tx) {
130                         job.onFailure();
131                         operationalState.clearIntransitKeys();
132
133                     }
134
135                     @Override
136                     public void onSuccess(TransactionBuilder tx) {
137                         job.onSuccess();
138                         operationalState.getDeviceInfo().onOperDataAvailable();
139                     }
140                 }));
141         });
142     }
143
144     private List<DependentJob> getReadyJobs(LinkedBlockingQueue<DependentJob> queue) {
145         List<DependentJob> readyJobs = new ArrayList<>();
146         Iterator<DependentJob> jobIterator = queue.iterator();
147         while (jobIterator.hasNext()) {
148             DependentJob job = jobIterator.next();
149             long currentTime = System.currentTimeMillis();
150             if (job.areDependenciesMet(deviceInfo)) {
151                 jobIterator.remove();
152                 readyJobs.add(job);
153                 continue;
154             }
155             if (job.isExpired(currentTime)) {
156                 deviceInfo.clearKeyFromDependencyQueue(job.getKey());
157                 jobIterator.remove();
158                 continue;
159             }
160         }
161         return readyJobs;
162     }
163
164     public static void close() {
165         EXECUTOR_SERVICE.shutdown();
166     }
167
168     public void submit(Runnable runnable) {
169         EXECUTOR_SERVICE.execute(runnable);
170     }
171 }