Bug 8055: use lambdas instead of anonymous classes
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / reconciliation / ReconciliationManager.java
1 /*
2  * Copyright © 2016, 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation;
9
10 import com.google.common.util.concurrent.ThreadFactoryBuilder;
11 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
12 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15
16 import java.util.concurrent.*;
17
18 /**
19  * Copied from org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationManager
20  *
21  * This class provides the implementation of ovsdb southbound plugins
22  * configuration reconciliation engine. This engine provide interfaces
23  * to enqueue (one time retry)/ enqueueForRetry(periodic retry)/ dequeue
24  * (remove from retry queue) reconciliation task. Reconciliation task can
25  * be a connection reconciliation or configuration reconciliation of any
26  * ovsdb managed resource like bridge, termination point etc. This engine
27  * execute all the reconciliation task through a fixed size thread pool.
28  * If submitted task need to be retry after a periodic interval they are
29  * submitted to a single thread executor to periodically wake up and check
30  * if task is ready for execution.
31  * Ideally, addition of any type of reconciliation task should not require
32  * any change in this reconciliation manager execution engine.
33  *
34  * 3-Node Cluster:
35  * Reconciliation manager is agnostic of whether it's running in single
36  * node cluster or 3-node cluster. It's a responsibility of the task
37  * submitter to make sure that it submit the task for reconciliation only
38  * if it's an owner of that device EXCEPT controller initiated Connection.
39  * Reconciliation of controller initiated connection should be done by all
40  * the 3-nodes in the cluster, because connection to individual controller
41  * can be interrupted for various reason.
42  *
43  * Created by Anil Vishnoi (avishnoi@Brocade.com) on 3/9/16.
44  */
45 public class ReconciliationManager implements AutoCloseable {
46     private static final Logger LOG = LoggerFactory.getLogger(ReconciliationManager.class);
47
48     private static final int NO_OF_RECONCILER = 10;
49     private static final int RECON_TASK_QUEUE_SIZE = 5000;
50
51     private final DataBroker db;
52     private final ExecutorService reconcilers;
53     private final ScheduledExecutorService taskTriager;
54
55     private final ReconciliationTaskManager reconTaskManager = new ReconciliationTaskManager();
56
57     public ReconciliationManager(final DataBroker db) {
58         this.db = db;
59         reconcilers = SpecialExecutors.newBoundedCachedThreadPool(NO_OF_RECONCILER, RECON_TASK_QUEUE_SIZE, "ovsdb-reconciler");
60
61         ThreadFactory threadFact = new ThreadFactoryBuilder()
62                 .setNameFormat("ovsdb-recon-task-triager-%d").build();
63         taskTriager = Executors.newSingleThreadScheduledExecutor(threadFact);
64     }
65
66     public boolean isEnqueued(final ReconciliationTask task) {
67         return reconTaskManager.isTaskQueued(task);
68     }
69
70     public void enqueue(final ReconciliationTask task) {
71         LOG.trace("Reconciliation task submitted for execution {}",task);
72         reconTaskManager.cacheTask(task, reconcilers.submit(task));
73     }
74
75     public void enqueueForRetry(final ReconciliationTask task) {
76         LOG.trace("Reconciliation task re-queued for re-execution {}",task);
77         reconTaskManager.cacheTask(task, taskTriager.schedule(
78                 () -> task.checkReadinessAndProcess(), task.retryDelayInMills(), TimeUnit.MILLISECONDS
79             )
80         );
81     }
82
83     public void dequeue(final ReconciliationTask task) {
84         reconTaskManager.cancelTask(task);
85     }
86
87     public DataBroker getDb() {
88         return db;
89     }
90
91     @Override
92     public void close() throws Exception {
93         if (this.reconcilers != null) {
94             this.reconcilers.shutdownNow();
95         }
96
97         if (this.taskTriager != null) {
98             this.taskTriager.shutdownNow();
99         }
100     }
101 }