import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
import com.google.common.net.InetAddresses;
import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.bootstrap.Bootstrap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import javassist.ClassPool;
import org.junit.After;
import org.junit.Assert;
MONITOR_ID, new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MONITOR_LOCAL_PORT), Optional.of(keys),
this.mappingService.getCodecFactory(), moduleInfoBackedContext.getSchemaContext(), this.mrs);
- final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
- Assert.assertEquals(1, monitor.getMonitor().size());
- final Monitor bmpMonitor = monitor.getMonitor().get(0);
- Assert.assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
- Assert.assertEquals(0, bmpMonitor.getRouter().size());
+ readData(InstanceIdentifier.create(BmpMonitor.class), monitor -> {
+ Assert.assertEquals(1, monitor.getMonitor().size());
+ final Monitor bmpMonitor = monitor.getMonitor().get(0);
+ Assert.assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
+ Assert.assertEquals(0, bmpMonitor.getRouter().size());
+ Assert.assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
+ Assert.assertEquals(0, bmpMonitor.getRouter().size());
+ return monitor;
+ });
}
@After
this.dispatcher.close();
this.bmpApp.close();
this.mappingService.close();
- final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
- assertTrue(monitor.getMonitor().isEmpty());
+
+ readData(InstanceIdentifier.create(BmpMonitor.class), monitor -> {
+ assertTrue(monitor.getMonitor().isEmpty());
+ return monitor;
+ });
}
@Test
public void testRouterMonitoring() throws Exception {
// first test if a single router monitoring is working
final Channel channel1 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
- assertEquals(1, getBmpData(MONITOR_IID).get().getRouter().size());
+ readData(MONITOR_IID, monitor -> {
+ assertEquals(1, monitor.getRouter().size());
+ return monitor;
+ });
final Channel channel2 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_2);
- assertEquals(2, getBmpData(MONITOR_IID).get().getRouter().size());
+ readData(MONITOR_IID, monitor -> {
+ assertEquals(2, monitor.getRouter().size());
+ return monitor;
+ });
// initiate another BMP request from router 1, create a redundant connection
// we expect the connection to be closed
final Channel channel3 = connectTestClient(REMOTE_ROUTER_ADDRESS_1, this.msgRegistry);
+ Thread.sleep(500);
+
// channel 1 should still be open, while channel3 should be closed
assertTrue(channel1.isOpen());
assertFalse(channel3.isOpen());
+
// now if we close the channel 1 and try it again, it should succeed
waitFutureSuccess(channel1.close());
+
// channel 2 is still open
- assertEquals(1, getBmpData(MONITOR_IID).get().getRouter().size());
+ readData(MONITOR_IID, monitor -> {
+ assertEquals(1, monitor.getRouter().size());
+ return monitor;
+ });
Channel channel4 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
- assertEquals(2, getBmpData(MONITOR_IID).get().getRouter().size());
+ readData(MONITOR_IID, monitor -> {
+ assertEquals(2, monitor.getRouter().size());
+ return monitor;
+ });
// close all channel altogether
waitFutureSuccess(channel2.close());
+ Thread.sleep(500);
+
// sleep for a while to avoid intermittent InMemoryDataTree modification conflict
waitFutureSuccess(channel4.close());
- assertEquals(0, getBmpData(MONITOR_IID).get().getRouter().size());
+
+ readData(MONITOR_IID, monitor -> {
+ assertEquals(0, monitor.getRouter().size());
+ return monitor;
+ });
}
private static <T extends Future> void waitFutureSuccess(final T future) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
future.addListener(future1 -> latch.countDown());
Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
- Thread.sleep(500);
}
private void waitWriteAndFlushSuccess(final ChannelFuture channelFuture) throws InterruptedException {
waitFutureSuccess(channelFuture);
- Thread.sleep(500);
}
private Channel testMonitoringStation(String remoteRouterIpAddr) throws InterruptedException {
final Channel channel = connectTestClient(remoteRouterIpAddr, this.msgRegistry);
final RouterId routerId = getRouterId(remoteRouterIpAddr);
try {
- Thread.sleep(500);
-
- final Monitor monitor = getBmpData(MONITOR_IID).get();
- assertFalse(monitor.getRouter().isEmpty());
- // now find the current router instance
- Router router = null;
- for (Router r : monitor.getRouter()) {
- if (routerId.equals(r.getRouterId())) {
- router = r;
- break;
+ readData(MONITOR_IID, monitor -> {
+ assertFalse(monitor.getRouter().isEmpty());
+ // now find the current router instance
+ Router router = null;
+ for (Router r : monitor.getRouter()) {
+ if (routerId.equals(r.getRouterId())) {
+ router = r;
+ break;
+ }
}
- }
- assertNotNull(router);
- assertEquals(Status.Down, router.getStatus());
- assertTrue(router.getPeer().isEmpty());
+ assertNotNull(router);
+ assertEquals(Status.Down, router.getStatus());
+ assertTrue(router.getPeer().isEmpty());
+ return router;
+ });
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info")));
- final Monitor monitorInit = getBmpData(MONITOR_IID).get();
- assertFalse(monitorInit.getRouter().isEmpty());
- Router routerInit = null;
- for (Router r : monitorInit.getRouter()) {
- if (routerId.equals(r.getRouterId())) {
- routerInit = r;
- break;
+
+ readData(MONITOR_IID, monitor -> {
+ assertFalse(monitor.getRouter().isEmpty());
+ Router retRouter = null;
+ for (Router r : monitor.getRouter()) {
+ if (routerId.equals(r.getRouterId())) {
+ retRouter = r;
+ break;
+ }
}
- }
- assertNotNull(routerInit);
- assertEquals("some info;", routerInit.getInfo());
- assertEquals("name", routerInit.getName());
- assertEquals("description", routerInit.getDescription());
- assertEquals(routerId, routerInit.getRouterId());
- assertTrue(routerInit.getPeer().isEmpty());
- assertEquals(Status.Up, routerInit.getStatus());
+
+ assertEquals("some info;", retRouter.getInfo());
+ assertEquals("name", retRouter.getName());
+ assertEquals("description", retRouter.getDescription());
+ assertEquals(routerId, retRouter.getRouterId());
+ assertTrue(retRouter.getPeer().isEmpty());
+ assertEquals(Status.Up, retRouter.getStatus());
+ return retRouter;
+ });
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true)));
final KeyedInstanceIdentifier<Router, RouterKey> routerIId = MONITOR_IID.child(Router.class, new RouterKey(routerId));
- final List<Peer> peers = getBmpData(routerIId).get().getPeer();
- assertEquals(1, peers.size());
- final Peer peer = peers.get(0);
- assertEquals(PeerType.Global, peer.getType());
- assertEquals(PEER_ID, peer.getPeerId());
- assertEquals(PEER1, peer.getBgpId());
- assertEquals(TestUtil.IPV4_ADDRESS_10, peer.getAddress().getIpv4Address());
- assertEquals(TestUtil.PEER_AS, peer.getAs());
- assertNull(peer.getDistinguisher());
- assertNull(peer.getStats());
-
- assertNotNull(peer.getPrePolicyRib());
- assertEquals(1, peer.getPrePolicyRib().getTables().size());
- final Tables prePolicyTable = peer.getPrePolicyRib().getTables().get(0);
- assertEquals(Ipv4AddressFamily.class, prePolicyTable.getAfi());
- assertEquals(UnicastSubsequentAddressFamily.class, prePolicyTable.getSafi());
- assertFalse(prePolicyTable.getAttributes().isUptodate());
- assertNotNull(prePolicyTable.getRoutes());
-
- assertNotNull(peer.getPostPolicyRib());
- assertEquals(1, peer.getPostPolicyRib().getTables().size());
- final Tables postPolicyTable = peer.getPrePolicyRib().getTables().get(0);
- assertEquals(Ipv4AddressFamily.class, postPolicyTable.getAfi());
- assertEquals(UnicastSubsequentAddressFamily.class, postPolicyTable.getSafi());
- assertFalse(postPolicyTable.getAttributes().isUptodate());
- assertNotNull(postPolicyTable.getRoutes());
-
- assertNotNull(peer.getPeerSession());
- final PeerSession peerSession = peer.getPeerSession();
- assertEquals(TestUtil.IPV4_ADDRESS_10, peerSession.getLocalAddress().getIpv4Address());
- assertEquals(TestUtil.PEER_LOCAL_PORT, peerSession.getLocalPort());
- assertEquals(TestUtil.PEER_REMOTE_PORT, peerSession.getRemotePort());
- assertEquals(Status.Up, peerSession.getStatus());
- assertNotNull(peerSession.getReceivedOpen());
- assertNotNull(peerSession.getSentOpen());
+
+ readData(routerIId, router -> {
+ final List<Peer> peers = router.getPeer();
+ assertEquals(1, peers.size());
+ final Peer peer = peers.get(0);
+ assertEquals(PeerType.Global, peer.getType());
+ assertEquals(PEER_ID, peer.getPeerId());
+ assertEquals(PEER1, peer.getBgpId());
+ assertEquals(TestUtil.IPV4_ADDRESS_10, peer.getAddress().getIpv4Address());
+ assertEquals(TestUtil.PEER_AS, peer.getAs());
+ assertNull(peer.getDistinguisher());
+ assertNull(peer.getStats());
+
+ assertNotNull(peer.getPrePolicyRib());
+ assertEquals(1, peer.getPrePolicyRib().getTables().size());
+ final Tables prePolicyTable = peer.getPrePolicyRib().getTables().get(0);
+ assertEquals(Ipv4AddressFamily.class, prePolicyTable.getAfi());
+ assertEquals(UnicastSubsequentAddressFamily.class, prePolicyTable.getSafi());
+ assertFalse(prePolicyTable.getAttributes().isUptodate());
+ assertNotNull(prePolicyTable.getRoutes());
+
+ assertNotNull(peer.getPostPolicyRib());
+ assertEquals(1, peer.getPostPolicyRib().getTables().size());
+ final Tables postPolicyTable = peer.getPrePolicyRib().getTables().get(0);
+ assertEquals(Ipv4AddressFamily.class, postPolicyTable.getAfi());
+ assertEquals(UnicastSubsequentAddressFamily.class, postPolicyTable.getSafi());
+ assertFalse(postPolicyTable.getAttributes().isUptodate());
+ assertNotNull(postPolicyTable.getRoutes());
+
+ assertNotNull(peer.getPeerSession());
+ final PeerSession peerSession = peer.getPeerSession();
+ assertEquals(TestUtil.IPV4_ADDRESS_10, peerSession.getLocalAddress().getIpv4Address());
+ assertEquals(TestUtil.PEER_LOCAL_PORT, peerSession.getLocalPort());
+ assertEquals(TestUtil.PEER_REMOTE_PORT, peerSession.getRemotePort());
+ assertEquals(Status.Up, peerSession.getStatus());
+ assertNotNull(peerSession.getReceivedOpen());
+ assertNotNull(peerSession.getSentOpen());
+ return router;
+ });
+
final StatsReportsMessage statsMsg = TestUtil.createStatsReportMsg(PEER1);
waitWriteAndFlushSuccess(channel.writeAndFlush(statsMsg));
final KeyedInstanceIdentifier<Peer, PeerKey> peerIId = routerIId.child(Peer.class, new PeerKey(PEER_ID));
- final Stats peerStats = getBmpData(peerIId.child(Stats.class)).get();
- assertNotNull(peerStats.getTimestampSec());
- final Tlvs tlvs = statsMsg.getTlvs();
- assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
- assertEquals(tlvs.getDuplicatePrefixAdvertisementsTlv().getCount(), peerStats.getDuplicatePrefixAdvertisements());
- assertEquals(tlvs.getDuplicateWithdrawsTlv().getCount(), peerStats.getDuplicateWithdraws());
- assertEquals(tlvs.getInvalidatedAsConfedLoopTlv().getCount(), peerStats.getInvalidatedAsConfedLoop());
- assertEquals(tlvs.getInvalidatedAsPathLoopTlv().getCount(), peerStats.getInvalidatedAsPathLoop());
- assertEquals(tlvs.getInvalidatedClusterListLoopTlv().getCount(), peerStats.getInvalidatedClusterListLoop());
- assertEquals(tlvs.getInvalidatedOriginatorIdTlv().getCount(), peerStats.getInvalidatedOriginatorId());
- assertEquals(tlvs.getLocRibRoutesTlv().getCount(), peerStats.getLocRibRoutes());
- assertEquals(tlvs.getRejectedPrefixesTlv().getCount(), peerStats.getRejectedPrefixes());
- assertEquals(tlvs.getPerAfiSafiAdjRibInTlv().getCount().toString(), peerStats.getPerAfiSafiAdjRibInRoutes().getAfiSafi().get(0).getCount().toString());
- assertEquals(tlvs.getPerAfiSafiLocRibTlv().getCount().toString(), peerStats.getPerAfiSafiLocRibRoutes().getAfiSafi().get(0).getCount().toString());
+
+ readData(peerIId.child(Stats.class), peerStats -> {
+ assertNotNull(peerStats.getTimestampSec());
+ final Tlvs tlvs = statsMsg.getTlvs();
+ assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
+ assertEquals(tlvs.getDuplicatePrefixAdvertisementsTlv().getCount(), peerStats.getDuplicatePrefixAdvertisements());
+ assertEquals(tlvs.getDuplicateWithdrawsTlv().getCount(), peerStats.getDuplicateWithdraws());
+ assertEquals(tlvs.getInvalidatedAsConfedLoopTlv().getCount(), peerStats.getInvalidatedAsConfedLoop());
+ assertEquals(tlvs.getInvalidatedAsPathLoopTlv().getCount(), peerStats.getInvalidatedAsPathLoop());
+ assertEquals(tlvs.getInvalidatedClusterListLoopTlv().getCount(), peerStats.getInvalidatedClusterListLoop());
+ assertEquals(tlvs.getInvalidatedOriginatorIdTlv().getCount(), peerStats.getInvalidatedOriginatorId());
+ assertEquals(tlvs.getLocRibRoutesTlv().getCount(), peerStats.getLocRibRoutes());
+ assertEquals(tlvs.getRejectedPrefixesTlv().getCount(), peerStats.getRejectedPrefixes());
+ assertEquals(tlvs.getPerAfiSafiAdjRibInTlv().getCount().toString(), peerStats.getPerAfiSafiAdjRibInRoutes().getAfiSafi().get(0).getCount().toString());
+ assertEquals(tlvs.getPerAfiSafiLocRibTlv().getCount().toString(), peerStats.getPerAfiSafiLocRibRoutes().getAfiSafi().get(0).getCount().toString());
+ return peerStats;
+ });
// route mirror message test
final RouteMirroringMessage routeMirrorMsg = TestUtil.createRouteMirrorMsg(PEER1);
waitWriteAndFlushSuccess(channel.writeAndFlush(routeMirrorMsg));
- final Mirrors routeMirrors = getBmpData(peerIId.child(Mirrors.class)).get();
- assertNotNull(routeMirrors.getTimestampSec());
+
+ readData(peerIId.child(Mirrors.class), routeMirrors -> {
+ assertNotNull(routeMirrors.getTimestampSec());
+ return routeMirrors;
+ });
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PrePolicy)));
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PrePolicy)));
- final Tables prePolicyRib = getBmpData(peerIId.child(PrePolicyRib.class)).get().getTables().get(0);
- assertTrue(prePolicyRib.getAttributes().isUptodate());
- assertEquals(3, ((Ipv4RoutesCase) prePolicyRib.getRoutes()).getIpv4Routes().getIpv4Route().size());
+
+ readData(peerIId.child(PrePolicyRib.class), prePolicyRib -> {
+ assertTrue(!prePolicyRib.getTables().isEmpty());
+ final Tables tables = prePolicyRib.getTables().get(0);
+ assertTrue(tables.getAttributes().isUptodate());
+ assertEquals(3, ((Ipv4RoutesCase) tables.getRoutes()).getIpv4Routes().getIpv4Route().size());
+ return tables;
+ });
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PostPolicy)));
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PostPolicy)));
- final Tables postPolicyRib = getBmpData(peerIId.child(PostPolicyRib.class)).get().getTables().get(0);
- assertTrue(postPolicyRib.getAttributes().isUptodate());
- assertEquals(3, ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.post.policy.rib.tables.routes.Ipv4RoutesCase) postPolicyRib.getRoutes()).getIpv4Routes().getIpv4Route().size());
+
+ readData(peerIId.child(PostPolicyRib.class), postPolicyRib -> {
+ assertTrue(!postPolicyRib.getTables().isEmpty());
+ final Tables tables = postPolicyRib.getTables().get(0);
+ assertTrue(tables.getAttributes().isUptodate());
+ assertEquals(3, ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.post.policy.rib.tables.routes.Ipv4RoutesCase)
+ tables.getRoutes()).getIpv4Routes().getIpv4Route().size());
+ return tables;
+ });
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerDownNotification(PEER1)));
- final List<Peer> peersAfterDown = getBmpData(routerIId).get().getPeer();
- assertTrue(peersAfterDown.isEmpty());
+
+ readData(routerIId, router -> {
+ final List<Peer> peersAfterDown = router.getPeer();
+ assertTrue(peersAfterDown.isEmpty());
+ return router;
+ });
} catch (final Exception e) {
final StringBuffer ex = new StringBuffer();
ex.append(e.getMessage()).append("\n");
final BmpMonitoringStation monitoringStation2 = BmpMonitoringStationImpl.createBmpMonitorInstance(new SimpleRIBExtensionProviderContext(), this.dispatcher, getDomBroker(),
new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS_2), MONITOR_LOCAL_PORT), Optional.of(new KeyMapping()),
this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), this.mrs);
- final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
- Assert.assertEquals(2, monitor.getMonitor().size());
+
+ readData(InstanceIdentifier.create(BmpMonitor.class), monitor -> {
+ Assert.assertEquals(2, monitor.getMonitor().size());
+ return monitor;
+ });
+
monitoringStation2.close();
}
return future.channel();
}
- private <T extends DataObject> Optional<T> getBmpData(final InstanceIdentifier<T> iid) throws ReadFailedException {
- try (final ReadOnlyTransaction tx = getDataBroker().newReadOnlyTransaction()) {
- return tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet();
+ private <R, T extends DataObject> R readData(final InstanceIdentifier<T> iid, Function<T, R> function)
+ throws ReadFailedException {
+ AssertionError lastError = null;
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
+ try (final ReadOnlyTransaction tx = getDataBroker().newReadOnlyTransaction()) {
+ Optional<T> data = tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet();
+ if(data.isPresent()) {
+ try {
+ return function.apply(data.get());
+ } catch (AssertionError e) {
+ lastError = e;
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
}
+
+ throw lastError;
}
private RouterId getRouterId(String routerIp) {