import static org.junit.Assert.fail;
import com.google.common.base.Charsets;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
import com.google.common.net.InetAddresses;
+import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
import java.net.InetSocketAddress;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javassist.ClassPool;
+import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
private static final int PORT = 12345;
private static final String LOCAL_ADDRESS = "127.0.0.10";
- private static final InetSocketAddress CLIENT_REMOTE = new InetSocketAddress("127.0.0.10", PORT);
+ private static final String LOCAL_ADDRESS_2 = "127.0.0.11";
+ private static final InetSocketAddress CLIENT_REMOTE = new InetSocketAddress(LOCAL_ADDRESS, PORT);
private static final InetSocketAddress CLIENT_LOCAL = new InetSocketAddress(LOCAL_ADDRESS, 0);
private static final Ipv4Address PEER1 = new Ipv4Address("20.20.20.20");
private static final MonitorId MONITOR_ID = new MonitorId("monitor");
MONITOR_ID, new InetSocketAddress(InetAddresses.forString("127.0.0.10"), PORT), Optional.of(keys),
this.mappingService.getCodecFactory(), moduleInfoBackedContext.getSchemaContext(), this.mrs);
- final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
- Assert.assertEquals(1, monitor.getMonitor().size());
- final Monitor bmpMonitor = monitor.getMonitor().get(0);
- Assert.assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
- Assert.assertEquals(0, bmpMonitor.getRouter().size());
+ readData(InstanceIdentifier.create(BmpMonitor.class), new Function<BmpMonitor, Object>() {
+ @Nullable
+ @Override
+ public Object apply(@Nullable final BmpMonitor monitor) {
+ Assert.assertEquals(1, monitor.getMonitor().size());
+ final Monitor bmpMonitor = monitor.getMonitor().get(0);
+ Assert.assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
+ Assert.assertEquals(0, bmpMonitor.getRouter().size());
+ return monitor;
+ }
+ });
}
@After
this.dispatcher.close();
this.bmpApp.close();
this.mappingService.close();
- final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
- assertTrue(monitor.getMonitor().isEmpty());
+
+ readData(InstanceIdentifier.create(BmpMonitor.class), new Function<BmpMonitor, Object>() {
+ @Nullable
+ @Override
+ public Object apply(@Nullable final BmpMonitor monitor) {
+ assertTrue(monitor.getMonitor().isEmpty());
+ return monitor;
+ }
+ });
}
@Test
Thread.sleep(500);
final KeyedInstanceIdentifier<Monitor, MonitorKey> monitorIId = InstanceIdentifier.create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
- final Monitor monitor = getBmpData(monitorIId).get();
- assertEquals(1, monitor.getRouter().size());
- final Router router = monitor.getRouter().get(0);
- assertEquals(ROUTER_ID, router.getRouterId());
- assertEquals(Status.Down, router.getStatus());
- assertTrue(router.getPeer().isEmpty());
+ readData(monitorIId, new Function<Monitor, Object>() {
+ @Nullable
+ @Override
+ public Object apply(@Nullable final Monitor monitor) {
+ assertEquals(1, monitor.getRouter().size());
+ final Router router = monitor.getRouter().get(0);
+ assertEquals(ROUTER_ID, router.getRouterId());
+ assertEquals(Status.Down, router.getStatus());
+ assertTrue(router.getPeer().isEmpty());
+ return monitor;
+ }
+ });
+ waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info")));
+ readData(monitorIId, new Function<Monitor, Object>() {
+ @Nullable
+ @Override
+ public Object apply(@Nullable final Monitor monitor) {
+ assertEquals(1, monitor.getRouter().size());
+ final Router routerInit = monitor.getRouter().get(0);
+ assertEquals("some info;", routerInit.getInfo());
+ assertEquals("name", routerInit.getName());
+ assertEquals("description", routerInit.getDescription());
+ assertEquals(ROUTER_ID, routerInit.getRouterId());
+ assertTrue(routerInit.getPeer().isEmpty());
+ assertEquals(Status.Up, routerInit.getStatus());
+ return monitor;
+ }
+ });
+
+ waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true)));
- channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info"));
- Thread.sleep(500);
- final Monitor monitorInit = getBmpData(monitorIId).get();
- assertEquals(1, monitorInit.getRouter().size());
- final Router routerInit = monitorInit.getRouter().get(0);
- assertEquals("some info;", routerInit.getInfo());
- assertEquals("name", routerInit.getName());
- assertEquals("description", routerInit.getDescription());
- assertEquals(ROUTER_ID, routerInit.getRouterId());
- assertTrue(routerInit.getPeer().isEmpty());
- assertEquals(Status.Up, routerInit.getStatus());
-
- channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true));
- Thread.sleep(500);
final KeyedInstanceIdentifier<Router, RouterKey> routerIId = monitorIId.child(Router.class, new RouterKey(ROUTER_ID));
- final List<Peer> peers = getBmpData(routerIId).get().getPeer();
- assertEquals(1, peers.size());
- final Peer peer = peers.get(0);
- assertEquals(PeerType.Global, peer.getType());
- assertEquals(PEER_ID, peer.getPeerId());
- assertEquals(PEER1, peer.getBgpId());
- assertEquals(TestUtil.IPV4_ADDRESS_10, peer.getAddress().getIpv4Address());
- assertEquals(TestUtil.PEER_AS, peer.getAs());
- assertNull(peer.getDistinguisher());
- assertNull(peer.getStats());
-
- assertNotNull(peer.getPrePolicyRib());
- assertEquals(1, peer.getPrePolicyRib().getTables().size());
- final Tables prePolicyTable = peer.getPrePolicyRib().getTables().get(0);
- assertEquals(Ipv4AddressFamily.class, prePolicyTable.getAfi());
- assertEquals(UnicastSubsequentAddressFamily.class, prePolicyTable.getSafi());
- assertFalse(prePolicyTable.getAttributes().isUptodate());
- assertNotNull(prePolicyTable.getRoutes());
-
- assertNotNull(peer.getPostPolicyRib());
- assertEquals(1, peer.getPostPolicyRib().getTables().size());
- final Tables postPolicyTable = peer.getPrePolicyRib().getTables().get(0);
- assertEquals(Ipv4AddressFamily.class, postPolicyTable.getAfi());
- assertEquals(UnicastSubsequentAddressFamily.class, postPolicyTable.getSafi());
- assertFalse(postPolicyTable.getAttributes().isUptodate());
- assertNotNull(postPolicyTable.getRoutes());
-
- assertNotNull(peer.getPeerSession());
- final PeerSession peerSession = peer.getPeerSession();
- assertEquals(TestUtil.IPV4_ADDRESS_10, peerSession.getLocalAddress().getIpv4Address());
- assertEquals(TestUtil.PEER_LOCAL_PORT, peerSession.getLocalPort());
- assertEquals(TestUtil.PEER_REMOTE_PORT, peerSession.getRemotePort());
- assertEquals(Status.Up, peerSession.getStatus());
+ readData(routerIId, new Function<Router, Object>() {
+ @Nullable
+ @Override
+ public Object apply(@Nullable final Router router) {
+ final List<Peer> peers = router.getPeer();
+ assertEquals(1, peers.size());
+ final Peer peer = peers.get(0);
+ assertEquals(PeerType.Global, peer.getType());
+ assertEquals(PEER_ID, peer.getPeerId());
+ assertEquals(PEER1, peer.getBgpId());
+ assertEquals(TestUtil.IPV4_ADDRESS_10, peer.getAddress().getIpv4Address());
+ assertEquals(TestUtil.PEER_AS, peer.getAs());
+ assertNull(peer.getDistinguisher());
+ assertNull(peer.getStats());
+
+ assertNotNull(peer.getPrePolicyRib());
+ assertEquals(1, peer.getPrePolicyRib().getTables().size());
+ final Tables prePolicyTable = peer.getPrePolicyRib().getTables().get(0);
+ assertEquals(Ipv4AddressFamily.class, prePolicyTable.getAfi());
+ assertEquals(UnicastSubsequentAddressFamily.class, prePolicyTable.getSafi());
+ assertFalse(prePolicyTable.getAttributes().isUptodate());
+ assertNotNull(prePolicyTable.getRoutes());
+
+ assertNotNull(peer.getPostPolicyRib());
+ assertEquals(1, peer.getPostPolicyRib().getTables().size());
+ final Tables postPolicyTable = peer.getPrePolicyRib().getTables().get(0);
+ assertEquals(Ipv4AddressFamily.class, postPolicyTable.getAfi());
+ assertEquals(UnicastSubsequentAddressFamily.class, postPolicyTable.getSafi());
+ assertFalse(postPolicyTable.getAttributes().isUptodate());
+ assertNotNull(postPolicyTable.getRoutes());
+
+ assertNotNull(peer.getPeerSession());
+ final PeerSession peerSession = peer.getPeerSession();
+ assertEquals(TestUtil.IPV4_ADDRESS_10, peerSession.getLocalAddress().getIpv4Address());
+ assertEquals(TestUtil.PEER_LOCAL_PORT, peerSession.getLocalPort());
+ assertEquals(TestUtil.PEER_REMOTE_PORT, peerSession.getRemotePort());
+ assertEquals(Status.Up, peerSession.getStatus());
+ return router;
+ }
+ });
+
final StatsReportsMessage statsMsg = TestUtil.createStatsReportMsg(PEER1);
- channel.writeAndFlush(statsMsg);
- Thread.sleep(500);
+ waitWriteAndFlushSuccess(channel.writeAndFlush(statsMsg));
final KeyedInstanceIdentifier<Peer, PeerKey> peerIId = routerIId.child(Peer.class, new PeerKey(PEER_ID));
- final Stats peerStats = getBmpData(peerIId.child(Stats.class)).get();
- assertNotNull(peerStats.getTimestampSec());
- final Tlvs tlvs = statsMsg.getTlvs();
- assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
- assertEquals(tlvs.getDuplicatePrefixAdvertisementsTlv().getCount(), peerStats.getDuplicatePrefixAdvertisements());
- assertEquals(tlvs.getDuplicateWithdrawsTlv().getCount(), peerStats.getDuplicateWithdraws());
- assertEquals(tlvs.getInvalidatedAsConfedLoopTlv().getCount(), peerStats.getInvalidatedAsConfedLoop());
- assertEquals(tlvs.getInvalidatedAsPathLoopTlv().getCount(), peerStats.getInvalidatedAsPathLoop());
- assertEquals(tlvs.getInvalidatedClusterListLoopTlv().getCount(), peerStats.getInvalidatedClusterListLoop());
- assertEquals(tlvs.getInvalidatedOriginatorIdTlv().getCount(), peerStats.getInvalidatedOriginatorId());
- assertEquals(tlvs.getLocRibRoutesTlv().getCount(), peerStats.getLocRibRoutes());
- assertEquals(tlvs.getRejectedPrefixesTlv().getCount(), peerStats.getRejectedPrefixes());
- assertEquals(tlvs.getPerAfiSafiAdjRibInTlv().getCount().toString(), peerStats.getPerAfiSafiAdjRibInRoutes().getAfiSafi().get(0).getCount().toString());
- assertEquals(tlvs.getPerAfiSafiLocRibTlv().getCount().toString(), peerStats.getPerAfiSafiLocRibRoutes().getAfiSafi().get(0).getCount().toString());
-
- // route mirror message test
+ readData(peerIId.child(Stats.class), new Function<Stats, Object>() {
+ @Nullable
+ @Override
+ public Object apply(@Nullable final Stats peerStats) {
+ assertNotNull(peerStats.getTimestampSec());
+ final Tlvs tlvs = statsMsg.getTlvs();
+ assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
+ assertEquals(tlvs.getDuplicatePrefixAdvertisementsTlv().getCount(), peerStats.getDuplicatePrefixAdvertisements());
+ assertEquals(tlvs.getDuplicateWithdrawsTlv().getCount(), peerStats.getDuplicateWithdraws());
+ assertEquals(tlvs.getInvalidatedAsConfedLoopTlv().getCount(), peerStats.getInvalidatedAsConfedLoop());
+ assertEquals(tlvs.getInvalidatedAsPathLoopTlv().getCount(), peerStats.getInvalidatedAsPathLoop());
+ assertEquals(tlvs.getInvalidatedClusterListLoopTlv().getCount(), peerStats.getInvalidatedClusterListLoop());
+ assertEquals(tlvs.getInvalidatedOriginatorIdTlv().getCount(), peerStats.getInvalidatedOriginatorId());
+ assertEquals(tlvs.getLocRibRoutesTlv().getCount(), peerStats.getLocRibRoutes());
+ assertEquals(tlvs.getRejectedPrefixesTlv().getCount(), peerStats.getRejectedPrefixes());
+ assertEquals(tlvs.getPerAfiSafiAdjRibInTlv().getCount().toString(), peerStats.getPerAfiSafiAdjRibInRoutes().getAfiSafi().get(0).getCount().toString());
+ assertEquals(tlvs.getPerAfiSafiLocRibTlv().getCount().toString(), peerStats.getPerAfiSafiLocRibRoutes().getAfiSafi().get(0).getCount().toString());
+ return peerStats;
+ }
+ });
+ // route mirror message test
final RouteMirroringMessage routeMirrorMsg = TestUtil.createRouteMirrorMsg(PEER1);
- channel.writeAndFlush(routeMirrorMsg);
- Thread.sleep(500);
- final Mirrors routeMirrors = getBmpData(peerIId.child(Mirrors.class)).get();
- assertNotNull(routeMirrors.getTimestampSec());
-
- channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PrePolicy));
- channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PrePolicy));
- Thread.sleep(500);
- final Tables prePolicyRib = getBmpData(peerIId.child(PrePolicyRib.class)).get().getTables().get(0);
- assertTrue(prePolicyRib.getAttributes().isUptodate());
- assertEquals(3, ((Ipv4RoutesCase) prePolicyRib.getRoutes()).getIpv4Routes().getIpv4Route().size());
-
- channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PostPolicy));
- channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PostPolicy));
- Thread.sleep(500);
- final Tables postPolicyRib = getBmpData(peerIId.child(PostPolicyRib.class)).get().getTables().get(0);
- assertTrue(postPolicyRib.getAttributes().isUptodate());
- assertEquals(3, ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.post.policy.rib.tables.routes.Ipv4RoutesCase) postPolicyRib.getRoutes()).getIpv4Routes().getIpv4Route().size());
-
- channel.writeAndFlush(TestUtil.createPeerDownNotification(PEER1));
- Thread.sleep(500);
- final List<Peer> peersAfterDown = getBmpData(routerIId).get().getPeer();
- assertTrue(peersAfterDown.isEmpty());
+ waitWriteAndFlushSuccess(channel.writeAndFlush(routeMirrorMsg));
+ readData(peerIId.child(Mirrors.class), new Function<Mirrors, Object>() {
+ @Nullable
+ @Override
+ public Object apply(@Nullable final Mirrors routeMirrors) {
+ assertNotNull(routeMirrors.getTimestampSec());
+ return routeMirrors;
+ }
+ });
+
+ waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PrePolicy)));
+ waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PrePolicy)));
+ readData(peerIId.child(PrePolicyRib.class), new Function<PrePolicyRib, Object>() {
+ @Nullable
+ @Override
+ public Object apply(@Nullable final PrePolicyRib prePolicyRib) {
+ final Tables table = prePolicyRib.getTables().get(0);
+ assertTrue(table.getAttributes().isUptodate());
+ assertEquals(3, ((Ipv4RoutesCase) table.getRoutes()).getIpv4Routes().getIpv4Route().size());
+ return prePolicyRib;
+ }
+ });
+
+
+ waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PostPolicy)));
+ waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PostPolicy)));
+ readData(peerIId.child(PostPolicyRib.class), new Function<PostPolicyRib, Object>() {
+ @Nullable
+ @Override
+ public Object apply(@Nullable final PostPolicyRib postPolicyRib) {
+ final Tables tables = postPolicyRib.getTables().get(0);
+ assertTrue(tables.getAttributes().isUptodate());
+ assertEquals(3, ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.post.policy.rib.tables.routes.Ipv4RoutesCase) tables.getRoutes()).getIpv4Routes().getIpv4Route().size());
+ return postPolicyRib;
+ }
+ });
+ waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerDownNotification(PEER1)));
+ readData(routerIId, new Function<Router, Object>() {
+ @Nullable
+ @Override
+ public Object apply(@Nullable final Router peersAfterDown) {
+ assertTrue(peersAfterDown.getPeer().isEmpty());
+ return peersAfterDown;
+ }
+ });
channel.close().await();
- Thread.sleep(500);
- final Monitor monitorAfterClose = getBmpData(monitorIId).get();
- assertTrue(monitorAfterClose.getRouter().isEmpty());
+ readData(monitorIId, new Function<Monitor, Object>() {
+ @Nullable
+ @Override
+ public Object apply(@Nullable final Monitor monitorAfterClose) {
+ assertTrue(monitorAfterClose.getRouter().isEmpty());
+ return monitorAfterClose;
+ }
+ });
+
} catch (final Exception e) {
final StringBuffer ex = new StringBuffer();
ex.append(e.getMessage() + "\n");
for (final StackTraceElement element: e.getStackTrace()) {
ex.append(element.toString() + "\n");
- };
+ }
fail(ex.toString());
}
}
@Test
public void deploySecondInstance() throws Exception {
final BmpMonitoringStation monitoringStation2 = BmpMonitoringStationImpl.createBmpMonitorInstance(new SimpleRIBExtensionProviderContext(), this.dispatcher, getDomBroker(),
- new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString("127.0.0.11"), PORT), Optional.of(new KeyMapping()),
- this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), this.mrs);
- final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
- Assert.assertEquals(2, monitor.getMonitor().size());
+ new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString(LOCAL_ADDRESS_2), PORT), Optional.of(new KeyMapping()),
+ this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), null);
+
+ readData(InstanceIdentifier.create(BmpMonitor.class), new Function<BmpMonitor, Object>() {
+ @Nullable
+ @Override
+ public Object apply(@Nullable final BmpMonitor monitor) {
+ Assert.assertEquals(2, monitor.getMonitor().size());
+ return monitor;
+ }
+ });
+
monitoringStation2.close();
}
return tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet();
}
}
+
+ private <R, T extends DataObject> R readData(final InstanceIdentifier<T> iid, final Function<T, R> function)
+ throws ReadFailedException {
+ AssertionError lastError = null;
+ final Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
+ try (final ReadOnlyTransaction tx = getDataBroker().newReadOnlyTransaction()) {
+ final Optional<T> data = tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet();
+ if(data.isPresent()) {
+ try {
+ return function.apply(data.get());
+ } catch (final AssertionError e) {
+ lastError = e;
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ }
+
+ throw lastError;
+ }
+
+ private static <T extends Future> void waitFutureSuccess(final T future) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ future.addListener(new FutureListener<T>() {
+ @Override
+ public void operationComplete(final Future future) throws Exception {
+ latch.countDown();
+ }
+ });
+ Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
+ }
+
+ private void waitWriteAndFlushSuccess(final ChannelFuture channelFuture) throws InterruptedException {
+ waitFutureSuccess(channelFuture);
+ }
}