public BmpRouterImpl(final RouterSessionManager sessionManager) {
this.sessionManager = Preconditions.checkNotNull(sessionManager);
this.domDataBroker = sessionManager.getDomDataBroker();
- this.domTxChain = sessionManager.getDomDataBroker().createTransactionChain(this);
+ this.domTxChain = this.domDataBroker.createTransactionChain(this);
this.extensions = sessionManager.getExtensions();
this.tree = sessionManager.getCodecTree();
}
@Override
public void onSessionUp(final BmpSession session) {
this.session = session;
- this.routerIp = InetAddresses.toAddrString(session.getRemoteAddress());
- this.routerId = new RouterId(Ipv4Util.getIpAddress(session.getRemoteAddress()));
- this.routerYangIId = YangInstanceIdentifier.builder(this.sessionManager.getRoutersYangIId()).nodeWithKey(Router.QNAME,
+ this.routerIp = InetAddresses.toAddrString(this.session.getRemoteAddress());
+ this.routerId = new RouterId(Ipv4Util.getIpAddress(this.session.getRemoteAddress()));
+ // check if this session is redundant
+ if (!this.sessionManager.addSessionListener(this)) {
+ LOG.warn("Redundant BMP session with remote router {} ({}) detected. This BMP session will be abandoned.", this.routerIp, this.session);
+ this.close();
+ } else {
+ this.routerYangIId = YangInstanceIdentifier.builder(this.sessionManager.getRoutersYangIId()).nodeWithKey(Router.QNAME,
ROUTER_ID_QNAME, this.routerIp).build();
- this.peersYangIId = YangInstanceIdentifier.builder(routerYangIId).node(Peer.QNAME).build();
- createRouterEntry();
- this.sessionManager.addSessionListener(this);
+ this.peersYangIId = YangInstanceIdentifier.builder(routerYangIId).node(Peer.QNAME).build();
+ createRouterEntry();
+ LOG.info("BMP session with remote router {} ({}) is up now.", this.routerIp, this.session);
+ }
}
@Override
- public void onSessionDown(final BmpSession session, final Exception e) {
- LOG.info("Session {} went down.", session);
+ public void onSessionDown(final Exception e) {
+ // we want to tear down as we want to do clean up like closing the transaction chain, etc.
+ // even when datastore is not writable (routerYangIId == null / redundant session)
tearDown();
}
@Override
- public void onMessage(final BmpSession session, final Notification message) {
+ public void onMessage(final Notification message) {
if (message instanceof InitiationMessage) {
onInitiate((InitiationMessage) message);
} else if (message instanceof PeerUpNotification) {
}
@Override
- public synchronized void close() throws Exception {
+ public synchronized void close() {
if (this.session != null) {
- this.session.close();
+ try {
+ this.session.close();
+ } catch (Exception e) {
+ LOG.error("Fail to close session.", e);
+ }
}
}
@GuardedBy("this")
private synchronized void tearDown() {
+ if (this.session == null) { // the session has been teared down before
+ return;
+ }
+ // we want to display remote router's IP here, as sometimes this.session.close() is already
+ // invoked before tearDown(), and session channel is null in this case, which leads to unuseful
+ // log information
+ LOG.info("BMP Session with remote router {} ({}) went down.", this.routerIp, this.session);
this.session = null;
final Iterator<BmpRouterPeer> it = this.peers.values().iterator();
try {
} catch(final Exception e) {
LOG.error("Failed to properly close BMP application.", e);
} finally {
- try {
- final DOMDataWriteTransaction wTx = this.domDataBroker.newWriteOnlyTransaction();
- wTx.delete(LogicalDatastoreType.OPERATIONAL, this.routerYangIId);
- wTx.submit().checkedGet();
- } catch (final TransactionCommitFailedException e) {
- LOG.error("Failed to remove BMP router data from DS.", e);
+ // remove session only when session is valid, otherwise
+ // we would remove the original valid session when a redundant connection happens
+ // as the routerId is the same for both connection
+ if (isDatastoreWritable()) {
+ try {
+ // it means the session was closed before it was written to datastore
+ final DOMDataWriteTransaction wTx = this.domDataBroker.newWriteOnlyTransaction();
+ wTx.delete(LogicalDatastoreType.OPERATIONAL, this.routerYangIId);
+ wTx.submit().checkedGet();
+ } catch (final TransactionCommitFailedException e) {
+ LOG.error("Failed to remove BMP router data from DS.", e);
+ }
+ this.sessionManager.removeSessionListener(this);
}
}
- this.sessionManager.removeSessionListener(this);
}
@Override
@Override
public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
- LOG.debug("Transaction chain {} successfull.", chain);
+ LOG.debug("Transaction chain {} successfully.", chain);
+ }
+
+ private boolean isDatastoreWritable() {
+ return (this.routerYangIId != null);
}
private void createRouterEntry() {
+ Preconditions.checkState(isDatastoreWritable());
final DOMDataWriteTransaction wTx = this.domTxChain.newWriteOnlyTransaction();
wTx.put(LogicalDatastoreType.OPERATIONAL, this.routerYangIId,
Builders.mapEntryBuilder()
}
private void onInitiate(final InitiationMessage initiation) {
+ Preconditions.checkState(isDatastoreWritable());
final DOMDataWriteTransaction wTx = this.domTxChain.newWriteOnlyTransaction();
wTx.merge(LogicalDatastoreType.OPERATIONAL, this.routerYangIId,
Builders.mapEntryBuilder()
package org.opendaylight.protocol.bmp.impl.app;
+import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
return new BmpRouterImpl(this);
}
- synchronized void addSessionListener(final BmpRouter sessionListener) {
- if (this.sessionListeners.containsKey(sessionListener.getRouterId())) {
- LOG.warn("Session listener for router {} already added.", sessionListener.getRouterId());
- return;
+ private synchronized boolean isSessionExist(final BmpRouter sessionListener) {
+ Preconditions.checkNotNull(sessionListener);
+ return sessionListeners.containsKey(Preconditions.checkNotNull(sessionListener.getRouterId()));
+ }
+
+ synchronized boolean addSessionListener(final BmpRouter sessionListener) {
+ if (isSessionExist(sessionListener)) {
+ LOG.warn("Session listener for router {} was already added.", sessionListener.getRouterId());
+ return false;
}
this.sessionListeners.put(sessionListener.getRouterId(), sessionListener);
+ return true;
}
synchronized void removeSessionListener(final BmpRouter sessionListener) {
- if (!this.sessionListeners.containsKey(sessionListener.getRouterId())) {
+ if (!isSessionExist(sessionListener)) {
LOG.warn("Session listener for router {} was already removed.", sessionListener.getRouterId());
return;
}
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
this.channel = ctx.channel();
+ LOG.info("Starting session {} <-> {}.", channel.localAddress(), channel.remoteAddress());
sessionUp();
- LOG.info("Session {} <-> {} started.", channel.localAddress(), channel.remoteAddress());
}
@Override
@Override
public InetAddress getRemoteAddress() {
- Preconditions.checkState(this.state != State.IDLE, "BMP Session %s is not active.", this);
+ Preconditions.checkNotNull(this.channel.remoteAddress(), "BMP Channel doesn't have a valid remote address.");
return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
}
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
LOG.error("Exception caught in BMP Session.", cause);
close();
- this.listener.onSessionDown(this, new IllegalStateException(cause));
+ this.listener.onSessionDown(new IllegalStateException(cause));
}
@Override
case UP:
if (msg instanceof InitiationMessage) {
this.state = State.INITIATED;
- this.listener.onMessage(this, msg);
+ this.listener.onMessage(msg);
} else {
- LOG.warn("Unexpected message recieved {}, expected was BMP Initiation Message. Closing session.", msg);
+ LOG.warn("Unexpected message received {}, expected was BMP Initiation Message. Closing session.", msg);
close();
}
break;
LOG.info("Session {} terminated by remote with reason: {}", this, getTerminationReason((TerminationMessage) msg));
close();
} else {
- this.listener.onMessage(this, msg);
+ this.listener.onMessage(msg);
}
break;
case IDLE:
- new IllegalStateException("Recieved message" + msg + "while BMP Session" + this + "was not active.");
+ new IllegalStateException("Received message " + msg + " while BMP Session " + this + " was not active.");
break;
default:
break;
}
private void endOfInput() {
- this.listener.onSessionDown(this, new IOException("End of input detected. Closing the session."));
+ this.listener.onSessionDown(new IOException("End of input detected. Closing the session."));
}
private void sessionUp() {
- this.state = State.UP;
+ Preconditions.checkArgument(State.IDLE == state);
this.listener.onSessionUp(this);
+ this.state = State.UP;
}
protected enum State {
import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
public class BmpMonitorImplTest extends AbstractDataBrokerTest {
-
- private static final int PORT = 12345;
- private static final String LOCAL_ADDRESS = "127.0.0.10";
- private static final InetSocketAddress CLIENT_REMOTE = new InetSocketAddress("127.0.0.10", PORT);
- private static final InetSocketAddress CLIENT_LOCAL = new InetSocketAddress(LOCAL_ADDRESS, 0);
+ // the local port and address where the monitor (ODL) will listen for incoming BMP request
+ private static final int MONITOR_LOCAL_PORT = 12345;
+ private static final String MONITOR_LOCAL_ADDRESS = "127.0.0.10";
+ private static final String MONITOR_LOCAL_ADDRESS_2 = "127.0.0.11";
+ // the router (monitee) address where we are going to simulate a BMP request from
+ private static final String REMOTE_ROUTER_ADDRESS_1 = "127.0.0.12";
+ private static final String REMOTE_ROUTER_ADDRESS_2 = "127.0.0.13";
private static final Ipv4Address PEER1 = new Ipv4Address("20.20.20.20");
private static final MonitorId MONITOR_ID = new MonitorId("monitor");
- private static final RouterId ROUTER_ID = new RouterId(new IpAddress(new Ipv4Address(LOCAL_ADDRESS)));
+ private static final KeyedInstanceIdentifier<Monitor, MonitorKey> MONITOR_IID = InstanceIdentifier.create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
private static final PeerId PEER_ID = new PeerId(PEER1.getValue());
private static final String MD5_PASSWORD = "abcdef";
this.mappingService.onGlobalContextUpdated(this.moduleInfoBackedContext.tryToCreateSchemaContext().get());
final KeyMapping keys = new KeyMapping();
- keys.put(InetAddresses.forString(LOCAL_ADDRESS), MD5_PASSWORD.getBytes(Charsets.US_ASCII));
+ keys.put(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MD5_PASSWORD.getBytes(Charsets.US_ASCII));
Mockito.doReturn(this.mockKeyAccess).when(this.kaf).getKeyAccess(Mockito.any(java.nio.channels.Channel.class));
Mockito.doReturn(keys).when(this.mockKeyAccess).getKeys();
Optional.<MD5ServerChannelFactory<?>>of(this.scfServerMd5));
this.bmpApp = BmpMonitoringStationImpl.createBmpMonitorInstance(ribExtension, this.dispatcher, getDomBroker(),
- MONITOR_ID, new InetSocketAddress(InetAddresses.forString("127.0.0.10"), PORT), Optional.of(keys),
+ MONITOR_ID, new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MONITOR_LOCAL_PORT), Optional.of(keys),
this.mappingService.getCodecFactory(), moduleInfoBackedContext.getSchemaContext(), this.mrs);
final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
}
@Test
- public void testMonitoringStation() throws InterruptedException {
- final Channel channel = connectTestClient(this.msgRegistry).channel();
+ public void testRouterMonitoring() throws Exception {
+ // first test if a single router monitoring is working
+ Channel channel1 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
+ assertEquals(1, getBmpData(MONITOR_IID).get().getRouter().size());
+
+ Channel channel2 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_2);
+ assertEquals(2, getBmpData(MONITOR_IID).get().getRouter().size());
+
+ // initiate another BMP request from router 1, create a redundant connection
+ // we expect the connection to be closed
+ Channel channel3 = connectTestClient(REMOTE_ROUTER_ADDRESS_1, this.msgRegistry).channel();
+
+ Thread.sleep(500);
+ // channel 1 should still be open, while channel3 should be closed
+ assertTrue(channel1.isOpen());
+ assertFalse(channel3.isOpen());
+ // now if we close the channel 1 and try it again, it should succeed
+ channel1.close().await();
+ Thread.sleep(500);
+
+ // channel 2 is still open
+ assertEquals(1, getBmpData(MONITOR_IID).get().getRouter().size());
+
+ Channel channel4 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
+ assertEquals(2, getBmpData(MONITOR_IID).get().getRouter().size());
+
+ // close all channel altogether
+ channel2.close().await();
+ // sleep for a while to avoid intermittent InMemoryDataTree modification conflict
+ Thread.sleep(500);
+ channel4.close().await();
+
+ Thread.sleep(500);
+ assertEquals(0, getBmpData(MONITOR_IID).get().getRouter().size());
+ }
+
+ private Channel testMonitoringStation(String remoteRouterIpAddr) throws InterruptedException {
+ final Channel channel = connectTestClient(remoteRouterIpAddr, this.msgRegistry).channel();
+ final RouterId routerId = getRouterId(remoteRouterIpAddr);
try {
Thread.sleep(500);
- final KeyedInstanceIdentifier<Monitor, MonitorKey> monitorIId = InstanceIdentifier.create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
- final Monitor monitor = getBmpData(monitorIId).get();
- assertEquals(1, monitor.getRouter().size());
- final Router router = monitor.getRouter().get(0);
- assertEquals(ROUTER_ID, router.getRouterId());
+ final Monitor monitor = getBmpData(MONITOR_IID).get();
+ assertFalse(monitor.getRouter().isEmpty());
+ // now find the current router instance
+ Router router = null;
+ for (Router r : monitor.getRouter()) {
+ if (routerId.equals(r.getRouterId())) {
+ router = r;
+ break;
+ }
+ }
+ assertNotNull(router);
assertEquals(Status.Down, router.getStatus());
assertTrue(router.getPeer().isEmpty());
channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info"));
Thread.sleep(500);
- final Monitor monitorInit = getBmpData(monitorIId).get();
- assertEquals(1, monitorInit.getRouter().size());
- final Router routerInit = monitorInit.getRouter().get(0);
+ final Monitor monitorInit = getBmpData(MONITOR_IID).get();
+ assertFalse(monitorInit.getRouter().isEmpty());
+ Router routerInit = null;
+ for (Router r : monitorInit.getRouter()) {
+ if (routerId.equals(r.getRouterId())) {
+ routerInit = r;
+ break;
+ }
+ }
+ assertNotNull(routerInit);
assertEquals("some info;", routerInit.getInfo());
assertEquals("name", routerInit.getName());
assertEquals("description", routerInit.getDescription());
- assertEquals(ROUTER_ID, routerInit.getRouterId());
+ assertEquals(routerId, routerInit.getRouterId());
assertTrue(routerInit.getPeer().isEmpty());
assertEquals(Status.Up, routerInit.getStatus());
channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true));
Thread.sleep(500);
- final KeyedInstanceIdentifier<Router, RouterKey> routerIId = monitorIId.child(Router.class, new RouterKey(ROUTER_ID));
+ final KeyedInstanceIdentifier<Router, RouterKey> routerIId = MONITOR_IID.child(Router.class, new RouterKey(routerId));
final List<Peer> peers = getBmpData(routerIId).get().getPeer();
assertEquals(1, peers.size());
final Peer peer = peers.get(0);
Thread.sleep(500);
final List<Peer> peersAfterDown = getBmpData(routerIId).get().getPeer();
assertTrue(peersAfterDown.isEmpty());
-
- channel.close().await();
- Thread.sleep(500);
- final Monitor monitorAfterClose = getBmpData(monitorIId).get();
- assertTrue(monitorAfterClose.getRouter().isEmpty());
} catch (final Exception e) {
final StringBuffer ex = new StringBuffer();
ex.append(e.getMessage() + "\n");
- for (final StackTraceElement element: e.getStackTrace()) {
+ for (final StackTraceElement element : e.getStackTrace()) {
ex.append(element.toString() + "\n");
- };
+ }
fail(ex.toString());
}
+ return channel;
}
@Test
public void deploySecondInstance() throws Exception {
final BmpMonitoringStation monitoringStation2 = BmpMonitoringStationImpl.createBmpMonitorInstance(new SimpleRIBExtensionProviderContext(), this.dispatcher, getDomBroker(),
- new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString("127.0.0.11"), PORT), Optional.of(new KeyMapping()),
+ new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS_2), MONITOR_LOCAL_PORT), Optional.of(new KeyMapping()),
this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), this.mrs);
final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
Assert.assertEquals(2, monitor.getMonitor().size());
monitoringStation2.close();
}
- private ChannelFuture connectTestClient(final BmpMessageRegistry msgRegistry) throws InterruptedException {
+ private ChannelFuture connectTestClient(final String routerIp, final BmpMessageRegistry msgRegistry) throws InterruptedException {
final BmpHandlerFactory hf = new BmpHandlerFactory(msgRegistry);
final Bootstrap b = new Bootstrap();
b.group(new NioEventLoopGroup());
ch.pipeline().addLast(hf.getEncoders());
}
});
- b.localAddress(CLIENT_LOCAL);
+ b.localAddress(new InetSocketAddress(routerIp, 0));
b.option(ChannelOption.SO_REUSEADDR, true);
- return b.connect(CLIENT_REMOTE).sync();
+ return b.connect(new InetSocketAddress(MONITOR_LOCAL_ADDRESS, MONITOR_LOCAL_PORT)).sync();
}
private <T extends DataObject> Optional<T> getBmpData(final InstanceIdentifier<T> iid) throws ReadFailedException {
return tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet();
}
}
+
+ private RouterId getRouterId(String routerIp) {
+ return new RouterId(new IpAddress(new Ipv4Address(routerIp)));
+ }
}
}
@Override
- public void onMessage(final BmpSession session, final Notification message) {
+ public void onMessage(final Notification message) {
LOG.debug("Received message: {} {}", message.getClass(), message);
this.messages.add(message);
}
}
@Override
- public void onSessionDown(final BmpSession session, final Exception e) {
+ public void onSessionDown(final Exception e) {
LOG.debug("Session down.", e);
this.up = false;
}
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
+ <!-- to solve the exception: "Invalid signature file digest for Manifest main attributes" -->
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
</configuration>
<executions>
<execution>
final BmpMockArguments arguments = BmpMockArguments.parseArguments(args);
initiateLogger(arguments);
final BmpMockDispatcher dispatcher = initiateMock(arguments);
- deployClients(dispatcher, arguments);
-
+ // now start the server / client
+ if (arguments.isOnPassiveMode()) {
+ deployServers(dispatcher, arguments);
+ } else {
+ deployClients(dispatcher, arguments);
+ }
}
private static void initiateLogger(final BmpMockArguments arguments) {
}
}
+ private static void deployServers(final BmpMockDispatcher dispatcher, final BmpMockArguments arguments) {
+ final InetSocketAddress localAddress = arguments.getLocalAddress();
+ InetAddress currentLocal = localAddress.getAddress();
+ final int port = localAddress.getPort();
+ for (int i = 0; i < arguments.getRoutersCount(); i++) {
+ dispatcher.createServer(new InetSocketAddress(currentLocal, port));
+ currentLocal = InetAddresses.increment(currentLocal);
+ }
+ }
+
private static ch.qos.logback.classic.Logger getRootLogger(final LoggerContext lc) {
return Iterables.find(lc.getLoggerList(), new Predicate<Logger>() {
@Override
import java.net.InetAddress;
import java.net.InetSocketAddress;
import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.Argument;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
private static final String LOCAL_ADDRESS_DST = "local_address";
private static final String REMOTE_ADDRESS_DST = "remote_address";
private static final String LOG_LEVEL_DST = "log_level";
+ // when set to true, the mock will operate as a server listening for incoming active monitoring request
+ private static final String PASSIVE_MODE_DST = "passive";
private static final ArgumentParser ARGUMENT_PARSER = initializeArgumentParser();
return this.parseArgs.get(LOG_LEVEL_DST);
}
+ public boolean isOnPassiveMode() {
+ return this.parseArgs.get(PASSIVE_MODE_DST);
+ }
+
private static ArgumentParser initializeArgumentParser() {
final ArgumentParser parser = ArgumentParsers.newArgumentParser(PROGRAM_NAME);
parser.addArgument(toArgName(ROUTERS_COUNT_DST)).type(Integer.class).setDefault(1);
parser.addArgument(toArgName(PEERS_COUNT_DST)).type(Integer.class).setDefault(0);
parser.addArgument(toArgName(PRE_POLICY_ROUTES_COUNT_DST)).type(Integer.class).setDefault(0);
parser.addArgument(toArgName(POST_POLICY_ROUTES_COUNT_DST)).type(Integer.class).setDefault(0);
+ parser.addArgument(toArgName(PASSIVE_MODE_DST)).action(Arguments.storeTrue());
parser.addArgument(toArgName(LOCAL_ADDRESS_DST)).type(new ArgumentType<InetSocketAddress>() {
@Override
public InetSocketAddress convert(final ArgumentParser parser, final Argument arg, final String value)
@Override
public InetSocketAddress convert(final ArgumentParser parser, final Argument arg, final String value)
throws ArgumentParserException {
- return getInetSocketAddress(value, DEFAULT_LOCAL_PORT);
+ return getInetSocketAddress(value, DEFAULT_REMOTE_PORT);
}
}).setDefault(REMOTE_ADDRESS);
parser.addArgument(toArgName(LOG_LEVEL_DST)).type(new ArgumentType<Level>(){
import com.google.common.base.Preconditions;
import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import java.net.InetSocketAddress;
import java.net.SocketAddress;
import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
private static final Logger LOG = LoggerFactory.getLogger(BmpMockDispatcher.class);
private static final int CONNECT_TIMEOUT = 2000;
+ private static final int MAX_CONNECTIONS_COUNT = 128;
final BmpHandlerFactory hf;
private final BmpSessionFactory sessionFactory;
+ private final EventLoopGroup bossGroup = new NioEventLoopGroup();
+ private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+
public BmpMockDispatcher(final BmpMessageRegistry registry, final BmpSessionFactory sessionFactory) {
this.sessionFactory = Preconditions.checkNotNull(sessionFactory);
Preconditions.checkNotNull(registry);
this.hf = new BmpHandlerFactory(registry);
}
- public ChannelFuture createClient(final SocketAddress localAddress, final SocketAddress remoteAddress) {
+ private Bootstrap createClientInstance(final SocketAddress localAddress) {
final NioEventLoopGroup workergroup = new NioEventLoopGroup();
final Bootstrap b = new Bootstrap();
}
});
b.localAddress(localAddress);
- b.remoteAddress(remoteAddress);
- LOG.debug("BMP client {} <--> {} deployed", localAddress, remoteAddress);
- return b.connect();
+ return b;
+ }
+
+ public ChannelFuture createClient(final SocketAddress localAddress, final SocketAddress remoteAddress) {
+ Preconditions.checkNotNull(localAddress);
+ Preconditions.checkNotNull(remoteAddress);
+
+ // ideally we should use Bootstrap clones here
+ Bootstrap b = createClientInstance(localAddress);
+ final ChannelFuture f = b.connect(remoteAddress);
+ LOG.info("BMP client {} <--> {} deployed", localAddress, remoteAddress);
+ return f;
+ }
+
+ private ServerBootstrap createServerInstance() {
+ final ServerBootstrap b = new ServerBootstrap();
+ b.childHandler(new ChannelInitializer<Channel>() {
+ @Override
+ protected void initChannel(final Channel ch) throws Exception {
+ ch.pipeline().addLast(BmpMockDispatcher.this.sessionFactory.getSession(ch, null));
+ ch.pipeline().addLast(BmpMockDispatcher.this.hf.getEncoders());
+ }
+ });
+
+ b.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
+ b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ b.channel(NioServerSocketChannel.class);
+ b.group(bossGroup, workerGroup);
+ return b;
+ }
+
+ public ChannelFuture createServer(final InetSocketAddress localAddress) {
+ Preconditions.checkNotNull(localAddress);
+
+ ServerBootstrap b = createServerInstance();
+ final ChannelFuture f = b.bind(localAddress);
+ LOG.info("Initiated BMP server at {}.", localAddress);
+ return f;
}
}
@Override
public void close() throws InterruptedException {
+ LOG.info("BMP session {} is closed.", BmpMockSession.this.channel);
this.channel.close().sync();
}
LOG.info("BMP session {} final successfully established.", BmpMockSession.this.channel);
}
});
- LOG.info("BMP session {} sucesfully established.", this.channel);
+ LOG.info("BMP session {} successfully established.", this.channel);
final InetSocketAddress localAddress = (InetSocketAddress) this.channel.localAddress();
this.remoteAddress = (InetSocketAddress) this.channel.remoteAddress();
advertizePeers(this.channel, localAddress);
final Channel channel = channelFuture.sync().channel();
Assert.assertTrue(channel.isActive());
+ channel.close();
+ serverDispatcher.close();
+ }
+
+ @Test
+ public void testCreateServer() throws InterruptedException {
+ final BmpMockDispatcher dispatcher = new BmpMockDispatcher(this.registry, this.sessionFactory);
+ final int port = getRandomPort();
+ final BmpDispatcherImpl serverDispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(),
+ this.registry, this.sessionFactory);
+ dispatcher.createServer(new InetSocketAddress(InetAddresses.forString("0.0.0.0"), port));
+
+ final ChannelFuture channelFuture = serverDispatcher.createClient(new InetSocketAddress(InetAddresses.forString("127.0.0.3"), port), this.slf, Optional.<KeyMapping>absent());
+ final Channel channel = channelFuture.sync().channel();
+
+ Assert.assertTrue(channel.isActive());
+ channel.close();
serverDispatcher.close();
}
private final BmpSessionListener sessionListener = Mockito.mock(BmpSessionListener.class);
private int serverPort;
- private Channel serverChannel;
private BmpExtensionProviderActivator bmpActivator;
private BmpDispatcher bmpDispatcher;
this.bmpActivator.start(ctx);
this.bmpDispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(), ctx.getBmpMessageRegistry(),
new DefaultBmpSessionFactory());
- final BmpSessionListenerFactory bmpSessionListenerFactory = new BmpSessionListenerFactory() {
- @Override
- public BmpSessionListener getSessionListener() {
- return BmpMockTest.this.sessionListener;
- }
- };
this.serverPort = BmpMockDispatcherTest.getRandomPort();
- this.serverChannel = this.bmpDispatcher.createServer(new InetSocketAddress("127.0.0.1", this.serverPort),
- bmpSessionListenerFactory, Optional.<KeyMapping>absent()).channel();
}
@After
public void tearDown() throws Exception {
- this.serverChannel.close().sync();
this.bmpActivator.stop();
this.bmpDispatcher.close();
}
@Test
public void testMain() throws Exception {
+ final BmpSessionListenerFactory bmpSessionListenerFactory = new BmpSessionListenerFactory() {
+ @Override
+ public BmpSessionListener getSessionListener() {
+ return BmpMockTest.this.sessionListener;
+ }
+ };
+ Channel serverChannel = bmpDispatcher.createServer(new InetSocketAddress("127.0.0.1", serverPort),
+ bmpSessionListenerFactory, Optional.<KeyMapping>absent()).channel();
+
BmpMock.main(new String[] {"--remote_address", "127.0.0.1:" + serverPort, "--peers_count", "3", "--pre_policy_routes", "3"});
Thread.sleep(1000);
Mockito.verify(this.sessionListener).onSessionUp(Mockito.any(BmpSession.class));
//1 * Initiate message + 3 * PeerUp Notification + 9 * Route Monitoring message
- Mockito.verify(this.sessionListener, Mockito.times(13)).onMessage(Mockito.any(BmpSession.class), Mockito.any(Notification.class));
+ Mockito.verify(this.sessionListener, Mockito.times(13)).onMessage(Mockito.any(Notification.class));
+
+ serverChannel.close().sync();
}
+ @Test
+ public void testMainInPassiveMode() throws Exception {
+ final BmpSessionListenerFactory bmpSessionListenerFactory = new BmpSessionListenerFactory() {
+ @Override
+ public BmpSessionListener getSessionListener() {
+ return BmpMockTest.this.sessionListener;
+ }
+ };
+
+ // create a local server in passive mode instead
+ BmpMock.main(new String[]{"--local_address", "127.0.0.1:" + serverPort, "--peers_count", "3", "--pre_policy_routes", "3", "--passive"});
+ Channel serverChannel = bmpDispatcher.createClient(new InetSocketAddress("127.0.0.1", serverPort),
+ bmpSessionListenerFactory, Optional.<KeyMapping>absent()).channel();
+
+ Thread.sleep(1000);
+ Mockito.verify(this.sessionListener).onSessionUp(Mockito.any(BmpSession.class));
+ //1 * Initiate message + 3 * PeerUp Notification + 9 * Route Monitoring message
+ Mockito.verify(this.sessionListener, Mockito.times(13)).onMessage(Mockito.any(Notification.class));
+
+ serverChannel.close().sync();
+ }
}
void onSessionUp(BmpSession session);
- void onSessionDown(BmpSession session, Exception e);
+ void onSessionDown(Exception e);
- void onMessage(BmpSession session, Notification message);
+ void onMessage(Notification message);
}