Resolve Bug:593. Persister should communicate via OSGi SR instead of TCP.
[controller.git] / opendaylight / netconf / netconf-impl / src / test / java / org / opendaylight / controller / netconf / impl / ConcurrentClientsTest.java
index 54a3482e3443b4e06bddd32e3a3c39ffd04ea48a..c1a7b1478b3edde41ea5f13004afc48df540c6bf 100644 (file)
@@ -11,31 +11,33 @@ package org.opendaylight.controller.netconf.impl;
 import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import org.apache.commons.io.IOUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.config.util.ConfigRegistryJMXClient;
-import org.opendaylight.controller.config.util.ConfigTransactionJMXClient;
-import org.opendaylight.controller.config.yang.store.api.YangStoreService;
-import org.opendaylight.controller.config.yang.store.api.YangStoreSnapshot;
 import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.api.NetconfOperationRouter;
 import org.opendaylight.controller.netconf.client.NetconfClient;
+import org.opendaylight.controller.netconf.client.NetconfClientDispatcher;
 import org.opendaylight.controller.netconf.impl.osgi.NetconfOperationServiceFactoryListenerImpl;
-import org.opendaylight.controller.netconf.mapping.api.*;
+import org.opendaylight.controller.netconf.impl.osgi.SessionMonitoringService;
+import org.opendaylight.controller.netconf.mapping.api.Capability;
+import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperation;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationService;
+import org.opendaylight.controller.netconf.mapping.api.NetconfOperationServiceFactory;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
 
