bug 8029 handle expired in transit entries
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / transact / DependencyQueue.java
1 /*
2  * Copyright © 2016, 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.hwvtepsouthbound.transact;
10
11 import com.google.common.util.concurrent.ThreadFactoryBuilder;
12 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepConnectionInstance;
13 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepDeviceInfo;
14 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepSouthboundConstants;
15 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 import java.util.ArrayList;
20 import java.util.Iterator;
21 import java.util.List;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
28
29 public class DependencyQueue {
30
31     private static final Logger LOG = LoggerFactory.getLogger(DependencyQueue.class);
32     private static final ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("hwvtep-waiting-job-%d").
33             build();
34     private static final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFact);
35
36     private final LinkedBlockingQueue<DependentJob> configWaitQueue = new LinkedBlockingQueue<>(
37             HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
38     private final LinkedBlockingQueue<DependentJob> opWaitQueue = new LinkedBlockingQueue<>(
39             HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
40     private final HwvtepDeviceInfo deviceInfo;
41     private ScheduledFuture expiredTasksMonitorJob;
42
43     @SuppressWarnings("unchecked")
44     public DependencyQueue(HwvtepDeviceInfo hwvtepDeviceInfo) {
45         this.deviceInfo = hwvtepDeviceInfo;
46         expiredTasksMonitorJob = executorService.scheduleWithFixedDelay(() -> {
47             try {
48                 LOG.debug("Processing dependencies");
49                 if (!deviceInfo.getConnectionInstance().getOvsdbClient().isActive()) {
50                     expiredTasksMonitorJob.cancel(false);
51                 }
52                 deviceInfo.onOperDataAvailable();
53             } catch (Throwable e) {
54                 //If execution of one run throws error , subsequent runs are suppressed, hence catching the throwable
55                 LOG.error("Failed to process dependencies", e);
56             }
57         }, 0, HwvtepSouthboundConstants.IN_TRANSIT_STATE_CHECK_PERIOD_MILLIS, TimeUnit.MILLISECONDS);
58     }
59
60     /**
61      * Tries to add the job to the waiting queue
62      * @param waitingJob The job to be enqueued
63      * @return true if it is successfully added to the queue
64      */
65     public boolean addToQueue(DependentJob waitingJob) {
66         boolean addedToQueue;
67         if (waitingJob instanceof DependentJob.ConfigWaitingJob) {
68             addedToQueue = configWaitQueue.offer(waitingJob);
69         } else {
70             addedToQueue = opWaitQueue.offer(waitingJob);
71         }
72         if (addedToQueue) {
73             LOG.debug("Added the waiting job {} to queue", waitingJob.getKey());
74         } else {
75             LOG.error("Failed to add the waiting job to queue {}", waitingJob.getKey());
76         }
77         return addedToQueue;
78     }
79
80     /**
81      * Checks if any config data dependent jobs are ready to be processed and process them
82      * @param connectionInstance The connection instance
83      */
84     public void processReadyJobsFromConfigQueue(HwvtepConnectionInstance connectionInstance) {
85         processReadyJobs(connectionInstance, configWaitQueue);
86     }
87
88     /**
89      * Checks if any operational data dependent jobs are ready to be processed and process them
90      * @param connectionInstance The connection instance
91      */
92     public void processReadyJobsFromOpQueue(HwvtepConnectionInstance connectionInstance) {
93         processReadyJobs(connectionInstance, opWaitQueue);
94     }
95
96     private void processReadyJobs(final HwvtepConnectionInstance hwvtepConnectionInstance,
97                                   LinkedBlockingQueue<DependentJob> queue) {
98         final List<DependentJob> readyJobs =  getReadyJobs(queue);
99         if (readyJobs.size() > 0) {
100             executorService.submit(() -> hwvtepConnectionInstance.transact(new TransactCommand() {
101                 HwvtepOperationalState operationalState;
102                 @Override
103                 public void execute(TransactionBuilder transactionBuilder) {
104                     this.operationalState = new HwvtepOperationalState(hwvtepConnectionInstance);
105                     for (DependentJob job : readyJobs) {
106                         job.onDependencyResolved(operationalState, transactionBuilder);
107                     }
108                 }
109
110                 @Override
111                 public void onFailure(TransactionBuilder deviceTransaction) {
112                     readyJobs.forEach((job) -> job.onFailure(deviceTransaction));
113                     operationalState.clearIntransitKeys();
114                 }
115
116                 @Override
117                 public void onSuccess(TransactionBuilder deviceTransaction) {
118                     readyJobs.forEach((job) -> job.onSuccess(deviceTransaction));
119                     operationalState.getDeviceInfo().onOperDataAvailable();
120                 }
121             }));
122         }
123     }
124
125     private List<DependentJob> getReadyJobs(LinkedBlockingQueue<DependentJob> queue) {
126         List<DependentJob> readyJobs = new ArrayList<>();
127         Iterator<DependentJob> jobIterator = queue.iterator();
128         while(jobIterator.hasNext()) {
129             DependentJob job = jobIterator.next();
130             long currentTime = System.currentTimeMillis();
131
132             //first check if its dependencies are met later check for expired status
133             if (job.areDependenciesMet(deviceInfo)) {
134                 jobIterator.remove();
135                 readyJobs.add(job);
136                 continue;
137             }
138             if (job.isExpired(currentTime)) {
139                 jobIterator.remove();
140                 continue;
141             }
142         }
143         return readyJobs;
144     }
145
146     public static void close() {
147         executorService.shutdown();
148     }
149
150     public void submit(Runnable runnable) {
151         executorService.submit(runnable);
152     }
153 }