Update MRI projects for Aluminium
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / transact / DependencyQueue.java
1 /*
2  * Copyright © 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.hwvtepsouthbound.transact;
9
10 import com.google.common.util.concurrent.ThreadFactoryBuilder;
11 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
12 import java.util.ArrayList;
13 import java.util.Iterator;
14 import java.util.List;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.ScheduledExecutorService;
18 import java.util.concurrent.ScheduledFuture;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.atomic.AtomicInteger;
21 import java.util.concurrent.atomic.AtomicReference;
22 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepConnectionInstance;
23 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepDeviceInfo;
24 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepSouthboundConstants;
25 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 public class DependencyQueue {
30
31     private static final Logger LOG = LoggerFactory.getLogger(DependencyQueue.class);
32     private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(
33             new ThreadFactoryBuilder().setNameFormat("hwvtep-waiting-job-%d").build());
34
35     private final LinkedBlockingQueue<DependentJob> configWaitQueue = new LinkedBlockingQueue<>(
36             HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
37     private final LinkedBlockingQueue<DependentJob> opWaitQueue = new LinkedBlockingQueue<>(
38             HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
39     private final HwvtepDeviceInfo deviceInfo;
40
41     @SuppressWarnings("checkstyle:IllegalCatch")
42     public DependencyQueue(HwvtepDeviceInfo hwvtepDeviceInfo) {
43         this.deviceInfo = hwvtepDeviceInfo;
44
45         final AtomicReference<ScheduledFuture<?>> expiredTasksMonitorJob = new AtomicReference<>();
46         expiredTasksMonitorJob.set(EXECUTOR_SERVICE.scheduleWithFixedDelay(() -> {
47             try {
48                 LOG.debug("Processing dependencies");
49                 if (!deviceInfo.getConnectionInstance().getOvsdbClient().isActive()) {
50                     if (expiredTasksMonitorJob.get() != null) {
51                         expiredTasksMonitorJob.get().cancel(false);
52                     }
53                 }
54                 deviceInfo.onOperDataAvailable();
55             } catch (RuntimeException e) {
56                 //If execution of one run throws error , subsequent runs are suppressed, hence catching the throwable
57                 LOG.error("Failed to process dependencies", e);
58             }
59         }, 0, HwvtepSouthboundConstants.IN_TRANSIT_STATE_CHECK_PERIOD_MILLIS, TimeUnit.MILLISECONDS));
60     }
61
62     /**
63      * Tries to add the job to the waiting queue.
64      *
65      * @param waitingJob The job to be enqueued
66      * @return true if it is successfully added to the queue
67      */
68     public boolean addToQueue(DependentJob waitingJob) {
69         boolean addedToQueue;
70         if (waitingJob instanceof DependentJob.ConfigWaitingJob) {
71             addedToQueue = configWaitQueue.offer(waitingJob);
72         } else {
73             addedToQueue = opWaitQueue.offer(waitingJob);
74         }
75         if (addedToQueue) {
76             LOG.debug("Added the waiting job {} to queue", waitingJob.getKey());
77         } else {
78             LOG.error("Failed to add the waiting job to queue {}", waitingJob.getKey());
79         }
80         return addedToQueue;
81     }
82
83     /**
84      * Checks if any config data dependent jobs are ready to be processed and process them.
85      *
86      * @param connectionInstance The connection instance
87      */
88     public void processReadyJobsFromConfigQueue(HwvtepConnectionInstance connectionInstance) {
89         processReadyJobs(connectionInstance, configWaitQueue);
90     }
91
92     /**
93      * Checks if any operational data dependent jobs are ready to be processed and process them.
94      *
95      * @param connectionInstance The connection instance
96      */
97     public void processReadyJobsFromOpQueue(HwvtepConnectionInstance connectionInstance) {
98         processReadyJobs(connectionInstance, opWaitQueue);
99     }
100
101     @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT")
102     private void processReadyJobs(final HwvtepConnectionInstance hwvtepConnectionInstance,
103                                   LinkedBlockingQueue<DependentJob> queue) {
104         final List<DependentJob> readyJobs = getReadyJobs(queue);
105         readyJobs.forEach((job) -> {
106             EXECUTOR_SERVICE.execute(() ->
107                 hwvtepConnectionInstance.transact(new TransactCommand() {
108                     HwvtepOperationalState operationalState = new HwvtepOperationalState(hwvtepConnectionInstance);
109                     AtomicInteger retryCount = new AtomicInteger(5);
110
111                     @Override
112                     public boolean retry() {
113                         return retryCount.decrementAndGet() > 0;
114                     }
115
116                     @Override
117                     public void execute(TransactionBuilder transactionBuilder) {
118                         deviceInfo.clearKeyFromDependencyQueue(job.getKey());
119                         if (operationalState.getConnectionInstance() != null
120                                 && operationalState.getConnectionInstance().isActive()) {
121                             job.onDependencyResolved(operationalState, transactionBuilder);
122                         }
123                     }
124
125                     @Override
126                     @SuppressFBWarnings(value = "UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR")
127                     public void onFailure(TransactionBuilder tx) {
128                         job.onFailure();
129                         operationalState.clearIntransitKeys();
130
131                     }
132
133                     @Override
134                     public void onSuccess(TransactionBuilder tx) {
135                         job.onSuccess();
136                         operationalState.getDeviceInfo().onOperDataAvailable();
137                     }
138                 }));
139         });
140     }
141
142     private List<DependentJob> getReadyJobs(LinkedBlockingQueue<DependentJob> queue) {
143         List<DependentJob> readyJobs = new ArrayList<>();
144         Iterator<DependentJob> jobIterator = queue.iterator();
145         while (jobIterator.hasNext()) {
146             DependentJob job = jobIterator.next();
147             long currentTime = System.currentTimeMillis();
148             if (job.areDependenciesMet(deviceInfo)) {
149                 jobIterator.remove();
150                 readyJobs.add(job);
151                 continue;
152             }
153             if (job.isExpired(currentTime)) {
154                 deviceInfo.clearKeyFromDependencyQueue(job.getKey());
155                 jobIterator.remove();
156                 continue;
157             }
158         }
159         return readyJobs;
160     }
161
162     public static void close() {
163         EXECUTOR_SERVICE.shutdown();
164     }
165
166     public void submit(Runnable runnable) {
167         EXECUTOR_SERVICE.execute(runnable);
168     }
169 }