Remove deprecated class AbstractDataBrokerTest under BGP
[bgpcep.git] / bgp / bmp-impl / src / test / java / org / opendaylight / protocol / bmp / impl / app / BmpMonitorImplTest.java
index 9226678600d8cbac2b638d2741ed2f02b1974a69..176ff8a3412a5159a8315f28a6b8a5840f69c293 100644 (file)
@@ -14,6 +14,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.opendaylight.protocol.util.CheckUtil.readData;
+import static org.opendaylight.protocol.util.CheckUtil.waitFutureSuccess;
 
 import com.google.common.base.Optional;
 import com.google.common.net.InetAddresses;
@@ -22,6 +24,10 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
@@ -32,35 +38,36 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
-import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.binding.test.AbstractConcurrentDataBrokerTest;
+import org.opendaylight.mdsal.binding.generator.impl.GeneratedClassLoadingStrategy;
+import org.opendaylight.mdsal.binding.generator.impl.ModuleInfoBackedContext;
+import org.opendaylight.mdsal.binding.generator.util.JavassistUtils;
+import org.opendaylight.protocol.bgp.inet.RIBActivator;
 import org.opendaylight.protocol.bgp.parser.impl.BGPActivator;
 import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionProviderContext;
 import org.opendaylight.protocol.bgp.parser.spi.pojo.SimpleBGPExtensionProviderContext;
-import org.opendaylight.protocol.bgp.rib.impl.RIBActivator;
 import org.opendaylight.protocol.bgp.rib.spi.RIBExtensionProviderContext;
 import org.opendaylight.protocol.bgp.rib.spi.SimpleRIBExtensionProviderContext;
 import org.opendaylight.protocol.bmp.api.BmpDispatcher;
-import org.opendaylight.protocol.bmp.impl.BmpActivator;
 import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
 import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
 import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
 import org.opendaylight.protocol.bmp.impl.spi.BmpMonitoringStation;
-import org.opendaylight.protocol.bmp.impl.test.TestUtil;
+import org.opendaylight.protocol.bmp.parser.BmpActivator;
+import org.opendaylight.protocol.bmp.parser.message.TestUtil;
 import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
 import org.opendaylight.protocol.bmp.spi.registry.SimpleBmpExtensionProviderContext;
-import org.opendaylight.tcpmd5.api.KeyMapping;
-import org.opendaylight.tcpmd5.netty.MD5ServerChannelFactory;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.IpAddress;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
+import org.opendaylight.protocol.concepts.KeyMapping;
+import org.opendaylight.protocol.util.CheckUtil;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.pre.policy.rib.tables.routes.Ipv4RoutesCase;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.update.attributes.mp.reach.nlri.advertized.routes.destination.type.DestinationIpv4Case;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.open.message.BgpParameters;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.CParameters1;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.open.bgp.parameters.optional.capabilities.c.parameters.MultiprotocolCapability;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.mp.capabilities.MultiprotocolCapability;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.multiprotocol.rev130919.update.attributes.mp.reach.nlri.AdvertizedRoutes;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.PeerId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.rib.rev130925.rib.Tables;
@@ -69,6 +76,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.type
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.AdjRibInType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.InitiationMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.PeerType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.RouteMirroringMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.StatsReportsMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.peer.up.ReceivedOpen;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.message.rev150512.peer.up.SentOpen;
@@ -81,6 +89,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.moni
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.bmp.monitor.MonitorKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.Peer;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.PeerKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.Mirrors;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.PeerSession;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.PostPolicyRib;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.peers.peer.PrePolicyRib;
@@ -89,25 +98,24 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.moni
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.routers.RouterKey;
 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
-import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy;
-import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
-import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
-import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
 
