Implement finding a primary based on the shard name and do basic wiring of Distribute...
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / test / java / org / opendaylight / controller / sal / connector / remoterpc / ServerRequestHandlerTest.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  * This program and the accompanying materials are made available under the
4  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
5  * and is available at http://www.eclipse.org/legal/epl-v10.html
6  */
7
8 package org.opendaylight.controller.sal.connector.remoterpc;
9
10 import static org.mockito.Mockito.mock;
11
12 import java.util.ArrayList;
13 import java.util.List;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16
17 import junit.framework.Assert;
18
19 import org.junit.After;
20 import org.junit.Before;
21 import org.junit.Test;
22 import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
23 import org.opendaylight.controller.sal.core.api.Broker;
24 import org.zeromq.ZMQ;
25
26 public class ServerRequestHandlerTest {
27
28   ServerRequestHandler handler;
29   ZMQ.Context context;
30   ExecutorService executorService = Executors.newCachedThreadPool();
31   private final int workerCount = 2;
32   private final String mockDealerAddress = "inproc://rpc-request-handler";
33   private final String mockServerIp = "localhost";
34   private final int mockServerPort = 5554;
35
36   @Before
37   public void setUp() throws Exception {
38     context = ZMQ.context(1);
39     String mockServerAddress = mockServerIp + ":" + mockServerPort;
40     Broker.ProviderSession mockSession = mock(Broker.ProviderSession.class);
41     handler = new ServerRequestHandler(context, mockSession, workerCount, mockDealerAddress, mockServerAddress);
42     handler.start();
43   }
44
45   @After
46   public void tearDown() throws Exception {
47     executorService.shutdown();
48     MessagingUtil.closeZmqContext(context);
49     handler.close();
50   }
51
52   @Test
53   public void testStart() throws Exception {
54     //should start workers == workerCount
55     Assert.assertEquals(workerCount, handler.getWorkerPool().getPoolSize());
56
57     //killing a thread should recreate another one
58
59     //start router-dealer bridge
60     executorService.execute(MessagingUtil.createRouterDealerBridge(context, mockDealerAddress, mockServerPort));
61     Thread.sleep(1000); //give sometime for socket initialization
62
63     //this will kill the thread
64     final String WORKER_THREAD_NAME = "remote-rpc-worker";
65     interruptAThreadWithName(WORKER_THREAD_NAME);
66
67     //send 4 message to router
68     for (int i = 0; i < 4; i++)
69       executorService.execute(MessagingUtil.sendAnEmptyMessage(context, "tcp://" + mockServerIp + ":" + mockServerPort));
70
71     //worker pool size should not change.
72     Assert.assertEquals(workerCount, handler.getWorkerPool().getPoolSize());
73
74     Thread.sleep(10000); //wait for processing to complete
75   }
76
77   @Test
78   public void testClose() throws Exception {
79
80   }
81
82   /**
83    * Interrupts the first thread found whose name starts with the provided name
84    *
85    * @param name
86    */
87   private void interruptAThreadWithName(String name) {
88     List<Thread> workerThreads = findThreadsWithName(name);
89     if (workerThreads.size() > 0) workerThreads.get(0).interrupt();
90   }
91
92   /**
93    * Find all threads that start with the given name
94    *
95    * @param name
96    * @return
97    */
98   private List<Thread> findThreadsWithName(String name) {
99     Thread[] threads = new Thread[Thread.activeCount()];
100     Thread.enumerate(threads);
101
102     List<Thread> foundThreads = new ArrayList<Thread>();
103     for (Thread t : threads) {
104       if (t.getName().startsWith(name))
105         foundThreads.add(t);
106     }
107
108     return foundThreads;
109   }
110 }