From 89ab65719fdcca5827997a4f4f0586674de43e5f Mon Sep 17 00:00:00 2001 From: Dana Kutenicsova Date: Fri, 27 Mar 2015 11:13:08 +0100 Subject: [PATCH] BUG-2383 : wire AdjRibOutListener to BGPPeer With RIBSupport in place, we can start emitting routes to our peers. Change-Id: Idcec38e59a2c4b60c291400b6baff6b67f38e0f3 Signed-off-by: Dana Kutenicsova Signed-off-by: Robert Varga --- .../bgp/rib/impl/AdjRibOutListener.java | 86 +++++++++++++++++-- .../bgp/rib/impl/ApplicationPeer.java | 3 +- .../protocol/bgp/rib/impl/BGPPeer.java | 7 +- .../protocol/bgp/rib/impl/BGPSessionImpl.java | 9 +- .../protocol/bgp/rib/impl/RIBImpl.java | 11 ++- .../bgp/rib/impl/RIBSupportContextImpl.java | 8 +- .../bgp/rib/impl/ApplicationPeerTest.java | 7 ++ .../bgp/rib/impl/BGPSessionImplTest.java | 1 + .../protocol/bgp/rib/impl/FSMTest.java | 1 + 9 files changed, 113 insertions(+), 20 deletions(-) diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/AdjRibOutListener.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/AdjRibOutListener.java index 374633ffef..d7fdcf7a75 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/AdjRibOutListener.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/AdjRibOutListener.java @@ -9,11 +9,30 @@ package org.opendaylight.protocol.bgp.rib.impl; import com.google.common.base.Preconditions; import java.util.Collection; +import java.util.Collections; +import javax.annotation.Nonnull; import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.protocol.bgp.rib.impl.spi.RIBSupportContextRegistry; import org.opendaylight.protocol.bgp.rib.spi.RIBSupport; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.UpdateBuilder; +import org.opendaylight.protocol.bgp.rib.spi.RibSupportUtils; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Update; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.path.attributes.Attributes; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.rib.Peer; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.bgp.rib.rib.peer.AdjRibOut; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.Tables; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.TablesKey; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNodes; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Instantiated for each peer and table, listens on a particular peer's adj-rib-out, @@ -21,25 +40,74 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; */ @NotThreadSafe final class AdjRibOutListener implements DOMDataTreeChangeListener { + + private static final Logger LOG = LoggerFactory.getLogger(AdjRibOutListener.class); + private final ChannelOutputLimiter session; - private final RIBSupport ribSupport; + private final RIBSupportContextRegistry registry; + private final RIBSupportContextImpl context; + private final RIBSupport support; - AdjRibOutListener(final RIBSupport ribSupport, final ChannelOutputLimiter session) { - this.ribSupport = Preconditions.checkNotNull(ribSupport); + private AdjRibOutListener(final TablesKey tablesKey, final YangInstanceIdentifier ribId, final DOMDataTreeChangeService service, final RIBSupportContextRegistry registry, final ChannelOutputLimiter session) { + this.registry = Preconditions.checkNotNull(registry); this.session = Preconditions.checkNotNull(session); + this.context = (RIBSupportContextImpl) this.registry.getRIBSupportContext(tablesKey); + this.support = this.context.getRibSupport(); + final YangInstanceIdentifier adjRibOutId = ribId.node(Peer.QNAME).node(Peer.QNAME).node(AdjRibOut.QNAME).node(Tables.QNAME).node(RibSupportUtils.toYangTablesKey(tablesKey)); + service.registerDataTreeChangeListener(new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, adjRibOutId), this); + } + + static AdjRibOutListener create(@Nonnull final TablesKey tablesKey, @Nonnull final YangInstanceIdentifier ribId, @Nonnull final DOMDataTreeChangeService service, @Nonnull final RIBSupportContextRegistry registry, @Nonnull final ChannelOutputLimiter session) { + return new AdjRibOutListener(tablesKey, ribId, service, registry, session); } @Override public void onDataTreeChanged(final Collection changes) { - for (DataTreeCandidate tc : changes) { - final UpdateBuilder ub = new UpdateBuilder(); + LOG.debug("Data change received for AdjRibOut {}", changes); + for (final DataTreeCandidate tc : changes) { + for (final DataTreeCandidateNode child : tc.getRootNode().getChildNodes()) { + for (final DataTreeCandidateNode route : this.context.getRibSupport().changedRoutes(child)) { + final Update update; + + switch (route.getModificationType()) { + case UNMODIFIED: + LOG.debug("Skipping unmodified route {}", route.getIdentifier()); + continue; + case DELETE: + // FIXME: we can batch deletions into a single batch + update = withdraw((MapEntryNode) route.getDataBefore().get()); + break; + case SUBTREE_MODIFIED: + case WRITE: + update = advertise((MapEntryNode) route.getDataAfter().get()); + break; + default: + LOG.warn("Ignoring unhandled modification type {}", route.getModificationType()); + continue; + } - // FIXME: fill the structure + LOG.debug("Writing update {}", update); + this.session.write(update); + } + } + } + this.session.flush(); + } - session.write(ub.build()); + private Attributes routeAttributes(final MapEntryNode route) { + if (LOG.isDebugEnabled()) { + LOG.debug("AdjRibOut parsing route {}", NormalizedNodes.toStringTree(route)); } - session.flush(); + final ContainerNode advertisedAttrs = (ContainerNode) NormalizedNodes.findNode(route, this.support.routeAttributesIdentifier()).orNull(); + return this.context.deserializeAttributes(advertisedAttrs); } + private Update withdraw(final MapEntryNode route) { + return this.support.buildUpdate(Collections.singleton(route), Collections.emptyList(), routeAttributes(route)); + } + + private Update advertise(final MapEntryNode route) { + return this.support.buildUpdate(Collections.emptyList(), Collections.singleton(route), routeAttributes(route)); + } } diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/ApplicationPeer.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/ApplicationPeer.java index 289b312c27..bb6b4905b5 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/ApplicationPeer.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/ApplicationPeer.java @@ -53,7 +53,8 @@ public class ApplicationPeer implements AutoCloseable, org.opendaylight.protocol this.adjRibsInId = this.targetRib.getYangRibId().node(Peer.QNAME).node(peerId).node(AdjRibIn.QNAME).node(Tables.QNAME); this.chain = this.targetRib.createPeerChain(this); this.writer = AdjRibInWriter.create(this.targetRib.getYangRibId(), PeerRole.Ibgp, this.targetRib.createPeerChain(this)); - this.writer = this.writer.transform(RouterIds.createPeerId(ipAddress), this.targetRib.getRibSupportContext(), this.targetRib.getLocalTablesKeys(), true); + // FIXME: set to true, once it's fixed how to skip advertising routes back to AppPeer + this.writer = this.writer.transform(RouterIds.createPeerId(ipAddress), this.targetRib.getRibSupportContext(), this.targetRib.getLocalTablesKeys(), false); } @Override diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPPeer.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPPeer.java index 30fa136b94..f02f5503d4 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPPeer.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPPeer.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.protocol.bgp.rib.impl; import com.google.common.base.MoreObjects; @@ -60,7 +59,6 @@ import org.opendaylight.yangtools.yang.binding.Notification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Class representing a peer. We have a single instance for each peer, which provides translation from BGP events into * RIB actions. @@ -187,6 +185,11 @@ public class BGPPeer implements ReusableBGPPeer, Peer, AutoCloseable, BGPPeerRun this.tables.add(key); this.rib.initTable(this, key); + + // not particularly nice + if (session instanceof BGPSessionImpl) { + AdjRibOutListener.create(key, this.rib.getYangRibId(), ((RIBImpl)this.rib).getService(), this.rib.getRibSupportContext(), ((BGPSessionImpl) session).getLimiter()); + } } this.ribWriter = this.ribWriter.transform(RouterIds.createPeerId(session.getBgpId()), this.rib.getRibSupportContext(), this.tables, false); diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java index 1603cabbce..5464c871fd 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java @@ -112,21 +112,22 @@ public class BGPSessionImpl extends AbstractProtocolSession implem private BGPSessionStats sessionStats; public BGPSessionImpl(final BGPSessionListener listener, final Channel channel, final Open remoteOpen, final BGPSessionPreferences localPreferences, - final BGPPeerRegistry peerRegitry) { - this(listener, channel, remoteOpen, localPreferences.getHoldTime(), peerRegitry); + final BGPPeerRegistry peerRegistry) { + this(listener, channel, remoteOpen, localPreferences.getHoldTime(), peerRegistry); this.sessionStats = new BGPSessionStats(remoteOpen, this.holdTimerValue, this.keepAlive, channel, Optional.of(localPreferences), this.tableTypes); } public BGPSessionImpl(final BGPSessionListener listener, final Channel channel, final Open remoteOpen, final int localHoldTimer, - final BGPPeerRegistry peerRegitry) { + final BGPPeerRegistry peerRegistry) { this.listener = Preconditions.checkNotNull(listener); this.channel = Preconditions.checkNotNull(channel); this.limiter = new ChannelOutputLimiter(this); + this.channel.pipeline().addLast(this.limiter); this.holdTimerValue = (remoteOpen.getHoldTimer() < localHoldTimer) ? remoteOpen.getHoldTimer() : localHoldTimer; LOG.info("BGP HoldTimer new value: {}", this.holdTimerValue); this.keepAlive = this.holdTimerValue / KA_TO_DEADTIMER_RATIO; this.asNumber = AsNumberUtil.advertizedAsNumber(remoteOpen); - this.peerRegistry = peerRegitry; + this.peerRegistry = peerRegistry; final Set tts = Sets.newHashSet(); final Set tats = Sets.newHashSet(); diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/RIBImpl.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/RIBImpl.java index 8b5f361daa..d92df8cdab 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/RIBImpl.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/RIBImpl.java @@ -139,6 +139,7 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable, private final YangInstanceIdentifier yangRibId; private final RIBSupportContextRegistryImpl ribContextRegistry; private final EffectiveRibInWriter efWriter; + private final DOMDataBrokerExtension service; private final Runnable scheduler = new Runnable() { @Override @@ -225,14 +226,14 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable, final PolicyDatabase pd = new PolicyDatabase(localAs.getValue(), localBgpId, this.clusterId); final DOMDataBrokerExtension service = this.domDataBroker.getSupportedExtensions().get(DOMDataTreeChangeService.class); - final DOMTransactionChain domChain = this.createPeerChain(this); - this.efWriter = EffectiveRibInWriter.create((DOMDataTreeChangeService) service, this.createPeerChain(this), getYangRibId(), pd, this.ribContextRegistry); + this.service = service; + this.efWriter = EffectiveRibInWriter.create(getService(), this.createPeerChain(this), getYangRibId(), pd, this.ribContextRegistry); LOG.debug("Effective RIB created."); for (final BgpTableType t : localTables) { final TablesKey key = new TablesKey(t.getAfi(), t.getSafi()); // create locRibWriter for each table - LocRibWriter.create(this.ribContextRegistry, key, this.createPeerChain(this), getYangRibId(), localAs, (DOMDataTreeChangeService) service, pd); + LocRibWriter.create(this.ribContextRegistry, key, this.createPeerChain(this), getYangRibId(), localAs, getService(), pd); } } @@ -493,6 +494,10 @@ public final class RIBImpl extends DefaultRibReference implements AutoCloseable, return this.localTablesKeys; } + public DOMDataTreeChangeService getService() { + return (DOMDataTreeChangeService) this.service; + } + @Override public YangInstanceIdentifier getYangRibId() { return this.yangRibId; diff --git a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/RIBSupportContextImpl.java b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/RIBSupportContextImpl.java index acb65186ce..6e9f75fc5e 100644 --- a/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/RIBSupportContextImpl.java +++ b/bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/RIBSupportContextImpl.java @@ -53,6 +53,7 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdent import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeBuilder; import org.slf4j.Logger; @@ -171,8 +172,13 @@ class RIBSupportContextImpl extends RIBSupportContext { return (ContainerNode) this.reachNlriCodec.serialize(nlri); } + public Attributes deserializeAttributes(final NormalizedNode attributes) { + Preconditions.checkState(this.attributesCodec != null, "Attributes codec not available"); + return this.attributesCodec.deserialize(attributes); + } + private ContainerNode serializeAttributes(final Attributes pathAttr) { - Preconditions.checkState(this.attributesCodec != null, "MpReachNlri codec not available"); + Preconditions.checkState(this.attributesCodec != null, "Attributes codec not available"); final AttributesBuilder a = new AttributesBuilder(pathAttr); a.addAugmentation(Attributes1.class, null); a.addAugmentation(Attributes2.class, null); diff --git a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/ApplicationPeerTest.java b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/ApplicationPeerTest.java index cbdb63fc38..297e2d4d1b 100644 --- a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/ApplicationPeerTest.java +++ b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/ApplicationPeerTest.java @@ -16,6 +16,8 @@ import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelPipeline; import io.netty.channel.DefaultChannelPromise; import io.netty.channel.EventLoop; import java.math.BigInteger; @@ -189,6 +191,9 @@ public class ApplicationPeerTest { @Mock Channel channel; + @Mock + ChannelPipeline pipeline; + @Mock private EventLoop eventLoop; @@ -247,6 +252,8 @@ public class ApplicationPeerTest { Mockito.doReturn(this.domTransWrite).when(this.domChain).newWriteOnlyTransaction(); Mockito.doReturn(this.eventLoop).when(this.channel).eventLoop(); Mockito.doReturn("channel").when(this.channel).toString(); + Mockito.doReturn(this.pipeline).when(this.channel).pipeline(); + Mockito.doReturn(this.pipeline).when(this.pipeline).addLast(Mockito.any(ChannelHandler.class)); Mockito.doAnswer(new Answer() { @Override diff --git a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImplTest.java b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImplTest.java index 4cb6336210..0c275238c6 100644 --- a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImplTest.java +++ b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImplTest.java @@ -137,6 +137,7 @@ public class BGPSessionImplTest { doReturn(new InetSocketAddress(InetAddress.getByName(LOCAL_IP), LOCAL_PORT)).when(this.speakerListener).localAddress(); doReturn(this.pipeline).when(this.speakerListener).pipeline(); doReturn(this.pipeline).when(this.pipeline).replace(any(ChannelHandler.class), any(String.class), any(ChannelHandler.class)); + doReturn(this.pipeline).when(this.pipeline).addLast(any(ChannelHandler.class)); doReturn(mock(ChannelFuture.class)).when(this.speakerListener).close(); this.listener = new SimpleSessionListener(); this.bgpSession = new BGPSessionImpl(this.listener, this.speakerListener, this.classicOpen, this.classicOpen.getHoldTimer(), null); diff --git a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/FSMTest.java b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/FSMTest.java index 6858007546..9ecbc613f0 100644 --- a/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/FSMTest.java +++ b/bgp/rib-impl/src/test/java/org/opendaylight/protocol/bgp/rib/impl/FSMTest.java @@ -131,6 +131,7 @@ public class FSMTest { doReturn(new InetSocketAddress(peerAddress, 179)).when(this.speakerListener).localAddress(); doReturn(this.pipeline).when(this.speakerListener).pipeline(); doReturn(this.pipeline).when(this.pipeline).replace(any(ChannelHandler.class), any(String.class), any(ChannelHandler.class)); + doReturn(this.pipeline).when(this.pipeline).addLast(any(ChannelHandler.class)); doReturn(mock(ChannelFuture.class)).when(this.speakerListener).close(); this.classicOpen = new OpenBuilder().setMyAsNumber(30).setHoldTimer(3).setVersion(new ProtocolVersion((short) 4)).setBgpParameters( tlvs).setBgpIdentifier(new Ipv4Address("1.1.1.2")).build(); -- 2.36.6