2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.bgpcep.bgp.topology.provider;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.Collection;
19 import java.util.Collections;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import javax.annotation.concurrent.GuardedBy;
22 import org.opendaylight.bgpcep.topology.TopologyReference;
23 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
24 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
27 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
28 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
29 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
30 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
31 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
32 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
33 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
34 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.protocol.bgp.rib.RibReference;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.Route;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.rib.LocRib;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.Tables;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.TablesKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.AddressFamily;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.SubsequentAddressFamily;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
49 import org.opendaylight.yangtools.concepts.ListenerRegistration;
50 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
54 public abstract class AbstractTopologyBuilder<T extends Route> implements ClusteredDataTreeChangeListener<T>,
55 TopologyReference, TransactionChainListener {
56 private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologyBuilder.class);
57 // we limit the listener reset interval to be 5 min at most
58 private static final long LISTENER_RESET_LIMIT_IN_MILLSEC = 5 * 60 * 1000;
59 private static final int LISTENER_RESET_ENFORCE_COUNTER = 3;
60 private final InstanceIdentifier<Topology> topology;
61 private final RibReference locRibReference;
62 private final DataBroker dataProvider;
63 private final Class<? extends AddressFamily> afi;
64 private final Class<? extends SubsequentAddressFamily> safi;
65 private final TopologyKey topologyKey;
66 private final TopologyTypes topologyTypes;
67 private final long listenerResetLimitInMillsec;
68 private final int listenerResetEnforceCounter;
71 private ListenerRegistration<AbstractTopologyBuilder<T>> listenerRegistration = null;
73 private BindingTransactionChain chain = null;
75 private boolean closed = false;
78 protected long listenerScheduledRestartTime = 0;
81 protected int listenerScheduledRestartEnforceCounter = 0;
83 protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
84 final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
85 final Class<? extends SubsequentAddressFamily> safi, final long listenerResetLimitInMillsec,
86 final int listenerResetEnforceCounter) {
87 this.dataProvider = dataProvider;
88 this.locRibReference = requireNonNull(locRibReference);
89 this.topologyKey = new TopologyKey(requireNonNull(topologyId));
90 this.topologyTypes = types;
93 this.listenerResetLimitInMillsec = listenerResetLimitInMillsec;
94 this.listenerResetEnforceCounter = listenerResetEnforceCounter;
95 this.topology = InstanceIdentifier.builder(NetworkTopology.class).child(Topology.class, this.topologyKey).build();
98 protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
99 final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
100 final Class<? extends SubsequentAddressFamily> safi) {
101 this(dataProvider, locRibReference, topologyId, types, afi, safi, LISTENER_RESET_LIMIT_IN_MILLSEC, LISTENER_RESET_ENFORCE_COUNTER);
104 public final synchronized void start() {
105 LOG.debug("Initiating topology builder from {} at {}. AFI={}, SAFI={}", this.locRibReference, this.topology, this.afi, this.safi);
106 initTransactionChain();
107 initOperationalTopology();
108 registerDataChangeListener();
112 * Register to data tree change listener
114 private synchronized void registerDataChangeListener() {
115 Preconditions.checkState(this.listenerRegistration == null, "Topology Listener on topology %s has been registered before.", this.getInstanceIdentifier());
116 final InstanceIdentifier<Tables> tablesId = this.locRibReference.getInstanceIdentifier().child(LocRib.class).child(Tables.class, new TablesKey(this.afi, this.safi));
117 final DataTreeIdentifier<T> id = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, getRouteWildcard(tablesId));
119 this.listenerRegistration = this.dataProvider.registerDataTreeChangeListener(id, this);
120 LOG.debug("Registered listener {} on topology {}. Timestamp={}", this, this.getInstanceIdentifier(), this.listenerScheduledRestartTime);
124 * Unregister to data tree change listener
126 private final synchronized void unregisterDataChangeListener() {
127 if (this.listenerRegistration != null) {
128 LOG.debug("Unregistered listener {} on topology {}", this, this.getInstanceIdentifier());
129 this.listenerRegistration.close();
130 this.listenerRegistration = null;
134 protected abstract InstanceIdentifier<T> getRouteWildcard(InstanceIdentifier<Tables> tablesId);
136 protected abstract void createObject(ReadWriteTransaction trans, InstanceIdentifier<T> id, T value);
138 protected abstract void removeObject(ReadWriteTransaction trans, InstanceIdentifier<T> id, T value);
140 protected abstract void clearTopology();
143 public final InstanceIdentifier<Topology> getInstanceIdentifier() {
144 return this.topology;
147 public final synchronized ListenableFuture<Void> close() {
149 LOG.trace("Transaction chain was already closed.");
150 Futures.immediateFuture(null);
153 LOG.info("Shutting down builder for {}", getInstanceIdentifier());
154 unregisterDataChangeListener();
155 final ListenableFuture<Void> future = destroyOperationalTopology();
156 destroyTransactionChain();
161 public synchronized void onDataTreeChanged(final Collection<DataTreeModification<T>> changes) {
163 LOG.trace("Transaction chain was already closed, skipping update.");
166 // check if the transaction chain needed to be restarted due to a previous error
167 if (restartTransactionChainOnDemand()) {
168 LOG.debug("The data change {} is disregarded due to restart of listener {}", changes, this);
171 final ReadWriteTransaction trans = this.chain.newReadWriteTransaction();
172 LOG.debug("Received data change {} event with transaction {}", changes, trans.getIdentifier());
173 final AtomicBoolean transactionInError = new AtomicBoolean(false);
174 for (final DataTreeModification<T> change : changes) {
176 routeChanged(change, trans);
177 } catch (final RuntimeException e) {
178 LOG.warn("Data change {} (transaction {}) was not completely propagated to listener {}", change, trans.getIdentifier(), this, e);
179 // trans.cancel() is not supported by PingPongTransactionChain, so we just skip the problematic change
180 // trans.submit() must be called first to unlock the current transaction chain, to make the chain closable
181 // so we cannot exit the #onDataTreeChanged() yet
182 transactionInError.set(true);
186 Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
188 public void onSuccess(final Void result) {
189 // as we are enforcing trans.submit(), in some cases the transaction execution actually could be successfully even when an
190 // exception is captured, thus #onTransactionChainFailed() never get invoked. Though the transaction chain remains usable,
191 // the data loss will not be able to be recovered. Thus we schedule a listener restart here
192 if (transactionInError.get()) {
193 LOG.warn("Transaction {} committed successfully while exception captured. Rescheduling a restart of listener {}", trans
194 .getIdentifier(), AbstractTopologyBuilder.this);
195 scheduleListenerRestart();
197 LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
202 public void onFailure(final Throwable t) {
203 // we do nothing but print out the log. Transaction chain restart will be done in #onTransactionChainFailed()
204 LOG.error("Failed to propagate change (transaction {}) by listener {}", trans.getIdentifier(), AbstractTopologyBuilder.this, t);
206 }, MoreExecutors.directExecutor());
210 protected void routeChanged(final DataTreeModification<T> change, final ReadWriteTransaction trans) {
211 final DataObjectModification<T> root = change.getRootNode();
212 switch (root.getModificationType()) {
214 removeObject(trans, change.getRootPath().getRootIdentifier(), root.getDataBefore());
216 case SUBTREE_MODIFIED:
218 if (root.getDataBefore() != null) {
219 removeObject(trans, change.getRootPath().getRootIdentifier(), root.getDataBefore());
221 createObject(trans, change.getRootPath().getRootIdentifier(), root.getDataAfter());
224 throw new IllegalArgumentException("Unhandled modification type " + root.getModificationType());
228 private synchronized void initOperationalTopology() {
229 requireNonNull(this.chain, "A valid transaction chain must be provided.");
230 final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
231 trans.put(LogicalDatastoreType.OPERATIONAL, this.topology,
232 new TopologyBuilder().setKey(this.topologyKey).setServerProvided(Boolean.TRUE).setTopologyTypes(this.topologyTypes)
233 .setLink(Collections.emptyList()).setNode(Collections.emptyList()).build(), true);
234 Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
236 public void onSuccess(final Void result) {
237 LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
241 public void onFailure(final Throwable t) {
242 LOG.error("Failed to initialize topology {} (transaction {}) by listener {}", AbstractTopologyBuilder.this.topology,
243 trans.getIdentifier(), AbstractTopologyBuilder.this, t);
245 }, MoreExecutors.directExecutor());
249 * Destroy the current operational topology data. Note a valid transaction must be provided
250 * @throws TransactionCommitFailedException
252 private synchronized ListenableFuture<Void> destroyOperationalTopology() {
253 requireNonNull(this.chain, "A valid transaction chain must be provided.");
254 final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
255 trans.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
256 final ListenableFuture<Void> future = trans.submit();
257 Futures.addCallback(future, new FutureCallback<Void>() {
259 public void onSuccess(final Void result) {
260 LOG.trace("Operational topology removed {}", AbstractTopologyBuilder.this.topology);
264 public void onFailure(final Throwable t) {
265 LOG.error("Unable to reset operational topology {} (transaction {})",
266 AbstractTopologyBuilder.this.topology, trans.getIdentifier(), t);
268 }, MoreExecutors.directExecutor());
274 * Reset a transaction chain by closing the current chain and starting a new one
276 private synchronized void initTransactionChain() {
277 LOG.debug("Initializing transaction chain for topology {}", this);
278 Preconditions.checkState(this.chain == null, "Transaction chain has to be closed before being initialized");
279 this.chain = this.dataProvider.createTransactionChain(this);
283 * Destroy the current transaction chain
285 private synchronized void destroyTransactionChain() {
286 if (this.chain != null) {
287 LOG.debug("Destroy transaction chain for topology {}", this);
288 // we cannot close the transaction chain, as it will close the AbstractDOMForwardedTransactionFactory
289 // and the transaction factory cannot be reopen even if we recreate the transaction chain
290 // so we abandon the chain directly
291 // FIXME we want to close the transaction chain gracefully once the PingPongTransactionChain get improved
292 // and the above problem get resolved.
294 // this.chain.close();
295 // } catch (Exception e) {
296 // // the close() may not succeed when the transaction chain is locked
297 // LOG.error("Unable to close transaction chain {} for topology builder {}", this.chain, getInstanceIdentifier());
304 * Reset the data change listener to its initial status
305 * By resetting the listener we will be able to recover all the data lost before
308 protected synchronized void resetListener() {
309 requireNonNull(this.listenerRegistration, "Listener on topology " + this + " hasn't been initialized.");
310 LOG.debug("Resetting data change listener for topology builder {}", getInstanceIdentifier());
311 // unregister current listener to prevent incoming data tree change first
312 unregisterDataChangeListener();
313 // create new transaction chain to reset the chain status
314 resetTransactionChain();
315 // reset the operational topology data so that we can have clean status
316 destroyOperationalTopology();
317 initOperationalTopology();
318 // re-register the data change listener to reset the operational topology
319 // we are expecting to receive all the pre-exist route change on the next onDataTreeChanged() call
320 registerDataChangeListener();
324 * Reset the transaction chain only so that the PingPong transaction chain will become usable again.
325 * However, there will be data loss if we do not apply the previous failed transaction again
328 protected synchronized void resetTransactionChain() {
329 LOG.debug("Resetting transaction chain for topology builder {}", getInstanceIdentifier());
330 destroyTransactionChain();
331 initTransactionChain();
335 * There are a few reasons we want to schedule a listener restart in a delayed manner:
336 * 1. we should avoid restarting the listener as when the topology is big, there might be huge overhead
337 * rebuilding the whole linkstate topology again and again
338 * 2. the #onTransactionChainFailed() normally get invoked after a delay. During that time gap, more
339 * data changes might still be pushed to #onDataTreeChanged(). And because #onTransactionChainFailed()
340 * is not invoked yet, listener restart/transaction chain restart is not done. Thus the new changes
341 * will still cause error and another #onTransactionChainFailed() might be invoked later. The listener
342 * will be restarted again in that case, which is unexpected. Restarting of transaction chain only introduce
343 * little overhead and it's okay to be restarted within a small time window
345 * Note: when the listener is restarted, we can disregard all the incoming data changes before the restart is
346 * done, as after the listener unregister/reregister, the first #onDataTreeChanged() call will contain the a
347 * complete set of existing changes
349 * @return if the listener get restarted, return true; otherwise false
352 protected synchronized boolean restartTransactionChainOnDemand() {
353 if (this.listenerScheduledRestartTime > 0) {
354 // when the #this.listenerScheduledRestartTime timer timed out we can reset the listener, otherwise we should only reset the transaction chain
355 if (System.currentTimeMillis() > this.listenerScheduledRestartTime) {
356 // reset the the restart timer
357 this.listenerScheduledRestartTime = 0;
358 this.listenerScheduledRestartEnforceCounter = 0;
363 resetTransactionChain();
369 protected synchronized void scheduleListenerRestart() {
370 if (0 == this.listenerScheduledRestartTime) {
371 this.listenerScheduledRestartTime = System.currentTimeMillis() + this.listenerResetLimitInMillsec;
372 } else if (System.currentTimeMillis() > this.listenerScheduledRestartTime
373 && ++this.listenerScheduledRestartEnforceCounter < this.listenerResetEnforceCounter) {
374 // if the transaction failure happens again, we will delay the listener restart up to #LISTENER_RESET_LIMIT_IN_MILLSEC times
375 this.listenerScheduledRestartTime += this.listenerResetLimitInMillsec;
377 LOG.debug("A listener restart was scheduled at {} (current system time is {})", this.listenerScheduledRestartTime, System.currentTimeMillis());
381 public final synchronized void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction, final Throwable cause) {
382 LOG.error("Topology builder for {} failed in transaction {}.", getInstanceIdentifier(), transaction != null ? transaction.getIdentifier() : null, cause);
383 scheduleListenerRestart();
384 restartTransactionChainOnDemand();
388 public final void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
389 LOG.info("Topology builder for {} shut down", getInstanceIdentifier());