import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpPrefix;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.path.attributes.Attributes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.Route;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.AddressFamily;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.SubsequentAddressFamily;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.next.hop.CNextHop;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.next.hop.c.next.hop.Ipv4NextHopCase;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.next.hop.c.next.hop.Ipv6NextHopCase;
}
protected AbstractReachabilityTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
- final TopologyId topologyId, final TopologyTypes topologyTypes) {
- super(dataProvider, locRibReference, topologyId, topologyTypes);
+ final TopologyId topologyId, final TopologyTypes topologyTypes, final Class<? extends AddressFamily> afi,
+ final Class<? extends SubsequentAddressFamily> safi) {
+ super(dataProvider, locRibReference, topologyId, topologyTypes, afi, safi);
}
private NodeId advertizingNode(final Attributes attrs) {
*/
package org.opendaylight.bgpcep.bgp.topology.provider;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.util.Collection;
import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.bgpcep.topology.TopologyReference;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeService;
import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
public abstract class AbstractTopologyBuilder<T extends Route> implements AutoCloseable, ClusteredDataTreeChangeListener<T>, TopologyReference, TransactionChainListener {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTopologyBuilder.class);
+ // we limit the listener reset interval to be 5 min at most
+ private static final long LISTENER_RESET_LIMIT_IN_MILLSEC = 5 * 60 * 1000;
+ private static final int LISTENER_RESET_ENFORCE_COUNTER = 3;
private final InstanceIdentifier<Topology> topology;
- private final BindingTransactionChain chain;
private final RibReference locRibReference;
+ private final DataBroker dataProvider;
+ private final Class<? extends AddressFamily> afi;
+ private final Class<? extends SubsequentAddressFamily> safi;
+ private final TopologyKey topologyKey;
+ private final TopologyTypes topologyTypes;
+ private final long listenerResetLimitInMillsec;
+ private final int listenerResetEnforceCounter;
+ @GuardedBy("this")
+ private ListenerRegistration<AbstractTopologyBuilder<T>> listenerRegistration = null;
+ @GuardedBy("this")
+ private BindingTransactionChain chain = null;
@GuardedBy("this")
private boolean closed = false;
+ @GuardedBy("this")
+ @VisibleForTesting
+ protected long listenerScheduledRestartTime = 0;
+ @GuardedBy("this")
+ @VisibleForTesting
+ protected int listenerScheduledRestartEnforceCounter = 0;
protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
- final TopologyId topologyId, final TopologyTypes types) {
+ final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
+ final Class<? extends SubsequentAddressFamily> safi, final long listenerResetLimitInMillsec,
+ final int listenerResetEnforceCounter) {
+ this.dataProvider = dataProvider;
this.locRibReference = Preconditions.checkNotNull(locRibReference);
- this.chain = dataProvider.createTransactionChain(this);
+ this.topologyKey = new TopologyKey(Preconditions.checkNotNull(topologyId));
+ this.topologyTypes = types;
+ this.afi = afi;
+ this.safi = safi;
+ this.listenerResetLimitInMillsec = listenerResetLimitInMillsec;
+ this.listenerResetEnforceCounter = listenerResetEnforceCounter;
+ this.topology = InstanceIdentifier.builder(NetworkTopology.class).child(Topology.class, this.topologyKey).build();
- final TopologyKey tk = new TopologyKey(Preconditions.checkNotNull(topologyId));
- this.topology = InstanceIdentifier.builder(NetworkTopology.class).child(Topology.class, tk).build();
-
- LOG.debug("Initiating topology builder from {} at {}", locRibReference, this.topology);
-
- final WriteTransaction t = this.chain.newWriteOnlyTransaction();
-
- t.put(LogicalDatastoreType.OPERATIONAL, this.topology,
- new TopologyBuilder().setKey(tk).setServerProvided(Boolean.TRUE).setTopologyTypes(types)
- .setLink(Collections.<Link>emptyList()).setNode(Collections.<Node>emptyList()).build(), true);
- Futures.addCallback(t.submit(), new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- LOG.trace("Transaction {} committed successfully", t.getIdentifier());
- }
+ LOG.debug("Initiating topology builder from {} at {}. AFI={}, SAFI={}", locRibReference, this.topology, this.afi, this.safi);
+ initTransactionChain();
+ initOperationalTopology();
+ registerDataChangeListener();
+ }
- @Override
- public void onFailure(final Throwable t) {
- LOG.error("Failed to initiate topology {} by listener {}", AbstractTopologyBuilder.this.topology,
- AbstractTopologyBuilder.this, t);
- }
- });
+ protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
+ final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
+ final Class<? extends SubsequentAddressFamily> safi) {
+ this(dataProvider, locRibReference, topologyId, types, afi, safi, LISTENER_RESET_LIMIT_IN_MILLSEC, LISTENER_RESET_ENFORCE_COUNTER);
}
@Deprecated
return this.locRibReference.getInstanceIdentifier().builder().child(LocRib.class).child(Tables.class, new TablesKey(afi, safi)).build();
}
- public final ListenerRegistration<AbstractTopologyBuilder<T>> start(final DataTreeChangeService service, final Class<? extends AddressFamily> afi,
- final Class<? extends SubsequentAddressFamily> safi) {
- final InstanceIdentifier<Tables> tablesId = this.locRibReference.getInstanceIdentifier().child(LocRib.class).child(Tables.class, new TablesKey(afi, safi));
+ /**
+ * Register to data tree change listener
+ */
+ private synchronized void registerDataChangeListener() {
+ Preconditions.checkState(this.listenerRegistration == null, "Topology Listener on topology %s has been registered before.", this.getInstanceIdentifier());
+ final InstanceIdentifier<Tables> tablesId = this.locRibReference.getInstanceIdentifier().child(LocRib.class).child(Tables.class, new TablesKey(this.afi, this.safi));
final DataTreeIdentifier<T> id = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, getRouteWildcard(tablesId));
- return service.registerDataTreeChangeListener(id, this);
+ this.listenerRegistration = this.dataProvider.registerDataTreeChangeListener(id, this);
+ LOG.debug("Registered listener {} on topology {}. Timestamp={}", this, this.getInstanceIdentifier(), this.listenerScheduledRestartTime);
+ }
+
+ /**
+ * Unregister to data tree change listener
+ */
+ private final synchronized void unregisterDataChangeListener() {
+ if (this.listenerRegistration != null) {
+ LOG.debug("Unregistered listener {} on topology {}", this, this.getInstanceIdentifier());
+ this.listenerRegistration.close();
+ this.listenerRegistration = null;
+ }
}
protected abstract InstanceIdentifier<T> getRouteWildcard(InstanceIdentifier<Tables> tablesId);
@Override
public final synchronized void close() throws TransactionCommitFailedException {
- LOG.info("Shutting down builder for {}", getInstanceIdentifier());
- final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
- trans.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
- trans.submit().checkedGet();
- this.chain.close();
+ if (this.closed) {
+ LOG.trace("Transaction chain was already closed.");
+ return;
+ }
this.closed = true;
+ LOG.info("Shutting down builder for {}", getInstanceIdentifier());
+ unregisterDataChangeListener();
+ destroyOperationalTopology();
+ destroyTransactionChain();
}
@Override
LOG.trace("Transaction chain was already closed, skipping update.");
return;
}
+ // check if the transaction chain needed to be restarted due to a previous error
+ if (restartTransactionChainOnDemand()) {
+ LOG.debug("The data change {} is disregarded due to restart of listener {}", changes, this);
+ return;
+ }
final ReadWriteTransaction trans = this.chain.newReadWriteTransaction();
LOG.debug("Received data change {} event with transaction {}", changes, trans.getIdentifier());
+ final AtomicBoolean transactionInError = new AtomicBoolean(false);
for (final DataTreeModification<T> change : changes) {
try {
routeChanged(change, trans);
} catch (final RuntimeException e) {
- LOG.warn("Data change {} was not completely propagated to listener {}, aborting", change, this, e);
- trans.cancel();
- return;
+ LOG.warn("Data change {} (transaction {}) was not completely propagated to listener {}", change, trans.getIdentifier(), this, e);
+ // trans.cancel() is not supported by PingPongTransactionChain, so we just skip the problematic change
+ // trans.submit() must be called first to unlock the current transaction chain, to make the chain closable
+ // so we cannot exit the #onDataTreeChanged() yet
+ transactionInError.set(true);
+ break;
}
}
Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
+ // as we are enforcing trans.submit(), in some cases the transaction execution actually could be successfully even when an
+ // exception is captured, thus #onTransactionChainFailed() never get invoked. Though the transaction chain remains usable,
+ // the data loss will not be able to be recovered. Thus we schedule a listener restart here
+ if (transactionInError.get()) {
+ LOG.warn("Transaction {} committed successfully while exception captured. Rescheduling a restart of listener {}", trans
+ .getIdentifier(), AbstractTopologyBuilder.this);
+ scheduleListenerRestart();
+ } else {
+ LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
+ }
}
@Override
public void onFailure(final Throwable t) {
- LOG.error("Failed to propagate change by listener {}", AbstractTopologyBuilder.this, t);
+ // we do nothing but print out the log. Transaction chain restart will be done in #onTransactionChainFailed()
+ LOG.error("Failed to propagate change (transaction {}) by listener {}", trans.getIdentifier(), AbstractTopologyBuilder.this, t);
}
});
}
- private void routeChanged(final DataTreeModification<T> change, final ReadWriteTransaction trans) {
+ @VisibleForTesting
+ protected void routeChanged(final DataTreeModification<T> change, final ReadWriteTransaction trans) {
final DataObjectModification<T> root = change.getRootNode();
switch (root.getModificationType()) {
case DELETE:
}
}
+ private synchronized void initOperationalTopology() {
+ Preconditions.checkNotNull(this.chain, "A valid transaction chain must be provided.");
+ final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
+ trans.put(LogicalDatastoreType.OPERATIONAL, this.topology,
+ new TopologyBuilder().setKey(this.topologyKey).setServerProvided(Boolean.TRUE).setTopologyTypes(this.topologyTypes)
+ .setLink(Collections.<Link>emptyList()).setNode(Collections.<Node>emptyList()).build(), true);
+ Futures.addCallback(trans.submit(), new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.error("Failed to initialize topology {} (transaction {}) by listener {}", AbstractTopologyBuilder.this.topology,
+ trans.getIdentifier(), AbstractTopologyBuilder.this, t);
+ }
+ });
+ }
+
+ /**
+ * Destroy the current operational topology data. Note a valid transaction must be provided
+ * @throws TransactionCommitFailedException
+ */
+ private synchronized void destroyOperationalTopology() {
+ Preconditions.checkNotNull(this.chain, "A valid transaction chain must be provided.");
+ final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
+ trans.delete(LogicalDatastoreType.OPERATIONAL, getInstanceIdentifier());
+ try {
+ trans.submit().checkedGet();
+ } catch (TransactionCommitFailedException e) {
+ LOG.error("Unable to reset operational topology {} (transaction {})", this.topology, trans.getIdentifier(), e);
+ }
+ }
+
+ /**
+ * Reset a transaction chain by closing the current chain and starting a new one
+ */
+ private synchronized void initTransactionChain() {
+ LOG.debug("Initializing transaction chain for topology {}", this);
+ Preconditions.checkState(this.chain == null, "Transaction chain has to be closed before being initialized");
+ this.chain = this.dataProvider.createTransactionChain(this);
+ }
+
+ /**
+ * Destroy the current transaction chain
+ */
+ private synchronized void destroyTransactionChain() {
+ if (this.chain != null) {
+ LOG.debug("Destroy transaction chain for topology {}", this);
+ // we cannot close the transaction chain, as it will close the AbstractDOMForwardedTransactionFactory
+ // and the transaction factory cannot be reopen even if we recreate the transaction chain
+ // so we abandon the chain directly
+ // FIXME we want to close the transaction chain gracefully once the PingPongTransactionChain get improved
+ // and the above problem get resolved.
+// try {
+// this.chain.close();
+// } catch (Exception e) {
+// // the close() may not succeed when the transaction chain is locked
+// LOG.error("Unable to close transaction chain {} for topology builder {}", this.chain, getInstanceIdentifier());
+// }
+ this.chain = null;
+ }
+ }
+
+ /**
+ * Reset the data change listener to its initial status
+ * By resetting the listener we will be able to recover all the data lost before
+ */
+ @VisibleForTesting
+ protected synchronized void resetListener() {
+ Preconditions.checkNotNull(this.listenerRegistration, "Listener on topology %s hasn't been initialized.", this);
+ LOG.debug("Resetting data change listener for topology builder {}", getInstanceIdentifier());
+ // unregister current listener to prevent incoming data tree change first
+ unregisterDataChangeListener();
+ // create new transaction chain to reset the chain status
+ resetTransactionChain();
+ // reset the operational topology data so that we can have clean status
+ destroyOperationalTopology();
+ initOperationalTopology();
+ // re-register the data change listener to reset the operational topology
+ // we are expecting to receive all the pre-exist route change on the next onDataTreeChanged() call
+ registerDataChangeListener();
+ }
+
+ /**
+ * Reset the transaction chain only so that the PingPong transaction chain will become usable again.
+ * However, there will be data loss if we do not apply the previous failed transaction again
+ */
+ @VisibleForTesting
+ protected synchronized void resetTransactionChain() {
+ LOG.debug("Resetting transaction chain for topology builder {}", getInstanceIdentifier());
+ destroyTransactionChain();
+ initTransactionChain();
+ }
+
+ /**
+ * There are a few reasons we want to schedule a listener restart in a delayed manner:
+ * 1. we should avoid restarting the listener as when the topology is big, there might be huge overhead
+ * rebuilding the whole linkstate topology again and again
+ * 2. the #onTransactionChainFailed() normally get invoked after a delay. During that time gap, more
+ * data changes might still be pushed to #onDataTreeChanged(). And because #onTransactionChainFailed()
+ * is not invoked yet, listener restart/transaction chain restart is not done. Thus the new changes
+ * will still cause error and another #onTransactionChainFailed() might be invoked later. The listener
+ * will be restarted again in that case, which is unexpected. Restarting of transaction chain only introduce
+ * little overhead and it's okay to be restarted within a small time window
+ *
+ * Note: when the listener is restarted, we can disregard all the incoming data changes before the restart is
+ * done, as after the listener unregister/reregister, the first #onDataTreeChanged() call will contain the a
+ * complete set of existing changes
+ *
+ * @return if the listener get restarted, return true; otherwise false
+ */
+ @VisibleForTesting
+ protected synchronized boolean restartTransactionChainOnDemand() {
+ if (this.listenerScheduledRestartTime > 0) {
+ // when the #this.listenerScheduledRestartTime timer timed out we can reset the listener, otherwise we should only reset the transaction chain
+ if (System.currentTimeMillis() > this.listenerScheduledRestartTime) {
+ // reset the the restart timer
+ this.listenerScheduledRestartTime = 0;
+ this.listenerScheduledRestartEnforceCounter = 0;
+ resetListener();
+ return true;
+ } else {
+ resetTransactionChain();
+ }
+ }
+ return false;
+ }
+
+ @VisibleForTesting
+ protected synchronized void scheduleListenerRestart() {
+ if (0 == this.listenerScheduledRestartTime) {
+ this.listenerScheduledRestartTime = System.currentTimeMillis() + this.listenerResetLimitInMillsec;
+ } else if (System.currentTimeMillis() > this.listenerScheduledRestartTime
+ && ++this.listenerScheduledRestartEnforceCounter < this.listenerResetEnforceCounter) {
+ // if the transaction failure happens again, we will delay the listener restart up to #LISTENER_RESET_LIMIT_IN_MILLSEC times
+ this.listenerScheduledRestartTime += this.listenerResetLimitInMillsec;
+ }
+ LOG.debug("A listener restart was scheduled at {} (current system time is {})", this.listenerScheduledRestartTime, System.currentTimeMillis());
+ }
+
@Override
- public final void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction, final Throwable cause) {
- // TODO: restart?
- LOG.error("Topology builder for {} failed in transaction {}", getInstanceIdentifier(), transaction != null ? transaction.getIdentifier() : null, cause);
+ public final synchronized void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+ LOG.error("Topology builder for {} failed in transaction {}.", getInstanceIdentifier(), transaction != null ? transaction.getIdentifier() : null, cause);
+ scheduleListenerRestart();
+ restartTransactionChainOnDemand();
}
@Override
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.ipv4.routes.ipv4.routes.Ipv4Route;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.path.attributes.Attributes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.Tables;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.Ipv4AddressFamily;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.UnicastSubsequentAddressFamily;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.bgp.topology.types.rev160524.TopologyTypes1;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.bgp.topology.types.rev160524.TopologyTypes1Builder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.bgp.topology.types.rev160524.bgp.ipv4.reachability.topology.type.BgpIpv4ReachabilityTopologyBuilder;
public Ipv4ReachabilityTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
final TopologyId topologyId) {
- super(dataProvider, locRibReference, topologyId, IPV4_TOPOLOGY_TYPE);
+ super(dataProvider, locRibReference, topologyId, IPV4_TOPOLOGY_TYPE, Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
}
@Override
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.ipv6.routes.ipv6.routes.Ipv6Route;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.path.attributes.Attributes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.Tables;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.Ipv6AddressFamily;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.UnicastSubsequentAddressFamily;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.bgp.topology.types.rev160524.TopologyTypes1;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.bgp.topology.types.rev160524.TopologyTypes1Builder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.bgp.topology.types.rev160524.bgp.ipv6.reachability.topology.type.BgpIpv6ReachabilityTopologyBuilder;
public Ipv6ReachabilityTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
final TopologyId topologyId) {
- super(dataProvider, locRibReference, topologyId, IPV6_TOPOLOGY_TYPE);
+ super(dataProvider, locRibReference, topologyId, IPV6_TOPOLOGY_TYPE, Ipv6AddressFamily.class, UnicastSubsequentAddressFamily.class);
}
@Override
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.linkstate.rev150210.Ipv4InterfaceIdentifier;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.linkstate.rev150210.Ipv6InterfaceIdentifier;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.linkstate.rev150210.IsisAreaIdentifier;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.linkstate.rev150210.LinkstateAddressFamily;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.linkstate.rev150210.LinkstateSubsequentAddressFamily;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.linkstate.rev150210.NodeFlagBits;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.linkstate.rev150210.NodeIdentifier;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.linkstate.rev150210.TopologyIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class LinkstateTopologyBuilder extends AbstractTopologyBuilder<LinkstateRoute> {
+public class LinkstateTopologyBuilder extends AbstractTopologyBuilder<LinkstateRoute> {
private static final TopologyTypes LINKSTATE_TOPOLOGY_TYPE = new TopologyTypesBuilder()
.addAugmentation(TopologyTypes1.class,
new TopologyTypes1Builder()
private final Map<NodeId, NodeHolder> nodes = new HashMap<>();
public LinkstateTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference, final TopologyId topologyId) {
- super(dataProvider, locRibReference, topologyId, LINKSTATE_TOPOLOGY_TYPE);
+ super(dataProvider, locRibReference, topologyId, LINKSTATE_TOPOLOGY_TYPE, LinkstateAddressFamily.class, LinkstateSubsequentAddressFamily.class);
+ }
+
+ public LinkstateTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
+ final TopologyId topologyId, final long listenerResetLimitInMillsec, final int listenerResetEnforceCounter) {
+ super(dataProvider, locRibReference, topologyId, LINKSTATE_TOPOLOGY_TYPE, LinkstateAddressFamily.class, LinkstateSubsequentAddressFamily.class,
+ listenerResetLimitInMillsec, listenerResetEnforceCounter);
}
private static LinkId buildLinkId(final UriBuilder base, final LinkCase link) {
import org.opendaylight.bgpcep.bgp.topology.provider.Ipv4ReachabilityTopologyBuilder;
import org.opendaylight.bgpcep.topology.DefaultTopologyReference;
import org.opendaylight.controller.config.api.JmxAttributeValidationException;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeService;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.Ipv4AddressFamily;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.UnicastSubsequentAddressFamily;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Override
public java.lang.AutoCloseable createInstance() {
- final Ipv4ReachabilityTopologyBuilder b = new Ipv4ReachabilityTopologyBuilder(getDataProviderDependency(), getLocalRibDependency(), getTopologyId());
- final ListenerRegistration<?> r = b.start((DataTreeChangeService) getDataProviderDependency(), Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
- LOG.debug("Registered listener {} on topology {}", b, b.getInstanceIdentifier());
-
- final class TopologyReferenceAutocloseable extends DefaultTopologyReference implements AutoCloseable {
- public TopologyReferenceAutocloseable(final InstanceIdentifier<Topology> instanceIdentifier) {
- super(instanceIdentifier);
- }
-
- @Override
- public void close() throws InterruptedException, ExecutionException, TransactionCommitFailedException {
- try {
- r.close();
- } finally {
- b.close();
- }
- }
- }
-
- return new TopologyReferenceAutocloseable(b.getInstanceIdentifier());
+ return new Ipv4ReachabilityTopologyBuilder(getDataProviderDependency(), getLocalRibDependency(), getTopologyId());
}
}
import org.opendaylight.bgpcep.bgp.topology.provider.Ipv6ReachabilityTopologyBuilder;
import org.opendaylight.bgpcep.topology.DefaultTopologyReference;
import org.opendaylight.controller.config.api.JmxAttributeValidationException;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeService;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.Ipv6AddressFamily;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev130919.UnicastSubsequentAddressFamily;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Override
public AutoCloseable createInstance() {
- final Ipv6ReachabilityTopologyBuilder b = new Ipv6ReachabilityTopologyBuilder(getDataProviderDependency(), getLocalRibDependency(), getTopologyId());
- final ListenerRegistration<?> r = b.start((DataTreeChangeService) getDataProviderDependency(), Ipv6AddressFamily.class, UnicastSubsequentAddressFamily.class);
- LOG.debug("Registered listener {} on topology {}", b, b.getInstanceIdentifier());
-
- final class TopologyReferenceAutocloseable extends DefaultTopologyReference implements AutoCloseable {
- public TopologyReferenceAutocloseable(final InstanceIdentifier<Topology> instanceIdentifier) {
- super(instanceIdentifier);
- }
-
- @Override
- public void close() throws InterruptedException, ExecutionException, TransactionCommitFailedException {
- try {
- r.close();
- } finally {
- b.close();
- }
- }
- }
-
- return new TopologyReferenceAutocloseable(b.getInstanceIdentifier());
+ return new Ipv6ReachabilityTopologyBuilder(getDataProviderDependency(), getLocalRibDependency(), getTopologyId());
}
}
import org.opendaylight.bgpcep.bgp.topology.provider.LinkstateTopologyBuilder;
import org.opendaylight.bgpcep.topology.DefaultTopologyReference;
import org.opendaylight.controller.config.api.JmxAttributeValidationException;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeService;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.linkstate.rev150210.LinkstateAddressFamily;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.linkstate.rev150210.LinkstateSubsequentAddressFamily;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Override
public java.lang.AutoCloseable createInstance() {
- final LinkstateTopologyBuilder b = new LinkstateTopologyBuilder(getDataProviderDependency(), getLocalRibDependency(), getTopologyId());
- final ListenerRegistration<?> r = b.start((DataTreeChangeService) getDataProviderDependency(), LinkstateAddressFamily.class, LinkstateSubsequentAddressFamily.class);
- LOG.debug("Registered listener {} on topology {}", b, b.getInstanceIdentifier());
-
- final class TopologyReferenceAutocloseable extends DefaultTopologyReference implements AutoCloseable {
- public TopologyReferenceAutocloseable(final InstanceIdentifier<Topology> instanceIdentifier) {
- super(instanceIdentifier);
- }
-
- @Override
- public void close() throws InterruptedException, ExecutionException, TransactionCommitFailedException {
- try {
- r.close();
- } finally {
- b.close();
- }
- }
- }
-
- return new TopologyReferenceAutocloseable(b.getInstanceIdentifier());
+ return new LinkstateTopologyBuilder(getDataProviderDependency(), getLocalRibDependency(), getTopologyId());
}
}
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.ReadTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopologyBuilder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
public abstract class AbstractTopologyBuilderTest extends AbstractDataBrokerTest {
protected static final TopologyId TEST_TOPOLOGY_ID = new TopologyId("test-topo");
protected static final RibReference LOC_RIB_REF = new DefaultRibReference(InstanceIdentifier.create(BgpRib.class).child(Rib.class, new RibKey(Preconditions.checkNotNull(new RibId("test-rib")))));
- protected ListenerRegistration<DataTreeChangeListener<?>> reg;
-
@Override
protected void setupWithDataBroker(final DataBroker dataBroker) {
super.setupWithDataBroker(dataBroker);
wTx.submit();
}
- public void tearDown() {
- this.reg.close();
- }
-
protected Optional<Topology> getTopology(final InstanceIdentifier<Topology> topoIID) {
final ReadTransaction rTx = getDataBroker().newReadOnlyTransaction();
try {
protected void setupWithDataBroker(final DataBroker dataBroker) {
super.setupWithDataBroker(dataBroker);
this.ipv4TopoBuilder = new Ipv4ReachabilityTopologyBuilder(dataBroker, LOC_RIB_REF, TEST_TOPOLOGY_ID);
- this.ipv4TopoBuilder.start(dataBroker, Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
final InstanceIdentifier<Tables> path = this.ipv4TopoBuilder.tableInstanceIdentifier(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
this.ipv4RouteIID = path.builder().child((Class) Ipv4Routes.class).child(Ipv4Route.class, new Ipv4RouteKey(new PathId(PATH_ID),
new Ipv4Prefix(ROUTE_IP4PREFIX))).build();
protected void setupWithDataBroker(final DataBroker dataBroker) {
super.setupWithDataBroker(dataBroker);
this.ipv6TopoBuilder = new Ipv6ReachabilityTopologyBuilder(dataBroker, LOC_RIB_REF, TEST_TOPOLOGY_ID);
- this.ipv6TopoBuilder.start(dataBroker, Ipv6AddressFamily.class, UnicastSubsequentAddressFamily.class);
final InstanceIdentifier<Tables> path = this.ipv6TopoBuilder.tableInstanceIdentifier(Ipv6AddressFamily.class, UnicastSubsequentAddressFamily.class);
this.ipv6RouteIID = path.builder().child((Class) Ipv6Routes.class).child(Ipv6Route.class, new Ipv6RouteKey(new PathId(PATH_ID),
new Ipv6Prefix(ROUTE_IP6PREFIX))).build();
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.RETURNS_SMART_NULLS;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import io.netty.buffer.Unpooled;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
+import org.junit.After;
import org.junit.Test;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
private static final String NODE_1_OSPF_ID = "bgpls://Ospf:1/type=node&as=1&router=0000.0102.0304";
private static final String NODE_2_OSPF_ID = "bgpls://Ospf:1/type=node&as=2";
private static final Identifier IDENTIFIER = new Identifier(new BigInteger("1"));
+ private static final long LISTENER_RESTART_TIME = 20000;
+ private static final int LISTENER_ENFORCE_COUNTER = 2;
private LinkstateTopologyBuilder linkstateTopoBuilder;
private InstanceIdentifier<LinkstateRoute> linkstateRouteIID;
-
@Override
protected void setupWithDataBroker(final DataBroker dataBroker) {
super.setupWithDataBroker(dataBroker);
- this.linkstateTopoBuilder = new LinkstateTopologyBuilder(dataBroker, LOC_RIB_REF, TEST_TOPOLOGY_ID);
- this.linkstateTopoBuilder.start(dataBroker, LinkstateAddressFamily.class, LinkstateSubsequentAddressFamily.class);
+ this.linkstateTopoBuilder = new LinkstateTopologyBuilder(dataBroker, LOC_RIB_REF, TEST_TOPOLOGY_ID, LISTENER_RESTART_TIME, LISTENER_ENFORCE_COUNTER);
final InstanceIdentifier<Tables> path = this.linkstateTopoBuilder.tableInstanceIdentifier(LinkstateAddressFamily.class, LinkstateSubsequentAddressFamily.class);
this.linkstateRouteIID = path.builder().child((Class)LinkstateRoutes.class).child(LinkstateRoute.class, new LinkstateRouteKey(LINKSTATE_ROUTE_KEY)).build();
}
+ @After
+ public void tearDown() throws Exception {
+ this.linkstateTopoBuilder.close();
+ assertFalse(getTopology(this.linkstateTopoBuilder.getInstanceIdentifier()).isPresent());
+ }
+
@Test
public void testLinkstateTopologyBuilderTopologyTypes() {
final Optional<Topology> topologyMaybe = getTopology(this.linkstateTopoBuilder.getInstanceIdentifier());
assertNull(igpLink1.getAugmentation(IgpLinkAttributes1.class));
assertEquals((short) 1, igpLink1.getAugmentation(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.ospf.topology.rev131021.IgpLinkAttributes1.class).getOspfLinkAttributes().getMultiTopologyId().shortValue());
assertEquals(2, igpLink1.getAugmentation(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.ospf.topology.rev131021.IgpLinkAttributes1.class).getOspfLinkAttributes().getTed().getSrlg().getSrlgValues().size());
+ }
- this.linkstateTopoBuilder.close();
- assertFalse(getTopology(this.linkstateTopoBuilder.getInstanceIdentifier()).isPresent());
+ /**
+ * This test is to verify if the AbstractTopologyBuilder/LinkstateTopologyBuilder is handling exception correctly
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRouteChangedError() throws Exception {
+ LinkstateTopologyBuilder spiedLinkstateTopologyBuilder = spy(this.linkstateTopoBuilder);
+ doThrow(RuntimeException.class).when(spiedLinkstateTopologyBuilder).routeChanged(any(), any());
+ try {
+ spiedLinkstateTopologyBuilder.routeChanged(null, null);
+ fail("Mockito failed to spy routeChanged() method");
+ } catch (Exception e) {
+ assertTrue(e instanceof RuntimeException);
+ }
+ assertEquals(0L, spiedLinkstateTopologyBuilder.listenerScheduledRestartTime);
+ assertEquals(0L, spiedLinkstateTopologyBuilder.listenerScheduledRestartEnforceCounter);
+ // first we examine if the chain is being reset when no exception is thrown
+ spiedLinkstateTopologyBuilder.onDataTreeChanged(new ArrayList<>());
+ verify(spiedLinkstateTopologyBuilder, times(1)).restartTransactionChainOnDemand();
+ verify(spiedLinkstateTopologyBuilder, never()).scheduleListenerRestart();
+ verify(spiedLinkstateTopologyBuilder, never()).resetTransactionChain();
+ assertEquals(0L, spiedLinkstateTopologyBuilder.listenerScheduledRestartTime);
+ assertEquals(0L, spiedLinkstateTopologyBuilder.listenerScheduledRestartEnforceCounter);
+ // now pass some invalid data to cause onDataTreeChanged fail
+ DataTreeModification<LinkstateRoute> modification = (DataTreeModification<LinkstateRoute>) mock(DataTreeModification.class, RETURNS_SMART_NULLS);
+ final List<DataTreeModification<LinkstateRoute>> changes = new ArrayList<>();
+ changes.add(modification);
+ spiedLinkstateTopologyBuilder.onDataTreeChanged(changes);
+ // one restart transaction chain check in onDataTreeChanged()
+ // we are introducing some timeout here as transaction may be executed in a delay manner
+ verify(spiedLinkstateTopologyBuilder, timeout(5000).times(1)).scheduleListenerRestart();
+ verify(spiedLinkstateTopologyBuilder, times(2)).restartTransactionChainOnDemand();
+ assertNotEquals(0L, spiedLinkstateTopologyBuilder.listenerScheduledRestartTime);
+ assertEquals(0, spiedLinkstateTopologyBuilder.listenerScheduledRestartEnforceCounter);
+ final long listenerScheduledRestartTime = spiedLinkstateTopologyBuilder.listenerScheduledRestartTime;
+ // call again with empty change to invoke restartTransactionChainOnDemand()
+ spiedLinkstateTopologyBuilder.onDataTreeChanged(new ArrayList<>());
+ verify(spiedLinkstateTopologyBuilder, times(3)).restartTransactionChainOnDemand();
+ // transaction chain should be reset while listener should not
+ verify(spiedLinkstateTopologyBuilder, times(1)).resetTransactionChain();
+ verify(spiedLinkstateTopologyBuilder, never()).resetListener();
+ // now apply a change with bad modification again
+ spiedLinkstateTopologyBuilder.onDataTreeChanged(changes);
+ verify(spiedLinkstateTopologyBuilder, times(4)).restartTransactionChainOnDemand();
+ // listener scheduled again
+ verify(spiedLinkstateTopologyBuilder, timeout(5000).times(2)).scheduleListenerRestart();
+ // listener timer shouldn't have changed
+ assertEquals(listenerScheduledRestartTime, spiedLinkstateTopologyBuilder.listenerScheduledRestartTime);
+ assertEquals(0, spiedLinkstateTopologyBuilder.listenerScheduledRestartEnforceCounter);
+ verify(spiedLinkstateTopologyBuilder, times(2)).resetTransactionChain();
+ verify(spiedLinkstateTopologyBuilder, never()).resetListener();
+ Thread.sleep(LISTENER_RESTART_TIME);
+ // manually invoke onTransactionChainFailed() to have the listener restart scheduled again
+ spiedLinkstateTopologyBuilder.onTransactionChainFailed(null, null, null);
+ assertTrue(spiedLinkstateTopologyBuilder.listenerScheduledRestartTime == listenerScheduledRestartTime + LISTENER_RESTART_TIME);
+ verify(spiedLinkstateTopologyBuilder, times(5)).restartTransactionChainOnDemand();
+ verify(spiedLinkstateTopologyBuilder, times(3)).scheduleListenerRestart();
+ // enforce counter get increased
+ assertEquals(1, spiedLinkstateTopologyBuilder.listenerScheduledRestartEnforceCounter);
+ verify(spiedLinkstateTopologyBuilder, times(3)).resetTransactionChain();
+ verify(spiedLinkstateTopologyBuilder, never()).resetListener();
+ // sleep to let the listener restart timer times out
+ Thread.sleep(LISTENER_RESTART_TIME);
+ // apply a good modification (empty change)
+ spiedLinkstateTopologyBuilder.onDataTreeChanged(new ArrayList<>());
+ assertEquals(0, spiedLinkstateTopologyBuilder.listenerScheduledRestartTime);
+ assertEquals(0, spiedLinkstateTopologyBuilder.listenerScheduledRestartEnforceCounter);
+ verify(spiedLinkstateTopologyBuilder, times(6)).restartTransactionChainOnDemand();
+ // listener restarted didn't get rescheduled again
+ verify(spiedLinkstateTopologyBuilder, times(3)).scheduleListenerRestart();
+ verify(spiedLinkstateTopologyBuilder, times(4)).resetTransactionChain();
+ verify(spiedLinkstateTopologyBuilder, times(1)).resetListener();
}
private void updateLinkstateRoute(final LinkstateRoute data) {
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+
import java.util.List;
import javax.management.ObjectName;
import org.junit.Test;