-public class BmpMonitorImplTest extends AbstractDataBrokerTest {
-
-    private static final int PORT = 12345;
-    private static final String LOCAL_ADDRESS = "127.0.0.11";
-    private static final InetSocketAddress CLIENT_REMOTE = new InetSocketAddress("127.0.0.10", PORT);
-    private static final InetSocketAddress CLIENT_LOCAL = new InetSocketAddress(LOCAL_ADDRESS, 0);
+public class BmpMonitorImplTest extends AbstractConcurrentDataBrokerTest {
+    // the local port and address where the monitor (ODL) will listen for incoming BMP request
+    private static final int MONITOR_LOCAL_PORT = 12345;
+    private static final String MONITOR_LOCAL_ADDRESS = "127.0.0.10";
+    private static final String MONITOR_LOCAL_ADDRESS_2 = "127.0.0.11";
+    // the router (monitee) address where we are going to simulate a BMP request from
+    private static final String REMOTE_ROUTER_ADDRESS_1 = "127.0.0.12";
+    private static final String REMOTE_ROUTER_ADDRESS_2 = "127.0.0.13";
     private static final Ipv4Address PEER1 = new Ipv4Address("20.20.20.20");
     private static final MonitorId MONITOR_ID = new MonitorId("monitor");
-    private static final RouterId ROUTER_ID = new RouterId(new IpAddress(new Ipv4Address(LOCAL_ADDRESS)));
+    private static final KeyedInstanceIdentifier<Monitor, MonitorKey> MONITOR_IID = InstanceIdentifier.create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
     private static final PeerId PEER_ID = new PeerId(PEER1.getValue());
-
+    private static final String MD5_PASSWORD = "abcdef";
+    private static final InstanceIdentifier<BmpMonitor> BMP_II = InstanceIdentifier.create(BmpMonitor.class);
     private BindingToNormalizedNodeCodec mappingService;
     private RIBActivator ribActivator;
     private BGPActivator bgpActivator;
@@ -115,22 +123,26 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
     private BmpDispatcher dispatcher;
     private BmpMonitoringStation bmpApp;
     private BmpMessageRegistry msgRegistry;
+    private ModuleInfoBackedContext moduleInfoBackedContext;
+
 
     @Before
     public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
         this.mappingService = new BindingToNormalizedNodeCodec(GeneratedClassLoadingStrategy.getTCCLClassLoadingStrategy(),
                 new BindingNormalizedNodeCodecRegistry(StreamWriterGenerator.create(JavassistUtils.forClassPool(ClassPool.getDefault()))));
-        final ModuleInfoBackedContext moduleInfoBackedContext = ModuleInfoBackedContext.create();
-        moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(InitiationMessage.class));
-        moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(CParameters1.class));
-        moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(BgpParameters.class));
-        moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(MultiprotocolCapability.class));
-        moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(DestinationIpv4Case.class));
-        moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(AdvertizedRoutes.class));
-        moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(SentOpen.class));
-        moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(ReceivedOpen.class));
-        this.mappingService.onGlobalContextUpdated(moduleInfoBackedContext.tryToCreateSchemaContext().get());
+        this.moduleInfoBackedContext = ModuleInfoBackedContext.create();
+        this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(InitiationMessage.class));
+        this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(CParameters1.class));
+        this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(BgpParameters.class));
+        this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(MultiprotocolCapability.class));
+        this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(DestinationIpv4Case.class));
+        this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(AdvertizedRoutes.class));
+        this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(SentOpen.class));
+        this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(ReceivedOpen.class));
+        this.mappingService.onGlobalContextUpdated(this.moduleInfoBackedContext.tryToCreateSchemaContext().get());
 
+        final KeyMapping keys = KeyMapping.getKeyMapping(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MD5_PASSWORD);
         this.ribActivator = new RIBActivator();
         final RIBExtensionProviderContext ribExtension = new SimpleRIBExtensionProviderContext();
         this.ribActivator.startRIBExtensionProvider(ribExtension);
@@ -139,22 +151,26 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
         final BGPExtensionProviderContext context = new SimpleBGPExtensionProviderContext();
         this.bgpActivator.start(context);
         final SimpleBmpExtensionProviderContext ctx = new SimpleBmpExtensionProviderContext();
-        this.bmpActivator = new BmpActivator(context.getMessageRegistry());
+        this.bmpActivator = new BmpActivator(context);
         this.bmpActivator.start(ctx);
         this.msgRegistry = ctx.getBmpMessageRegistry();
 
         this.dispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(),
