Remove deprecated class AbstractDataBrokerTest under BGP
[bgpcep.git] / bgp / bmp-impl / src / test / java / org / opendaylight / protocol / bmp / impl / app / BmpMonitorImplTest.java
index 2ff7506d7b8e89fca39522417ac28c42ed606c05..176ff8a3412a5159a8315f28a6b8a5840f69c293 100644 (file)
@@ -14,60 +14,53 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import com.google.common.base.Charsets;
+import static org.opendaylight.protocol.util.CheckUtil.readData;
+import static org.opendaylight.protocol.util.CheckUtil.waitFutureSuccess;
+
 import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
 import com.google.common.net.InetAddresses;
-import com.google.common.util.concurrent.Uninterruptibles;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.util.concurrent.Future;
+import io.netty.channel.socket.nio.NioSocketChannel;
 import java.net.InetSocketAddress;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
 import javassist.ClassPool;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.config.yang.bmp.impl.MonitoredRouter;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
-import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.binding.test.AbstractConcurrentDataBrokerTest;
+import org.opendaylight.mdsal.binding.generator.impl.GeneratedClassLoadingStrategy;
+import org.opendaylight.mdsal.binding.generator.impl.ModuleInfoBackedContext;
+import org.opendaylight.mdsal.binding.generator.util.JavassistUtils;
+import org.opendaylight.protocol.bgp.inet.RIBActivator;
 import org.opendaylight.protocol.bgp.parser.impl.BGPActivator;
 import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionProviderContext;
 import org.opendaylight.protocol.bgp.parser.spi.pojo.SimpleBGPExtensionProviderContext;
-import org.opendaylight.protocol.bgp.rib.impl.RIBActivator;
 import org.opendaylight.protocol.bgp.rib.spi.RIBExtensionProviderContext;
 import org.opendaylight.protocol.bgp.rib.spi.SimpleRIBExtensionProviderContext;
 import org.opendaylight.protocol.bmp.api.BmpDispatcher;
-import org.opendaylight.protocol.bmp.impl.BmpActivator;
 import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
 import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
 import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
 import org.opendaylight.protocol.bmp.impl.spi.BmpMonitoringStation;
-import org.opendaylight.protocol.bmp.impl.test.TestUtil;
+import org.opendaylight.protocol.bmp.parser.BmpActivator;
+import org.opendaylight.protocol.bmp.parser.message.TestUtil;
 import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
 import org.opendaylight.protocol.bmp.spi.registry.SimpleBmpExtensionProviderContext;
-import org.opendaylight.tcpmd5.api.KeyAccess;
-import org.opendaylight.tcpmd5.api.KeyAccessFactory;
-import org.opendaylight.tcpmd5.api.KeyMapping;
-import org.opendaylight.tcpmd5.netty.MD5ChannelFactory;
-import org.opendaylight.tcpmd5.netty.MD5NioServerSocketChannelFactory;
-import org.opendaylight.tcpmd5.netty.MD5NioSocketChannelFactory;
-import org.opendaylight.tcpmd5.netty.MD5ServerChannelFactory;
+import org.opendaylight.protocol.concepts.KeyMapping;
+import org.opendaylight.protocol.util.CheckUtil;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.pre.policy.rib.tables.routes.Ipv4RoutesCase;
@@ -105,15 +98,11 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.moni
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.routers.RouterKey;
 import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
 import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
-import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy;
-import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
-import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
-import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
 
