import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import com.google.common.base.Charsets;
+import static org.opendaylight.protocol.util.CheckUtil.readData;
+import static org.opendaylight.protocol.util.CheckUtil.waitFutureSuccess;
+
import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
import com.google.common.net.InetAddresses;
-import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
-import io.netty.util.concurrent.Future;
+import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
import javassist.ClassPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.config.yang.bmp.impl.MonitoredRouter;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
-import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.binding.test.AbstractConcurrentDataBrokerTest;
+import org.opendaylight.mdsal.binding.generator.impl.GeneratedClassLoadingStrategy;
+import org.opendaylight.mdsal.binding.generator.impl.ModuleInfoBackedContext;
+import org.opendaylight.mdsal.binding.generator.util.JavassistUtils;
+import org.opendaylight.protocol.bgp.inet.RIBActivator;
import org.opendaylight.protocol.bgp.parser.impl.BGPActivator;
import org.opendaylight.protocol.bgp.parser.spi.BGPExtensionProviderContext;
import org.opendaylight.protocol.bgp.parser.spi.pojo.SimpleBGPExtensionProviderContext;
-import org.opendaylight.protocol.bgp.rib.impl.RIBActivator;
import org.opendaylight.protocol.bgp.rib.spi.RIBExtensionProviderContext;
import org.opendaylight.protocol.bgp.rib.spi.SimpleRIBExtensionProviderContext;
import org.opendaylight.protocol.bmp.api.BmpDispatcher;
-import org.opendaylight.protocol.bmp.impl.BmpActivator;
import org.opendaylight.protocol.bmp.impl.BmpDispatcherImpl;
import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
import org.opendaylight.protocol.bmp.impl.session.DefaultBmpSessionFactory;
import org.opendaylight.protocol.bmp.impl.spi.BmpMonitoringStation;
-import org.opendaylight.protocol.bmp.impl.test.TestUtil;
+import org.opendaylight.protocol.bmp.parser.BmpActivator;
+import org.opendaylight.protocol.bmp.parser.message.TestUtil;
import org.opendaylight.protocol.bmp.spi.registry.BmpMessageRegistry;
import org.opendaylight.protocol.bmp.spi.registry.SimpleBmpExtensionProviderContext;
-import org.opendaylight.tcpmd5.api.KeyAccess;
-import org.opendaylight.tcpmd5.api.KeyAccessFactory;
-import org.opendaylight.tcpmd5.api.KeyMapping;
-import org.opendaylight.tcpmd5.netty.MD5ChannelFactory;
-import org.opendaylight.tcpmd5.netty.MD5NioServerSocketChannelFactory;
-import org.opendaylight.tcpmd5.netty.MD5NioSocketChannelFactory;
-import org.opendaylight.tcpmd5.netty.MD5ServerChannelFactory;
+import org.opendaylight.protocol.concepts.KeyMapping;
+import org.opendaylight.protocol.util.CheckUtil;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.inet.rev150305.bmp.monitor.monitor.router.peer.pre.policy.rib.tables.routes.Ipv4RoutesCase;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bmp.monitor.rev150512.routers.RouterKey;
import org.opendaylight.yangtools.binding.data.codec.gen.impl.StreamWriterGenerator;
import org.opendaylight.yangtools.binding.data.codec.impl.BindingNormalizedNodeCodecRegistry;
-import org.opendaylight.yangtools.sal.binding.generator.impl.GeneratedClassLoadingStrategy;
-import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
-import org.opendaylight.yangtools.sal.binding.generator.util.JavassistUtils;
-import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
-public class BmpMonitorImplTest extends AbstractDataBrokerTest {
+public class BmpMonitorImplTest extends AbstractConcurrentDataBrokerTest {
// the local port and address where the monitor (ODL) will listen for incoming BMP request
private static final int MONITOR_LOCAL_PORT = 12345;
private static final String MONITOR_LOCAL_ADDRESS = "127.0.0.10";
private static final KeyedInstanceIdentifier<Monitor, MonitorKey> MONITOR_IID = InstanceIdentifier.create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
private static final PeerId PEER_ID = new PeerId(PEER1.getValue());
private static final String MD5_PASSWORD = "abcdef";
-
+ private static final InstanceIdentifier<BmpMonitor> BMP_II = InstanceIdentifier.create(BmpMonitor.class);
private BindingToNormalizedNodeCodec mappingService;
private RIBActivator ribActivator;
private BGPActivator bgpActivator;
private BmpDispatcher dispatcher;
private BmpMonitoringStation bmpApp;
private BmpMessageRegistry msgRegistry;
- private MD5NioSocketChannelFactory scfMd5;
- private List<MonitoredRouter> mrs;
private ModuleInfoBackedContext moduleInfoBackedContext;
- @Mock private KeyAccess mockKeyAccess;
- @Mock private KeyAccessFactory kaf;
@Before
public void setUp() throws Exception {
this.moduleInfoBackedContext.registerModuleInfo(BindingReflections.getModuleInfo(ReceivedOpen.class));
this.mappingService.onGlobalContextUpdated(this.moduleInfoBackedContext.tryToCreateSchemaContext().get());
- final KeyMapping keys = new KeyMapping();
- keys.put(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MD5_PASSWORD.getBytes(Charsets.US_ASCII));
-
- Mockito.doReturn(this.mockKeyAccess).when(this.kaf).getKeyAccess(Mockito.any(java.nio.channels.Channel.class));
- Mockito.doReturn(keys).when(this.mockKeyAccess).getKeys();
- Mockito.doNothing().when(this.mockKeyAccess).setKeys(Mockito.any(KeyMapping.class));
-
- final MD5NioServerSocketChannelFactory scfServerMd5 = new MD5NioServerSocketChannelFactory(this.kaf);
- this.scfMd5 = new MD5NioSocketChannelFactory(this.kaf);
-
+ final KeyMapping keys = KeyMapping.getKeyMapping(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MD5_PASSWORD);
this.ribActivator = new RIBActivator();
final RIBExtensionProviderContext ribExtension = new SimpleRIBExtensionProviderContext();
this.ribActivator.startRIBExtensionProvider(ribExtension);
this.msgRegistry = ctx.getBmpMessageRegistry();
this.dispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(),
- ctx.getBmpMessageRegistry(), new DefaultBmpSessionFactory(), Optional.<MD5ChannelFactory<?>>of(this.scfMd5),
- Optional.<MD5ServerChannelFactory<?>>of(scfServerMd5));
+ ctx.getBmpMessageRegistry(), new DefaultBmpSessionFactory());
this.bmpApp = BmpMonitoringStationImpl.createBmpMonitorInstance(ribExtension, this.dispatcher, getDomBroker(),
MONITOR_ID, new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MONITOR_LOCAL_PORT), Optional.of(keys),
- this.mappingService.getCodecFactory(), moduleInfoBackedContext.getSchemaContext(), this.mrs);
+ this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), null);
- readData(InstanceIdentifier.create(BmpMonitor.class), monitor -> {
+ readData(getDataBroker(), BMP_II, monitor -> {
Assert.assertEquals(1, monitor.getMonitor().size());
final Monitor bmpMonitor = monitor.getMonitor().get(0);
Assert.assertEquals(MONITOR_ID, bmpMonitor.getMonitorId());
this.bmpApp.close();
this.mappingService.close();
- readData(InstanceIdentifier.create(BmpMonitor.class), monitor -> {
+ readData(getDataBroker(), BMP_II, monitor -> {
assertTrue(monitor.getMonitor().isEmpty());
return monitor;
});
public void testRouterMonitoring() throws Exception {
// first test if a single router monitoring is working
final Channel channel1 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
- readData(MONITOR_IID, monitor -> {
+ readData(getDataBroker(), MONITOR_IID, monitor -> {
assertEquals(1, monitor.getRouter().size());
return monitor;
});
final Channel channel2 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_2);
- readData(MONITOR_IID, monitor -> {
+ readData(getDataBroker(), MONITOR_IID, monitor -> {
assertEquals(2, monitor.getRouter().size());
return monitor;
});
// we expect the connection to be closed
final Channel channel3 = connectTestClient(REMOTE_ROUTER_ADDRESS_1, this.msgRegistry);
- Thread.sleep(500);
// channel 1 should still be open, while channel3 should be closed
- assertTrue(channel1.isOpen());
- assertFalse(channel3.isOpen());
+ CheckUtil.checkEquals(()-> assertTrue(channel1.isOpen()));
+ CheckUtil.checkEquals(()-> assertFalse(channel3.isOpen()));
// now if we close the channel 1 and try it again, it should succeed
waitFutureSuccess(channel1.close());
// channel 2 is still open
- readData(MONITOR_IID, monitor -> {
+ readData(getDataBroker(), MONITOR_IID, monitor -> {
assertEquals(1, monitor.getRouter().size());
return monitor;
});
- Channel channel4 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
- readData(MONITOR_IID, monitor -> {
+ final Channel channel4 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
+ readData(getDataBroker(), MONITOR_IID, monitor -> {
assertEquals(2, monitor.getRouter().size());
return monitor;
});
// close all channel altogether
waitFutureSuccess(channel2.close());
- Thread.sleep(500);
+ Thread.sleep(100);
// sleep for a while to avoid intermittent InMemoryDataTree modification conflict
waitFutureSuccess(channel4.close());
- readData(MONITOR_IID, monitor -> {
+ readData(getDataBroker(), MONITOR_IID, monitor -> {
assertEquals(0, monitor.getRouter().size());
return monitor;
});
}
- private static <T extends Future> void waitFutureSuccess(final T future) throws InterruptedException {
- final CountDownLatch latch = new CountDownLatch(1);
- future.addListener(future1 -> latch.countDown());
- Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
- }
-
private void waitWriteAndFlushSuccess(final ChannelFuture channelFuture) throws InterruptedException {
waitFutureSuccess(channelFuture);
}
- private Channel testMonitoringStation(String remoteRouterIpAddr) throws InterruptedException {
+ private Channel testMonitoringStation(final String remoteRouterIpAddr) throws InterruptedException {
final Channel channel = connectTestClient(remoteRouterIpAddr, this.msgRegistry);
final RouterId routerId = getRouterId(remoteRouterIpAddr);
try {
- readData(MONITOR_IID, monitor -> {
+ readData(getDataBroker(), MONITOR_IID, monitor -> {
assertFalse(monitor.getRouter().isEmpty());
// now find the current router instance
Router router = null;
- for (Router r : monitor.getRouter()) {
+ for (final Router r : monitor.getRouter()) {
if (routerId.equals(r.getRouterId())) {
router = r;
break;
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info")));
- readData(MONITOR_IID, monitor -> {
+ readData(getDataBroker(), MONITOR_IID, monitor -> {
assertFalse(monitor.getRouter().isEmpty());
Router retRouter = null;
- for (Router r : monitor.getRouter()) {
+ for (final Router r : monitor.getRouter()) {
if (routerId.equals(r.getRouterId())) {
retRouter = r;
break;
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true)));
final KeyedInstanceIdentifier<Router, RouterKey> routerIId = MONITOR_IID.child(Router.class, new RouterKey(routerId));
- readData(routerIId, router -> {
+ readData(getDataBroker(), routerIId, router -> {
final List<Peer> peers = router.getPeer();
assertEquals(1, peers.size());
final Peer peer = peers.get(0);
waitWriteAndFlushSuccess(channel.writeAndFlush(statsMsg));
final KeyedInstanceIdentifier<Peer, PeerKey> peerIId = routerIId.child(Peer.class, new PeerKey(PEER_ID));
- readData(peerIId.child(Stats.class), peerStats -> {
+ readData(getDataBroker(), peerIId.child(Stats.class), peerStats -> {
assertNotNull(peerStats.getTimestampSec());
final Tlvs tlvs = statsMsg.getTlvs();
assertEquals(tlvs.getAdjRibsInRoutesTlv().getCount(), peerStats.getAdjRibsInRoutes());
final RouteMirroringMessage routeMirrorMsg = TestUtil.createRouteMirrorMsg(PEER1);
waitWriteAndFlushSuccess(channel.writeAndFlush(routeMirrorMsg));
- readData(peerIId.child(Mirrors.class), routeMirrors -> {
+ readData(getDataBroker(), peerIId.child(Mirrors.class), routeMirrors -> {
assertNotNull(routeMirrors.getTimestampSec());
return routeMirrors;
});
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PrePolicy)));
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PrePolicy)));
- readData(peerIId.child(PrePolicyRib.class), prePolicyRib -> {
+ readData(getDataBroker(), peerIId.child(PrePolicyRib.class), prePolicyRib -> {
assertTrue(!prePolicyRib.getTables().isEmpty());
final Tables tables = prePolicyRib.getTables().get(0);
assertTrue(tables.getAttributes().isUptodate());
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonitMsg(false, PEER1, AdjRibInType.PostPolicy)));
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createRouteMonMsgWithEndOfRibMarker(PEER1, AdjRibInType.PostPolicy)));
- readData(peerIId.child(PostPolicyRib.class), postPolicyRib -> {
+ readData(getDataBroker(), peerIId.child(PostPolicyRib.class), postPolicyRib -> {
assertTrue(!postPolicyRib.getTables().isEmpty());
final Tables tables = postPolicyRib.getTables().get(0);
assertTrue(tables.getAttributes().isUptodate());
waitWriteAndFlushSuccess(channel.writeAndFlush(TestUtil.createPeerDownNotification(PEER1)));
- readData(routerIId, router -> {
+ readData(getDataBroker(), routerIId, router -> {
final List<Peer> peersAfterDown = router.getPeer();
assertTrue(peersAfterDown.isEmpty());
return router;
@Test
public void deploySecondInstance() throws Exception {
final BmpMonitoringStation monitoringStation2 = BmpMonitoringStationImpl.createBmpMonitorInstance(new SimpleRIBExtensionProviderContext(), this.dispatcher, getDomBroker(),
- new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS_2), MONITOR_LOCAL_PORT), Optional.of(new KeyMapping()),
- this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), this.mrs);
+ new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS_2), MONITOR_LOCAL_PORT), Optional.of(KeyMapping.getKeyMapping()),
+ this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), null);
- readData(InstanceIdentifier.create(BmpMonitor.class), monitor -> {
+ readData(getDataBroker(), BMP_II, monitor -> {
Assert.assertEquals(2, monitor.getMonitor().size());
return monitor;
});
private Channel connectTestClient(final String routerIp, final BmpMessageRegistry msgRegistry) throws InterruptedException {
final BmpHandlerFactory hf = new BmpHandlerFactory(msgRegistry);
final Bootstrap b = new Bootstrap();
- b.group(new NioEventLoopGroup());
+ final EventLoopGroup workerGroup;
+ if(Epoll.isAvailable()){
+ b.channel(EpollSocketChannel.class);
+ workerGroup =new EpollEventLoopGroup();
+ } else {
+ b.channel(NioSocketChannel.class);
+ workerGroup = new NioEventLoopGroup();
+ }
+ b.group(workerGroup);
b.option(ChannelOption.SO_KEEPALIVE, true);
- b.channelFactory(this.scfMd5);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
return future.channel();
}
- private <R, T extends DataObject> R readData(final InstanceIdentifier<T> iid, Function<T, R> function)
- throws ReadFailedException {
- AssertionError lastError = null;
- Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
- try (final ReadOnlyTransaction tx = getDataBroker().newReadOnlyTransaction()) {
- Optional<T> data = tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet();
- if(data.isPresent()) {
- try {
- return function.apply(data.get());
- } catch (AssertionError e) {
- lastError = e;
- Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
- }
- }
- }
- }
-
- throw lastError;
- }
-
- private RouterId getRouterId(String routerIp) {
+ private RouterId getRouterId(final String routerIp) {
return new RouterId(new IpAddress(new Ipv4Address(routerIp)));
}
}