-                ctx.getBmpMessageRegistry(), new DefaultBmpSessionFactory(), Optional.<MD5ServerChannelFactory<?>>absent());
+                ctx.getBmpMessageRegistry(), new DefaultBmpSessionFactory());
 
         this.bmpApp = BmpMonitoringStationImpl.createBmpMonitorInstance(ribExtension, this.dispatcher, getDomBroker(),
-                MONITOR_ID, new InetSocketAddress(InetAddresses.forString("0.0.0.0"), PORT), Optional.<KeyMapping>absent(),
-                this.mappingService.getCodecFactory(), moduleInfoBackedContext.getSchemaContext());
-
-        final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
-        Assert.assertEquals(1, monitor.getMonitor().size());
-        final Monitor bmpMonitor = monitor.getMonitor().get(0);
-        Assert.assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
-        Assert.assertEquals(0, bmpMonitor.getRouter().size());
+                MONITOR_ID, new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MONITOR_LOCAL_PORT), Optional.of(keys),
+                this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), null);
+
+        readData(getDataBroker(), BMP_II, monitor -> {
+            Assert.assertEquals(1, monitor.getMonitor().size());
+            final Monitor bmpMonitor = monitor.getMonitor().get(0);
+            Assert.assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
+            Assert.assertEquals(0, bmpMonitor.getRouter().size());
+            Assert.assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
+            Assert.assertEquals(0, bmpMonitor.getRouter().size());
+            return monitor;
+        });
     }
 
     @After
@@ -165,124 +181,251 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
         this.dispatcher.close();
         this.bmpApp.close();
         this.mappingService.close();
-        final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
-        assertTrue(monitor.getMonitor().isEmpty());
+
+        readData(getDataBroker(), BMP_II, monitor -> {
+            assertTrue(monitor.getMonitor().isEmpty());
+            return monitor;
+        });
     }
 
     @Test
