TSC-101: Fixup Augmentable and Identifiable methods change
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / reconciliation / ReconciliationManager.java
1 /*
2  * Copyright © 2016, 2017 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.ovsdb.southbound.reconciliation;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.cache.CacheBuilder;
12 import com.google.common.cache.CacheLoader;
13 import com.google.common.cache.LoadingCache;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import com.google.common.util.concurrent.UncheckedExecutionException;
16 import java.util.Collection;
17 import java.util.List;
18 import java.util.Map;
19 import java.util.NoSuchElementException;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
30 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.ovsdb.southbound.InstanceIdentifierCodec;
33 import org.opendaylight.ovsdb.southbound.OvsdbConnectionInstance;
34 import org.opendaylight.ovsdb.southbound.OvsdbConnectionManager;
35 import org.opendaylight.ovsdb.southbound.SouthboundMapper;
36 import org.opendaylight.ovsdb.southbound.ovsdb.transact.TransactUtils;
37 import org.opendaylight.ovsdb.southbound.reconciliation.configuration.TerminationPointConfigReconciliationTask;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAugmentation;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
41 import org.opendaylight.yangtools.concepts.ListenerRegistration;
42 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
43 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * This class provides the implementation of ovsdb southbound plugins
49  * configuration reconciliation engine. This engine provide interfaces
50  * to enqueue (one time retry)/ enqueueForRetry(periodic retry)/ dequeue
51  * (remove from retry queue) reconciliation task. Reconciliation task can
52  * be a connection reconciliation or configuration reconciliation of any
53  * ovsdb managed resource like bridge, termination point etc. This engine
54  * execute all the reconciliation task through a fixed size thread pool.
55  * If submitted task need to be retry after a periodic interval they are
56  * submitted to a single thread executor to periodically wake up and check
57  * if task is ready for execution.
58  * Ideally, addition of any type of reconciliation task should not require
59  * any change in this reconciliation manager execution engine.
60  * <p>
61  * 3-Node Cluster:
62  * Reconciliation manager is agnostic of whether it's running in single
63  * node cluster or 3-node cluster. It's a responsibility of the task
64  * submitter to make sure that it submit the task for reconciliation only
65  * if it's an owner of that device EXCEPT controller initiated Connection.
66  * Reconciliation of controller initiated connection should be done by all
67  * the 3-nodes in the cluster, because connection to individual controller
68  * can be interrupted for various reason.
69  * </p>
70  */
71 public class ReconciliationManager implements AutoCloseable {
72     private static final Logger LOG = LoggerFactory.getLogger(ReconciliationManager.class);
73
74     private static final int NO_OF_RECONCILER = 10;
75     private static final int RECON_TASK_QUEUE_SIZE = 5000;
76     private static final long BRIDGE_CACHE_TIMEOUT_IN_SECONDS = 30;
77
78     private final DataBroker db;
79     private final InstanceIdentifierCodec instanceIdentifierCodec;
80     private final ExecutorService reconcilers;
81     private final ScheduledExecutorService taskTriager;
82
83     // Timeout cache contains the list of bridges to be reconciled for termination points
84     private LoadingCache<NodeKey, NodeConnectionMetadata> bridgeNodeCache = null;
85
86     // Listens for new bridge creations in the operational DS
87     private ListenerRegistration<BridgeCreatedDataTreeChangeListener> bridgeCreatedDataTreeChangeRegistration = null;
88
89     private final ReconciliationTaskManager reconTaskManager = new ReconciliationTaskManager();
90
91     public ReconciliationManager(final DataBroker db, final InstanceIdentifierCodec instanceIdentifierCodec) {
92         this.db = db;
93         this.instanceIdentifierCodec = instanceIdentifierCodec;
94         reconcilers = SpecialExecutors.newBoundedCachedThreadPool(NO_OF_RECONCILER, RECON_TASK_QUEUE_SIZE,
95                 "ovsdb-reconciler", getClass());
96
97         ThreadFactory threadFact = new ThreadFactoryBuilder()
98                 .setNameFormat("ovsdb-recon-task-triager-%d").build();
99         taskTriager = Executors.newSingleThreadScheduledExecutor(threadFact);
100
101         bridgeNodeCache = buildBridgeNodeCache();
102     }
103
104     public boolean isEnqueued(final ReconciliationTask task) {
105         return reconTaskManager.isTaskQueued(task);
106     }
107
108     public void enqueue(final ReconciliationTask task) {
109         LOG.trace("Reconciliation task submitted for execution {}",task);
110         reconTaskManager.cacheTask(task, reconcilers.submit(task));
111     }
112
113     public void enqueueForRetry(final ReconciliationTask task) {
114         LOG.trace("Reconciliation task re-queued for re-execution {}",task);
115         reconTaskManager.cacheTask(task, taskTriager.schedule(
116                 task::checkReadinessAndProcess, task.retryDelayInMills(), TimeUnit.MILLISECONDS
117             )
118         );
119     }
120
121     public void dequeue(final ReconciliationTask task) {
122         reconTaskManager.cancelTask(task);
123     }
124
125     public DataBroker getDb() {
126         return db;
127     }
128
129     @Override
130     public void close() throws Exception {
131         if (this.reconcilers != null) {
132             this.reconcilers.shutdownNow();
133         }
134
135         if (this.taskTriager != null) {
136             this.taskTriager.shutdownNow();
137         }
138     }
139
140     /**
141      * This method reconciles Termination Point configurations for the given list of bridge nodes.
142      *
143      * @param connectionManager OvsdbConnectionManager object
144      * @param connectionInstance OvsdbConnectionInstance object
145      * @param bridgeNodes list of bridge nodes be reconciled for termination points
146      */
147     public void reconcileTerminationPoints(final OvsdbConnectionManager connectionManager,
148                                            final OvsdbConnectionInstance connectionInstance,
149                                            final List<Node> bridgeNodes) {
150         LOG.debug("Reconcile Termination Point Configuration for Bridges {}", bridgeNodes);
151         Preconditions.checkNotNull(bridgeNodes, "Bridge Node list must not be null");
152         if (!bridgeNodes.isEmpty()) {
153             for (Node node : bridgeNodes) {
154                 bridgeNodeCache.put(node.key(),
155                         new NodeConnectionMetadata(node, connectionManager, connectionInstance));
156             }
157             registerBridgeCreatedDataTreeChangeListener();
158         }
159     }
160
161     public void cancelTerminationPointReconciliation() {
162         cleanupBridgeCreatedDataTreeChangeRegistration();
163         for (NodeConnectionMetadata nodeConnectionMetadata : bridgeNodeCache.asMap().values()) {
164             if (nodeConnectionMetadata.getNodeIid() != null) {
165                 dequeue(new TerminationPointConfigReconciliationTask(
166                         this,
167                         nodeConnectionMetadata.getConnectionManager(),
168                         nodeConnectionMetadata.getNode(),
169                         nodeConnectionMetadata.getNodeIid(),
170                         nodeConnectionMetadata.getConnectionInstance(),
171                         instanceIdentifierCodec
172                 ));
173             }
174         }
175         bridgeNodeCache.invalidateAll();
176     }
177
178     private synchronized void registerBridgeCreatedDataTreeChangeListener() {
179         if (bridgeCreatedDataTreeChangeRegistration == null) {
180             BridgeCreatedDataTreeChangeListener bridgeCreatedDataTreeChangeListener =
181                     new BridgeCreatedDataTreeChangeListener();
182             InstanceIdentifier<Node> path = SouthboundMapper.createTopologyInstanceIdentifier()
183                     .child(Node.class);
184             DataTreeIdentifier<Node> dataTreeIdentifier =
185                     new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, path);
186
187             bridgeCreatedDataTreeChangeRegistration = db.registerDataTreeChangeListener(dataTreeIdentifier,
188                     bridgeCreatedDataTreeChangeListener);
189         }
190     }
191
192     private LoadingCache<NodeKey, NodeConnectionMetadata> buildBridgeNodeCache() {
193         return CacheBuilder.newBuilder()
194                 .expireAfterWrite(BRIDGE_CACHE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
195                 .build(new CacheLoader<NodeKey, NodeConnectionMetadata>() {
196                     @Override
197                     public NodeConnectionMetadata load(NodeKey nodeKey) throws Exception {
198                         // the termination points are explicitly added to the cache, retrieving bridges that are not in
199                         // the cache results in NoSuchElementException
200                         throw new NoSuchElementException();
201                     }
202                 });
203     }
204
205     /**
206      * This class listens for bridge creations in the operational data store.
207      * If the newly created bridge is in the 'bridgeNodeCache', termination point reconciliation for the bridge
208      * is triggered and the bridge entry is removed from the cache.
209      * Once cache is empty, either being removed explicitly or expired, the the listener de-registered.
210      */
211     class BridgeCreatedDataTreeChangeListener implements ClusteredDataTreeChangeListener<Node> {
212         @Override
213         public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Node>> changes) {
214             bridgeNodeCache.cleanUp();
215             if (!bridgeNodeCache.asMap().isEmpty()) {
216                 Map<InstanceIdentifier<OvsdbBridgeAugmentation>, OvsdbBridgeAugmentation> nodes =
217                         TransactUtils.extractCreated(changes, OvsdbBridgeAugmentation.class);
218                 for (Map.Entry<InstanceIdentifier<OvsdbBridgeAugmentation>, OvsdbBridgeAugmentation> entry :
219                         nodes.entrySet()) {
220                     InstanceIdentifier<?> bridgeIid = entry.getKey();
221                     NodeKey nodeKey = bridgeIid.firstKeyOf(Node.class);
222                     try {
223                         NodeConnectionMetadata bridgeNodeMetaData = bridgeNodeCache.get(nodeKey);
224                         bridgeNodeMetaData.setNodeIid(bridgeIid);
225                         TerminationPointConfigReconciliationTask tpReconciliationTask =
226                                 new TerminationPointConfigReconciliationTask(ReconciliationManager.this,
227                                         bridgeNodeMetaData.getConnectionManager(),
228                                         bridgeNodeMetaData.getNode(),
229                                         bridgeIid,
230                                         bridgeNodeMetaData.getConnectionInstance(),
231                                         instanceIdentifierCodec);
232                         enqueue(tpReconciliationTask);
233                         bridgeNodeCache.invalidate(nodeKey);
234                     } catch (UncheckedExecutionException ex) {
235                         // Ignore NoSuchElementException which indicates bridge node is not in the list of
236                         // pending reconciliation
237                         if (!(ex.getCause() instanceof NoSuchElementException)) {
238                             LOG.error("Error getting Termination Point node from LoadingCache", ex);
239                         }
240
241                     } catch (ExecutionException ex) {
242                         LOG.error("Error getting Termination Point node from LoadingCache", ex);
243                     }
244                     if (bridgeNodeCache.asMap().isEmpty()) {
245                         LOG.debug("De-registering for bridge creation event");
246                         cleanupBridgeCreatedDataTreeChangeRegistration();
247                     }
248                 }
249             } else {
250                 LOG.debug("Cache expired - De-registering for bridge creation event");
251                 cleanupBridgeCreatedDataTreeChangeRegistration();
252             }
253         }
254     }
255
256     private void cleanupBridgeCreatedDataTreeChangeRegistration() {
257         if (bridgeCreatedDataTreeChangeRegistration != null) {
258             bridgeCreatedDataTreeChangeRegistration.close();
259             bridgeCreatedDataTreeChangeRegistration = null;
260         }
261     }
262
263     private static class NodeConnectionMetadata {
264         private final Node node;
265         private InstanceIdentifier<?> nodeIid;
266         private final OvsdbConnectionManager connectionManager;
267         private final OvsdbConnectionInstance connectionInstance;
268
269         NodeConnectionMetadata(Node node,
270                                OvsdbConnectionManager connectionManager,
271                                OvsdbConnectionInstance connectionInstance) {
272             this.node = node;
273             this.connectionManager = connectionManager;
274             this.connectionInstance = connectionInstance;
275         }
276
277         public Node getNode() {
278             return node;
279         }
280
281         public OvsdbConnectionManager getConnectionManager() {
282             return connectionManager;
283         }
284
285         public OvsdbConnectionInstance getConnectionInstance() {
286             return connectionInstance;
287         }
288
289         public void setNodeIid(InstanceIdentifier<?> nodeIid) {
290             this.nodeIid = nodeIid;
291         }
292
293         public InstanceIdentifier<?> getNodeIid() {
294             return nodeIid;
295         }
296     }
297 }