591d026cbddfb29b087617457941e1578b20032d
[bgpcep.git] / bgp / topology-provider / src / main / java / org / opendaylight / bgpcep / bgp / topology / provider / AbstractTopologyBuilder.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.bgp.topology.provider;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import java.util.Collection;
20 import java.util.Collections;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.bgpcep.topology.TopologyReference;
24 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
25 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
28 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
29 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
30 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
31 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
32 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
33 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
34 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
36 import org.opendaylight.protocol.bgp.rib.RibReference;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.Route;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.rib.LocRib;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.Tables;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.TablesKey;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.AddressFamily;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.SubsequentAddressFamily;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
47 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
48 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
49 import org.opendaylight.yangtools.concepts.ListenerRegistration;
50 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 public abstract class AbstractTopologyBuilder<T extends Route> implements ClusteredDataTreeChangeListener<T>,
55     TopologyReference, TransactionChainListener {
56     private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologyBuilder.class);
57     // we limit the listener reset interval to be 5 min at most
58     private static final long LISTENER_RESET_LIMIT_IN_MILLSEC = 5 * 60 * 1000;
59     private static final int LISTENER_RESET_ENFORCE_COUNTER = 3;
60     private final InstanceIdentifier<Topology> topology;
61     private final RibReference locRibReference;
62     private final DataBroker dataProvider;
63     private final Class<? extends AddressFamily> afi;
64     private final Class<? extends SubsequentAddressFamily> safi;
65     private final TopologyKey topologyKey;
66     private final TopologyTypes topologyTypes;
67     private final long listenerResetLimitInMillsec;
68     private final int listenerResetEnforceCounter;
69
70     @GuardedBy("this")
71     private ListenerRegistration<AbstractTopologyBuilder<T>> listenerRegistration = null;
72     @GuardedBy("this")
73     private BindingTransactionChain chain = null;
74     @GuardedBy("this")
75     private boolean closed = false;
76     @GuardedBy("this")
77     @VisibleForTesting
78     protected long listenerScheduledRestartTime = 0;
79     @GuardedBy("this")
80     @VisibleForTesting
81     protected int listenerScheduledRestartEnforceCounter = 0;
82
83     protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
84             final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
85         final Class<? extends SubsequentAddressFamily> safi, final long listenerResetLimitInMillsec,
86         final int listenerResetEnforceCounter) {
87         this.dataProvider = dataProvider;
88         this.locRibReference = requireNonNull(locRibReference);
89         this.topologyKey = new TopologyKey(requireNonNull(topologyId));
90         this.topologyTypes = types;
91         this.afi = afi;
92         this.safi = safi;
93         this.listenerResetLimitInMillsec = listenerResetLimitInMillsec;
94         this.listenerResetEnforceCounter = listenerResetEnforceCounter;
95         this.topology = InstanceIdentifier.builder(NetworkTopology.class)
96                 .child(Topology.class, this.topologyKey).build();
97     }
98
99     protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
100         final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
101         final Class<? extends SubsequentAddressFamily> safi) {
102         this(dataProvider, locRibReference, topologyId, types, afi, safi, LISTENER_RESET_LIMIT_IN_MILLSEC,
103                 LISTENER_RESET_ENFORCE_COUNTER);
104     }
105
106     public final synchronized void start() {
107         LOG.debug("Initiating topology builder from {} at {}. AFI={}, SAFI={}", this.locRibReference, this.topology,
108                 this.afi, this.safi);
109         initTransactionChain();
110         initOperationalTopology();
111         registerDataChangeListener();
112     }
113
114     /**
115      * Register to data tree change listener.
116      */
117     private synchronized void registerDataChangeListener() {
118         Preconditions.checkState(this.listenerRegistration == null,
119                 "Topology Listener on topology %s has been registered before.",
120                 this.getInstanceIdentifier());
121         final InstanceIdentifier<Tables> tablesId = this.locRibReference.getInstanceIdentifier()
122                 .child(LocRib.class).child(Tables.class, new TablesKey(this.afi, this.safi));
123         final DataTreeIdentifier<T> id = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
124                 getRouteWildcard(tablesId));
125
126         this.listenerRegistration = this.dataProvider.registerDataTreeChangeListener(id, this);
127         LOG.debug("Registered listener {} on topology {}. Timestamp={}", this, this.getInstanceIdentifier(),
128                 this.listenerScheduledRestartTime);
129     }
130
131     /**
132      * Unregister to data tree change listener.
133      */
134     private synchronized void unregisterDataChangeListener() {
135         if (this.listenerRegistration != null) {
136             LOG.debug("Unregistered listener {} on topology {}", this, this.getInstanceIdentifier());
137             this.listenerRegistration.close();
138             this.listenerRegistration = null;
139         }
140     }
141
142     protected abstract InstanceIdentifier<T> getRouteWildcard(InstanceIdentifier<Tables> tablesId);
143
144     protected abstract void createObject(ReadWriteTransaction trans, InstanceIdentifier<T> id, T value);
145
146     protected abstract void removeObject(ReadWriteTransaction trans, InstanceIdentifier<T> id, T value);
147
148     protected abstract void clearTopology();
149
150     @Override
151     public final InstanceIdentifier<Topology> getInstanceIdentifier() {
152         return this.topology;
153     }
154
155     @SuppressFBWarnings(value = "NP_NONNULL_PARAM_VIOLATION", justification = "Unrecognised NullableDecl")
156     public final synchronized ListenableFuture<Void> close() {
157         if (this.closed) {
158             LOG.trace("Transaction chain was already closed.");
159             Futures.immediateFuture(null);
160         }
161         this.closed = true;
162         LOG.info("Shutting down builder for {}", getInstanceIdentifier());
163         unregisterDataChangeListener();
164         final ListenableFuture<Void> future = destroyOperationalTopology();
165         destroyTransactionChain();
166         return future;
167     }
168
169     @Override
170     @SuppressWarnings("checkstyle:IllegalCatch")
171     public synchronized void onDataTreeChanged(final Collection<DataTreeModification<T>> changes) {
172         if (this.closed) {
173             LOG.trace("Transaction chain was already closed, skipping update.");
174             return;
175         }
176         // check if the transaction chain needed to be restarted due to a previous error
177         if (restartTransactionChainOnDemand()) {
178             LOG.debug("The data change {} is disregarded due to restart of listener {}", changes, this);
179             return;
180         }
181         final ReadWriteTransaction trans = this.chain.newReadWriteTransaction();
182         LOG.trace("Received data change {} event with transaction {}", changes, trans.getIdentifier());
183         final AtomicBoolean transactionInError = new AtomicBoolean(false);
184         for (final DataTreeModification<T> change : changes) {
185             try {
186                 routeChanged(change, trans);
187             } catch (final RuntimeException exc) {
188                 LOG.warn("Data change {} (transaction {}) was not completely propagated to listener {}", change,
189                         trans.getIdentifier(), this, exc);
190                 // trans.cancel() is not supported by PingPongTransactionChain, so we just skip the problematic change
191                 // trans.submit() must be called first to unlock the current transaction chain, to make the chain
192                 // closable so we cannot exit the #onDataTreeChanged() yet
193                 transactionInError.set(true);
194                 break;
195             }
196         }
197         Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
198             @Override
199             public void onSuccess(final Void result) {
200                 // as we are enforcing trans.submit(), in some cases the transaction execution actually could be
201                 // successfully even when an exception is captured, thus #onTransactionChainFailed() never get invoked.
202                 // Though the transaction chain remains usable,
203                 // the data loss will not be able to be recovered. Thus we schedule a listener restart here
204                 if (transactionInError.get()) {
205                     LOG.warn("Transaction {} committed successfully while exception captured. Rescheduling a restart"
206                             + " of listener {}", trans
207                         .getIdentifier(), AbstractTopologyBuilder.this);
208                     scheduleListenerRestart();
209                 } else {
210                     LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
211                 }
212             }
213
214             @Override
215             public void onFailure(final Throwable throwable) {
216                 // we do nothing but print out the log. Transaction chain restart will be done in
217                 // #onTransactionChainFailed()
218                 LOG.error("Failed to propagate change (transaction {}) by listener {}", trans.getIdentifier(),
219                         AbstractTopologyBuilder.this, throwable);
220             }
221         }, MoreExecutors.directExecutor());
222     }
223
224     @VisibleForTesting
225     protected void routeChanged(final DataTreeModification<T> change, final ReadWriteTransaction trans) {
226         final DataObjectModification<T> root = change.getRootNode();
227         switch (root.getModificationType()) {
228             case DELETE:
229                 removeObject(trans, change.getRootPath().getRootIdentifier(), root.getDataBefore());
230                 break;
231             case SUBTREE_MODIFIED:
232             case WRITE:
233                 if (root.getDataBefore() != null) {
234                     removeObject(trans, change.getRootPath().getRootIdentifier(), root.getDataBefore());
235                 }
236                 createObject(trans, change.getRootPath().getRootIdentifier(), root.getDataAfter());
237                 break;
238             default:
239                 throw new IllegalArgumentException("Unhandled modification type " + root.getModificationType());
240         }
241     }
242
243     private synchronized void initOperationalTopology() {
244         requireNonNull(this.chain, "A valid transaction chain must be provided.");
245         final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
246         trans.put(LogicalDatastoreType.OPERATIONAL, this.topology,
247                 new TopologyBuilder().setKey(this.topologyKey).setServerProvided(Boolean.TRUE)
248                         .setTopologyTypes(this.topologyTypes)
249                         .setLink(Collections.emptyList()).setNode(Collections.emptyList()).build(), true);
250         Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
251             @Override
252             public void onSuccess(final Void result) {
253                 LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
254             }
255
256             @Override
257             public void onFailure(final Throwable throwable) {
258                 LOG.error("Failed to initialize topology {} (transaction {}) by listener {}",
259                         AbstractTopologyBuilder.this.topology,
260                         trans.getIdentifier(), AbstractTopologyBuilder.this, throwable);
261             }
262         }, MoreExecutors.directExecutor());
263     }
264
265     /**
266      * Destroy the current operational topology data. Note a valid transaction must be provided.
267      */
268     private synchronized ListenableFuture<Void> destroyOperationalTopology() {
269         requireNonNull(this.chain, "A valid transaction chain must be provided.");
270         final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
271         trans.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
272         final ListenableFuture<Void> future = trans.submit();
273         Futures.addCallback(future, new FutureCallback<Void>() {
274             @Override
275             public void onSuccess(final Void result) {
276                 LOG.trace("Operational topology removed {}", AbstractTopologyBuilder.this.topology);
277             }
278
279             @Override
280             public void onFailure(final Throwable throwable) {
281                 LOG.error("Unable to reset operational topology {} (transaction {})",
282                     AbstractTopologyBuilder.this.topology, trans.getIdentifier(), throwable);
283             }
284         }, MoreExecutors.directExecutor());
285         clearTopology();
286         return future;
287     }
288
289     /**
290      * Reset a transaction chain by closing the current chain and starting a new one.
291      */
292     private synchronized void initTransactionChain() {
293         LOG.debug("Initializing transaction chain for topology {}", this);
294         Preconditions.checkState(this.chain == null,
295                 "Transaction chain has to be closed before being initialized");
296         this.chain = this.dataProvider.createTransactionChain(this);
297     }
298
299     /**
300      * Destroy the current transaction chain.
301      */
302     private synchronized void destroyTransactionChain() {
303         if (this.chain != null) {
304             LOG.debug("Destroy transaction chain for topology {}", this);
305             // we cannot close the transaction chain, as it will close the AbstractDOMForwardedTransactionFactory
306             // and the transaction factory cannot be reopen even if we recreate the transaction chain
307             // so we abandon the chain directly
308             // FIXME we want to close the transaction chain gracefully once the PingPongTransactionChain get improved
309             // and the above problem get resolved.
310 //            try {
311 //                this.chain.close();
312 //            } catch (Exception e) {
313 //                // the close() may not succeed when the transaction chain is locked
314 //                LOG.error("Unable to close transaction chain {} for topology builder {}", this.chain,
315 //                  getInstanceIdentifier());
316 //            }
317             this.chain = null;
318         }
319     }
320
321     /**
322      * Reset the data change listener to its initial status.
323      * By resetting the listener we will be able to recover all the data lost before
324      */
325     @VisibleForTesting
326     protected synchronized void resetListener() {
327         requireNonNull(this.listenerRegistration, "Listener on topology " + this + " hasn't been initialized.");
328         LOG.debug("Resetting data change listener for topology builder {}", getInstanceIdentifier());
329         // unregister current listener to prevent incoming data tree change first
330         unregisterDataChangeListener();
331         // create new transaction chain to reset the chain status
332         resetTransactionChain();
333         // reset the operational topology data so that we can have clean status
334         destroyOperationalTopology();
335         initOperationalTopology();
336         // re-register the data change listener to reset the operational topology
337         // we are expecting to receive all the pre-exist route change on the next onDataTreeChanged() call
338         registerDataChangeListener();
339     }
340
341     /**
342      * Reset the transaction chain only so that the PingPong transaction chain will become usable again.
343      * However, there will be data loss if we do not apply the previous failed transaction again
344      */
345     @VisibleForTesting
346     protected synchronized void resetTransactionChain() {
347         LOG.debug("Resetting transaction chain for topology builder {}", getInstanceIdentifier());
348         destroyTransactionChain();
349         initTransactionChain();
350     }
351
352     /**
353      * There are a few reasons we want to schedule a listener restart in a delayed manner:
354      * 1. we should avoid restarting the listener as when the topology is big, there might be huge overhead
355      * rebuilding the whole linkstate topology again and again
356      * 2. the #onTransactionChainFailed() normally get invoked after a delay. During that time gap, more
357      * data changes might still be pushed to #onDataTreeChanged(). And because #onTransactionChainFailed()
358      * is not invoked yet, listener restart/transaction chain restart is not done. Thus the new changes
359      * will still cause error and another #onTransactionChainFailed() might be invoked later. The listener
360      * will be restarted again in that case, which is unexpected. Restarting of transaction chain only introduce
361      * little overhead and it's okay to be restarted within a small time window.
362      * Note: when the listener is restarted, we can disregard all the incoming data changes before the restart is
363      * done, as after the listener unregister/reregister, the first #onDataTreeChanged() call will contain the a
364      * complete set of existing changes
365      *
366      * @return if the listener get restarted, return true; otherwise false
367      */
368     @VisibleForTesting
369     protected synchronized boolean restartTransactionChainOnDemand() {
370         if (this.listenerScheduledRestartTime > 0) {
371             // when the #this.listenerScheduledRestartTime timer timed out we can reset the listener,
372             // otherwise we should only reset the transaction chain
373             if (System.currentTimeMillis() > this.listenerScheduledRestartTime) {
374                 // reset the the restart timer
375                 this.listenerScheduledRestartTime = 0;
376                 this.listenerScheduledRestartEnforceCounter = 0;
377                 resetListener();
378                 return true;
379             }
380
381             resetTransactionChain();
382         }
383         return false;
384     }
385
386     @VisibleForTesting
387     protected synchronized void scheduleListenerRestart() {
388         if (0 == this.listenerScheduledRestartTime) {
389             this.listenerScheduledRestartTime = System.currentTimeMillis() + this.listenerResetLimitInMillsec;
390         } else if (System.currentTimeMillis() > this.listenerScheduledRestartTime
391             && ++this.listenerScheduledRestartEnforceCounter < this.listenerResetEnforceCounter) {
392             // if the transaction failure happens again, we will delay the listener restart up to
393             // #LISTENER_RESET_LIMIT_IN_MILLSEC times
394             this.listenerScheduledRestartTime += this.listenerResetLimitInMillsec;
395         }
396         LOG.debug("A listener restart was scheduled at {} (current system time is {})",
397                 this.listenerScheduledRestartTime, System.currentTimeMillis());
398     }
399
400     @Override
401     public final synchronized void onTransactionChainFailed(final TransactionChain<?, ?> transactionChain,
402             final AsyncTransaction<?, ?> transaction, final Throwable cause) {
403         LOG.error("Topology builder for {} failed in transaction {}.", getInstanceIdentifier(),
404                 transaction != null ? transaction.getIdentifier() : null, cause);
405         scheduleListenerRestart();
406         restartTransactionChainOnDemand();
407     }
408
409     @Override
410     public final void onTransactionChainSuccessful(final TransactionChain<?, ?> transactionChain) {
411         LOG.info("Topology builder for {} shut down", getInstanceIdentifier());
412     }
413 }