-    public void testMonitoringStation() throws InterruptedException {
-        final Channel channel = connectTestClient(this.msgRegistry).channel();
-        try {
-            Thread.sleep(500);
-            final KeyedInstanceIdentifier<Monitor, MonitorKey> monitorIId = InstanceIdentifier.create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
-            final Monitor monitor = getBmpData(monitorIId).get();
+    public void testRouterMonitoring() throws Exception {
+        // first test if a single router monitoring is working
+        final Channel channel1 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
+        readData(getDataBroker(), MONITOR_IID, monitor -> {
+            assertEquals(1, monitor.getRouter().size());
+            return monitor;
+        });
+
+        final Channel channel2 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_2);
+        readData(getDataBroker(), MONITOR_IID, monitor -> {
+            assertEquals(2, monitor.getRouter().size());
+            return monitor;
+        });
+
+        // initiate another BMP request from router 1, create a redundant connection
+        // we expect the connection to be closed
+        final Channel channel3 = connectTestClient(REMOTE_ROUTER_ADDRESS_1, this.msgRegistry);
+
+
+        // channel 1 should still be open, while channel3 should be closed
+        CheckUtil.checkEquals(()-> assertTrue(channel1.isOpen()));
+        CheckUtil.checkEquals(()-> assertFalse(channel3.isOpen()));
+
+        // now if we close the channel 1 and try it again, it should succeed
+        waitFutureSuccess(channel1.close());
+
+        // channel 2 is still open
+        readData(getDataBroker(), MONITOR_IID, monitor -> {
             assertEquals(1, monitor.getRouter().size());
-            final Router router = monitor.getRouter().get(0);
-            assertEquals(ROUTER_ID, router.getRouterId());
-            assertEquals(Status.Down, router.getStatus());
-            assertTrue(router.getPeer().isEmpty());
-
-            channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info"));
-            Thread.sleep(500);
-            final Monitor monitorInit = getBmpData(monitorIId).get();
-            assertEquals(1, monitorInit.getRouter().size());
-            final Router routerInit = monitorInit.getRouter().get(0);
-            assertEquals("some info;", routerInit.getInfo());
-            assertEquals("name", routerInit.getName());
-            assertEquals("description", routerInit.getDescription());
-            assertEquals(ROUTER_ID, routerInit.getRouterId());
-            assertTrue(routerInit.getPeer().isEmpty());
-            assertEquals(Status.Up, routerInit.getStatus());
-
-            channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true));
-            Thread.sleep(500);
-            final KeyedInstanceIdentifier<Router, RouterKey> routerIId = monitorIId.child(Router.class, new RouterKey(ROUTER_ID));
-            final List<Peer> peers = getBmpData(routerIId).get().getPeer();
-            assertEquals(1, peers.size());
-            final Peer peer = peers.get(0);
-            assertEquals(PeerType.Global, peer.getType());
-            assertEquals(PEER_ID, peer.getPeerId());
-            assertEquals(PEER1, peer.getBgpId());
-            assertEquals(TestUtil.IPV4_ADDRESS_10, peer.getAddress().getIpv4Address());
-            assertEquals(TestUtil.PEER_AS, peer.getAs());
-            assertNull(peer.getDistinguisher());
-            assertNull(peer.getStats());
-
-            assertNotNull(peer.getPrePolicyRib());
-            assertEquals(1, peer.getPrePolicyRib().getTables().size());
-            final Tables prePolicyTable = peer.getPrePolicyRib().getTables().get(0);
-            assertEquals(Ipv4AddressFamily.class, prePolicyTable.getAfi());
-            assertEquals(UnicastSubsequentAddressFamily.class, prePolicyTable.getSafi());
-            assertFalse(prePolicyTable.getAttributes().isUptodate());
-            assertNotNull(prePolicyTable.getRoutes());
-
-            assertNotNull(peer.getPostPolicyRib());
-            assertEquals(1, peer.getPostPolicyRib().getTables().size());
-            final Tables postPolicyTable = peer.getPrePolicyRib().getTables().get(0);
-            assertEquals(Ipv4AddressFamily.class, postPolicyTable.getAfi());
-            assertEquals(UnicastSubsequentAddressFamily.class, postPolicyTable.getSafi());
-            assertFalse(postPolicyTable.getAttributes().isUptodate());
-            assertNotNull(postPolicyTable.getRoutes());
-
-            assertNotNull(peer.getPeerSession());
-            final PeerSession peerSession = peer.getPeerSession();
-            assertEquals(TestUtil.IPV4_ADDRESS_10, peerSession.getLocalAddress().getIpv4Address());
-            assertEquals(TestUtil.PEER_LOCAL_PORT, peerSession.getLocalPort());
-            assertEquals(TestUtil.PEER_REMOTE_PORT, peerSession.getRemotePort());
-            assertEquals(Status.Up, peerSession.getStatus());
+            return monitor;
+        });
+
+        final Channel channel4 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
+        readData(getDataBroker(), MONITOR_IID, monitor -> {
+            assertEquals(2, monitor.getRouter().size());
+            return monitor;
+        });
+
+        // close all channel altogether
+        waitFutureSuccess(channel2.close());
+        Thread.sleep(100);
+
+        // sleep for a while to avoid intermittent InMemoryDataTree modification conflict
+        waitFutureSuccess(channel4.close());
+
+        readData(getDataBroker(), MONITOR_IID, monitor -> {
+            assertEquals(0, monitor.getRouter().size());
+            return monitor;
+        });
+    }
+
+    private void waitWriteAndFlushSuccess(final ChannelFuture channelFuture) throws InterruptedException {
+        waitFutureSuccess(channelFuture);
+    }
+
+    private Channel testMonitoringStation(final String remoteRouterIpAddr) throws InterruptedException {
+        final Channel channel = connectTestClient(remoteRouterIpAddr, this.msgRegistry);
+        final RouterId routerId = getRouterId(remoteRouterIpAddr);
+        try {
+            readData(getDataBroker(), MONITOR_IID, monitor -> {
+                assertFalse(monitor.getRouter().isEmpty());
+                // now find the current router instance
+                Router router = null;
+                for (final Router r : monitor.getRouter()) {
+                    if (routerId.equals(r.getRouterId())) {
+                        router = r;
+                        break;
+                    }
+                }
+                assertNotNull(router);
+                assertEquals(Status.Down, router.getStatus());
+                assertTrue(router.getPeer().isEmpty());
+                return router;
+            });
+
+            waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info")));
+
+            readData(getDataBroker(), MONITOR_IID, monitor -> {
+                assertFalse(monitor.getRouter().isEmpty());
+                Router retRouter = null;
+                for (final Router r : monitor.getRouter()) {
+                    if (routerId.equals(r.getRouterId())) {
+                        retRouter = r;
+                        break;
+                    }
+                }
+
+                assertEquals("some info;", retRouter.getInfo());
+                assertEquals("name", retRouter.getName());
+                assertEquals("description", retRouter.getDescription());
+                assertEquals(routerId, retRouter.getRouterId());
+                assertTrue(retRouter.getPeer().isEmpty());
+                assertEquals(Status.Up, retRouter.getStatus());
+                return retRouter;
+            });
+
+            waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true)));
+            final KeyedInstanceIdentifier<Router, RouterKey> routerIId = MONITOR_IID.child(Router.class, new RouterKey(routerId));
+
+            readData(getDataBroker(), routerIId, router -> {
+                final List<Peer> peers = router.getPeer();
+                assertEquals(1, peers.size());
+                final Peer peer = peers.get(0);
+                assertEquals(PeerType.Global, peer.getType());
+                assertEquals(PEER_ID, peer.getPeerId());
+                assertEquals(PEER1, peer.getBgpId());
+                assertEquals(TestUtil.IPV4_ADDRESS_10, peer.getAddress().getIpv4Address());
+                assertEquals(TestUtil.PEER_AS, peer.getAs());
+                assertNull(peer.getDistinguisher());
+                assertNull(peer.getStats());
+
+                assertNotNull(peer.getPrePolicyRib());
+                assertEquals(1, peer.getPrePolicyRib().getTables().size());
+                final Tables prePolicyTable = peer.getPrePolicyRib().getTables().get(0);
+                assertEquals(Ipv4AddressFamily.class, prePolicyTable.getAfi());
+                assertEquals(UnicastSubsequentAddressFamily.class, prePolicyTable.getSafi());
+                assertFalse(prePolicyTable.getAttributes().isUptodate());
+                assertNotNull(prePolicyTable.getRoutes());
+
+                assertNotNull(peer.getPostPolicyRib());
+                assertEquals(1, peer.getPostPolicyRib().getTables().size());
+                final Tables postPolicyTable = peer.getPrePolicyRib().getTables().get(0);
+                assertEquals(Ipv4AddressFamily.class, postPolicyTable.getAfi());
+                assertEquals(UnicastSubsequentAddressFamily.class, postPolicyTable.getSafi());
+                assertFalse(postPolicyTable.getAttributes().isUptodate());
+                assertNotNull(postPolicyTable.getRoutes());
+
+                assertNotNull(peer.getPeerSession());
+                final PeerSession peerSession = peer.getPeerSession();
+                assertEquals(TestUtil.IPV4_ADDRESS_10, peerSession.getLocalAddress().getIpv4Address());
+                assertEquals(TestUtil.PEER_LOCAL_PORT, peerSession.getLocalPort());
+                assertEquals(TestUtil.PEER_REMOTE_PORT, peerSession.getRemotePort());
+                assertEquals(Status.Up, peerSession.getStatus());
+                assertNotNull(peerSession.getReceivedOpen());
+                assertNotNull(peerSession.getSentOpen());
+                return router;
+            });
+
 
             final StatsReportsMessage statsMsg = TestUtil.createStatsReportMsg(PEER1);