-public class BmpMonitorImplTest extends AbstractDataBrokerTest {
+public class BmpMonitorImplTest extends AbstractConcurrentDataBrokerTest {
     // the local port and address where the monitor (ODL) will listen for incoming BMP request
     private static final int MONITOR_LOCAL_PORT = 12345;
     private static final String MONITOR_LOCAL_ADDRESS = "127.0.0.10";
@@ -126,7 +115,7 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
     private static final KeyedInstanceIdentifier<Monitor, MonitorKey> MONITOR_IID = InstanceIdentifier.create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
     private static final PeerId PEER_ID = new PeerId(PEER1.getValue());
     private static final String MD5_PASSWORD = "abcdef";
-
+    private static final InstanceIdentifier<BmpMonitor> BMP_II = InstanceIdentifier.create(BmpMonitor.class);
     private BindingToNormalizedNodeCodec mappingService;
     private RIBActivator ribActivator;
     private BGPActivator bgpActivator;
@@ -134,12 +123,8 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
     private BmpDispatcher dispatcher;
     private BmpMonitoringStation bmpApp;
     private BmpMessageRegistry msgRegistry;
-    private MD5NioSocketChannelFactory scfMd5;
-    private List<MonitoredRouter> mrs;
     private ModuleInfoBackedContext moduleInfoBackedContext;
 
-    @Mock private KeyAccess mockKeyAccess;
-    @Mock private KeyAccessFactory kaf;
 
     @Before
     public void setUp() throws Exception {
@@ -157,16 +142,7 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
         this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(ReceivedOpen.class));
         this.mappingService.onGlobalContextUpdated(this.moduleInfoBackedContext.tryToCreateSchemaContext().get());
 
-        final KeyMapping keys = new KeyMapping();
-        keys.put(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MD5_PASSWORD.getBytes(Charsets.US_ASCII));
-
-        Mockito.doReturn(this.mockKeyAccess).when(this.kaf).getKeyAccess(Mockito.any(java.nio.channels.Channel.class));
-        Mockito.doReturn(keys).when(this.mockKeyAccess).getKeys();
-        Mockito.doNothing().when(this.mockKeyAccess).setKeys(Mockito.any(KeyMapping.class));
-
-        final MD5NioServerSocketChannelFactory scfServerMd5 = new MD5NioServerSocketChannelFactory(this.kaf);
-        this.scfMd5 = new MD5NioSocketChannelFactory(this.kaf);
-
+        final KeyMapping keys = KeyMapping.getKeyMapping(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MD5_PASSWORD);
         this.ribActivator = new RIBActivator();
         final RIBExtensionProviderContext ribExtension = new SimpleRIBExtensionProviderContext();
         this.ribActivator.startRIBExtensionProvider(ribExtension);
@@ -180,14 +156,13 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
         this.msgRegistry = ctx.getBmpMessageRegistry();
 
         this.dispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(),
-                ctx.getBmpMessageRegistry(), new DefaultBmpSessionFactory(), Optional.<MD5ChannelFactory<?>>of(this.scfMd5),
-                Optional.<MD5ServerChannelFactory<?>>of(scfServerMd5));
+                ctx.getBmpMessageRegistry(), new DefaultBmpSessionFactory());
 
         this.bmpApp = BmpMonitoringStationImpl.createBmpMonitorInstance(ribExtension, this.dispatcher, getDomBroker(),
                 MONITOR_ID, new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MONITOR_LOCAL_PORT), Optional.of(keys),
-                this.mappingService.getCodecFactory(), moduleInfoBackedContext.getSchemaContext(), this.mrs);
+                this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), null);
 
