Add BMP redundant connection detection logic 94/37994/13
authorKevin Wang <kevixw@gmail.com>
Thu, 21 Apr 2016 23:42:12 +0000 (23:42 +0000)
committerKevin Wang <kevixw@gmail.com>
Tue, 3 May 2016 19:40:42 +0000 (19:40 +0000)
Change-Id: If0081a64fb9c6b2b0d6aa0dee8bfb927cda43010
Signed-off-by: Kevin Wang <kevixw@gmail.com>
13 files changed:
bgp/bmp-impl/src/main/java/org/opendaylight/protocol/bmp/impl/app/BmpRouterImpl.java
bgp/bmp-impl/src/main/java/org/opendaylight/protocol/bmp/impl/app/RouterSessionManager.java
bgp/bmp-impl/src/main/java/org/opendaylight/protocol/bmp/impl/session/BmpSessionImpl.java
bgp/bmp-impl/src/test/java/org/opendaylight/protocol/bmp/impl/app/BmpMonitorImplTest.java
bgp/bmp-impl/src/test/java/org/opendaylight/protocol/bmp/impl/session/BmpTestSessionListener.java
bgp/bmp-mock/pom.xml
bgp/bmp-mock/src/main/java/org/opendaylight/protocol/bmp/mock/BmpMock.java
bgp/bmp-mock/src/main/java/org/opendaylight/protocol/bmp/mock/BmpMockArguments.java
bgp/bmp-mock/src/main/java/org/opendaylight/protocol/bmp/mock/BmpMockDispatcher.java
bgp/bmp-mock/src/main/java/org/opendaylight/protocol/bmp/mock/BmpMockSession.java
bgp/bmp-mock/src/test/java/org/opendaylight/protocol/bmp/mock/BmpMockDispatcherTest.java
bgp/bmp-mock/src/test/java/org/opendaylight/protocol/bmp/mock/BmpMockTest.java
bgp/bmp-spi/src/main/java/org/opendaylight/protocol/bmp/api/BmpSessionListener.java