-            channel.writeAndFlush(statsMsg);
-            Thread.sleep(500);
+            waitWriteAndFlushSuccess(channel.writeAndFlush(statsMsg));
             final KeyedInstanceIdentifier<Peer, PeerKey> peerIId = routerIId.child(Peer.class, new PeerKey(PEER_ID));
-            final Stats peerStats = getBmpData(peerIId.child(Stats.class)).get();
-            assertNotNull(peerStats.getTimestampSec());
-            final Tlvs tlvs = statsMsg.getTlvs();
-            assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
-            assertEquals(tlvs.getDuplicatePrefixAdvertisementsTlv().getCount(), peerStats.getDuplicatePrefixAdvertisements());
-            assertEquals(tlvs.getDuplicateWithdrawsTlv().getCount(), peerStats.getDuplicateWithdraws());
-            assertEquals(tlvs.getInvalidatedAsConfedLoopTlv().getCount(), peerStats.getInvalidatedAsConfedLoop());
-            assertEquals(tlvs.getInvalidatedAsPathLoopTlv().getCount(), peerStats.getInvalidatedAsPathLoop());
-            assertEquals(tlvs.getInvalidatedClusterListLoopTlv().getCount(), peerStats.getInvalidatedClusterListLoop());
-            assertEquals(tlvs.getInvalidatedOriginatorIdTlv().getCount(), peerStats.getInvalidatedOriginatorId());
-            assertEquals(tlvs.getLocRibRoutesTlv().getCount(), peerStats.getLocRibRoutes());
-            assertEquals(tlvs.getRejectedPrefixesTlv().getCount(), peerStats.getRejectedPrefixes());
-
-            channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PrePolicy));
-            channel.writeAndFlush(TestUtil.creaetRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PrePolicy));
-            Thread.sleep(500);
-            final Tables prePolicyRib = getBmpData(peerIId.child(PrePolicyRib.class)).get().getTables().get(0);
-            assertTrue(prePolicyRib.getAttributes().isUptodate());
-            assertEquals(3, ((Ipv4RoutesCase) prePolicyRib.getRoutes()).getIpv4Routes().getIpv4Route().size());
-
-            channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PostPolicy));
-            channel.writeAndFlush(TestUtil.creaetRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PostPolicy));
-            Thread.sleep(500);
-            final Tables postPolicyRib = getBmpData(peerIId.child(PostPolicyRib.class)).get().getTables().get(0);
-            assertTrue(postPolicyRib.getAttributes().isUptodate());
-            assertEquals(3, ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.post.policy.rib.tables.routes.Ipv4RoutesCase) postPolicyRib.getRoutes()).getIpv4Routes().getIpv4Route().size());
-
-            channel.writeAndFlush(TestUtil.createPeerDownNotification(PEER1));
-            Thread.sleep(500);
-            final List<Peer> peersAfterDown = getBmpData(routerIId).get().getPeer();
-            assertTrue(peersAfterDown.isEmpty());
-
-            channel.close().await();
-            Thread.sleep(500);
-            final Monitor monitorAfterClose = getBmpData(monitorIId).get();
-            assertTrue(monitorAfterClose.getRouter().isEmpty());
+
+            readData(getDataBroker(), peerIId.child(Stats.class), peerStats -> {
+                assertNotNull(peerStats.getTimestampSec());
+                final Tlvs tlvs = statsMsg.getTlvs();
+                assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
+                assertEquals(tlvs.getDuplicatePrefixAdvertisementsTlv().getCount(), peerStats.getDuplicatePrefixAdvertisements());
+                assertEquals(tlvs.getDuplicateWithdrawsTlv().getCount(), peerStats.getDuplicateWithdraws());
+                assertEquals(tlvs.getInvalidatedAsConfedLoopTlv().getCount(), peerStats.getInvalidatedAsConfedLoop());
+                assertEquals(tlvs.getInvalidatedAsPathLoopTlv().getCount(), peerStats.getInvalidatedAsPathLoop());
+                assertEquals(tlvs.getInvalidatedClusterListLoopTlv().getCount(), peerStats.getInvalidatedClusterListLoop());
+                assertEquals(tlvs.getInvalidatedOriginatorIdTlv().getCount(), peerStats.getInvalidatedOriginatorId());
+                assertEquals(tlvs.getLocRibRoutesTlv().getCount(), peerStats.getLocRibRoutes());
+                assertEquals(tlvs.getRejectedPrefixesTlv().getCount(), peerStats.getRejectedPrefixes());
+                assertEquals(tlvs.getPerAfiSafiAdjRibInTlv().getCount().toString(), peerStats.getPerAfiSafiAdjRibInRoutes().getAfiSafi().get(0).getCount().toString());
+                assertEquals(tlvs.getPerAfiSafiLocRibTlv().getCount().toString(), peerStats.getPerAfiSafiLocRibRoutes().getAfiSafi().get(0).getCount().toString());
+                return peerStats;
+            });
+
+            // route mirror message test
+            final RouteMirroringMessage routeMirrorMsg = TestUtil.createRouteMirrorMsg(PEER1);
+            waitWriteAndFlushSuccess(channel.writeAndFlush(routeMirrorMsg));
+
+            readData(getDataBroker(), peerIId.child(Mirrors.class), routeMirrors -> {
+                assertNotNull(routeMirrors.getTimestampSec());
+                return routeMirrors;
+            });
+
+            waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PrePolicy)));
+            waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PrePolicy)));
+
+            readData(getDataBroker(), peerIId.child(PrePolicyRib.class), prePolicyRib -> {
+                assertTrue(!prePolicyRib.getTables().isEmpty());
+                final Tables tables = prePolicyRib.getTables().get(0);
+                assertTrue(tables.getAttributes().isUptodate());
+                assertEquals(3, ((Ipv4RoutesCase) tables.getRoutes()).getIpv4Routes().getIpv4Route().size());
+                return tables;
+            });
+
+            waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PostPolicy)));
+            waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PostPolicy)));
+
+            readData(getDataBroker(), peerIId.child(PostPolicyRib.class), postPolicyRib -> {
+                assertTrue(!postPolicyRib.getTables().isEmpty());
+                final Tables tables = postPolicyRib.getTables().get(0);
+                assertTrue(tables.getAttributes().isUptodate());
+                assertEquals(3, ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.post.policy.rib.tables.routes.Ipv4RoutesCase)
+                        tables.getRoutes()).getIpv4Routes().getIpv4Route().size());
+                return tables;
+            });
+
+            waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerDownNotification(PEER1)));
+
+            readData(getDataBroker(), routerIId, router -> {
+                final List<Peer> peersAfterDown = router.getPeer();
+                assertTrue(peersAfterDown.isEmpty());
+                return router;
+            });
         } catch (final Exception e) {
-            fail(e.getMessage());
+            final StringBuffer ex = new StringBuffer();
+            ex.append(e.getMessage()).append("\n");
+            for (final StackTraceElement element : e.getStackTrace()) {
+                ex.append(element.toString() + "\n");
+            }
+            fail(ex.toString());
         }
