731335f12a153b760f75f55313285192d67bcd0e
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / reconciliation / ReconciliationManager.java
1 /*
2  * Copyright © 2016, 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.hwvtepsouthbound.reconciliation;
9
10 import com.google.common.util.concurrent.ThreadFactoryBuilder;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.ScheduledExecutorService;
14 import java.util.concurrent.ThreadFactory;
15 import java.util.concurrent.TimeUnit;
16 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
17 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 /**
22  * Copied from org.opendaylight.ovsdb.southbound.reconciliation.ReconciliationManager.
23  *
24  * <p>
25  * This class provides the implementation of ovsdb southbound plugins
26  * configuration reconciliation engine. This engine provide interfaces
27  * to enqueue (one time retry)/ enqueueForRetry(periodic retry)/ dequeue
28  * (remove from retry queue) reconciliation task. Reconciliation task can
29  * be a connection reconciliation or configuration reconciliation of any
30  * ovsdb managed resource like bridge, termination point etc. This engine
31  * execute all the reconciliation task through a fixed size thread pool.
32  * If submitted task need to be retry after a periodic interval they are
33  * submitted to a single thread executor to periodically wake up and check
34  * if task is ready for execution.
35  * Ideally, addition of any type of reconciliation task should not require
36  * any change in this reconciliation manager execution engine.
37  *
38  * <p>
39  * 3-Node Cluster:
40  * Reconciliation manager is agnostic of whether it's running in single
41  * node cluster or 3-node cluster. It's a responsibility of the task
42  * submitter to make sure that it submit the task for reconciliation only
43  * if it's an owner of that device EXCEPT controller initiated Connection.
44  * Reconciliation of controller initiated connection should be done by all
45  * the 3-nodes in the cluster, because connection to individual controller
46  * can be interrupted for various reason.
47  *
48  * <p>
49  * Created by Anil Vishnoi (avishnoi@Brocade.com) on 3/9/16.
50  */
51 public class ReconciliationManager implements AutoCloseable {
52     private static final Logger LOG = LoggerFactory.getLogger(ReconciliationManager.class);
53
54     private static final int NO_OF_RECONCILER = 10;
55     private static final int RECON_TASK_QUEUE_SIZE = 5000;
56
57     private final DataBroker db;
58     private final ExecutorService reconcilers;
59     private final ScheduledExecutorService taskTriager;
60
61     private final ReconciliationTaskManager reconTaskManager = new ReconciliationTaskManager();
62
63     public ReconciliationManager(final DataBroker db) {
64         this.db = db;
65         reconcilers =
66                 SpecialExecutors.newBoundedCachedThreadPool(NO_OF_RECONCILER, RECON_TASK_QUEUE_SIZE, "ovsdb-reconciler",
67                         getClass());
68
69         ThreadFactory threadFact = new ThreadFactoryBuilder()
70                 .setNameFormat("ovsdb-recon-task-triager-%d").build();
71         taskTriager = Executors.newSingleThreadScheduledExecutor(threadFact);
72     }
73
74     public boolean isEnqueued(final ReconciliationTask task) {
75         return reconTaskManager.isTaskQueued(task);
76     }
77
78     public void enqueue(final ReconciliationTask task) {
79         LOG.trace("Reconciliation task submitted for execution {}",task);
80         reconTaskManager.cacheTask(task, reconcilers.submit(task));
81     }
82
83     public void enqueueForRetry(final ReconciliationTask task) {
84         LOG.trace("Reconciliation task re-queued for re-execution {}",task);
85         reconTaskManager.cacheTask(task, taskTriager.schedule(
86                 task::checkReadinessAndProcess, task.retryDelayInMills(), TimeUnit.MILLISECONDS
87             )
88         );
89     }
90
91     public void dequeue(final ReconciliationTask task) {
92         reconTaskManager.cancelTask(task);
93     }
94
95     public DataBroker getDb() {
96         return db;
97     }
98
99     @Override
100     public void close() throws Exception {
101         if (this.reconcilers != null) {
102             this.reconcilers.shutdownNow();
103         }
104
105         if (this.taskTriager != null) {
106             this.taskTriager.shutdownNow();
107         }
108     }
109 }