9f6326e027796ea304e56b112e2f69928757e053
[bgpcep.git] / bgp / topology-provider / src / main / java / org / opendaylight / bgpcep / bgp / topology / provider / AbstractTopologyBuilder.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.bgp.topology.provider;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.Collection;
18 import java.util.Map;
19 import java.util.concurrent.atomic.AtomicBoolean;
20 import org.checkerframework.checker.lock.qual.GuardedBy;
21 import org.opendaylight.bgpcep.topology.TopologyReference;
22 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
23 import org.opendaylight.mdsal.binding.api.DataBroker;
24 import org.opendaylight.mdsal.binding.api.DataObjectModification;
25 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
26 import org.opendaylight.mdsal.binding.api.DataTreeModification;
27 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
28 import org.opendaylight.mdsal.binding.api.Transaction;
29 import org.opendaylight.mdsal.binding.api.TransactionChain;
30 import org.opendaylight.mdsal.binding.api.TransactionChainListener;
31 import org.opendaylight.mdsal.binding.api.WriteTransaction;
32 import org.opendaylight.mdsal.common.api.CommitInfo;
33 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
34 import org.opendaylight.protocol.bgp.rib.RibReference;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.Route;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.rib.LocRib;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.Tables;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.TablesKey;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.AddressFamily;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.SubsequentAddressFamily;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
47 import org.opendaylight.yangtools.concepts.ListenerRegistration;
48 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 public abstract class AbstractTopologyBuilder<T extends Route> implements ClusteredDataTreeChangeListener<T>,
53     TopologyReference, TransactionChainListener {
54     private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologyBuilder.class);
55     // we limit the listener reset interval to be 5 min at most
56     private static final long LISTENER_RESET_LIMIT_IN_MILLSEC = 5 * 60 * 1000;
57     private static final int LISTENER_RESET_ENFORCE_COUNTER = 3;
58     private final InstanceIdentifier<Topology> topology;
59     private final RibReference locRibReference;
60     private final DataBroker dataProvider;
61     private final AddressFamily afi;
62     private final SubsequentAddressFamily safi;
63     private final TopologyKey topologyKey;
64     private final TopologyTypes topologyTypes;
65     private final long listenerResetLimitInMillsec;
66     private final int listenerResetEnforceCounter;
67
68     @GuardedBy("this")
69     private ListenerRegistration<AbstractTopologyBuilder<T>> listenerRegistration = null;
70     @GuardedBy("this")
71     private TransactionChain chain = null;
72     private final AtomicBoolean closed = new AtomicBoolean(false);
73     @GuardedBy("this")
74     @VisibleForTesting
75     protected long listenerScheduledRestartTime = 0;
76     @GuardedBy("this")
77     @VisibleForTesting
78     protected int listenerScheduledRestartEnforceCounter = 0;
79     protected boolean networkTopologyTransaction = true;
80
81     protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
82             final TopologyId topologyId, final TopologyTypes types, final AddressFamily afi,
83             final SubsequentAddressFamily safi, final long listenerResetLimitInMillsec,
84             final int listenerResetEnforceCounter) {
85         this.dataProvider = dataProvider;
86         this.locRibReference = requireNonNull(locRibReference);
87         this.topologyKey = new TopologyKey(requireNonNull(topologyId));
88         this.topologyTypes = types;
89         this.afi = afi;
90         this.safi = safi;
91         this.listenerResetLimitInMillsec = listenerResetLimitInMillsec;
92         this.listenerResetEnforceCounter = listenerResetEnforceCounter;
93         this.topology = InstanceIdentifier.builder(NetworkTopology.class)
94                 .child(Topology.class, this.topologyKey).build();
95     }
96
97     protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
98         final TopologyId topologyId, final TopologyTypes types, final AddressFamily afi,
99         final SubsequentAddressFamily safi) {
100         this(dataProvider, locRibReference, topologyId, types, afi, safi, LISTENER_RESET_LIMIT_IN_MILLSEC,
101                 LISTENER_RESET_ENFORCE_COUNTER);
102     }
103
104     public final synchronized void start() {
105         LOG.debug("Initiating topology builder from {} at {}. AFI={}, SAFI={}", this.locRibReference, this.topology,
106                 this.afi, this.safi);
107         initTransactionChain();
108         initOperationalTopology();
109         registerDataChangeListener();
110     }
111
112     /**
113      * Register to data tree change listener.
114      */
115     private synchronized void registerDataChangeListener() {
116         Preconditions.checkState(this.listenerRegistration == null,
117                 "Topology Listener on topology %s has been registered before.",
118                 this.getInstanceIdentifier());
119         final InstanceIdentifier<Tables> tablesId = this.locRibReference.getInstanceIdentifier()
120                 .child(LocRib.class).child(Tables.class, new TablesKey(this.afi, this.safi));
121         final DataTreeIdentifier<T> id = DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
122                 getRouteWildcard(tablesId));
123
124         this.listenerRegistration = this.dataProvider.registerDataTreeChangeListener(id, this);
125         LOG.debug("Registered listener {} on topology {}. Timestamp={}", this, this.getInstanceIdentifier(),
126                 this.listenerScheduledRestartTime);
127     }
128
129     /**
130      * Unregister to data tree change listener.
131      */
132     private synchronized void unregisterDataChangeListener() {
133         if (this.listenerRegistration != null) {
134             LOG.debug("Unregistered listener {} on topology {}", this, this.getInstanceIdentifier());
135             this.listenerRegistration.close();
136             this.listenerRegistration = null;
137         }
138     }
139
140     protected abstract InstanceIdentifier<T> getRouteWildcard(InstanceIdentifier<Tables> tablesId);
141
142     protected abstract void createObject(ReadWriteTransaction trans, InstanceIdentifier<T> id, T value);
143
144     protected abstract void removeObject(ReadWriteTransaction trans, InstanceIdentifier<T> id, T value);
145
146     protected abstract void clearTopology();
147
148     @Override
149     public final InstanceIdentifier<Topology> getInstanceIdentifier() {
150         return this.topology;
151     }
152
153     public final synchronized FluentFuture<? extends CommitInfo> close() {
154         if (this.closed.getAndSet(true)) {
155             LOG.trace("Transaction chain was already closed.");
156             return CommitInfo.emptyFluentFuture();
157         }
158         LOG.info("Shutting down builder for {}", getInstanceIdentifier());
159         unregisterDataChangeListener();
160         final FluentFuture<? extends CommitInfo> future = destroyOperationalTopology();
161         destroyTransactionChain();
162         return future;
163     }
164
165     @Override
166     @SuppressWarnings("checkstyle:IllegalCatch")
167     public synchronized void onDataTreeChanged(final Collection<DataTreeModification<T>> changes) {
168         if (networkTopologyTransaction) {
169             if (this.closed.get()) {
170                 LOG.trace("Transaction chain was already closed, skipping update.");
171                 return;
172             }
173             // check if the transaction chain needed to be restarted due to a previous error
174             if (restartTransactionChainOnDemand()) {
175                 LOG.debug("The data change {} is disregarded due to restart of listener {}", changes, this);
176                 return;
177             }
178             final ReadWriteTransaction trans = this.chain.newReadWriteTransaction();
179             LOG.trace("Received data change {} event with transaction {}", changes, trans.getIdentifier());
180             final AtomicBoolean transactionInError = new AtomicBoolean(false);
181             for (final DataTreeModification<T> change : changes) {
182                 try {
183                     routeChanged(change, trans);
184                 } catch (final RuntimeException exc) {
185                     LOG.warn("Data change {} (transaction {}) was not completely propagated to listener {}", change,
186                             trans.getIdentifier(), this, exc);
187                     // trans.cancel() is not supported by PingPongTransactionChain, so we just skip the problematic
188                     // change.
189                     // trans.commit() must be called first to unlock the current transaction chain, to make the chain
190                     // closable so we cannot exit the #onDataTreeChanged() yet
191                     transactionInError.set(true);
192                     break;
193                 }
194             }
195             trans.commit().addCallback(new FutureCallback<CommitInfo>() {
196                 @Override
197                 public void onSuccess(final CommitInfo result) {
198                     // as we are enforcing trans.commit(), in some cases the transaction execution actually could be
199                     // successfully even when an exception is captured, thus #onTransactionChainFailed() never get
200                     // invoked. Though the transaction chain remains usable,
201                     // the data loss will not be able to be recovered. Thus we schedule a listener restart here
202                     if (transactionInError.get()) {
203                         LOG.warn("Transaction {} committed successfully while exception captured. Rescheduling a"
204                                 + " restart of listener {}", trans
205                             .getIdentifier(), AbstractTopologyBuilder.this);
206                         scheduleListenerRestart();
207                     } else {
208                         LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
209                     }
210                 }
211
212                 @Override
213                 public void onFailure(final Throwable throwable) {
214                     // we do nothing but print out the log. Transaction chain restart will be done in
215                     // #onTransactionChainFailed()
216                     LOG.error("Failed to propagate change (transaction {}) by listener {}", trans.getIdentifier(),
217                             AbstractTopologyBuilder.this, throwable);
218                 }
219             }, MoreExecutors.directExecutor());
220         } else {
221             for (final DataTreeModification<T> change : changes) {
222                 routeChanged(change, null);
223             }
224         }
225     }
226
227     @VisibleForTesting
228     protected void routeChanged(final DataTreeModification<T> change, final ReadWriteTransaction trans) {
229         final DataObjectModification<T> root = change.getRootNode();
230         switch (root.getModificationType()) {
231             case DELETE:
232                 removeObject(trans, change.getRootPath().getRootIdentifier(), root.getDataBefore());
233                 break;
234             case SUBTREE_MODIFIED:
235             case WRITE:
236                 if (root.getDataBefore() != null) {
237                     removeObject(trans, change.getRootPath().getRootIdentifier(), root.getDataBefore());
238                 }
239                 createObject(trans, change.getRootPath().getRootIdentifier(), root.getDataAfter());
240                 break;
241             default:
242                 throw new IllegalArgumentException("Unhandled modification type " + root.getModificationType());
243         }
244     }
245
246     private synchronized void initOperationalTopology() {
247         requireNonNull(this.chain, "A valid transaction chain must be provided.");
248         final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
249         trans.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, this.topology,
250                 new TopologyBuilder().withKey(this.topologyKey).setServerProvided(Boolean.TRUE)
251                         .setTopologyTypes(this.topologyTypes)
252                         .setLink(Map.of()).setNode(Map.of()).build());
253         trans.commit().addCallback(new FutureCallback<CommitInfo>() {
254             @Override
255             public void onSuccess(final CommitInfo result) {
256                 LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
257             }
258
259             @Override
260             public void onFailure(final Throwable throwable) {
261                 LOG.error("Failed to initialize topology {} (transaction {}) by listener {}",
262                         AbstractTopologyBuilder.this.topology,
263                         trans.getIdentifier(), AbstractTopologyBuilder.this, throwable);
264             }
265         }, MoreExecutors.directExecutor());
266     }
267
268     /**
269      * Destroy the current operational topology data. Note a valid transaction must be provided.
270      */
271     private synchronized FluentFuture<? extends CommitInfo> destroyOperationalTopology() {
272         requireNonNull(this.chain, "A valid transaction chain must be provided.");
273         final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
274         trans.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
275         final FluentFuture<? extends CommitInfo> future = trans.commit();
276         future.addCallback(new FutureCallback<CommitInfo>() {
277             @Override
278             public void onSuccess(final CommitInfo result) {
279                 LOG.trace("Operational topology removed {}", AbstractTopologyBuilder.this.topology);
280             }
281
282             @Override
283             public void onFailure(final Throwable throwable) {
284                 LOG.error("Unable to reset operational topology {} (transaction {})",
285                     AbstractTopologyBuilder.this.topology, trans.getIdentifier(), throwable);
286             }
287         }, MoreExecutors.directExecutor());
288         clearTopology();
289         return future;
290     }
291
292     /**
293      * Reset a transaction chain by closing the current chain and starting a new one.
294      */
295     private synchronized void initTransactionChain() {
296         LOG.debug("Initializing transaction chain for topology {}", this);
297         Preconditions.checkState(this.chain == null,
298                 "Transaction chain has to be closed before being initialized");
299         this.chain = this.dataProvider.createMergingTransactionChain(this);
300     }
301
302     /**
303      * Destroy the current transaction chain.
304      */
305     private synchronized void destroyTransactionChain() {
306         if (this.chain != null) {
307             LOG.debug("Destroy transaction chain for topology {}", this);
308             // we cannot close the transaction chain, as it will close the AbstractDOMForwardedTransactionFactory
309             // and the transaction factory cannot be reopen even if we recreate the transaction chain
310             // so we abandon the chain directly
311             // FIXME we want to close the transaction chain gracefully once the PingPongTransactionChain get improved
312             // and the above problem get resolved.
313 //            try {
314 //                this.chain.close();
315 //            } catch (Exception e) {
316 //                // the close() may not succeed when the transaction chain is locked
317 //                LOG.error("Unable to close transaction chain {} for topology builder {}", this.chain,
318 //                  getInstanceIdentifier());
319 //            }
320             this.chain = null;
321         }
322     }
323
324     /**
325      * Reset the data change listener to its initial status.
326      * By resetting the listener we will be able to recover all the data lost before
327      */
328     @VisibleForTesting
329     protected synchronized void resetListener() {
330         requireNonNull(this.listenerRegistration, "Listener on topology " + this + " hasn't been initialized.");
331         LOG.debug("Resetting data change listener for topology builder {}", getInstanceIdentifier());
332         // unregister current listener to prevent incoming data tree change first
333         unregisterDataChangeListener();
334         // create new transaction chain to reset the chain status
335         resetTransactionChain();
336         // reset the operational topology data so that we can have clean status
337         destroyOperationalTopology();
338         initOperationalTopology();
339         // re-register the data change listener to reset the operational topology
340         // we are expecting to receive all the pre-exist route change on the next onDataTreeChanged() call
341         registerDataChangeListener();
342     }
343
344     /**
345      * Reset the transaction chain only so that the PingPong transaction chain will become usable again.
346      * However, there will be data loss if we do not apply the previous failed transaction again
347      */
348     @VisibleForTesting
349     protected synchronized void resetTransactionChain() {
350         LOG.debug("Resetting transaction chain for topology builder {}", getInstanceIdentifier());
351         destroyTransactionChain();
352         initTransactionChain();
353     }
354
355     /**
356      * There are a few reasons we want to schedule a listener restart in a delayed manner:
357      * 1. we should avoid restarting the listener as when the topology is big, there might be huge overhead
358      * rebuilding the whole linkstate topology again and again
359      * 2. the #onTransactionChainFailed() normally get invoked after a delay. During that time gap, more
360      * data changes might still be pushed to #onDataTreeChanged(). And because #onTransactionChainFailed()
361      * is not invoked yet, listener restart/transaction chain restart is not done. Thus the new changes
362      * will still cause error and another #onTransactionChainFailed() might be invoked later. The listener
363      * will be restarted again in that case, which is unexpected. Restarting of transaction chain only introduce
364      * little overhead and it's okay to be restarted within a small time window.
365      * Note: when the listener is restarted, we can disregard all the incoming data changes before the restart is
366      * done, as after the listener unregister/reregister, the first #onDataTreeChanged() call will contain the a
367      * complete set of existing changes
368      *
369      * @return if the listener get restarted, return true; otherwise false
370      */
371     @VisibleForTesting
372     protected synchronized boolean restartTransactionChainOnDemand() {
373         if (this.listenerScheduledRestartTime > 0) {
374             // when the #this.listenerScheduledRestartTime timer timed out we can reset the listener,
375             // otherwise we should only reset the transaction chain
376             if (System.currentTimeMillis() > this.listenerScheduledRestartTime) {
377                 // reset the the restart timer
378                 this.listenerScheduledRestartTime = 0;
379                 this.listenerScheduledRestartEnforceCounter = 0;
380                 resetListener();
381                 return true;
382             }
383
384             resetTransactionChain();
385         }
386         return false;
387     }
388
389     @VisibleForTesting
390     protected synchronized void scheduleListenerRestart() {
391         if (0 == this.listenerScheduledRestartTime) {
392             this.listenerScheduledRestartTime = System.currentTimeMillis() + this.listenerResetLimitInMillsec;
393         } else if (System.currentTimeMillis() > this.listenerScheduledRestartTime
394             && ++this.listenerScheduledRestartEnforceCounter < this.listenerResetEnforceCounter) {
395             // if the transaction failure happens again, we will delay the listener restart up to
396             // #LISTENER_RESET_LIMIT_IN_MILLSEC times
397             this.listenerScheduledRestartTime += this.listenerResetLimitInMillsec;
398         }
399         LOG.debug("A listener restart was scheduled at {} (current system time is {})",
400                 this.listenerScheduledRestartTime, System.currentTimeMillis());
401     }
402
403     @Override
404     public final synchronized void onTransactionChainFailed(final TransactionChain transactionChain,
405             final Transaction transaction, final Throwable cause) {
406         LOG.error("Topology builder for {} failed in transaction {}.", getInstanceIdentifier(),
407                 transaction != null ? transaction.getIdentifier() : null, cause);
408         scheduleListenerRestart();
409         restartTransactionChainOnDemand();
410     }
411
412     @Override
413     public final void onTransactionChainSuccessful(final TransactionChain transactionChain) {
414         LOG.info("Topology builder for {} shut down", getInstanceIdentifier());
415     }
416 }