+        return channel;
+    }
 
+    @Test
+    public void deploySecondInstance() throws Exception {
+        final BmpMonitoringStation monitoringStation2 = BmpMonitoringStationImpl.createBmpMonitorInstance(new SimpleRIBExtensionProviderContext(), this.dispatcher, getDomBroker(),
+                new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS_2), MONITOR_LOCAL_PORT), Optional.of(KeyMapping.getKeyMapping()),
+                this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), null);
+
+        readData(getDataBroker(), BMP_II, monitor -> {
+            Assert.assertEquals(2, monitor.getMonitor().size());
+            return monitor;
+        });
+
+        monitoringStation2.close();
     }
 
-    private ChannelFuture connectTestClient(final BmpMessageRegistry msgRegistry) throws InterruptedException {
+    private Channel connectTestClient(final String routerIp, final BmpMessageRegistry msgRegistry) throws InterruptedException {
         final BmpHandlerFactory hf = new BmpHandlerFactory(msgRegistry);
         final Bootstrap b = new Bootstrap();
-        b.group(new NioEventLoopGroup());
+        final EventLoopGroup workerGroup;
+        if(Epoll.isAvailable()){
+            b.channel(EpollSocketChannel.class);
+            workerGroup =new EpollEventLoopGroup();
+        } else {
+            b.channel(NioSocketChannel.class);
+            workerGroup = new NioEventLoopGroup();
+        }
+        b.group(workerGroup);
         b.option(ChannelOption.SO_KEEPALIVE, true);
-        b.channel(NioSocketChannel.class);
         b.handler(new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(final SocketChannel ch) throws Exception {
@@ -290,13 +433,14 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
                 ch.pipeline().addLast(hf.getEncoders());
             }
         });
-        b.localAddress(CLIENT_LOCAL);
-        return b.connect(CLIENT_REMOTE).sync();
+        b.localAddress(new InetSocketAddress(routerIp, 0));
+        b.option(ChannelOption.SO_REUSEADDR, true);
+        final ChannelFuture future = b.connect(new InetSocketAddress(MONITOR_LOCAL_ADDRESS, MONITOR_LOCAL_PORT)).sync();
+        waitFutureSuccess(future);
+        return future.channel();
     }
 
-    private <T extends DataObject> Optional<T> getBmpData(final InstanceIdentifier<T> iid) throws ReadFailedException {
-        try (final ReadOnlyTransaction tx = getDataBroker().newReadOnlyTransaction()) {
-            return tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet();
-        }
+    private RouterId getRouterId(final String routerIp) {
+        return new RouterId(new IpAddress(new Ipv4Address(routerIp)));
     }
 }