bug 6579 removed boilerplate code
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / transact / DependencyQueue.java
1 /*
2  * Copyright (c) 2016 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.ovsdb.hwvtepsouthbound.transact;
10
11 import com.google.common.util.concurrent.ThreadFactoryBuilder;
12 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepConnectionInstance;
13 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepDeviceInfo;
14 import org.opendaylight.ovsdb.hwvtepsouthbound.HwvtepSouthboundConstants;
15 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
16 import org.opendaylight.yangtools.yang.binding.Identifiable;
17 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import java.util.ArrayList;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadFactory;
28
29 public class DependencyQueue {
30
31     private static final Logger LOG = LoggerFactory.getLogger(DependencyQueue.class);
32     private static final ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("hwvtep-waiting-job-%d").
33             build();
34     private static final ExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFact);
35
36     private final LinkedBlockingQueue<DependentJob> configWaitQueue = new LinkedBlockingQueue<>(
37             HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
38     private final LinkedBlockingQueue<DependentJob> opWaitQueue = new LinkedBlockingQueue<>(
39             HwvtepSouthboundConstants.WAITING_QUEUE_CAPACITY);
40     private final HwvtepDeviceInfo deviceInfo;
41
42     public DependencyQueue(HwvtepDeviceInfo hwvtepDeviceInfo) {
43         this.deviceInfo = hwvtepDeviceInfo;
44     }
45
46     /**
47      * Tries to add the job to the waiting queue
48      * @param waitingJob The job to be enqueued
49      * @return true if it is successfully added to the queue
50      */
51     public boolean addToQueue(DependentJob waitingJob) {
52         boolean addedToQueue;
53         if (waitingJob instanceof DependentJob.ConfigWaitingJob) {
54             addedToQueue = configWaitQueue.offer(waitingJob);
55         } else {
56             addedToQueue = opWaitQueue.offer(waitingJob);
57         }
58         if (addedToQueue) {
59             LOG.debug("Added the waiting job {} to queue", waitingJob.getKey());
60         } else {
61             LOG.error("Failed to add the waiting job to queue {}", waitingJob.getKey());
62         }
63         return addedToQueue;
64     }
65
66     /**
67      * Checks if any config data dependent jobs are ready to be processed and process them
68      * @param connectionInstance The connection instance
69      */
70     public void processReadyJobsFromConfigQueue(HwvtepConnectionInstance connectionInstance) {
71         processReadyJobs(connectionInstance, configWaitQueue);
72     }
73
74     /**
75      * Checks if any operational data dependent jobs are ready to be processed and process them
76      * @param connectionInstance The connection instance
77      */
78     public void processReadyJobsFromOpQueue(HwvtepConnectionInstance connectionInstance) {
79         processReadyJobs(connectionInstance, opWaitQueue);
80     }
81
82     private void processReadyJobs(final HwvtepConnectionInstance hwvtepConnectionInstance,
83                                   LinkedBlockingQueue<DependentJob> queue) {
84         final List<DependentJob> readyJobs =  getReadyJobs(queue);
85         if (readyJobs.size() > 0) {
86             executorService.submit(new Runnable() {
87                 @Override
88                 public void run() {
89                     hwvtepConnectionInstance.transact(new TransactCommand() {
90                         @Override
91                         public void execute(TransactionBuilder transactionBuilder) {
92                             HwvtepOperationalState operationalState = new HwvtepOperationalState(hwvtepConnectionInstance);
93                             for (DependentJob job : readyJobs) {
94                                 job.onDependencyResolved(operationalState, transactionBuilder);
95                             }
96                         }
97
98                         @Override
99                         public void onConfigUpdate(TransactionBuilder transaction, InstanceIdentifier nodeIid,
100                                                    Identifiable data, InstanceIdentifier key, Object... extraData) {
101                         }
102
103                         @Override
104                         public void doDeviceTransaction(TransactionBuilder transaction, InstanceIdentifier nodeIid,
105                                                         Identifiable data, InstanceIdentifier key, Object... extraData) {
106                         }
107                     });
108                 }
109             });
110         }
111     }
112
113     private List<DependentJob> getReadyJobs(LinkedBlockingQueue<DependentJob> queue) {
114         List<DependentJob> readyJobs = new ArrayList<>();
115         Iterator<DependentJob> jobIterator = queue.iterator();
116         while(jobIterator.hasNext()) {
117             DependentJob job = jobIterator.next();
118             long currentTime = System.currentTimeMillis();
119             if (job.isExpired(currentTime)) {
120                 jobIterator.remove();
121                 continue;
122             }
123             if (job.areDependenciesMet(deviceInfo)) {
124                 jobIterator.remove();
125                 readyJobs.add(job);
126             }
127         }
128         return readyJobs;
129     }
130
131     public static void close() {
132         executorService.shutdown();
133     }
134
135     public void submit(Runnable runnable) {
136         executorService.submit(runnable);
137     }
138 }