2 * Copyright © 2016, 2017 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.ovsdb.southbound.reconciliation;
10 import com.google.common.base.Preconditions;
11 import com.google.common.cache.CacheBuilder;
12 import com.google.common.cache.CacheLoader;
13 import com.google.common.cache.LoadingCache;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import com.google.common.util.concurrent.UncheckedExecutionException;
16 import java.util.Collection;
17 import java.util.List;
19 import java.util.NoSuchElementException;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
26 import javax.annotation.Nonnull;
27 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
28 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
29 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
30 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.ovsdb.southbound.InstanceIdentifierCodec;
33 import org.opendaylight.ovsdb.southbound.OvsdbConnectionInstance;
34 import org.opendaylight.ovsdb.southbound.OvsdbConnectionManager;
35 import org.opendaylight.ovsdb.southbound.SouthboundMapper;
36 import org.opendaylight.ovsdb.southbound.ovsdb.transact.TransactUtils;
37 import org.opendaylight.ovsdb.southbound.reconciliation.configuration.TerminationPointConfigReconciliationTask;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.ovsdb.rev150105.OvsdbBridgeAugmentation;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
41 import org.opendaylight.yangtools.concepts.ListenerRegistration;
42 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
43 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
48 * This class provides the implementation of ovsdb southbound plugins
49 * configuration reconciliation engine. This engine provide interfaces
50 * to enqueue (one time retry)/ enqueueForRetry(periodic retry)/ dequeue
51 * (remove from retry queue) reconciliation task. Reconciliation task can
52 * be a connection reconciliation or configuration reconciliation of any
53 * ovsdb managed resource like bridge, termination point etc. This engine
54 * execute all the reconciliation task through a fixed size thread pool.
55 * If submitted task need to be retry after a periodic interval they are
56 * submitted to a single thread executor to periodically wake up and check
57 * if task is ready for execution.
58 * Ideally, addition of any type of reconciliation task should not require
59 * any change in this reconciliation manager execution engine.
62 * Reconciliation manager is agnostic of whether it's running in single
63 * node cluster or 3-node cluster. It's a responsibility of the task
64 * submitter to make sure that it submit the task for reconciliation only
65 * if it's an owner of that device EXCEPT controller initiated Connection.
66 * Reconciliation of controller initiated connection should be done by all
67 * the 3-nodes in the cluster, because connection to individual controller
68 * can be interrupted for various reason.
71 public class ReconciliationManager implements AutoCloseable {
72 private static final Logger LOG = LoggerFactory.getLogger(ReconciliationManager.class);
74 private static final int NO_OF_RECONCILER = 10;
75 private static final int RECON_TASK_QUEUE_SIZE = 5000;
76 private static final long BRIDGE_CACHE_TIMEOUT_IN_SECONDS = 30;
78 private final DataBroker db;
79 private final InstanceIdentifierCodec instanceIdentifierCodec;
80 private final ExecutorService reconcilers;
81 private final ScheduledExecutorService taskTriager;
83 // Timeout cache contains the list of bridges to be reconciled for termination points
84 private LoadingCache<NodeKey, NodeConnectionMetadata> bridgeNodeCache = null;
86 // Listens for new bridge creations in the operational DS
87 private ListenerRegistration<BridgeCreatedDataTreeChangeListener> bridgeCreatedDataTreeChangeRegistration = null;
89 private final ReconciliationTaskManager reconTaskManager = new ReconciliationTaskManager();
91 public ReconciliationManager(final DataBroker db, final InstanceIdentifierCodec instanceIdentifierCodec) {
93 this.instanceIdentifierCodec = instanceIdentifierCodec;
94 reconcilers = SpecialExecutors.newBoundedCachedThreadPool(NO_OF_RECONCILER, RECON_TASK_QUEUE_SIZE,
95 "ovsdb-reconciler", getClass());
97 ThreadFactory threadFact = new ThreadFactoryBuilder()
98 .setNameFormat("ovsdb-recon-task-triager-%d").build();
99 taskTriager = Executors.newSingleThreadScheduledExecutor(threadFact);
101 bridgeNodeCache = buildBridgeNodeCache();
104 public boolean isEnqueued(final ReconciliationTask task) {
105 return reconTaskManager.isTaskQueued(task);
108 public void enqueue(final ReconciliationTask task) {
109 LOG.trace("Reconciliation task submitted for execution {}",task);
110 reconTaskManager.cacheTask(task, reconcilers.submit(task));
113 public void enqueueForRetry(final ReconciliationTask task) {
114 LOG.trace("Reconciliation task re-queued for re-execution {}",task);
115 reconTaskManager.cacheTask(task, taskTriager.schedule(
116 task::checkReadinessAndProcess, task.retryDelayInMills(), TimeUnit.MILLISECONDS
121 public void dequeue(final ReconciliationTask task) {
122 reconTaskManager.cancelTask(task);
125 public DataBroker getDb() {
130 public void close() throws Exception {
131 if (this.reconcilers != null) {
132 this.reconcilers.shutdownNow();
135 if (this.taskTriager != null) {
136 this.taskTriager.shutdownNow();
141 * This method reconciles Termination Point configurations for the given list of bridge nodes.
143 * @param connectionManager OvsdbConnectionManager object
144 * @param connectionInstance OvsdbConnectionInstance object
145 * @param bridgeNodes list of bridge nodes be reconciled for termination points
147 public void reconcileTerminationPoints(final OvsdbConnectionManager connectionManager,
148 final OvsdbConnectionInstance connectionInstance,
149 final List<Node> bridgeNodes) {
150 LOG.debug("Reconcile Termination Point Configuration for Bridges {}", bridgeNodes);
151 Preconditions.checkNotNull(bridgeNodes, "Bridge Node list must not be null");
152 if (!bridgeNodes.isEmpty()) {
153 for (Node node : bridgeNodes) {
154 bridgeNodeCache.put(node.getKey(),
155 new NodeConnectionMetadata(node, connectionManager, connectionInstance));
157 registerBridgeCreatedDataTreeChangeListener();
161 public void cancelTerminationPointReconciliation() {
162 cleanupBridgeCreatedDataTreeChangeRegistration();
163 for (NodeConnectionMetadata nodeConnectionMetadata : bridgeNodeCache.asMap().values()) {
164 if (nodeConnectionMetadata.getNodeIid() != null) {
165 dequeue(new TerminationPointConfigReconciliationTask(
167 nodeConnectionMetadata.getConnectionManager(),
168 nodeConnectionMetadata.getNode(),
169 nodeConnectionMetadata.getNodeIid(),
170 nodeConnectionMetadata.getConnectionInstance(),
171 instanceIdentifierCodec
175 bridgeNodeCache.invalidateAll();
178 private synchronized void registerBridgeCreatedDataTreeChangeListener() {
179 if (bridgeCreatedDataTreeChangeRegistration == null) {
180 BridgeCreatedDataTreeChangeListener bridgeCreatedDataTreeChangeListener =
181 new BridgeCreatedDataTreeChangeListener();
182 InstanceIdentifier<Node> path = SouthboundMapper.createTopologyInstanceIdentifier()
184 DataTreeIdentifier<Node> dataTreeIdentifier =
185 new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, path);
187 bridgeCreatedDataTreeChangeRegistration = db.registerDataTreeChangeListener(dataTreeIdentifier,
188 bridgeCreatedDataTreeChangeListener);
192 private LoadingCache<NodeKey, NodeConnectionMetadata> buildBridgeNodeCache() {
193 return CacheBuilder.newBuilder()
194 .expireAfterWrite(BRIDGE_CACHE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
195 .build(new CacheLoader<NodeKey, NodeConnectionMetadata>() {
197 public NodeConnectionMetadata load(NodeKey nodeKey) throws Exception {
198 // the termination points are explicitly added to the cache, retrieving bridges that are not in
199 // the cache results in NoSuchElementException
200 throw new NoSuchElementException();
206 * This class listens for bridge creations in the operational data store.
207 * If the newly created bridge is in the 'bridgeNodeCache', termination point reconciliation for the bridge
208 * is triggered and the bridge entry is removed from the cache.
209 * Once cache is empty, either being removed explicitly or expired, the the listener de-registered.
211 class BridgeCreatedDataTreeChangeListener implements ClusteredDataTreeChangeListener<Node> {
213 public void onDataTreeChanged(@Nonnull Collection<DataTreeModification<Node>> changes) {
214 bridgeNodeCache.cleanUp();
215 if (!bridgeNodeCache.asMap().isEmpty()) {
216 Map<InstanceIdentifier<OvsdbBridgeAugmentation>, OvsdbBridgeAugmentation> nodes =
217 TransactUtils.extractCreated(changes, OvsdbBridgeAugmentation.class);
218 for (Map.Entry<InstanceIdentifier<OvsdbBridgeAugmentation>, OvsdbBridgeAugmentation> entry :
220 InstanceIdentifier<?> bridgeIid = entry.getKey();
221 NodeKey nodeKey = bridgeIid.firstKeyOf(Node.class);
223 NodeConnectionMetadata bridgeNodeMetaData = bridgeNodeCache.get(nodeKey);
224 bridgeNodeMetaData.setNodeIid(bridgeIid);
225 TerminationPointConfigReconciliationTask tpReconciliationTask =
226 new TerminationPointConfigReconciliationTask(ReconciliationManager.this,
227 bridgeNodeMetaData.getConnectionManager(),
228 bridgeNodeMetaData.getNode(),
230 bridgeNodeMetaData.getConnectionInstance(),
231 instanceIdentifierCodec);
232 enqueue(tpReconciliationTask);
233 bridgeNodeCache.invalidate(nodeKey);
234 } catch (UncheckedExecutionException ex) {
235 // Ignore NoSuchElementException which indicates bridge node is not in the list of
236 // pending reconciliation
237 if (!(ex.getCause() instanceof NoSuchElementException)) {
238 LOG.error("Error getting Termination Point node from LoadingCache", ex);
241 } catch (ExecutionException ex) {
242 LOG.error("Error getting Termination Point node from LoadingCache", ex);
244 if (bridgeNodeCache.asMap().isEmpty()) {
245 LOG.debug("De-registering for bridge creation event");
246 cleanupBridgeCreatedDataTreeChangeRegistration();
250 LOG.debug("Cache expired - De-registering for bridge creation event");
251 cleanupBridgeCreatedDataTreeChangeRegistration();
256 private void cleanupBridgeCreatedDataTreeChangeRegistration() {
257 if (bridgeCreatedDataTreeChangeRegistration != null) {
258 bridgeCreatedDataTreeChangeRegistration.close();
259 bridgeCreatedDataTreeChangeRegistration = null;
263 private static class NodeConnectionMetadata {
264 private final Node node;
265 private InstanceIdentifier<?> nodeIid;
266 private final OvsdbConnectionManager connectionManager;
267 private final OvsdbConnectionInstance connectionInstance;
269 NodeConnectionMetadata(Node node,
270 OvsdbConnectionManager connectionManager,
271 OvsdbConnectionInstance connectionInstance) {
273 this.connectionManager = connectionManager;
274 this.connectionInstance = connectionInstance;
277 public Node getNode() {
281 public OvsdbConnectionManager getConnectionManager() {
282 return connectionManager;
285 public OvsdbConnectionInstance getConnectionInstance() {
286 return connectionInstance;
289 public void setNodeIid(InstanceIdentifier<?> nodeIid) {
290 this.nodeIid = nodeIid;
293 public InstanceIdentifier<?> getNodeIid() {