2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.bgpcep.bgp.topology.provider;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.List;
19 import java.util.concurrent.atomic.AtomicBoolean;
20 import org.checkerframework.checker.lock.qual.GuardedBy;
21 import org.opendaylight.bgpcep.topology.TopologyReference;
22 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
23 import org.opendaylight.mdsal.binding.api.DataBroker;
24 import org.opendaylight.mdsal.binding.api.DataObjectModification;
25 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
26 import org.opendaylight.mdsal.binding.api.DataTreeModification;
27 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
28 import org.opendaylight.mdsal.binding.api.TransactionChain;
29 import org.opendaylight.mdsal.binding.api.WriteTransaction;
30 import org.opendaylight.mdsal.common.api.CommitInfo;
31 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
32 import org.opendaylight.protocol.bgp.rib.RibReference;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.Route;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.rib.LocRib;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.Tables;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.TablesKey;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.AddressFamily;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.SubsequentAddressFamily;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
45 import org.opendaylight.yangtools.concepts.Registration;
46 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
47 import org.opendaylight.yangtools.yang.common.Empty;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 public abstract class AbstractTopologyBuilder<T extends Route>
52 implements ClusteredDataTreeChangeListener<T>, TopologyReference, FutureCallback<Empty> {
53 private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologyBuilder.class);
54 // we limit the listener reset interval to be 5 min at most
55 private static final long LISTENER_RESET_LIMIT_IN_MILLSEC = 5 * 60 * 1000;
56 private static final int LISTENER_RESET_ENFORCE_COUNTER = 3;
57 private final InstanceIdentifier<Topology> topology;
58 private final RibReference locRibReference;
59 private final DataBroker dataProvider;
60 private final AddressFamily afi;
61 private final SubsequentAddressFamily safi;
62 private final TopologyKey topologyKey;
63 private final TopologyTypes topologyTypes;
64 private final long listenerResetLimitInMillsec;
65 private final int listenerResetEnforceCounter;
68 private Registration listenerRegistration = null;
70 private TransactionChain chain = null;
71 private final AtomicBoolean closed = new AtomicBoolean(false);
74 protected long listenerScheduledRestartTime = 0;
77 protected int listenerScheduledRestartEnforceCounter = 0;
78 protected boolean networkTopologyTransaction = true;
80 protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
81 final TopologyId topologyId, final TopologyTypes types, final AddressFamily afi,
82 final SubsequentAddressFamily safi, final long listenerResetLimitInMillsec,
83 final int listenerResetEnforceCounter) {
84 this.dataProvider = dataProvider;
85 this.locRibReference = requireNonNull(locRibReference);
86 topologyKey = new TopologyKey(requireNonNull(topologyId));
87 topologyTypes = types;
90 this.listenerResetLimitInMillsec = listenerResetLimitInMillsec;
91 this.listenerResetEnforceCounter = listenerResetEnforceCounter;
92 topology = InstanceIdentifier.builder(NetworkTopology.class)
93 .child(Topology.class, topologyKey)
97 protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
98 final TopologyId topologyId, final TopologyTypes types, final AddressFamily afi,
99 final SubsequentAddressFamily safi) {
100 this(dataProvider, locRibReference, topologyId, types, afi, safi, LISTENER_RESET_LIMIT_IN_MILLSEC,
101 LISTENER_RESET_ENFORCE_COUNTER);
104 public final synchronized void start() {
105 LOG.debug("Initiating topology builder from {} at {}. AFI={}, SAFI={}", locRibReference, topology,
107 initTransactionChain();
108 initOperationalTopology();
109 registerDataChangeListener();
113 * Register to data tree change listener.
115 private synchronized void registerDataChangeListener() {
116 Preconditions.checkState(listenerRegistration == null,
117 "Topology Listener on topology %s has been registered before.",
118 this.getInstanceIdentifier());
119 final InstanceIdentifier<Tables> tablesId = locRibReference.getInstanceIdentifier()
120 .child(LocRib.class).child(Tables.class, new TablesKey(afi, safi));
121 final DataTreeIdentifier<T> id = DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
122 getRouteWildcard(tablesId));
124 listenerRegistration = dataProvider.registerDataTreeChangeListener(id, this);
125 LOG.debug("Registered listener {} on topology {}. Timestamp={}", this, this.getInstanceIdentifier(),
126 listenerScheduledRestartTime);
130 * Unregister to data tree change listener.
132 private synchronized void unregisterDataChangeListener() {
133 if (listenerRegistration != null) {
134 LOG.debug("Unregistered listener {} on topology {}", this, this.getInstanceIdentifier());
135 listenerRegistration.close();
136 listenerRegistration = null;
140 protected abstract InstanceIdentifier<T> getRouteWildcard(InstanceIdentifier<Tables> tablesId);
142 protected abstract void createObject(ReadWriteTransaction trans, InstanceIdentifier<T> id, T value);
144 protected abstract void removeObject(ReadWriteTransaction trans, InstanceIdentifier<T> id, T value);
146 protected abstract void clearTopology();
149 public final InstanceIdentifier<Topology> getInstanceIdentifier() {
153 public final synchronized FluentFuture<? extends CommitInfo> close() {
154 if (closed.getAndSet(true)) {
155 LOG.trace("Transaction chain was already closed.");
156 return CommitInfo.emptyFluentFuture();
158 LOG.info("Shutting down builder for {}", getInstanceIdentifier());
159 unregisterDataChangeListener();
160 final FluentFuture<? extends CommitInfo> future = destroyOperationalTopology();
161 destroyTransactionChain();
166 @SuppressWarnings("checkstyle:IllegalCatch")
167 public synchronized void onDataTreeChanged(final List<DataTreeModification<T>> changes) {
168 if (networkTopologyTransaction) {
170 LOG.trace("Transaction chain was already closed, skipping update.");
173 // check if the transaction chain needed to be restarted due to a previous error
174 if (restartTransactionChainOnDemand()) {
175 LOG.debug("The data change {} is disregarded due to restart of listener {}", changes, this);
178 final ReadWriteTransaction trans = chain.newReadWriteTransaction();
179 LOG.trace("Received data change {} event with transaction {}", changes, trans.getIdentifier());
180 final AtomicBoolean transactionInError = new AtomicBoolean(false);
181 for (final DataTreeModification<T> change : changes) {
183 routeChanged(change, trans);
184 } catch (final RuntimeException exc) {
185 LOG.warn("Data change {} (transaction {}) was not completely propagated to listener {}", change,
186 trans.getIdentifier(), this, exc);
187 // trans.cancel() is not supported by PingPongTransactionChain, so we just skip the problematic
189 // trans.commit() must be called first to unlock the current transaction chain, to make the chain
190 // closable so we cannot exit the #onDataTreeChanged() yet
191 transactionInError.set(true);
195 trans.commit().addCallback(new FutureCallback<CommitInfo>() {
197 public void onSuccess(final CommitInfo result) {
198 // as we are enforcing trans.commit(), in some cases the transaction execution actually could be
199 // successfully even when an exception is captured, thus #onTransactionChainFailed() never get
200 // invoked. Though the transaction chain remains usable,
201 // the data loss will not be able to be recovered. Thus we schedule a listener restart here
202 if (transactionInError.get()) {
203 LOG.warn("Transaction {} committed successfully while exception captured. Rescheduling a"
204 + " restart of listener {}", trans
205 .getIdentifier(), AbstractTopologyBuilder.this);
206 scheduleListenerRestart();
208 LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
213 public void onFailure(final Throwable throwable) {
214 // we do nothing but print out the log. Transaction chain restart will be done in
215 // #onTransactionChainFailed()
216 LOG.error("Failed to propagate change (transaction {}) by listener {}", trans.getIdentifier(),
217 AbstractTopologyBuilder.this, throwable);
219 }, MoreExecutors.directExecutor());
221 for (final DataTreeModification<T> change : changes) {
222 routeChanged(change, null);
228 protected void routeChanged(final DataTreeModification<T> change, final ReadWriteTransaction trans) {
229 final DataObjectModification<T> root = change.getRootNode();
230 switch (root.getModificationType()) {
232 removeObject(trans, change.getRootPath().getRootIdentifier(), root.getDataBefore());
234 case SUBTREE_MODIFIED:
236 if (root.getDataBefore() != null) {
237 removeObject(trans, change.getRootPath().getRootIdentifier(), root.getDataBefore());
239 createObject(trans, change.getRootPath().getRootIdentifier(), root.getDataAfter());
242 throw new IllegalArgumentException("Unhandled modification type " + root.getModificationType());
246 private synchronized void initOperationalTopology() {
247 requireNonNull(chain, "A valid transaction chain must be provided.");
248 final WriteTransaction trans = chain.newWriteOnlyTransaction();
249 trans.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, topology,
250 new TopologyBuilder().withKey(topologyKey).setServerProvided(Boolean.TRUE)
251 .setTopologyTypes(topologyTypes)
252 .setLink(Map.of()).setNode(Map.of()).build());
253 trans.commit().addCallback(new FutureCallback<CommitInfo>() {
255 public void onSuccess(final CommitInfo result) {
256 LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
260 public void onFailure(final Throwable throwable) {
261 LOG.error("Failed to initialize topology {} (transaction {}) by listener {}",
262 AbstractTopologyBuilder.this.topology,
263 trans.getIdentifier(), AbstractTopologyBuilder.this, throwable);
265 }, MoreExecutors.directExecutor());
269 * Destroy the current operational topology data. Note a valid transaction must be provided.
271 private synchronized FluentFuture<? extends CommitInfo> destroyOperationalTopology() {
272 requireNonNull(chain, "A valid transaction chain must be provided.");
273 final WriteTransaction trans = chain.newWriteOnlyTransaction();
274 trans.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
275 final FluentFuture<? extends CommitInfo> future = trans.commit();
276 future.addCallback(new FutureCallback<CommitInfo>() {
278 public void onSuccess(final CommitInfo result) {
279 LOG.trace("Operational topology removed {}", AbstractTopologyBuilder.this.topology);
283 public void onFailure(final Throwable throwable) {
284 LOG.error("Unable to reset operational topology {} (transaction {})",
285 AbstractTopologyBuilder.this.topology, trans.getIdentifier(), throwable);
287 }, MoreExecutors.directExecutor());
293 * Reset a transaction chain by closing the current chain and starting a new one.
295 private synchronized void initTransactionChain() {
296 LOG.debug("Initializing transaction chain for topology {}", this);
297 Preconditions.checkState(chain == null,
298 "Transaction chain has to be closed before being initialized");
299 chain = dataProvider.createMergingTransactionChain();
300 chain.addCallback(this);
304 * Destroy the current transaction chain.
306 private synchronized void destroyTransactionChain() {
308 LOG.debug("Destroy transaction chain for topology {}", this);
309 // we cannot close the transaction chain, as it will close the AbstractDOMForwardedTransactionFactory
310 // and the transaction factory cannot be reopen even if we recreate the transaction chain
311 // so we abandon the chain directly
312 // FIXME we want to close the transaction chain gracefully once the PingPongTransactionChain get improved
313 // and the above problem get resolved.
315 // this.chain.close();
316 // } catch (Exception e) {
317 // // the close() may not succeed when the transaction chain is locked
318 // LOG.error("Unable to close transaction chain {} for topology builder {}", this.chain,
319 // getInstanceIdentifier());
326 * Reset the data change listener to its initial status.
327 * By resetting the listener we will be able to recover all the data lost before
330 protected synchronized void resetListener() {
331 requireNonNull(listenerRegistration, "Listener on topology " + this + " hasn't been initialized.");
332 LOG.debug("Resetting data change listener for topology builder {}", getInstanceIdentifier());
333 // unregister current listener to prevent incoming data tree change first
334 unregisterDataChangeListener();
335 // create new transaction chain to reset the chain status
336 resetTransactionChain();
337 // reset the operational topology data so that we can have clean status
338 destroyOperationalTopology();
339 initOperationalTopology();
340 // re-register the data change listener to reset the operational topology
341 // we are expecting to receive all the pre-exist route change on the next onDataTreeChanged() call
342 registerDataChangeListener();
346 * Reset the transaction chain only so that the PingPong transaction chain will become usable again.
347 * However, there will be data loss if we do not apply the previous failed transaction again
350 protected synchronized void resetTransactionChain() {
351 LOG.debug("Resetting transaction chain for topology builder {}", getInstanceIdentifier());
352 destroyTransactionChain();
353 initTransactionChain();
357 * There are a few reasons we want to schedule a listener restart in a delayed manner:
358 * 1. we should avoid restarting the listener as when the topology is big, there might be huge overhead
359 * rebuilding the whole linkstate topology again and again
360 * 2. the #onTransactionChainFailed() normally get invoked after a delay. During that time gap, more
361 * data changes might still be pushed to #onDataTreeChanged(). And because #onTransactionChainFailed()
362 * is not invoked yet, listener restart/transaction chain restart is not done. Thus the new changes
363 * will still cause error and another #onTransactionChainFailed() might be invoked later. The listener
364 * will be restarted again in that case, which is unexpected. Restarting of transaction chain only introduce
365 * little overhead and it's okay to be restarted within a small time window.
366 * Note: when the listener is restarted, we can disregard all the incoming data changes before the restart is
367 * done, as after the listener unregister/reregister, the first #onDataTreeChanged() call will contain the a
368 * complete set of existing changes
370 * @return if the listener get restarted, return true; otherwise false
373 protected synchronized boolean restartTransactionChainOnDemand() {
374 if (listenerScheduledRestartTime > 0) {
375 // when the #this.listenerScheduledRestartTime timer timed out we can reset the listener,
376 // otherwise we should only reset the transaction chain
377 if (System.currentTimeMillis() > listenerScheduledRestartTime) {
378 // reset the the restart timer
379 listenerScheduledRestartTime = 0;
380 listenerScheduledRestartEnforceCounter = 0;
385 resetTransactionChain();
391 protected synchronized void scheduleListenerRestart() {
392 if (0 == listenerScheduledRestartTime) {
393 listenerScheduledRestartTime = System.currentTimeMillis() + listenerResetLimitInMillsec;
394 } else if (System.currentTimeMillis() > listenerScheduledRestartTime
395 && ++listenerScheduledRestartEnforceCounter < listenerResetEnforceCounter) {
396 // if the transaction failure happens again, we will delay the listener restart up to
397 // #LISTENER_RESET_LIMIT_IN_MILLSEC times
398 listenerScheduledRestartTime += listenerResetLimitInMillsec;
400 LOG.debug("A listener restart was scheduled at {} (current system time is {})",
401 listenerScheduledRestartTime, System.currentTimeMillis());
405 public final synchronized void onFailure(final Throwable cause) {
406 LOG.error("Topology builder for {} failed", getInstanceIdentifier(), cause);
407 scheduleListenerRestart();
408 restartTransactionChainOnDemand();
412 public final void onSuccess(final Empty value) {
413 LOG.info("Topology builder for {} shut down", getInstanceIdentifier());