index b97a970c26db7663c22f1194b09a898cdb95a388..0594bef2c5fdcbbaded1ab472989ca2bf9f2fb34 100644 (file)
@@ -77,7 +77,7 @@ public class BmpRouterImpl implements BmpRouter, TransactionChainListener {
     public BmpRouterImpl(final RouterSessionManager sessionManager) {
         this.sessionManager = Preconditions.checkNotNull(sessionManager);
         this.domDataBroker = sessionManager.getDomDataBroker();
-        this.domTxChain = sessionManager.getDomDataBroker().createTransactionChain(this);
+        this.domTxChain = this.domDataBroker.createTransactionChain(this);
         this.extensions = sessionManager.getExtensions();
         this.tree = sessionManager.getCodecTree();
     }
@@ -85,23 +85,30 @@ public class BmpRouterImpl implements BmpRouter, TransactionChainListener {
     @Override
     public void onSessionUp(final BmpSession session) {
         this.session = session;
-        this.routerIp = InetAddresses.toAddrString(session.getRemoteAddress());
-        this.routerId = new RouterId(Ipv4Util.getIpAddress(session.getRemoteAddress()));
-        this.routerYangIId = YangInstanceIdentifier.builder(this.sessionManager.getRoutersYangIId()).nodeWithKey(Router.QNAME,
+        this.routerIp = InetAddresses.toAddrString(this.session.getRemoteAddress());
+        this.routerId = new RouterId(Ipv4Util.getIpAddress(this.session.getRemoteAddress()));
+        // check if this session is redundant
+        if (!this.sessionManager.addSessionListener(this)) {
+            LOG.warn("Redundant BMP session with remote router {} ({}) detected. This BMP session will be abandoned.", this.routerIp, this.session);
+            this.close();
+        } else {
+            this.routerYangIId = YangInstanceIdentifier.builder(this.sessionManager.getRoutersYangIId()).nodeWithKey(Router.QNAME,
                 ROUTER_ID_QNAME, this.routerIp).build();
-        this.peersYangIId = YangInstanceIdentifier.builder(routerYangIId).node(Peer.QNAME).build();
-        createRouterEntry();
-        this.sessionManager.addSessionListener(this);
+            this.peersYangIId = YangInstanceIdentifier.builder(routerYangIId).node(Peer.QNAME).build();
+            createRouterEntry();
+            LOG.info("BMP session with remote router {} ({}) is up now.", this.routerIp, this.session);
+        }
     }
 
     @Override
-    public void onSessionDown(final BmpSession session, final Exception e) {
-        LOG.info("Session {} went down.", session);
+    public void onSessionDown(final Exception e) {
+        // we want to tear down as we want to do clean up like closing the transaction chain, etc.
+        // even when datastore is not writable (routerYangIId == null / redundant session)
         tearDown();
     }
 
     @Override
-    public void onMessage(final BmpSession session, final Notification message) {
+    public void onMessage(final Notification message) {
         if (message instanceof InitiationMessage) {
             onInitiate((InitiationMessage) message);
         } else if (message instanceof PeerUpNotification) {
@@ -117,14 +124,25 @@ public class BmpRouterImpl implements BmpRouter, TransactionChainListener {
     }
 
     @Override
-    public synchronized void close() throws Exception {
+    public synchronized void close() {
         if (this.session != null) {
-            this.session.close();
+            try {
+                this.session.close();
+            } catch (Exception e) {
+                LOG.error("Fail to close session.", e);
+            }
         }
     }
 
     @GuardedBy("this")
     private synchronized void tearDown() {
+        if (this.session == null) {   // the session has been teared down before
+            return;
+        }
+        // we want to display remote router's IP here, as sometimes this.session.close() is already
+        // invoked before tearDown(), and session channel is null in this case, which leads to unuseful
+        // log information
+        LOG.info("BMP Session with remote router {} ({}) went down.", this.routerIp, this.session);
         this.session = null;
         final Iterator<BmpRouterPeer> it = this.peers.values().iterator();
         try {
@@ -136,15 +154,21 @@ public class BmpRouterImpl implements BmpRouter, TransactionChainListener {
         } catch(final Exception e) {
             LOG.error("Failed to properly close BMP application.", e);
         } finally {
-            try {
-                final DOMDataWriteTransaction wTx = this.domDataBroker.newWriteOnlyTransaction();
-                wTx.delete(LogicalDatastoreType.OPERATIONAL, this.routerYangIId);
-                wTx.submit().checkedGet();
-            } catch (final TransactionCommitFailedException e) {
-                LOG.error("Failed to remove BMP router data from DS.", e);
+            // remove session only when session is valid, otherwise
+            // we would remove the original valid session when a redundant connection happens
+            // as the routerId is the same for both connection
+            if (isDatastoreWritable()) {
+                try {
+                    // it means the session was closed before it was written to datastore
+                    final DOMDataWriteTransaction wTx = this.domDataBroker.newWriteOnlyTransaction();
+                    wTx.delete(LogicalDatastoreType.OPERATIONAL, this.routerYangIId);
+                    wTx.submit().checkedGet();
+                } catch (final TransactionCommitFailedException e) {
+                    LOG.error("Failed to remove BMP router data from DS.", e);
+                }
+                this.sessionManager.removeSessionListener(this);
             }
         }
-        this.sessionManager.removeSessionListener(this);
     }
 
     @Override
@@ -154,10 +178,15 @@ public class BmpRouterImpl implements BmpRouter, TransactionChainListener {
 
     @Override
     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
-        LOG.debug("Transaction chain {} successfull.", chain);
+        LOG.debug("Transaction chain {} successfully.", chain);
+    }
+
+    private boolean isDatastoreWritable() {
+        return (this.routerYangIId != null);
     }
 
     private void createRouterEntry() {
+        Preconditions.checkState(isDatastoreWritable());
         final DOMDataWriteTransaction wTx = this.domTxChain.newWriteOnlyTransaction();
         wTx.put(LogicalDatastoreType.OPERATIONAL, this.routerYangIId,
                 Builders.mapEntryBuilder()
@@ -169,6 +198,7 @@ public class BmpRouterImpl implements BmpRouter, TransactionChainListener {
     }
 
     private void onInitiate(final InitiationMessage initiation) {
+        Preconditions.checkState(isDatastoreWritable());
         final DOMDataWriteTransaction wTx = this.domTxChain.newWriteOnlyTransaction();
         wTx.merge(LogicalDatastoreType.OPERATIONAL, this.routerYangIId,
                 Builders.mapEntryBuilder()
index 09a17ffc30cc37dce04692f3e08c80293fc271e8..cc8793233eba97917059fe6de5c4fd9e5dc1b484 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.protocol.bmp.impl.app;
 
+import com.google.common.base.Preconditions;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
@@ -46,16 +47,22 @@ final class RouterSessionManager implements BmpSessionListenerFactory, AutoClose
         return new BmpRouterImpl(this);
     }
 
-    synchronized void addSessionListener(final BmpRouter sessionListener) {
-        if (this.sessionListeners.containsKey(sessionListener.getRouterId())) {
-            LOG.warn("Session listener for router {} already added.", sessionListener.getRouterId());
-            return;
+    private synchronized boolean isSessionExist(final BmpRouter sessionListener) {
+        Preconditions.checkNotNull(sessionListener);
+        return sessionListeners.containsKey(Preconditions.checkNotNull(sessionListener.getRouterId()));
+    }
+
+    synchronized boolean addSessionListener(final BmpRouter sessionListener) {
+        if (isSessionExist(sessionListener)) {
+            LOG.warn("Session listener for router {} was already added.", sessionListener.getRouterId());
+            return false;
         }
         this.sessionListeners.put(sessionListener.getRouterId(), sessionListener);
+        return true;
     }
 
     synchronized void removeSessionListener(final BmpRouter sessionListener) {
-        if (!this.sessionListeners.containsKey(sessionListener.getRouterId())) {
+        if (!isSessionExist(sessionListener)) {
             LOG.warn("Session listener for router {} was already removed.", sessionListener.getRouterId());
             return;
         }
index bdf6b77a130901be5e028b15004b191ff398ea00..652be4b4fb0173726279125fb78f6c000ffdc178 100644 (file)
@@ -63,8 +63,8 @@ public final class BmpSessionImpl extends SimpleChannelInboundHandler<Notificati
     @Override
     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
         this.channel = ctx.channel();
+        LOG.info("Starting session {} <-> {}.", channel.localAddress(), channel.remoteAddress());
         sessionUp();
-        LOG.info("Session {} <-> {} started.", channel.localAddress(), channel.remoteAddress());
     }
 
     @Override
@@ -79,7 +79,7 @@ public final class BmpSessionImpl extends SimpleChannelInboundHandler<Notificati
 
     @Override
     public InetAddress getRemoteAddress() {
-        Preconditions.checkState(this.state != State.IDLE, "BMP Session %s is not active.", this);
+        Preconditions.checkNotNull(this.channel.remoteAddress(), "BMP Channel doesn't have a valid remote address.");
         return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
     }
 
@@ -87,7 +87,7 @@ public final class BmpSessionImpl extends SimpleChannelInboundHandler<Notificati
     public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
         LOG.error("Exception caught in BMP Session.", cause);
         close();
-        this.listener.onSessionDown(this, new IllegalStateException(cause));
+        this.listener.onSessionDown(new IllegalStateException(cause));
     }
 
     @Override
@@ -105,9 +105,9 @@ public final class BmpSessionImpl extends SimpleChannelInboundHandler<Notificati
         case UP:
             if (msg instanceof InitiationMessage) {
                 this.state = State.INITIATED;
-                this.listener.onMessage(this, msg);
+                this.listener.onMessage(msg);
             } else {
-                LOG.warn("Unexpected message recieved {}, expected was BMP Initiation Message. Closing session.", msg);
+                LOG.warn("Unexpected message received {}, expected was BMP Initiation Message. Closing session.", msg);
                 close();
             }
             break;
@@ -116,11 +116,11 @@ public final class BmpSessionImpl extends SimpleChannelInboundHandler<Notificati
                 LOG.info("Session {} terminated by remote with reason: {}", this, getTerminationReason((TerminationMessage) msg));
                 close();
             } else {
-                this.listener.onMessage(this, msg);
+                this.listener.onMessage(msg);
             }
             break;
         case IDLE:
-            new IllegalStateException("Recieved message" + msg + "while BMP Session" + this + "was not active.");
+            new IllegalStateException("Received message " + msg + " while BMP Session " + this + " was not active.");
             break;
         default:
             break;
@@ -136,12 +136,13 @@ public final class BmpSessionImpl extends SimpleChannelInboundHandler<Notificati
     }
 
     private void endOfInput() {
-        this.listener.onSessionDown(this, new IOException("End of input detected. Closing the session."));
+        this.listener.onSessionDown(new IOException("End of input detected. Closing the session."));
     }
 
     private void sessionUp() {
-        this.state = State.UP;
+        Preconditions.checkArgument(State.IDLE == state);
         this.listener.onSessionUp(this);
+        this.state = State.UP;
     }
 
     protected enum State {
index b0c34bc979ba11b155b2e63c867067a3647af2f4..5c46b568b365caa16d6f8e8c9879552576caf013 100644 (file)
@@ -109,14 +109,16 @@ import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
 
 public class BmpMonitorImplTest extends AbstractDataBrokerTest {
-
-    private static final int PORT = 12345;
-    private static final String LOCAL_ADDRESS = "127.0.0.10";
-    private static final InetSocketAddress CLIENT_REMOTE = new InetSocketAddress("127.0.0.10", PORT);
-    private static final InetSocketAddress CLIENT_LOCAL = new InetSocketAddress(LOCAL_ADDRESS, 0);
+    // the local port and address where the monitor (ODL) will listen for incoming BMP request
+    private static final int MONITOR_LOCAL_PORT = 12345;
+    private static final String MONITOR_LOCAL_ADDRESS = "127.0.0.10";
+    private static final String MONITOR_LOCAL_ADDRESS_2 = "127.0.0.11";
+    // the router (monitee) address where we are going to simulate a BMP request from
+    private static final String REMOTE_ROUTER_ADDRESS_1 = "127.0.0.12";
+    private static final String REMOTE_ROUTER_ADDRESS_2 = "127.0.0.13";
     private static final Ipv4Address PEER1 = new Ipv4Address("20.20.20.20");
     private static final MonitorId MONITOR_ID = new MonitorId("monitor");
-    private static final RouterId ROUTER_ID = new RouterId(new IpAddress(new Ipv4Address(LOCAL_ADDRESS)));
+    private static final KeyedInstanceIdentifier<Monitor, MonitorKey> MONITOR_IID = InstanceIdentifier.create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
     private static final PeerId PEER_ID = new PeerId(PEER1.getValue());
     private static final String MD5_PASSWORD = "abcdef";
 
@@ -152,7 +154,7 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
         this.mappingService.onGlobalContextUpdated(this.moduleInfoBackedContext.tryToCreateSchemaContext().get());
 
         final KeyMapping keys = new KeyMapping();
-        keys.put(InetAddresses.forString(LOCAL_ADDRESS), MD5_PASSWORD.getBytes(Charsets.US_ASCII));
+        keys.put(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MD5_PASSWORD.getBytes(Charsets.US_ASCII));
 
         Mockito.doReturn(this.mockKeyAccess).when(this.kaf).getKeyAccess(Mockito.any(java.nio.channels.Channel.class));
         Mockito.doReturn(keys).when(this.mockKeyAccess).getKeys();
@@ -178,7 +180,7 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
                 Optional.<MD5ServerChannelFactory<?>>of(this.scfServerMd5));
 
         this.bmpApp = BmpMonitoringStationImpl.createBmpMonitorInstance(ribExtension, this.dispatcher, getDomBroker(),
-                MONITOR_ID, new InetSocketAddress(InetAddresses.forString("127.0.0.10"), PORT), Optional.of(keys),
+                MONITOR_ID, new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS), MONITOR_LOCAL_PORT), Optional.of(keys),
                 this.mappingService.getCodecFactory(), moduleInfoBackedContext.getSchemaContext(), this.mrs);
 
         final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
@@ -201,34 +203,84 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
     }
 
     @Test
-    public void testMonitoringStation() throws InterruptedException {
-        final Channel channel = connectTestClient(this.msgRegistry).channel();
+    public void testRouterMonitoring() throws Exception {
+        // first test if a single router monitoring is working
+        Channel channel1 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
+        assertEquals(1, getBmpData(MONITOR_IID).get().getRouter().size());
+
+        Channel channel2 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_2);
+        assertEquals(2, getBmpData(MONITOR_IID).get().getRouter().size());
+
+        // initiate another BMP request from router 1, create a redundant connection
+        // we expect the connection to be closed
+        Channel channel3 = connectTestClient(REMOTE_ROUTER_ADDRESS_1, this.msgRegistry).channel();
+
+        Thread.sleep(500);
+        // channel 1 should still be open, while channel3 should be closed
+        assertTrue(channel1.isOpen());
+        assertFalse(channel3.isOpen());
+        // now if we close the channel 1 and try it again, it should succeed
+        channel1.close().await();
+        Thread.sleep(500);
+
+        // channel 2 is still open
+        assertEquals(1, getBmpData(MONITOR_IID).get().getRouter().size());
+
+        Channel channel4 = testMonitoringStation(REMOTE_ROUTER_ADDRESS_1);
+        assertEquals(2, getBmpData(MONITOR_IID).get().getRouter().size());
+
+        // close all channel altogether
+        channel2.close().await();
+        // sleep for a while to avoid intermittent InMemoryDataTree modification conflict
+        Thread.sleep(500);
+        channel4.close().await();
+
+        Thread.sleep(500);
+        assertEquals(0, getBmpData(MONITOR_IID).get().getRouter().size());
+    }
+
+    private Channel testMonitoringStation(String remoteRouterIpAddr) throws InterruptedException {
+        final Channel channel = connectTestClient(remoteRouterIpAddr, this.msgRegistry).channel();
+        final RouterId routerId = getRouterId(remoteRouterIpAddr);
         try {
             Thread.sleep(500);
-            final KeyedInstanceIdentifier<Monitor, MonitorKey> monitorIId = InstanceIdentifier.create(BmpMonitor.class).child(Monitor.class, new MonitorKey(MONITOR_ID));
 
-            final Monitor monitor = getBmpData(monitorIId).get();
-            assertEquals(1, monitor.getRouter().size());
-            final Router router = monitor.getRouter().get(0);
-            assertEquals(ROUTER_ID, router.getRouterId());
+            final Monitor monitor = getBmpData(MONITOR_IID).get();
+            assertFalse(monitor.getRouter().isEmpty());
+            // now find the current router instance
+            Router router = null;
+            for (Router r : monitor.getRouter()) {
+                if (routerId.equals(r.getRouterId())) {
+                    router = r;
+                    break;
+                }
+            }
+            assertNotNull(router);
             assertEquals(Status.Down, router.getStatus());
             assertTrue(router.getPeer().isEmpty());
 
             channel.writeAndFlush(TestUtil.createInitMsg("description", "name", "some info"));
             Thread.sleep(500);
-            final Monitor monitorInit = getBmpData(monitorIId).get();
-            assertEquals(1, monitorInit.getRouter().size());
-            final Router routerInit = monitorInit.getRouter().get(0);
+            final Monitor monitorInit = getBmpData(MONITOR_IID).get();
+            assertFalse(monitorInit.getRouter().isEmpty());
+            Router routerInit = null;
+            for (Router r : monitorInit.getRouter()) {
+                if (routerId.equals(r.getRouterId())) {
+                    routerInit = r;
+                    break;
+                }
+            }
+            assertNotNull(routerInit);
             assertEquals("some info;", routerInit.getInfo());
             assertEquals("name", routerInit.getName());
             assertEquals("description", routerInit.getDescription());
-            assertEquals(ROUTER_ID, routerInit.getRouterId());
+            assertEquals(routerId, routerInit.getRouterId());
             assertTrue(routerInit.getPeer().isEmpty());
             assertEquals(Status.Up, routerInit.getStatus());
 
             channel.writeAndFlush(TestUtil.createPeerUpNotification(PEER1, true));
             Thread.sleep(500);
-            final KeyedInstanceIdentifier<Router, RouterKey> routerIId = monitorIId.child(Router.class, new RouterKey(ROUTER_ID));
+            final KeyedInstanceIdentifier<Router, RouterKey> routerIId = MONITOR_IID.child(Router.class, new RouterKey(routerId));
             final List<Peer> peers = getBmpData(routerIId).get().getPeer();
             assertEquals(1, peers.size());
             final Peer peer = peers.get(0);
@@ -309,32 +361,28 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
             Thread.sleep(500);
             final List<Peer> peersAfterDown = getBmpData(routerIId).get().getPeer();
             assertTrue(peersAfterDown.isEmpty());
-
-            channel.close().await();
-            Thread.sleep(500);
-            final Monitor monitorAfterClose = getBmpData(monitorIId).get();
-            assertTrue(monitorAfterClose.getRouter().isEmpty());
         } catch (final Exception e) {
             final StringBuffer ex = new StringBuffer();
             ex.append(e.getMessage() + "\n");
-            for (final StackTraceElement element: e.getStackTrace()) {
+            for (final StackTraceElement element : e.getStackTrace()) {
                 ex.append(element.toString() + "\n");
-            };
+            }
             fail(ex.toString());
         }
+        return channel;
     }
 
     @Test
     public void deploySecondInstance() throws Exception {
         final BmpMonitoringStation monitoringStation2 = BmpMonitoringStationImpl.createBmpMonitorInstance(new SimpleRIBExtensionProviderContext(), this.dispatcher, getDomBroker(),
-                new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString("127.0.0.11"), PORT), Optional.of(new KeyMapping()),
+                new MonitorId("monitor2"), new InetSocketAddress(InetAddresses.forString(MONITOR_LOCAL_ADDRESS_2), MONITOR_LOCAL_PORT), Optional.of(new KeyMapping()),
                 this.mappingService.getCodecFactory(), this.moduleInfoBackedContext.getSchemaContext(), this.mrs);
         final BmpMonitor monitor = getBmpData(InstanceIdentifier.create(BmpMonitor.class)).get();
         Assert.assertEquals(2, monitor.getMonitor().size());
         monitoringStation2.close();
     }
 
-    private ChannelFuture connectTestClient(final BmpMessageRegistry msgRegistry) throws InterruptedException {
+    private ChannelFuture connectTestClient(final String routerIp, final BmpMessageRegistry msgRegistry) throws InterruptedException {
         final BmpHandlerFactory hf = new BmpHandlerFactory(msgRegistry);
         final Bootstrap b = new Bootstrap();
         b.group(new NioEventLoopGroup());
@@ -347,9 +395,9 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
                 ch.pipeline().addLast(hf.getEncoders());
             }
         });
-        b.localAddress(CLIENT_LOCAL);
+        b.localAddress(new InetSocketAddress(routerIp, 0));
         b.option(ChannelOption.SO_REUSEADDR, true);
-        return b.connect(CLIENT_REMOTE).sync();
+        return b.connect(new InetSocketAddress(MONITOR_LOCAL_ADDRESS, MONITOR_LOCAL_PORT)).sync();
     }
 
     private <T extends DataObject> Optional<T> getBmpData(final InstanceIdentifier<T> iid) throws ReadFailedException {
@@ -357,4 +405,8 @@ public class BmpMonitorImplTest extends AbstractDataBrokerTest {
             return tx.read(LogicalDatastoreType.OPERATIONAL, iid).checkedGet();
         }
     }
+
+    private RouterId getRouterId(String routerIp) {
+        return new RouterId(new IpAddress(new Ipv4Address(routerIp)));
+    }
 }
index 210e4d61d7d0e7fdbb8974821aacb25bd3a678e4..34d8501c8eb73d8e17ba4d22616519a3c43776bf 100644 (file)
@@ -34,7 +34,7 @@ public class BmpTestSessionListener implements BmpSessionListener {
     }
 
     @Override
-    public void onMessage(final BmpSession session, final Notification message) {
+    public void onMessage(final Notification message) {
         LOG.debug("Received message: {} {}", message.getClass(), message);
         this.messages.add(message);
     }
@@ -46,7 +46,7 @@ public class BmpTestSessionListener implements BmpSessionListener {
     }
 
     @Override
-    public void onSessionDown(final BmpSession session, final Exception e) {
+    public void onSessionDown(final Exception e) {
         LOG.debug("Session down.", e);
         this.up = false;
     }
index a5199e85c0246020f11aea6d5200b5d291e87ad2..06fa303e291acc3a8e5e67a36af5d3885e105fb9 100644 (file)
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
                 <configuration>
+                    <!-- to solve the exception: "Invalid signature file digest for Manifest main attributes" -->
+                    <filters>
+                        <filter>
+                            <artifact>*:*</artifact>
+                            <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                            </excludes>
+                        </filter>
+                    </filters>
                 </configuration>
                 <executions>
                     <execution>
index 982afd374315d299bf3173c4ab68c40450035e02..fc0ec5d2271b4c112f782797ecd31f859be5179e 100644 (file)
@@ -38,8 +38,12 @@ public final class BmpMock {
         final BmpMockArguments arguments = BmpMockArguments.parseArguments(args);
         initiateLogger(arguments);
         final BmpMockDispatcher dispatcher = initiateMock(arguments);
-        deployClients(dispatcher, arguments);
-
+        // now start the server / client
+        if (arguments.isOnPassiveMode()) {
+            deployServers(dispatcher, arguments);
+        } else {
+            deployClients(dispatcher, arguments);
+        }
     }
 
     private static void initiateLogger(final BmpMockArguments arguments) {
@@ -73,6 +77,16 @@ public final class BmpMock {
         }
     }
 
+    private static void deployServers(final BmpMockDispatcher dispatcher, final BmpMockArguments arguments) {
+        final InetSocketAddress localAddress = arguments.getLocalAddress();
+        InetAddress currentLocal = localAddress.getAddress();
+        final int port = localAddress.getPort();
+        for (int i = 0; i < arguments.getRoutersCount(); i++) {
+            dispatcher.createServer(new InetSocketAddress(currentLocal, port));
+            currentLocal = InetAddresses.increment(currentLocal);
+        }
+    }
+
     private static ch.qos.logback.classic.Logger getRootLogger(final LoggerContext lc) {
         return Iterables.find(lc.getLoggerList(), new Predicate<Logger>() {
             @Override
index f7bc76cb31a8304c50df529d9921ee0de3ec76fd..56c8150473efbaaa4719833ea2a5bd1a92934062 100644 (file)
@@ -14,6 +14,7 @@ import com.google.common.net.InetAddresses;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.impl.Arguments;
 import net.sourceforge.argparse4j.inf.Argument;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
@@ -38,6 +39,8 @@ public final class BmpMockArguments {
     private static final String LOCAL_ADDRESS_DST = "local_address";
     private static final String REMOTE_ADDRESS_DST = "remote_address";
     private static final String LOG_LEVEL_DST = "log_level";
+    // when set to true, the mock will operate as a server listening for incoming active monitoring request
+    private static final String PASSIVE_MODE_DST = "passive";
 
     private static final ArgumentParser ARGUMENT_PARSER = initializeArgumentParser();
 
@@ -84,12 +87,17 @@ public final class BmpMockArguments {
         return this.parseArgs.get(LOG_LEVEL_DST);
     }
 
+    public boolean isOnPassiveMode() {
+        return this.parseArgs.get(PASSIVE_MODE_DST);
+    }
+
     private static ArgumentParser initializeArgumentParser() {
         final ArgumentParser parser = ArgumentParsers.newArgumentParser(PROGRAM_NAME);
         parser.addArgument(toArgName(ROUTERS_COUNT_DST)).type(Integer.class).setDefault(1);
         parser.addArgument(toArgName(PEERS_COUNT_DST)).type(Integer.class).setDefault(0);
         parser.addArgument(toArgName(PRE_POLICY_ROUTES_COUNT_DST)).type(Integer.class).setDefault(0);
         parser.addArgument(toArgName(POST_POLICY_ROUTES_COUNT_DST)).type(Integer.class).setDefault(0);
+        parser.addArgument(toArgName(PASSIVE_MODE_DST)).action(Arguments.storeTrue());
         parser.addArgument(toArgName(LOCAL_ADDRESS_DST)).type(new ArgumentType<InetSocketAddress>() {
             @Override
             public InetSocketAddress convert(final ArgumentParser parser, final Argument arg, final String value)
@@ -101,7 +109,7 @@ public final class BmpMockArguments {
             @Override
             public InetSocketAddress convert(final ArgumentParser parser, final Argument arg, final String value)
                     throws ArgumentParserException {
-                return getInetSocketAddress(value, DEFAULT_LOCAL_PORT);
+                return getInetSocketAddress(value, DEFAULT_REMOTE_PORT);
             }
         }).setDefault(REMOTE_ADDRESS);
         parser.addArgument(toArgName(LOG_LEVEL_DST)).type(new ArgumentType<Level>(){
index ee90382fa8db07231cc4f24b73c7a49525a841ea..2f1206b7c3195dfac52bbc07d6a7b5cf4c835114 100644 (file)
@@ -10,11 +10,17 @@ package org.opendaylight.protocol.bmp.mock;
 
 import com.google.common.base.Preconditions;
 import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import org.opendaylight.protocol.bmp.api.BmpSessionFactory;
 import org.opendaylight.protocol.bmp.impl.BmpHandlerFactory;
@@ -26,17 +32,21 @@ public final class BmpMockDispatcher {
 
     private static final Logger LOG = LoggerFactory.getLogger(BmpMockDispatcher.class);
     private static final int CONNECT_TIMEOUT = 2000;
+    private static final int MAX_CONNECTIONS_COUNT = 128;
 
     final BmpHandlerFactory hf;
     private final BmpSessionFactory sessionFactory;
 
+    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
+    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+
     public BmpMockDispatcher(final BmpMessageRegistry registry, final BmpSessionFactory sessionFactory) {
         this.sessionFactory = Preconditions.checkNotNull(sessionFactory);
         Preconditions.checkNotNull(registry);
         this.hf = new BmpHandlerFactory(registry);
     }
 
-    public ChannelFuture createClient(final SocketAddress localAddress, final SocketAddress remoteAddress) {
+    private Bootstrap createClientInstance(final SocketAddress localAddress) {
         final NioEventLoopGroup workergroup = new NioEventLoopGroup();
         final Bootstrap b = new Bootstrap();
 
@@ -53,8 +63,43 @@ public final class BmpMockDispatcher {
             }
         });
         b.localAddress(localAddress);
-        b.remoteAddress(remoteAddress);
-        LOG.debug("BMP client {} <--> {} deployed", localAddress, remoteAddress);
-        return b.connect();
+        return b;
+    }
+
+    public ChannelFuture createClient(final SocketAddress localAddress, final SocketAddress remoteAddress) {
+        Preconditions.checkNotNull(localAddress);
+        Preconditions.checkNotNull(remoteAddress);
+
+        // ideally we should use Bootstrap clones here
+        Bootstrap b = createClientInstance(localAddress);
+        final ChannelFuture f = b.connect(remoteAddress);
+        LOG.info("BMP client {} <--> {} deployed", localAddress, remoteAddress);
+        return f;
+    }
+
+    private ServerBootstrap createServerInstance() {
+        final ServerBootstrap b = new ServerBootstrap();
+        b.childHandler(new ChannelInitializer<Channel>() {
+            @Override
+            protected void initChannel(final Channel ch) throws Exception {
+                ch.pipeline().addLast(BmpMockDispatcher.this.sessionFactory.getSession(ch, null));
+                ch.pipeline().addLast(BmpMockDispatcher.this.hf.getEncoders());
+            }
+        });
+
+        b.option(ChannelOption.SO_BACKLOG, MAX_CONNECTIONS_COUNT);
+        b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        b.channel(NioServerSocketChannel.class);
+        b.group(bossGroup, workerGroup);
+        return b;
+    }
+
+    public ChannelFuture createServer(final InetSocketAddress localAddress) {
+        Preconditions.checkNotNull(localAddress);
+
+        ServerBootstrap b = createServerInstance();
+        final ChannelFuture f = b.bind(localAddress);
+        LOG.info("Initiated BMP server at {}.", localAddress);
+        return f;
     }
 }
index f0150dc1a5377d60fabf29518c9a5f94f4d458bb..3a4c79619f624bad0f0b076fc845ff45890d54ba 100644 (file)
@@ -46,6 +46,7 @@ public final class BmpMockSession extends SimpleChannelInboundHandler<Notificati
 
     @Override
     public void close() throws InterruptedException {
+        LOG.info("BMP session {} is closed.", BmpMockSession.this.channel);
         this.channel.close().sync();
     }
 
@@ -68,7 +69,7 @@ public final class BmpMockSession extends SimpleChannelInboundHandler<Notificati
                 LOG.info("BMP session {} final successfully established.", BmpMockSession.this.channel);
             }
         });
-        LOG.info("BMP session {} sucesfully established.", this.channel);
+        LOG.info("BMP session {} successfully established.", this.channel);
         final InetSocketAddress localAddress = (InetSocketAddress) this.channel.localAddress();
         this.remoteAddress = (InetSocketAddress) this.channel.remoteAddress();
         advertizePeers(this.channel, localAddress);
index 5c88113f401477ae0b787fb8eb1a3e2af5870544..ab975ef9dfc529c5c441d72489e8a0c155a2da7d 100644 (file)
@@ -42,6 +42,23 @@ public class BmpMockDispatcherTest {
         final Channel channel = channelFuture.sync().channel();
 
         Assert.assertTrue(channel.isActive());
+        channel.close();
+        serverDispatcher.close();
+    }
+
+    @Test
+    public void testCreateServer() throws InterruptedException {
+        final BmpMockDispatcher dispatcher = new BmpMockDispatcher(this.registry, this.sessionFactory);
+        final int port = getRandomPort();
+        final BmpDispatcherImpl serverDispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(),
+                this.registry, this.sessionFactory);
+        dispatcher.createServer(new InetSocketAddress(InetAddresses.forString("0.0.0.0"), port));
+
+        final ChannelFuture channelFuture = serverDispatcher.createClient(new InetSocketAddress(InetAddresses.forString("127.0.0.3"), port), this.slf, Optional.<KeyMapping>absent());
+        final Channel channel = channelFuture.sync().channel();
+
+        Assert.assertTrue(channel.isActive());
+        channel.close();
         serverDispatcher.close();
     }
 
index 53b2e887e92a7daf2c96ee67c3183bc207a8cb5a..08834468495a6a547318d0efe5c2768ecf944031 100644 (file)
@@ -34,7 +34,6 @@ public class BmpMockTest {
 
     private final BmpSessionListener sessionListener = Mockito.mock(BmpSessionListener.class);
     private int serverPort;
-    private Channel serverChannel;
     private BmpExtensionProviderActivator bmpActivator;
     private BmpDispatcher bmpDispatcher;
 
@@ -46,31 +45,54 @@ public class BmpMockTest {
         this.bmpActivator.start(ctx);
         this.bmpDispatcher = new BmpDispatcherImpl(new NioEventLoopGroup(), new NioEventLoopGroup(), ctx.getBmpMessageRegistry(),
                 new DefaultBmpSessionFactory());
-        final BmpSessionListenerFactory bmpSessionListenerFactory = new BmpSessionListenerFactory() {
-            @Override
-            public BmpSessionListener getSessionListener() {
-                return BmpMockTest.this.sessionListener;
-            }
-        };
         this.serverPort = BmpMockDispatcherTest.getRandomPort();
-        this.serverChannel = this.bmpDispatcher.createServer(new InetSocketAddress("127.0.0.1", this.serverPort),
-                bmpSessionListenerFactory, Optional.<KeyMapping>absent()).channel();
     }
 
     @After
     public void tearDown() throws Exception {
-        this.serverChannel.close().sync();
         this.bmpActivator.stop();
         this.bmpDispatcher.close();
     }
 
     @Test
     public void testMain() throws Exception {
+        final BmpSessionListenerFactory bmpSessionListenerFactory = new BmpSessionListenerFactory() {
+            @Override
+            public BmpSessionListener getSessionListener() {
+                return BmpMockTest.this.sessionListener;
+            }
+        };
+        Channel serverChannel = bmpDispatcher.createServer(new InetSocketAddress("127.0.0.1", serverPort),
+                bmpSessionListenerFactory, Optional.<KeyMapping>absent()).channel();
+
         BmpMock.main(new String[] {"--remote_address", "127.0.0.1:" + serverPort, "--peers_count", "3", "--pre_policy_routes", "3"});
         Thread.sleep(1000);
         Mockito.verify(this.sessionListener).onSessionUp(Mockito.any(BmpSession.class));
         //1 * Initiate message + 3 * PeerUp Notification + 9 * Route Monitoring message
-        Mockito.verify(this.sessionListener, Mockito.times(13)).onMessage(Mockito.any(BmpSession.class), Mockito.any(Notification.class));
+        Mockito.verify(this.sessionListener, Mockito.times(13)).onMessage(Mockito.any(Notification.class));
+
+        serverChannel.close().sync();
     }
 
+    @Test
+    public void testMainInPassiveMode() throws Exception {
+        final BmpSessionListenerFactory bmpSessionListenerFactory = new BmpSessionListenerFactory() {
+            @Override
+            public BmpSessionListener getSessionListener() {
+                return BmpMockTest.this.sessionListener;
+            }
+        };
+
+        // create a local server in passive mode instead
+        BmpMock.main(new String[]{"--local_address", "127.0.0.1:" + serverPort, "--peers_count", "3", "--pre_policy_routes", "3", "--passive"});
+        Channel serverChannel = bmpDispatcher.createClient(new InetSocketAddress("127.0.0.1", serverPort),
+                bmpSessionListenerFactory, Optional.<KeyMapping>absent()).channel();
+
+        Thread.sleep(1000);
+        Mockito.verify(this.sessionListener).onSessionUp(Mockito.any(BmpSession.class));
+        //1 * Initiate message + 3 * PeerUp Notification + 9 * Route Monitoring message
+        Mockito.verify(this.sessionListener, Mockito.times(13)).onMessage(Mockito.any(Notification.class));
+
+        serverChannel.close().sync();
+    }
 }
index 9ce763809a1bc00ef71150cdbf7f9370c218babe..aac296b20f17468606980d89a220cb80bfd89e31 100644 (file)
@@ -15,7 +15,7 @@ public interface BmpSessionListener extends EventListener {
 
     void onSessionUp(BmpSession session);
 
-    void onSessionDown(BmpSession session, Exception e);
+    void onSessionDown(Exception e);
 
-    void onMessage(BmpSession session, Notification message);
+    void onMessage(Notification message);
 }