import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.MoreExecutors;
-import java.util.Arrays;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.eclipse.jdt.annotation.Nullable;
-import org.opendaylight.mdsal.binding.api.TransactionChain;
-import org.opendaylight.mdsal.binding.api.TransactionChainListener;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
import org.opendaylight.protocol.bgp.mode.impl.BGPRouteEntryExportParametersImpl;
import org.opendaylight.protocol.bgp.rib.impl.spi.PeerTransactionChain;
import org.opendaylight.protocol.bgp.rib.impl.spi.RIB;
import org.opendaylight.yangtools.yang.binding.ChildOf;
import org.opendaylight.yangtools.yang.binding.ChoiceIn;
import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-abstract class AbstractPeer extends BGPPeerStateImpl implements BGPRouteEntryImportParameters, TransactionChainListener,
- DOMTransactionChainListener, Peer, PeerTransactionChain {
+abstract class AbstractPeer extends BGPPeerStateImpl implements BGPRouteEntryImportParameters, Peer,
+ PeerTransactionChain, FutureCallback<Empty> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractPeer.class);
- protected final RIB rib;
- final String name;
- final PeerRole peerRole;
+
+ final RTCClientRouteCache rtCache = new RTCClientRouteCache();
+ final RIB rib;
+
private final ClusterIdentifier clusterId;
+ private final PeerRole peerRole;
private final AsNumber localAs;
+ private final String name;
+
+ // FIXME: Revisit locking here to improve concurrency:
+ // -- identifiers, peerId are a shared resource
+ // -- domChain seems to really be 'ribInChain', accessed from netty thread
+ // -- ribOutChain is accessed from LocRibWriter
+ // hence we want to use the two chains concurrently. The problem is their lifecycle in response to errors,
+ // which needs figuring out.
@GuardedBy("this")
private DOMTransactionChain domChain;
+ // FIXME: This is an invariant once the peer is 'resolved' -- which happens instantaneously for ApplicationPeer.
+ // There are also a number YangInstanceIdentifiers which are tied to it. We want to keep all of them in one
+ // structure for isolation. This could be a separate DTO (JDK16 record) or isolated into an abstract behavior
+ // class.
+ @GuardedBy("this")
+ PeerId peerId;
+
+ // These seem to be separate
@GuardedBy("this")
@VisibleForTesting
- TransactionChain bindingChain;
- byte[] rawIdentifier;
+ DOMTransactionChain ribOutChain;
@GuardedBy("this")
- PeerId peerId;
private FluentFuture<? extends CommitInfo> submitted;
- RTCClientRouteCache rtCache = new RTCClientRouteCache();
+ @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR",
+ justification = "False positive on synchronized createDomChain()")
AbstractPeer(
final RIB rib,
final String peerName,
final Map<TablesKey, Integer> afiSafisLlGracefulAdvertized) {
super(rib.getInstanceIdentifier(), groupId, neighborAddress, afiSafisAdvertized, afiSafisGracefulAdvertized,
afiSafisLlGracefulAdvertized);
- this.name = peerName;
- this.peerRole = role;
+ name = peerName;
+ peerRole = role;
this.clusterId = clusterId;
this.localAs = localAs;
this.rib = rib;
return CommitInfo.emptyFluentFuture();
}
LOG.info("Closed per Peer {} removed", peerPath);
- final DOMDataTreeWriteTransaction tx = this.domChain.newWriteOnlyTransaction();
+ final DOMDataTreeWriteTransaction tx = domChain.newWriteOnlyTransaction();
tx.delete(LogicalDatastoreType.OPERATIONAL, peerPath);
final FluentFuture<? extends CommitInfo> future = tx.commit();
future.addCallback(new FutureCallback<CommitInfo>() {
return future;
}
- synchronized YangInstanceIdentifier createPeerPath() {
- return this.rib.getYangRibId().node(PEER_NID).node(IdentifierUtils.domPeerId(this.peerId));
+ final YangInstanceIdentifier createPeerPath(final PeerId newPeerId) {
+ return rib.getYangRibId().node(PEER_NID).node(IdentifierUtils.domPeerId(newPeerId));
}
@Override
public final synchronized PeerId getPeerId() {
- return this.peerId;
+ return peerId;
}
@Override
public final PeerRole getRole() {
- return this.peerRole;
- }
-
- @Override
- public final synchronized byte[] getRawIdentifier() {
- return Arrays.copyOf(this.rawIdentifier, this.rawIdentifier.length);
+ return peerRole;
}
@Override
}
@Override
- public final void onTransactionChainSuccessful(final DOMTransactionChain chain) {
- LOG.debug("Transaction chain {} successful.", chain);
- }
-
- @Override
- public final void onTransactionChainSuccessful(final TransactionChain chain) {
- LOG.debug("Transaction chain {} successful.", chain);
+ public final void onSuccess(final Empty value) {
+ LOG.debug("Transaction chain successful");
}
@Override
@Override
public final String getName() {
- return this.name;
+ return name;
}
@Override
public final ClusterIdentifier getClusterId() {
- return this.clusterId;
+ return clusterId;
}
@Override
public final AsNumber getLocalAs() {
- return this.localAs;
+ return localAs;
}
@Override
public synchronized DOMTransactionChain getDomChain() {
- return this.domChain;
+ return domChain;
}
/**
public final synchronized <C extends Routes & DataObject & ChoiceIn<Tables>, S extends ChildOf<? super C>>
void initializeRibOut(final RouteEntryDependenciesContainer entryDep,
final List<ActualBestPathRoutes<C, S>> routesToStore) {
- if (this.bindingChain == null) {
+ if (ribOutChain == null) {
LOG.debug("Session closed, skip changes to peer AdjRibsOut {}", getPeerId());
return;
}
final YangInstanceIdentifier tableRibout = getRibOutIId(ribSupport.tablesKey());
final boolean addPathSupported = supportsAddPathSupported(ribSupport.getTablesKey());
- final DOMDataTreeWriteTransaction tx = this.domChain.newWriteOnlyTransaction();
+ final DOMDataTreeWriteTransaction tx = ribOutChain.newWriteOnlyTransaction();
for (final ActualBestPathRoutes<C, S> initRoute : routesToStore) {
if (!supportsLLGR() && initRoute.isDepreferenced()) {
// Stale Long-lived Graceful Restart routes should not be propagated
}
final FluentFuture<? extends CommitInfo> future = tx.commit();
- this.submitted = future;
+ submitted = future;
future.addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
public final synchronized <C extends Routes & DataObject & ChoiceIn<Tables>, S extends ChildOf<? super C>>
void refreshRibOut(final RouteEntryDependenciesContainer entryDep,
final List<StaleBestPathRoute> staleRoutes, final List<AdvertizedRoute<C, S>> newRoutes) {
- if (this.bindingChain == null) {
+ if (ribOutChain == null) {
LOG.debug("Session closed, skip changes to peer AdjRibsOut {}", getPeerId());
return;
}
- final DOMDataTreeWriteTransaction tx = this.domChain.newWriteOnlyTransaction();
+ final DOMDataTreeWriteTransaction tx = ribOutChain.newWriteOnlyTransaction();
final RIBSupport<C, S> ribSupport = entryDep.getRIBSupport();
deleteRouteRibOut(ribSupport, staleRoutes, tx);
installRouteRibOut(entryDep, newRoutes, tx);
final FluentFuture<? extends CommitInfo> future = tx.commit();
- this.submitted = future;
+ submitted = future;
future.addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
public final synchronized <C extends Routes & DataObject & ChoiceIn<Tables>, S extends ChildOf<? super C>>
void reEvaluateAdvertizement(final RouteEntryDependenciesContainer entryDep,
final List<ActualBestPathRoutes<C, S>> routesToStore) {
- if (this.bindingChain == null) {
+ if (ribOutChain == null) {
LOG.debug("Session closed, skip changes to peer AdjRibsOut {}", getPeerId());
return;
}
final NodeIdentifierWithPredicates tk = ribSupport.tablesKey();
final boolean addPathSupported = supportsAddPathSupported(ribSupport.getTablesKey());
- final DOMDataTreeWriteTransaction tx = this.domChain.newWriteOnlyTransaction();
+ final DOMDataTreeWriteTransaction tx = ribOutChain.newWriteOnlyTransaction();
for (final ActualBestPathRoutes<C, S> actualBestRoute : routesToStore) {
final PeerId fromPeerId = actualBestRoute.getFromPeerId();
if (!filterRoutes(fromPeerId, ribSupport.getTablesKey())) {
final Optional<ContainerNode> effAttr = applyExportPolicy(entryDep, fromPeerId, route, routePath,
actualBestRoute.getAttributes());
if (effAttr.isPresent()) {
- storeRoute(ribSupport, actualBestRoute, route, routePath, effAttr.get(), tx);
+ storeRoute(ribSupport, actualBestRoute, route, routePath, effAttr.orElseThrow(), tx);
continue;
}
}
}
final FluentFuture<? extends CommitInfo> future = tx.commit();
- this.submitted = future;
+ submitted = future;
future.addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
final Peer fromPeer = entryDep.getPeerTracker().getPeer(fromPeerId);
final RIBSupport<?, ?> ribSupport = entryDep.getRIBSupport();
final BGPRouteEntryExportParameters routeEntry = new BGPRouteEntryExportParametersImpl(fromPeer, this,
- ribSupport.extractRouteKey(route.getIdentifier()), this.rtCache);
+ ribSupport.extractRouteKey(route.name()), rtCache);
final Attributes bindingAttrs = ribSupport.attributeFromContainerNode(attrs);
final Optional<Attributes> optExportAttrs = entryDep.getRoutingPolicies().applyExportPolicies(routeEntry,
tx.delete(LogicalDatastoreType.OPERATIONAL, ribOutTarget);
}
- final synchronized void releaseBindingChain(final boolean isWaitForSubmitted) {
+ // FIXME: make this asynchronous?
+ final synchronized void releaseRibOutChain(final boolean isWaitForSubmitted) {
if (isWaitForSubmitted) {
- if (this.submitted != null) {
+ if (submitted != null) {
try {
- this.submitted.get();
+ submitted.get();
} catch (final InterruptedException | ExecutionException throwable) {
LOG.error("Write routes failed", throwable);
}
}
}
- closeBindingChain();
- }
- private synchronized void closeBindingChain() {
- if (this.bindingChain != null) {
+ if (ribOutChain != null) {
LOG.info("Closing peer chain {}", getPeerId());
- this.bindingChain.close();
- this.bindingChain = null;
+ ribOutChain.close();
+ ribOutChain = null;
}
}
final synchronized void createDomChain() {
- if (this.domChain == null) {
+ if (domChain == null) {
LOG.info("Creating DOM peer chain {}", getPeerId());
- this.domChain = this.rib.createPeerDOMChain(this);
+ domChain = rib.createPeerDOMChain();
+ domChain.addCallback(this);
}
}
final synchronized void closeDomChain() {
- if (this.domChain != null) {
+ if (domChain != null) {
LOG.info("Closing DOM peer chain {}", getPeerId());
- this.domChain.close();
- this.domChain = null;
+ domChain.close();
+ domChain = null;
}
}