-        readData(InstanceIdentifier.create(BmpMonitor.class), monitor -> {
+        readData(getDataBroker(), BMP_II, monitor -> {
             Assert.assertEquals(1, monitor.getMonitor().size());
             final Monitor bmpMonitor = monitor.getMonitor().get(0);
             Assert.assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
@@ -207,7 +182,7 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
         this.bmpApp.close();
         this.mappingService.close();
 
-        readData(InstanceIdentifier.create(BmpMonitor.class), monitor -> {
+        readData(getDataBroker(), BMP_II, monitor -> {
             assertTrue(monitor.getMonitor().isEmpty());
             return monitor;
         });
@@ -217,13 +192,13 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
     public void testRouterMonitoring() throws Exception {
         // first test if a single router monitoring is working
         final Channel channel1 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
-        readData(MONITOR_IID, monitor -> {
+        readData(getDataBroker(), MONITOR_IID, monitor -> {
             assertEquals(1, monitor.getRouter().size());
             return monitor;
         });
 
         final Channel channel2 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_2);
-        readData(MONITOR_IID, monitor -> {
+        readData(getDataBroker(), MONITOR_IID, monitor -> {
             assertEquals(2, monitor.getRouter().size());
             return monitor;
         });
@@ -232,59 +207,52 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
         // we expect the connection to be closed
         final Channel channel3 = connectTestClient(REMOTE_ROUTER_ADDRESS_1, this.msgRegistry);
 
-        Thread.sleep(500);
 
         // channel 1 should still be open, while channel3 should be closed
-        assertTrue(channel1.isOpen());
-        assertFalse(channel3.isOpen());
+        CheckUtil.checkEquals(()-> assertTrue(channel1.isOpen()));
+        CheckUtil.checkEquals(()-> assertFalse(channel3.isOpen()));
 
         // now if we close the channel 1 and try it again, it should succeed
         waitFutureSuccess(channel1.close());
 
         // channel 2 is still open
-        readData(MONITOR_IID, monitor -> {
+        readData(getDataBroker(), MONITOR_IID, monitor -> {
             assertEquals(1, monitor.getRouter().size());
             return monitor;
         });
 
-        Channel channel4 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
-        readData(MONITOR_IID, monitor -> {
+        final Channel channel4 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
+        readData(getDataBroker(), MONITOR_IID, monitor -> {
             assertEquals(2, monitor.getRouter().size());
             return monitor;
         });
 
         // close all channel altogether
         waitFutureSuccess(channel2.close());
-        Thread.sleep(500);
+        Thread.sleep(100);
 
         // sleep for a while to avoid intermittent InMemoryDataTree modification conflict
         waitFutureSuccess(channel4.close());
 
-        readData(MONITOR_IID, monitor -> {
+        readData(getDataBroker(), MONITOR_IID, monitor -> {
             assertEquals(0, monitor.getRouter().size());
             return monitor;
         });
     }
 
-    private static <T extends Future> void waitFutureSuccess(final T future) throws InterruptedException {
-        final CountDownLatch latch = new CountDownLatch(1);
-        future.addListener(future1 -> latch.countDown());
-        Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
-    }
-
     private void waitWriteAndFlushSuccess(final ChannelFuture channelFuture) throws InterruptedException {
         waitFutureSuccess(channelFuture);
     }
 
-    private Channel testMonitoringStation(String remoteRouterIpAddr) throws InterruptedException {
+    private Channel testMonitoringStation(final String remoteRouterIpAddr) throws InterruptedException {
         final Channel channel = connectTestClient(remoteRouterIpAddr, this.msgRegistry);
         final RouterId routerId = getRouterId(remoteRouterIpAddr);
         try {
-            readData(MONITOR_IID, monitor -> {
+            readData(getDataBroker(), MONITOR_IID, monitor -> {
                 assertFalse(monitor.getRouter().isEmpty());
                 // now find the current router instance
                 Router router = null;
-                for (Router r : monitor.getRouter()) {
+                for (final Router r : monitor.getRouter()) {
                     if (routerId.equals(r.getRouterId())) {
                         router = r;
                         break;
@@ -298,10 +266,10 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
 
             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info")));
 
-            readData(MONITOR_IID, monitor -> {
+            readData(getDataBroker(), MONITOR_IID, monitor -> {
                 assertFalse(monitor.getRouter().isEmpty());
                 Router retRouter = null;
-                for (Router r : monitor.getRouter()) {
+                for (final Router r : monitor.getRouter()) {
                     if (routerId.equals(r.getRouterId())) {
                         retRouter = r;
                         break;
@@ -320,7 +288,7 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true)));
             final KeyedInstanceIdentifier<Router, RouterKey> routerIId = MONITOR_IID.child(Router.class, new RouterKey(routerId));
 
-            readData(routerIId, router -> {
+            readData(getDataBroker(), routerIId, router -> {
                 final List<Peer> peers = router.getPeer();
                 assertEquals(1, peers.size());
                 final Peer peer = peers.get(0);
@@ -364,7 +332,7 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
             waitWriteAndFlushSuccess(channel.writeAndFlush(statsMsg));
             final KeyedInstanceIdentifier<Peer, PeerKey> peerIId = routerIId.child(Peer.class, new PeerKey(PEER_ID));
 
-            readData(peerIId.child(Stats.class), peerStats -> {
+            readData(getDataBroker(), peerIId.child(Stats.class), peerStats -> {
                 assertNotNull(peerStats.getTimestampSec());
                 final Tlvs tlvs = statsMsg.getTlvs();
                 assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
@@ -385,7 +353,7 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
             final RouteMirroringMessage routeMirrorMsg = TestUtil.createRouteMirrorMsg(PEER1);
             waitWriteAndFlushSuccess(channel.writeAndFlush(routeMirrorMsg));
 
-            readData(peerIId.child(Mirrors.class), routeMirrors -> {
+            readData(getDataBroker(), peerIId.child(Mirrors.class), routeMirrors -> {
                 assertNotNull(routeMirrors.getTimestampSec());
                 return routeMirrors;
             });
@@ -393,7 +361,7 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PrePolicy)));
             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PrePolicy)));
 
-            readData(peerIId.child(PrePolicyRib.class), prePolicyRib -> {
+            readData(getDataBroker(), peerIId.child(PrePolicyRib.class), prePolicyRib -> {
                 assertTrue(!prePolicyRib.getTables().isEmpty());
                 final Tables tables = prePolicyRib.getTables().get(0);
                 assertTrue(tables.getAttributes().isUptodate());
@@ -404,7 +372,7 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PostPolicy)));
             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PostPolicy)));
 
-            readData(peerIId.child(PostPolicyRib.class), postPolicyRib -> {
+            readData(getDataBroker(), peerIId.child(PostPolicyRib.class), postPolicyRib -> {
                 assertTrue(!postPolicyRib.getTables().isEmpty());
                 final Tables tables = postPolicyRib.getTables().get(0);
                 assertTrue(tables.getAttributes().isUptodate());
@@ -415,7 +383,7 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
 
             waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerDownNotification(PEER1)));
 
-            readData(routerIId, router -> {
+            readData(getDataBroker(), routerIId, router -> {
                 final List<Peer> peersAfterDown = router.getPeer();
                 assertTrue(peersAfterDown.isEmpty());
                 return router;
@@ -434,10 +402,10 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
     @Test
     public void deploySecondInstance() throws Exception {
         final BmpMonitoringStation monitoringStation2 = BmpMonitoringStationImpl.createBmpMonitorInstance(new SimpleRIBExtensionProviderContext(), this.dispatcher, getDomBroker(),
-                new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS_2), MONITOR_LOCAL_PORT), Optional.of(new KeyMapping()),
-                this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), this.mrs);
+                new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS_2), MONITOR_LOCAL_PORT), Optional.of(KeyMapping.getKeyMapping()),
+                this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), null);
 
-        readData(InstanceIdentifier.create(BmpMonitor.class), monitor -> {
+        readData(getDataBroker(), BMP_II, monitor -> {
             Assert.assertEquals(2, monitor.getMonitor().size());
             return monitor;
         });
@@ -448,9 +416,16 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
     private Channel connectTestClient(final String routerIp, final BmpMessageRegistry msgRegistry) throws InterruptedException {
         final BmpHandlerFactory hf = new BmpHandlerFactory(msgRegistry);
         final Bootstrap b = new Bootstrap();
-        b.group(new NioEventLoopGroup());
+        final EventLoopGroup workerGroup;
+        if(Epoll.isAvailable()){
+            b.channel(EpollSocketChannel.class);
+            workerGroup =new EpollEventLoopGroup();
+        } else {
+            b.channel(NioSocketChannel.class);
+            workerGroup = new NioEventLoopGroup();
+        }
+        b.group(workerGroup);
         b.option(ChannelOption.SO_KEEPALIVE, true);
-        b.channelFactory(this.scfMd5);
         b.handler(new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(final SocketChannel ch) throws Exception {
@@ -465,28 +440,7 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
         return future.channel();
     }
 
-    private <R, T extends DataObject> R readData(final InstanceIdentifier<T> iid, Function<T, R> function)
-            throws ReadFailedException {
-        AssertionError lastError = null;
-        Stopwatch sw = Stopwatch.createStarted();
-        while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
-            try (final ReadOnlyTransaction tx = getDataBroker().newReadOnlyTransaction()) {
-                Optional<T> data = tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet();
-                if(data.isPresent()) {
-                    try {
-                        return function.apply(data.get());
-                    } catch (AssertionError e) {
-                        lastError = e;
-                        Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
-                    }
-                }
-            }
-        }
-
-        throw lastError;
-    }
-
-    private RouterId getRouterId(String routerIp) {
+    private RouterId getRouterId(final String routerIp) {
         return new RouterId(new IpAddress(new Ipv4Address(routerIp)));
     }
 }