import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
-import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.concurrent.GuardedBy;
+import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.bgpcep.topology.TopologyReference;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.DataObjectModification;
+import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
+import org.opendaylight.mdsal.binding.api.DataTreeModification;
+import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
+import org.opendaylight.mdsal.binding.api.Transaction;
+import org.opendaylight.mdsal.binding.api.TransactionChain;
+import org.opendaylight.mdsal.binding.api.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.protocol.bgp.rib.RibReference;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.Route;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.bgp.rib.rib.LocRib;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.Tables;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev180329.rib.TablesKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev180329.AddressFamily;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev180329.SubsequentAddressFamily;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.AddressFamily;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.types.rev200120.SubsequentAddressFamily;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
private final InstanceIdentifier<Topology> topology;
private final RibReference locRibReference;
private final DataBroker dataProvider;
- private final Class<? extends AddressFamily> afi;
- private final Class<? extends SubsequentAddressFamily> safi;
+ private final AddressFamily afi;
+ private final SubsequentAddressFamily safi;
private final TopologyKey topologyKey;
private final TopologyTypes topologyTypes;
private final long listenerResetLimitInMillsec;
@GuardedBy("this")
private ListenerRegistration<AbstractTopologyBuilder<T>> listenerRegistration = null;
@GuardedBy("this")
- private BindingTransactionChain chain = null;
- private AtomicBoolean closed = new AtomicBoolean(false);
+ private TransactionChain chain = null;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
@GuardedBy("this")
@VisibleForTesting
protected long listenerScheduledRestartTime = 0;
@GuardedBy("this")
@VisibleForTesting
protected int listenerScheduledRestartEnforceCounter = 0;
+ protected boolean networkTopologyTransaction = true;
protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
- final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
- final Class<? extends SubsequentAddressFamily> safi, final long listenerResetLimitInMillsec,
- final int listenerResetEnforceCounter) {
+ final TopologyId topologyId, final TopologyTypes types, final AddressFamily afi,
+ final SubsequentAddressFamily safi, final long listenerResetLimitInMillsec,
+ final int listenerResetEnforceCounter) {
this.dataProvider = dataProvider;
this.locRibReference = requireNonNull(locRibReference);
this.topologyKey = new TopologyKey(requireNonNull(topologyId));
}
protected AbstractTopologyBuilder(final DataBroker dataProvider, final RibReference locRibReference,
- final TopologyId topologyId, final TopologyTypes types, final Class<? extends AddressFamily> afi,
- final Class<? extends SubsequentAddressFamily> safi) {
+ final TopologyId topologyId, final TopologyTypes types, final AddressFamily afi,
+ final SubsequentAddressFamily safi) {
this(dataProvider, locRibReference, topologyId, types, afi, safi, LISTENER_RESET_LIMIT_IN_MILLSEC,
LISTENER_RESET_ENFORCE_COUNTER);
}
this.getInstanceIdentifier());
final InstanceIdentifier<Tables> tablesId = this.locRibReference.getInstanceIdentifier()
.child(LocRib.class).child(Tables.class, new TablesKey(this.afi, this.safi));
- final DataTreeIdentifier<T> id = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL,
+ final DataTreeIdentifier<T> id = DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL,
getRouteWildcard(tablesId));
this.listenerRegistration = this.dataProvider.registerDataTreeChangeListener(id, this);
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
public synchronized void onDataTreeChanged(final Collection<DataTreeModification<T>> changes) {
- if (this.closed.get()) {
- LOG.trace("Transaction chain was already closed, skipping update.");
- return;
- }
- // check if the transaction chain needed to be restarted due to a previous error
- if (restartTransactionChainOnDemand()) {
- LOG.debug("The data change {} is disregarded due to restart of listener {}", changes, this);
- return;
- }
- final ReadWriteTransaction trans = this.chain.newReadWriteTransaction();
- LOG.trace("Received data change {} event with transaction {}", changes, trans.getIdentifier());
- final AtomicBoolean transactionInError = new AtomicBoolean(false);
- for (final DataTreeModification<T> change : changes) {
- try {
- routeChanged(change, trans);
- } catch (final RuntimeException exc) {
- LOG.warn("Data change {} (transaction {}) was not completely propagated to listener {}", change,
- trans.getIdentifier(), this, exc);
- // trans.cancel() is not supported by PingPongTransactionChain, so we just skip the problematic change
- // trans.commit() must be called first to unlock the current transaction chain, to make the chain
- // closable so we cannot exit the #onDataTreeChanged() yet
- transactionInError.set(true);
- break;
+ if (networkTopologyTransaction) {
+ if (this.closed.get()) {
+ LOG.trace("Transaction chain was already closed, skipping update.");
+ return;
}
- }
- trans.commit().addCallback(new FutureCallback<CommitInfo>() {
- @Override
- public void onSuccess(final CommitInfo result) {
- // as we are enforcing trans.commit(), in some cases the transaction execution actually could be
- // successfully even when an exception is captured, thus #onTransactionChainFailed() never get invoked.
- // Though the transaction chain remains usable,
- // the data loss will not be able to be recovered. Thus we schedule a listener restart here
- if (transactionInError.get()) {
- LOG.warn("Transaction {} committed successfully while exception captured. Rescheduling a restart"
- + " of listener {}", trans
- .getIdentifier(), AbstractTopologyBuilder.this);
- scheduleListenerRestart();
- } else {
- LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
+ // check if the transaction chain needed to be restarted due to a previous error
+ if (restartTransactionChainOnDemand()) {
+ LOG.debug("The data change {} is disregarded due to restart of listener {}", changes, this);
+ return;
+ }
+ final ReadWriteTransaction trans = this.chain.newReadWriteTransaction();
+ LOG.trace("Received data change {} event with transaction {}", changes, trans.getIdentifier());
+ final AtomicBoolean transactionInError = new AtomicBoolean(false);
+ for (final DataTreeModification<T> change : changes) {
+ try {
+ routeChanged(change, trans);
+ } catch (final RuntimeException exc) {
+ LOG.warn("Data change {} (transaction {}) was not completely propagated to listener {}", change,
+ trans.getIdentifier(), this, exc);
+ // trans.cancel() is not supported by PingPongTransactionChain, so we just skip the problematic
+ // change.
+ // trans.commit() must be called first to unlock the current transaction chain, to make the chain
+ // closable so we cannot exit the #onDataTreeChanged() yet
+ transactionInError.set(true);
+ break;
}
}
+ trans.commit().addCallback(new FutureCallback<CommitInfo>() {
+ @Override
+ public void onSuccess(final CommitInfo result) {
+ // as we are enforcing trans.commit(), in some cases the transaction execution actually could be
+ // successfully even when an exception is captured, thus #onTransactionChainFailed() never get
+ // invoked. Though the transaction chain remains usable,
+ // the data loss will not be able to be recovered. Thus we schedule a listener restart here
+ if (transactionInError.get()) {
+ LOG.warn("Transaction {} committed successfully while exception captured. Rescheduling a"
+ + " restart of listener {}", trans
+ .getIdentifier(), AbstractTopologyBuilder.this);
+ scheduleListenerRestart();
+ } else {
+ LOG.trace("Transaction {} committed successfully", trans.getIdentifier());
+ }
+ }
- @Override
- public void onFailure(final Throwable throwable) {
- // we do nothing but print out the log. Transaction chain restart will be done in
- // #onTransactionChainFailed()
- LOG.error("Failed to propagate change (transaction {}) by listener {}", trans.getIdentifier(),
- AbstractTopologyBuilder.this, throwable);
+ @Override
+ public void onFailure(final Throwable throwable) {
+ // we do nothing but print out the log. Transaction chain restart will be done in
+ // #onTransactionChainFailed()
+ LOG.error("Failed to propagate change (transaction {}) by listener {}", trans.getIdentifier(),
+ AbstractTopologyBuilder.this, throwable);
+ }
+ }, MoreExecutors.directExecutor());
+ } else {
+ for (final DataTreeModification<T> change : changes) {
+ routeChanged(change, null);
}
- }, MoreExecutors.directExecutor());
+ }
}
@VisibleForTesting
private synchronized void initOperationalTopology() {
requireNonNull(this.chain, "A valid transaction chain must be provided.");
final WriteTransaction trans = this.chain.newWriteOnlyTransaction();
- trans.put(LogicalDatastoreType.OPERATIONAL, this.topology,
- new TopologyBuilder().setKey(this.topologyKey).setServerProvided(Boolean.TRUE)
+ trans.mergeParentStructurePut(LogicalDatastoreType.OPERATIONAL, this.topology,
+ new TopologyBuilder().withKey(this.topologyKey).setServerProvided(Boolean.TRUE)
.setTopologyTypes(this.topologyTypes)
- .setLink(Collections.emptyList()).setNode(Collections.emptyList()).build(), true);
+ .setLink(Map.of()).setNode(Map.of()).build());
trans.commit().addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
LOG.debug("Initializing transaction chain for topology {}", this);
Preconditions.checkState(this.chain == null,
"Transaction chain has to be closed before being initialized");
- this.chain = this.dataProvider.createTransactionChain(this);
+ this.chain = this.dataProvider.createMergingTransactionChain(this);
}
/**
}
@Override
- public final synchronized void onTransactionChainFailed(final TransactionChain<?, ?> transactionChain,
- final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+ public final synchronized void onTransactionChainFailed(final TransactionChain transactionChain,
+ final Transaction transaction, final Throwable cause) {
LOG.error("Topology builder for {} failed in transaction {}.", getInstanceIdentifier(),
transaction != null ? transaction.getIdentifier() : null, cause);
scheduleListenerRestart();
}
@Override
- public final void onTransactionChainSuccessful(final TransactionChain<?, ?> transactionChain) {
+ public final void onTransactionChainSuccessful(final TransactionChain transactionChain) {
LOG.info("Topology builder for {} shut down", getInstanceIdentifier());
}
}