ClusterSingletonService cleaning FRM/FRS
[openflowplugin.git] / applications / forwardingrules-sync / src / test / java / org / opendaylight / openflowplugin / applications / frsync / util / SemaphoreKeeperTest.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.util;
10
11 import java.util.concurrent.ConcurrentHashMap;
12 import java.util.concurrent.ConcurrentMap;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.LinkedBlockingQueue;
15 import java.util.concurrent.Semaphore;
16 import java.util.concurrent.ThreadPoolExecutor;
17 import java.util.concurrent.TimeUnit;
18 import org.junit.Assert;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 /**
26  * Test for {@link SemaphoreKeeperGuavaImpl}.
27  */
28 public class SemaphoreKeeperTest {
29     private static final Logger LOG = LoggerFactory.getLogger(SemaphoreKeeperTest.class);
30     private SemaphoreKeeperGuavaImpl<String> semaphoreKeeper;
31     private final String key = "11";
32
33     @Before
34     public void setUp() throws Exception {
35         semaphoreKeeper = new SemaphoreKeeperGuavaImpl(1, true);
36     }
37
38     @Test
39     public void testSummonGuard() throws Exception {
40         Semaphore semaphore1 = semaphoreKeeper.summonGuard(key);
41         final int g1FingerPrint = semaphore1.hashCode();
42         Semaphore semaphore2 = semaphoreKeeper.summonGuard(key);
43         final int g2FingerPrint = semaphore2.hashCode();
44
45         Assert.assertSame(semaphore1, semaphore2);
46         Assert.assertEquals(1, semaphore1.availablePermits());
47
48         semaphore1.acquire();
49         semaphore1.release();
50         Assert.assertEquals(1, semaphore1.availablePermits());
51         semaphore1 = null;
52         System.gc();
53
54         semaphore2.acquire();
55         semaphore2.release();
56         Assert.assertEquals(1, semaphore2.availablePermits());
57         semaphore2 = null;
58         Assert.assertEquals(g1FingerPrint, g2FingerPrint);
59
60         System.gc();
61         final Semaphore semaphore3 = semaphoreKeeper.summonGuard(key);
62         Assert.assertNotEquals(g1FingerPrint, semaphore3.hashCode());
63     }
64
65     @Test
66     public void testReleaseGuard() throws Exception {
67         for (int total = 1; total <= 10; total++) {
68             LOG.info("test run: {}", total);
69             final Worker task = new Worker(semaphoreKeeper, key);
70
71             final ExecutorService executorService = new ThreadPoolExecutor(5, 5,
72                     0L, TimeUnit.MILLISECONDS,
73                     new LinkedBlockingQueue<Runnable>()) {
74                 @Override
75                 protected void afterExecute(final Runnable r, final Throwable t) {
76                     super.afterExecute(r, t);
77                     if (t != null) {
78                         LOG.error("pool thread crashed", t);
79                     }
80                 }
81             };
82
83             final int steps = 10;
84             for (int i = 0; i < steps; i++) {
85                 executorService.submit(task);
86             }
87             Thread.sleep(50L);
88             LOG.info("STARTING new serie");
89             System.gc();
90
91             for (int i = 0; i < steps; i++) {
92                 executorService.submit(task);
93             }
94             Thread.sleep(100L);
95             System.gc();
96
97             executorService.shutdown();
98             final boolean terminated = executorService.awaitTermination(10, TimeUnit.SECONDS);
99             if (!terminated) {
100                 LOG.warn("pool stuck, forcing termination");
101                 executorService.shutdownNow();
102                 Assert.fail("pool failed to finish gracefully");
103             }
104
105             final int counterSize = task.getCounterSize();
106             LOG.info("final counter = {}", counterSize);
107             Assert.assertEquals(20, counterSize);
108         }
109     }
110
111     private static class Worker implements Runnable {
112         private final SemaphoreKeeper<String> keeper;
113         private final String key;
114         private final ConcurrentMap<Integer, Integer> counter = new ConcurrentHashMap<>();
115         private volatile int index = 0;
116
117         public Worker(SemaphoreKeeper<String> keeper, final String key) {
118             this.keeper = keeper;
119             this.key = key;
120         }
121
122         @Override
123         public void run() {
124             try {
125                 final Semaphore guard = keeper.summonGuard(key);
126                 Thread.sleep(2L);
127                 guard.acquire();
128                 counter.putIfAbsent(index, 0);
129                 counter.put(index, counter.get(index) + 1);
130                 LOG.debug("queue: {} [{}] - {}", guard.getQueueLength(), guard.hashCode(), counter.size());
131                 index++;
132                 guard.release();
133             } catch (Exception e) {
134                 LOG.warn("acquiring failed.. ", e);
135             }
136         }
137
138         public int getCounterSize() {
139             return counter.size();
140         }
141     }
142 }