-import javax.management.ObjectName;
-import javax.net.ssl.SSLContext;
 import java.io.DataOutputStream;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -46,21 +48,18 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.MockitoAnnotations.initMocks;
 
 public class ConcurrentClientsTest {
 
     private static final int CONCURRENCY = 16;
-    @Mock
-    private YangStoreService yangStoreService;
-    @Mock
-    private ConfigRegistryJMXClient jmxClient;
+    private EventLoopGroup nettyGroup;
+    private NetconfClientDispatcher netconfClientDispatcher;
 
     private final InetSocketAddress netconfAddress = new InetSocketAddress("127.0.0.1", 8303);
 
@@ -69,42 +68,50 @@ public class ConcurrentClientsTest {
     private DefaultCommitNotificationProducer commitNot;
     private NetconfServerDispatcher dispatch;
 
-    @Before
-    public void setUp() throws Exception {
-        { // init mocks
-            MockitoAnnotations.initMocks(this);
-            final YangStoreSnapshot yStore = mock(YangStoreSnapshot.class);
-            doReturn(yStore).when(this.yangStoreService).getYangStoreSnapshot();
-            doReturn(Collections.emptyMap()).when(yStore).getModuleMXBeanEntryMap();
-            doReturn(Collections.emptyMap()).when(yStore).getModuleMap();
+    @Mock
+    private SessionMonitoringService monitoring;
 
-            final ConfigTransactionJMXClient mockedTCl = mock(ConfigTransactionJMXClient.class);
-            doReturn(mockedTCl).when(this.jmxClient).getConfigTransactionClient(any(ObjectName.class));
+    HashedWheelTimer hashedWheelTimer;
 
-            doReturn(Collections.emptySet()).when(jmxClient).lookupConfigBeans();
-        }
+    @Before
+    public void setUp() throws Exception {
+        initMocks(this);
+        nettyGroup = new NioEventLoopGroup();
+        NetconfHelloMessageAdditionalHeader additionalHeader = new NetconfHelloMessageAdditionalHeader("uname", "10.10.10.1", "830", "tcp", "client");
+        netconfClientDispatcher = new NetconfClientDispatcher( nettyGroup, nettyGroup, additionalHeader, 5000);
 
         NetconfOperationServiceFactoryListenerImpl factoriesListener = new NetconfOperationServiceFactoryListenerImpl();
         factoriesListener.onAddNetconfOperationServiceFactory(mockOpF());
 
         SessionIdProvider idProvider = new SessionIdProvider();
+        hashedWheelTimer = new HashedWheelTimer();
         NetconfServerSessionNegotiatorFactory serverNegotiatorFactory = new NetconfServerSessionNegotiatorFactory(
-                new HashedWheelTimer(5000, TimeUnit.MILLISECONDS), factoriesListener, idProvider);
+                hashedWheelTimer, factoriesListener, idProvider, 5000);
 
         commitNot = new DefaultCommitNotificationProducer(ManagementFactory.getPlatformMBeanServer());
 
+        doNothing().when(monitoring).onSessionUp(any(NetconfServerSession.class));
+        doNothing().when(monitoring).onSessionDown(any(NetconfServerSession.class));
+
         NetconfServerSessionListenerFactory listenerFactory = new NetconfServerSessionListenerFactory(
-                factoriesListener, commitNot, idProvider);
-        dispatch = new NetconfServerDispatcher(Optional.<SSLContext> absent(), serverNegotiatorFactory, listenerFactory);
+                factoriesListener, commitNot, idProvider, monitoring);
+        NetconfServerDispatcher.ServerChannelInitializer serverChannelInitializer = new NetconfServerDispatcher.ServerChannelInitializer(serverNegotiatorFactory, listenerFactory);
+        dispatch = new NetconfServerDispatcher(serverChannelInitializer, nettyGroup, nettyGroup);
 
         ChannelFuture s = dispatch.createServer(netconfAddress);
         s.await();
     }
 
+    @After
+    public void tearDown(){
+        hashedWheelTimer.stop();
+        nettyGroup.shutdownGracefully();
+    }
+
     private NetconfOperationServiceFactory mockOpF() {
         return new NetconfOperationServiceFactory() {
             @Override
-            public NetconfOperationService createService(long netconfSessionId, String netconfSessionIdForReporting) {
+            public NetconfOperationService createService(String netconfSessionIdForReporting) {
                 return new NetconfOperationService() {
                     @Override
                     public Set<Capability> getCapabilities() {
@@ -120,8 +127,7 @@ public class ConcurrentClientsTest {
                             }
 
                             @Override
-                            public Document handle(Document message, NetconfOperationRouter operationRouter)
-                                    throws NetconfDocumentedException {
+                            public Document handle(Document requestMessage, NetconfOperationChainedExecution subsequentOperation) throws NetconfDocumentedException {
                                 try {
                                     return XmlUtil.readXmlToDocument("<test/>");
                                 } catch (Exception e) {
@@ -131,11 +137,6 @@ public class ConcurrentClientsTest {
                         });
                     }
 
-                    @Override
-                    public Set<NetconfOperationFilter> getFilters() {
-                        return Collections.emptySet();
-                    }
-
                     @Override
                     public void close() {
                     }
@@ -147,7 +148,6 @@ public class ConcurrentClientsTest {
     @After
     public void cleanUp() throws Exception {
         commitNot.close();
-        dispatch.close();
     }
 
     @Test
@@ -163,7 +163,11 @@ public class ConcurrentClientsTest {
 
         for (TestingThread thread : threads) {
             thread.join();
-            assertTrue(thread.success);
+            if(thread.thrownException.isPresent()) {
+                Exception exception = thread.thrownException.get();
+                logger.error("Thread for testing client failed", exception);
+                fail("Client thread " + thread + " failed: " + exception.getMessage());
+            }
         }
     }
 
@@ -183,12 +187,16 @@ public class ConcurrentClientsTest {
 
         for (BlockingThread thread : threads) {
             thread.join();
-            assertTrue(thread.success);
+            if(thread.thrownException.isPresent()) {
+                Exception exception = thread.thrownException.get();
+                logger.error("Thread for testing client failed", exception);
+                fail("Client thread " + thread + " failed: " + exception.getMessage());
+            }
         }
     }
 
     class BlockingThread extends Thread {
-        Boolean success;
+        private Optional<Exception> thrownException;
 
         public BlockingThread(String name) {
             super("client-" + name);
@@ -198,10 +206,9 @@ public class ConcurrentClientsTest {
         public void run() {
             try {
                 run2();
-                success = true;
+                thrownException = Optional.absent();
             } catch (Exception e) {
-                success = false;
-                throw new RuntimeException(e);
+                thrownException = Optional.of(e);
             }
         }
 
@@ -241,7 +248,7 @@ public class ConcurrentClientsTest {
 
         private final String clientId;
         private final int attempts;
-        private Boolean success;
+        private Optional<Exception> thrownException;
 
         TestingThread(String clientId, int attempts) {
             this.clientId = clientId;
@@ -252,7 +259,7 @@ public class ConcurrentClientsTest {
         @Override
         public void run() {
             try {
-                final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress);
+                final NetconfClient netconfClient = new NetconfClient(clientId, netconfAddress, netconfClientDispatcher);
                 long sessionId = netconfClient.getSessionId();
                 logger.info("Client with sessionid {} hello exchanged", sessionId);
 
@@ -262,10 +269,9 @@ public class ConcurrentClientsTest {
                 logger.info("Client with sessionid {} got result {}", sessionId, result);
                 netconfClient.close();
                 logger.info("Client with session id {} ended", sessionId);
-                success = true;
+                thrownException = Optional.absent();
             } catch (final Exception e) {
-                success = false;
-                throw new RuntimeException(e);
+                thrownException = Optional.of(e);
             }
         }
     }