de1070ab8344a34c251a9c5103d62bccc8f8bbab
[bgpcep.git] / bgp / topology-provider / src / main / java / org / opendaylight / bgpcep / bgp / topology / provider / AbstractTopologyBuilder.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.bgpcep.bgp.topology.provider;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.concurrent.atomic.AtomicBoolean;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.bgpcep.topology.TopologyReference;
19 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
20 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
23 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
24 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
25 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
26 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
32 import org.opendaylight.protocol.bgp.rib.RibReference;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.Route;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.rib.LocRib;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.Tables;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.TablesKey;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.AddressFamily;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.SubsequentAddressFamily;
39 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
40 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
41 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
42 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
43 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
44 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
45 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
46 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
47 import org.opendaylight.yangtools.concepts.ListenerRegistration;
48 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 public abstract class AbstractTopologyBuilder<T extends Route> implements AutoCloseable, ClusteredDataTreeChangeListener<T>, TopologyReference, TransactionChainListener {
53     private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologyBuilder.class);
54     // we limit the listener reset interval to be 5 min at most
55     private static final long LISTENER_RESET_LIMIT_IN_MILLSEC = 5 * 60 * 1000;
56     private static final int LISTENER_RESET_ENFORCE_COUNTER = 3;
57     private final InstanceIdentifier<Topology> topology;
58     private final RibReference locRibReference;
59     private final DataBroker dataProvider;
60     private final Class<? extends AddressFamily> afi;
61     private final Class<? extends SubsequentAddressFamily> safi;
62     private final TopologyKey topologyKey;
63     private final TopologyTypes topologyTypes;
64     private final long listenerResetLimitInMillsec;
65     private final int listenerResetEnforceCounter;
66
67     @GuardedBy("this")
68     private ListenerRegistration<AbstractTopologyBuilder<T>> listenerRegistration = null;
69     @GuardedBy("this")
70     private BindingTransactionChain chain = null;
71     @GuardedBy("this")
72     private boolean closed = false;
73     @GuardedBy("this")
74     @VisibleForTesting
75     protected long listenerScheduledRestartTime = 0;
76     @GuardedBy("this")
77     @VisibleForTesting
78     protected int listenerScheduledRestartEnforceCounter = 0;
79
80     protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
81             final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
82         final Class<? extends SubsequentAddressFamily> safi, final long listenerResetLimitInMillsec,
83         final int listenerResetEnforceCounter) {
84         this.dataProvider = dataProvider;
85         this.locRibReference = Preconditions.checkNotNull(locRibReference);
86         this.topologyKey = new TopologyKey(Preconditions.checkNotNull(topologyId));
87         this.topologyTypes = types;
88         this.afi = afi;
89         this.safi = safi;
90         this.listenerResetLimitInMillsec = listenerResetLimitInMillsec;
91         this.listenerResetEnforceCounter = listenerResetEnforceCounter;
92         this.topology = InstanceIdentifier.builder(NetworkTopology.class).child(Topology.class, this.topologyKey).build();
93     }
94
95     protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
96         final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
97         final Class<? extends SubsequentAddressFamily> safi) {
98         this(dataProvider, locRibReference, topologyId, types, afi, safi, LISTENER_RESET_LIMIT_IN_MILLSEC, LISTENER_RESET_ENFORCE_COUNTER);
99     }
100
101     public final synchronized void start() {
102         LOG.debug("Initiating topology builder from {} at {}. AFI={}, SAFI={}", this.locRibReference, this.topology, this.afi, this.safi);
103         initTransactionChain();
104         initOperationalTopology();
105         registerDataChangeListener();
106     }
107
108     @Deprecated
109     public final InstanceIdentifier<Tables> tableInstanceIdentifier(final Class<? extends AddressFamily> afi,
110             final Class<? extends SubsequentAddressFamily> safi) {
111         return this.locRibReference.getInstanceIdentifier().builder().child(LocRib.class).child(Tables.class, new TablesKey(afi, safi)).build();
112     }
113
114     /**
115      * Register to data tree change listener
116      */
117     private synchronized void registerDataChangeListener() {
118         Preconditions.checkState(this.listenerRegistration == null, "Topology Listener on topology %s has been registered before.", this.getInstanceIdentifier());
119         final InstanceIdentifier<Tables> tablesId = this.locRibReference.getInstanceIdentifier().child(LocRib.class).child(Tables.class, new TablesKey(this.afi, this.safi));
120         final DataTreeIdentifier<T> id = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, getRouteWildcard(tablesId));
121
122         this.listenerRegistration = this.dataProvider.registerDataTreeChangeListener(id, this);
123         LOG.debug("Registered listener {} on topology {}. Timestamp={}", this, this.getInstanceIdentifier(), this.listenerScheduledRestartTime);
124     }
125
126     /**
127      * Unregister to data tree change listener
128      */
129     private final synchronized void unregisterDataChangeListener() {
130         if (this.listenerRegistration != null) {
131             LOG.debug("Unregistered listener {} on topology {}", this, this.getInstanceIdentifier());
132             this.listenerRegistration.close();
133             this.listenerRegistration = null;
134         }
135     }
136
137     protected abstract InstanceIdentifier<T> getRouteWildcard(InstanceIdentifier<Tables> tablesId);
138
139     protected abstract void createObject(ReadWriteTransaction trans, InstanceIdentifier<T> id, T value);
140
141     protected abstract void removeObject(ReadWriteTransaction trans, InstanceIdentifier<T> id, T value);
142
143     protected abstract void clearTopology();
144
145     @Override
146     public final InstanceIdentifier<Topology> getInstanceIdentifier() {
147         return this.topology;
148     }
149
150     @Override
151     public final synchronized void close() {
152         if (this.closed) {
153             LOG.trace("Transaction chain was already closed.");
154             return;
155         }
156         this.closed = true;
157         LOG.info("Shutting down builder for {}", getInstanceIdentifier());
158         unregisterDataChangeListener();
159         destroyOperationalTopology();
160         destroyTransactionChain();
161     }
162
163     @Override
164     public synchronized void onDataTreeChanged(final Collection<DataTreeModification<T>> changes) {
165         if (this.closed) {
166             LOG.trace("Transaction chain was already closed, skipping update.");
167             return;
168         }
169         // check if the transaction chain needed to be restarted due to a previous error
170         if (restartTransactionChainOnDemand()) {
171             LOG.debug("The data change {} is disregarded due to restart of listener {}", changes, this);
172             return;
173         }
174         final ReadWriteTransaction trans = this.chain.newReadWriteTransaction();
175         LOG.debug("Received data change {} event with transaction {}", changes, trans.getIdentifier());
176         final AtomicBoolean transactionInError = new AtomicBoolean(false);
177         for (final DataTreeModification<T> change : changes) {
178             try {
179                 routeChanged(change, trans);
180             } catch (final RuntimeException e) {
181                 LOG.warn("Data change {} (transaction {}) was not completely propagated to listener {}", change, trans.getIdentifier(), this, e);
182                 // trans.cancel() is not supported by PingPongTransactionChain, so we just skip the problematic change
183                 // trans.submit() must be called first to unlock the current transaction chain, to make the chain closable
184                 // so we cannot exit the #onDataTreeChanged() yet
185                 transactionInError.set(true);
186                 break;
187             }
188         }
189         Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
190             @Override
191             public void onSuccess(final Void result) {
192                 // as we are enforcing trans.submit(), in some cases the transaction execution actually could be successfully even when an
193                 // exception is captured, thus #onTransactionChainFailed() never get invoked. Though the transaction chain remains usable,
194                 // the data loss will not be able to be recovered. Thus we schedule a listener restart here
195                 if (transactionInError.get()) {
196                     LOG.warn("Transaction {} committed successfully while exception captured. Rescheduling a restart of listener {}", trans
197                         .getIdentifier(), AbstractTopologyBuilder.this);
198                     scheduleListenerRestart();
199                 } else {
200                     LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
201                 }
202             }
203
204             @Override
205             public void onFailure(final Throwable t) {
206                 // we do nothing but print out the log. Transaction chain restart will be done in #onTransactionChainFailed()
207                 LOG.error("Failed to propagate change (transaction {}) by listener {}", trans.getIdentifier(), AbstractTopologyBuilder.this, t);
208             }
209         });
210     }
211
212     @VisibleForTesting
213     protected void routeChanged(final DataTreeModification<T> change, final ReadWriteTransaction trans) {
214         final DataObjectModification<T> root = change.getRootNode();
215         switch (root.getModificationType()) {
216         case DELETE:
217             removeObject(trans, change.getRootPath().getRootIdentifier(), root.getDataBefore());
218             break;
219         case SUBTREE_MODIFIED:
220         case WRITE:
221             if (root.getDataBefore() != null) {
222                 removeObject(trans, change.getRootPath().getRootIdentifier(), root.getDataBefore());
223             }
224             createObject(trans, change.getRootPath().getRootIdentifier(), root.getDataAfter());
225             break;
226         default:
227             throw new IllegalArgumentException("Unhandled modification type " + root.getModificationType());
228         }
229     }
230
231     private synchronized void initOperationalTopology() {
232         Preconditions.checkNotNull(this.chain, "A valid transaction chain must be provided.");
233         final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
234         trans.put(LogicalDatastoreType.OPERATIONAL, this.topology,
235             new TopologyBuilder().setKey(this.topologyKey).setServerProvided(Boolean.TRUE).setTopologyTypes(this.topologyTypes)
236                                  .setLink(Collections.<Link>emptyList()).setNode(Collections.<Node>emptyList()).build(), true);
237         Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
238             @Override
239             public void onSuccess(final Void result) {
240                 LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
241             }
242
243             @Override
244             public void onFailure(final Throwable t) {
245                 LOG.error("Failed to initialize topology {} (transaction {}) by listener {}", AbstractTopologyBuilder.this.topology,
246                     trans.getIdentifier(), AbstractTopologyBuilder.this, t);
247             }
248         });
249     }
250
251     /**
252      * Destroy the current operational topology data. Note a valid transaction must be provided
253      * @throws TransactionCommitFailedException
254      */
255     private synchronized void destroyOperationalTopology() {
256         Preconditions.checkNotNull(this.chain, "A valid transaction chain must be provided.");
257         final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
258         trans.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
259         try {
260             trans.submit().checkedGet();
261         } catch (final TransactionCommitFailedException e) {
262             LOG.error("Unable to reset operational topology {} (transaction {})", this.topology, trans.getIdentifier(), e);
263         }
264         clearTopology();
265     }
266
267     /**
268      * Reset a transaction chain by closing the current chain and starting a new one
269      */
270     private synchronized void initTransactionChain() {
271         LOG.debug("Initializing transaction chain for topology {}", this);
272         Preconditions.checkState(this.chain == null, "Transaction chain has to be closed before being initialized");
273         this.chain = this.dataProvider.createTransactionChain(this);
274     }
275
276     /**
277      * Destroy the current transaction chain
278      */
279     private synchronized void destroyTransactionChain() {
280         if (this.chain != null) {
281             LOG.debug("Destroy transaction chain for topology {}", this);
282             // we cannot close the transaction chain, as it will close the AbstractDOMForwardedTransactionFactory
283             // and the transaction factory cannot be reopen even if we recreate the transaction chain
284             // so we abandon the chain directly
285             // FIXME we want to close the transaction chain gracefully once the PingPongTransactionChain get improved
286             // and the above problem get resolved.
287 //            try {
288 //                this.chain.close();
289 //            } catch (Exception e) {
290 //                // the close() may not succeed when the transaction chain is locked
291 //                LOG.error("Unable to close transaction chain {} for topology builder {}", this.chain, getInstanceIdentifier());
292 //            }
293             this.chain = null;
294         }
295     }
296
297     /**
298      * Reset the data change listener to its initial status
299      * By resetting the listener we will be able to recover all the data lost before
300      */
301     @VisibleForTesting
302     protected synchronized void resetListener() {
303         Preconditions.checkNotNull(this.listenerRegistration, "Listener on topology %s hasn't been initialized.", this);
304         LOG.debug("Resetting data change listener for topology builder {}", getInstanceIdentifier());
305         // unregister current listener to prevent incoming data tree change first
306         unregisterDataChangeListener();
307         // create new transaction chain to reset the chain status
308         resetTransactionChain();
309         // reset the operational topology data so that we can have clean status
310         destroyOperationalTopology();
311         initOperationalTopology();
312         // re-register the data change listener to reset the operational topology
313         // we are expecting to receive all the pre-exist route change on the next onDataTreeChanged() call
314         registerDataChangeListener();
315     }
316
317     /**
318      * Reset the transaction chain only so that the PingPong transaction chain will become usable again.
319      * However, there will be data loss if we do not apply the previous failed transaction again
320      */
321     @VisibleForTesting
322     protected synchronized void resetTransactionChain() {
323         LOG.debug("Resetting transaction chain for topology builder {}", getInstanceIdentifier());
324         destroyTransactionChain();
325         initTransactionChain();
326     }
327
328     /**
329      * There are a few reasons we want to schedule a listener restart in a delayed manner:
330      * 1. we should avoid restarting the listener as when the topology is big, there might be huge overhead
331      *    rebuilding the whole linkstate topology again and again
332      * 2. the #onTransactionChainFailed() normally get invoked after a delay. During that time gap, more
333      *    data changes might still be pushed to #onDataTreeChanged(). And because #onTransactionChainFailed()
334      *    is not invoked yet, listener restart/transaction chain restart is not done. Thus the new changes
335      *    will still cause error and another #onTransactionChainFailed() might be invoked later. The listener
336      *    will be restarted again in that case, which is unexpected. Restarting of transaction chain only introduce
337      *    little overhead and it's okay to be restarted within a small time window
338      *
339      * Note: when the listener is restarted, we can disregard all the incoming data changes before the restart is
340      * done, as after the listener unregister/reregister, the first #onDataTreeChanged() call will contain the a
341      * complete set of existing changes
342      *
343      * @return if the listener get restarted, return true; otherwise false
344      */
345     @VisibleForTesting
346     protected synchronized boolean restartTransactionChainOnDemand() {
347         if (this.listenerScheduledRestartTime > 0) {
348             // when the #this.listenerScheduledRestartTime timer timed out we can reset the listener, otherwise we should only reset the transaction chain
349             if (System.currentTimeMillis() > this.listenerScheduledRestartTime) {
350                 // reset the the restart timer
351                 this.listenerScheduledRestartTime = 0;
352                 this.listenerScheduledRestartEnforceCounter = 0;
353                 resetListener();
354                 return true;
355             } else {
356                 resetTransactionChain();
357             }
358         }
359         return false;
360     }
361
362     @VisibleForTesting
363     protected synchronized void scheduleListenerRestart() {
364         if (0 == this.listenerScheduledRestartTime) {
365             this.listenerScheduledRestartTime = System.currentTimeMillis() + this.listenerResetLimitInMillsec;
366         } else if (System.currentTimeMillis() > this.listenerScheduledRestartTime
367             && ++this.listenerScheduledRestartEnforceCounter < this.listenerResetEnforceCounter) {
368             // if the transaction failure happens again, we will delay the listener restart up to #LISTENER_RESET_LIMIT_IN_MILLSEC times
369             this.listenerScheduledRestartTime += this.listenerResetLimitInMillsec;
370         }
371         LOG.debug("A listener restart was scheduled at {} (current system time is {})", this.listenerScheduledRestartTime, System.currentTimeMillis());
372     }
373
374     @Override
375     public final synchronized void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction, final Throwable cause) {
376         LOG.error("Topology builder for {} failed in transaction {}.", getInstanceIdentifier(), transaction != null ? transaction.getIdentifier() : null, cause);
377         scheduleListenerRestart();
378         restartTransactionChainOnDemand();
379     }
380
381     @Override
382     public final void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
383         LOG.info("Topology builder for {} shut down", getInstanceIdentifier());
384     }
385 }