import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.MoreExecutors;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
import org.opendaylight.protocol.bgp.mode.impl.BGPRouteEntryExportParametersImpl;
import org.opendaylight.protocol.bgp.rib.impl.spi.PeerTransactionChain;
import org.opendaylight.protocol.bgp.rib.impl.spi.RIB;
import org.opendaylight.yangtools.yang.binding.ChildOf;
import org.opendaylight.yangtools.yang.binding.ChoiceIn;
import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.slf4j.LoggerFactory;
abstract class AbstractPeer extends BGPPeerStateImpl implements BGPRouteEntryImportParameters, Peer,
- PeerTransactionChain, DOMTransactionChainListener {
+ PeerTransactionChain, FutureCallback<Empty> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractPeer.class);
final RTCClientRouteCache rtCache = new RTCClientRouteCache();
private final AsNumber localAs;
private final String name;
- // FIXME: revisit locking here to improve concurrency:
+ // FIXME: Revisit locking here to improve concurrency:
// -- identifiers, peerId are a shared resource
// -- domChain seems to really be 'ribInChain', accessed from netty thread
// -- ribOutChain is accessed from LocRibWriter
// which needs figuring out.
@GuardedBy("this")
private DOMTransactionChain domChain;
+ // FIXME: This is an invariant once the peer is 'resolved' -- which happens instantaneously for ApplicationPeer.
+ // There are also a number YangInstanceIdentifiers which are tied to it. We want to keep all of them in one
+ // structure for isolation. This could be a separate DTO (JDK16 record) or isolated into an abstract behavior
+ // class.
@GuardedBy("this")
PeerId peerId;
@GuardedBy("this")
private FluentFuture<? extends CommitInfo> submitted;
+ @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR",
+ justification = "False positive on synchronized createDomChain()")
AbstractPeer(
final RIB rib,
final String peerName,
final Map<TablesKey, Integer> afiSafisLlGracefulAdvertized) {
super(rib.getInstanceIdentifier(), groupId, neighborAddress, afiSafisAdvertized, afiSafisGracefulAdvertized,
afiSafisLlGracefulAdvertized);
- this.name = peerName;
- this.peerRole = role;
+ name = peerName;
+ peerRole = role;
this.clusterId = clusterId;
this.localAs = localAs;
this.rib = rib;
return CommitInfo.emptyFluentFuture();
}
LOG.info("Closed per Peer {} removed", peerPath);
- final DOMDataTreeWriteTransaction tx = this.domChain.newWriteOnlyTransaction();
+ final DOMDataTreeWriteTransaction tx = domChain.newWriteOnlyTransaction();
tx.delete(LogicalDatastoreType.OPERATIONAL, peerPath);
final FluentFuture<? extends CommitInfo> future = tx.commit();
future.addCallback(new FutureCallback<CommitInfo>() {
return future;
}
- synchronized YangInstanceIdentifier createPeerPath() {
- return this.rib.getYangRibId().node(PEER_NID).node(IdentifierUtils.domPeerId(this.peerId));
+ final YangInstanceIdentifier createPeerPath(final PeerId newPeerId) {
+ return rib.getYangRibId().node(PEER_NID).node(IdentifierUtils.domPeerId(newPeerId));
}
@Override
public final synchronized PeerId getPeerId() {
- return this.peerId;
+ return peerId;
}
@Override
public final PeerRole getRole() {
- return this.peerRole;
+ return peerRole;
}
@Override
}
@Override
- public final void onTransactionChainSuccessful(final DOMTransactionChain chain) {
- LOG.debug("Transaction chain {} successful.", chain);
+ public final void onSuccess(final Empty value) {
+ LOG.debug("Transaction chain successful");
}
@Override
@Override
public final String getName() {
- return this.name;
+ return name;
}
@Override
public final ClusterIdentifier getClusterId() {
- return this.clusterId;
+ return clusterId;
}
@Override
public final AsNumber getLocalAs() {
- return this.localAs;
+ return localAs;
}
@Override
public synchronized DOMTransactionChain getDomChain() {
- return this.domChain;
+ return domChain;
}
/**
public final synchronized <C extends Routes & DataObject & ChoiceIn<Tables>, S extends ChildOf<? super C>>
void initializeRibOut(final RouteEntryDependenciesContainer entryDep,
final List<ActualBestPathRoutes<C, S>> routesToStore) {
- if (this.ribOutChain == null) {
+ if (ribOutChain == null) {
LOG.debug("Session closed, skip changes to peer AdjRibsOut {}", getPeerId());
return;
}
final YangInstanceIdentifier tableRibout = getRibOutIId(ribSupport.tablesKey());
final boolean addPathSupported = supportsAddPathSupported(ribSupport.getTablesKey());
- final DOMDataTreeWriteTransaction tx = this.ribOutChain.newWriteOnlyTransaction();
+ final DOMDataTreeWriteTransaction tx = ribOutChain.newWriteOnlyTransaction();
for (final ActualBestPathRoutes<C, S> initRoute : routesToStore) {
if (!supportsLLGR() && initRoute.isDepreferenced()) {
// Stale Long-lived Graceful Restart routes should not be propagated
}
final FluentFuture<? extends CommitInfo> future = tx.commit();
- this.submitted = future;
+ submitted = future;
future.addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
public final synchronized <C extends Routes & DataObject & ChoiceIn<Tables>, S extends ChildOf<? super C>>
void refreshRibOut(final RouteEntryDependenciesContainer entryDep,
final List<StaleBestPathRoute> staleRoutes, final List<AdvertizedRoute<C, S>> newRoutes) {
- if (this.ribOutChain == null) {
+ if (ribOutChain == null) {
LOG.debug("Session closed, skip changes to peer AdjRibsOut {}", getPeerId());
return;
}
- final DOMDataTreeWriteTransaction tx = this.ribOutChain.newWriteOnlyTransaction();
+ final DOMDataTreeWriteTransaction tx = ribOutChain.newWriteOnlyTransaction();
final RIBSupport<C, S> ribSupport = entryDep.getRIBSupport();
deleteRouteRibOut(ribSupport, staleRoutes, tx);
installRouteRibOut(entryDep, newRoutes, tx);
final FluentFuture<? extends CommitInfo> future = tx.commit();
- this.submitted = future;
+ submitted = future;
future.addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
public final synchronized <C extends Routes & DataObject & ChoiceIn<Tables>, S extends ChildOf<? super C>>
void reEvaluateAdvertizement(final RouteEntryDependenciesContainer entryDep,
final List<ActualBestPathRoutes<C, S>> routesToStore) {
- if (this.ribOutChain == null) {
+ if (ribOutChain == null) {
LOG.debug("Session closed, skip changes to peer AdjRibsOut {}", getPeerId());
return;
}
final NodeIdentifierWithPredicates tk = ribSupport.tablesKey();
final boolean addPathSupported = supportsAddPathSupported(ribSupport.getTablesKey());
- final DOMDataTreeWriteTransaction tx = this.ribOutChain.newWriteOnlyTransaction();
+ final DOMDataTreeWriteTransaction tx = ribOutChain.newWriteOnlyTransaction();
for (final ActualBestPathRoutes<C, S> actualBestRoute : routesToStore) {
final PeerId fromPeerId = actualBestRoute.getFromPeerId();
if (!filterRoutes(fromPeerId, ribSupport.getTablesKey())) {
final Optional<ContainerNode> effAttr = applyExportPolicy(entryDep, fromPeerId, route, routePath,
actualBestRoute.getAttributes());
if (effAttr.isPresent()) {
- storeRoute(ribSupport, actualBestRoute, route, routePath, effAttr.get(), tx);
+ storeRoute(ribSupport, actualBestRoute, route, routePath, effAttr.orElseThrow(), tx);
continue;
}
}
}
final FluentFuture<? extends CommitInfo> future = tx.commit();
- this.submitted = future;
+ submitted = future;
future.addCallback(new FutureCallback<CommitInfo>() {
@Override
public void onSuccess(final CommitInfo result) {
final Peer fromPeer = entryDep.getPeerTracker().getPeer(fromPeerId);
final RIBSupport<?, ?> ribSupport = entryDep.getRIBSupport();
final BGPRouteEntryExportParameters routeEntry = new BGPRouteEntryExportParametersImpl(fromPeer, this,
- ribSupport.extractRouteKey(route.getIdentifier()), this.rtCache);
+ ribSupport.extractRouteKey(route.name()), rtCache);
final Attributes bindingAttrs = ribSupport.attributeFromContainerNode(attrs);
final Optional<Attributes> optExportAttrs = entryDep.getRoutingPolicies().applyExportPolicies(routeEntry,
tx.delete(LogicalDatastoreType.OPERATIONAL, ribOutTarget);
}
+ // FIXME: make this asynchronous?
final synchronized void releaseRibOutChain(final boolean isWaitForSubmitted) {
if (isWaitForSubmitted) {
- if (this.submitted != null) {
+ if (submitted != null) {
try {
- this.submitted.get();
+ submitted.get();
} catch (final InterruptedException | ExecutionException throwable) {
LOG.error("Write routes failed", throwable);
}
}
}
- if (this.ribOutChain != null) {
+ if (ribOutChain != null) {
LOG.info("Closing peer chain {}", getPeerId());
- this.ribOutChain.close();
- this.ribOutChain = null;
+ ribOutChain.close();
+ ribOutChain = null;
}
}
final synchronized void createDomChain() {
- if (this.domChain == null) {
+ if (domChain == null) {
LOG.info("Creating DOM peer chain {}", getPeerId());
- this.domChain = this.rib.createPeerDOMChain(this);
+ domChain = rib.createPeerDOMChain();
+ domChain.addCallback(this);
}
}
final synchronized void closeDomChain() {
- if (this.domChain != null) {
+ if (domChain != null) {
LOG.info("Closing DOM peer chain {}", getPeerId());
- this.domChain.close();
- this.domChain = null;
+ domChain.close();
+ domChain = null;
}
}