2 * Copyright (c) 2014-2015 Hewlett-Packard Development Company, L.P. and others.
\r
3 * All rights reserved.
\r
5 * This program and the accompanying materials are made available under the
\r
6 * terms of the Eclipse License v1.0 which accompanies this distribution,
\r
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
\r
9 package org.opendaylight.aaa;
\r
11 import static org.junit.Assert.assertEquals;
\r
12 import static org.junit.Assert.assertNull;
\r
13 import static org.junit.Assert.assertTrue;
\r
15 import java.util.Arrays;
\r
16 import java.util.Iterator;
\r
17 import java.util.LinkedList;
\r
18 import java.util.List;
\r
19 import java.util.concurrent.ArrayBlockingQueue;
\r
20 import java.util.concurrent.BlockingQueue;
\r
21 import java.util.concurrent.Callable;
\r
22 import java.util.concurrent.ExecutionException;
\r
23 import java.util.concurrent.ExecutorService;
\r
24 import java.util.concurrent.Executors;
\r
25 import java.util.concurrent.ThreadPoolExecutor;
\r
26 import java.util.concurrent.TimeUnit;
\r
28 import org.junit.Before;
\r
29 import org.junit.Test;
\r
30 import org.opendaylight.aaa.SecureBlockingQueue.SecureData;
\r
31 import org.opendaylight.aaa.api.Authentication;
\r
33 public class SecureBlockingQueueTest {
\r
34 private final int MAX_TASKS = 100;
\r
37 public void setup() {
\r
38 AuthenticationManager.instance().clear();
\r
42 public void testSecureThreadPoolExecutor() throws InterruptedException,
\r
43 ExecutionException {
\r
44 BlockingQueue<Runnable> queue = new SecureBlockingQueue<>(
\r
45 new ArrayBlockingQueue<SecureData<Runnable>>(10));
\r
46 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 500,
\r
47 TimeUnit.MILLISECONDS, queue);
\r
48 executor.prestartAllCoreThreads();
\r
49 for (int cnt = 1; cnt <= MAX_TASKS; cnt++) {
\r
50 assertEquals(Integer.toString(cnt),
\r
51 executor.submit(new Task(Integer.toString(cnt), "1111", "user")).get()
\r
54 executor.shutdown();
\r
58 public void testNormalThreadPoolExecutor() throws InterruptedException,
\r
59 ExecutionException {
\r
60 BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10);
\r
61 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 500,
\r
62 TimeUnit.MILLISECONDS, queue);
\r
63 executor.prestartAllCoreThreads();
\r
64 for (int cnt = 1; cnt <= MAX_TASKS; cnt++) {
\r
65 assertNull(executor.submit(new Task(Integer.toString(cnt), "1111", "user")).get());
\r
67 executor.shutdown();
\r
71 public void testQueueOps() throws InterruptedException, ExecutionException {
\r
72 BlockingQueue<String> queue = new SecureBlockingQueue<>(
\r
73 new ArrayBlockingQueue<SecureData<String>>(3));
\r
74 ExecutorService es = Executors.newFixedThreadPool(3);
\r
75 es.submit(new Producer("foo", "1111", "user", queue)).get();
\r
76 assertEquals(1, queue.size());
\r
77 assertEquals("foo", es.submit(new Consumer(queue)).get());
\r
78 es.submit(new Producer("bar", "2222", "user", queue)).get();
\r
79 assertEquals("bar", queue.peek());
\r
80 assertEquals("bar", queue.element());
\r
81 assertEquals(1, queue.size());
\r
82 assertEquals("bar", queue.poll());
\r
83 assertTrue(queue.isEmpty());
\r
88 public void testCollectionOps() throws InterruptedException, ExecutionException {
\r
89 BlockingQueue<String> queue = new SecureBlockingQueue<>(
\r
90 new ArrayBlockingQueue<SecureData<String>>(6));
\r
91 for (int i = 1; i <= 3; i++)
\r
92 queue.add("User" + i);
\r
93 Iterator<String> it = queue.iterator();
\r
94 while (it.hasNext())
\r
95 assertTrue(it.next().startsWith("User"));
\r
96 assertEquals(3, queue.toArray().length);
\r
97 List<String> actual = Arrays.asList(queue.toArray(new String[0]));
\r
98 assertEquals("User1", actual.iterator().next());
\r
99 assertTrue(queue.containsAll(actual));
\r
100 queue.addAll(actual);
\r
101 assertEquals(6, queue.size());
\r
102 queue.retainAll(Arrays.asList(new String[] {"User2"}));
\r
103 assertEquals(2, queue.size());
\r
104 assertEquals("User2", queue.iterator().next());
\r
105 queue.removeAll(actual);
\r
106 assertTrue(queue.isEmpty());
\r
107 queue.add("hello");
\r
108 assertEquals(1, queue.size());
\r
110 assertTrue(queue.isEmpty());
\r
114 public void testBlockingQueueOps() throws InterruptedException {
\r
115 BlockingQueue<String> queue = new SecureBlockingQueue<>(
\r
116 new ArrayBlockingQueue<SecureData<String>>(3));
\r
117 queue.offer("foo");
\r
118 assertEquals(1, queue.size());
\r
119 queue.offer("bar", 500, TimeUnit.MILLISECONDS);
\r
120 assertEquals(2, queue.size());
\r
121 assertEquals("foo", queue.poll());
\r
122 assertTrue(queue.contains("bar"));
\r
123 queue.remove("bar");
\r
124 assertEquals(3, queue.remainingCapacity());
\r
125 queue.addAll(Arrays.asList(new String[] {"foo", "bar", "tom"}));
\r
126 assertEquals(3, queue.size());
\r
127 assertEquals("foo", queue.poll(500, TimeUnit.MILLISECONDS));
\r
128 assertEquals(2, queue.size());
\r
129 List<String> drain = new LinkedList<>();
\r
130 queue.drainTo(drain);
\r
131 assertTrue(queue.isEmpty());
\r
132 assertEquals(2, drain.size());
\r
133 queue.addAll(Arrays.asList(new String[] {"foo", "bar", "tom"}));
\r
135 queue.drainTo(drain, 1);
\r
136 assertEquals(2, queue.size());
\r
137 assertEquals(1, drain.size());
\r
140 // Task to run in a ThreadPoolExecutor
\r
141 private class Task implements Callable<Authentication> {
\r
142 Task(String name, String userId, String role) {
\r
143 // Mock that each task has its original authentication context
\r
144 AuthenticationManager.instance().set(new AuthenticationBuilder(new ClaimBuilder().setUser(name)
\r
145 .setUserId(userId).addRole(role).build()).build());
\r
149 public Authentication call() throws Exception {
\r
150 return AuthenticationManager.instance().get();
\r
154 // Producer sets auth context
\r
155 private class Producer implements Callable<String> {
\r
156 private final String name;
\r
157 private final String userId;
\r
158 private final String role;
\r
159 private final BlockingQueue<String> queue;
\r
161 Producer(String name, String userId, String role, BlockingQueue<String> queue) {
\r
163 this.userId = userId;
\r
165 this.queue = queue;
\r
169 public String call() throws InterruptedException {
\r
170 AuthenticationManager.instance().set(new AuthenticationBuilder(new ClaimBuilder().setUser(name)
\r
171 .setUserId(userId).addRole(role).build()).build());
\r
177 // Consumer gets producer's auth context via data element in queue
\r
178 private class Consumer implements Callable<String> {
\r
179 private final BlockingQueue<String> queue;
\r
181 Consumer(BlockingQueue<String> queue) {
\r
182 this.queue = queue;
\r
186 public String call() {
\r
188 Authentication auth = AuthenticationManager.instance().get();
\r
189 return (auth == null) ? null : auth.user();
\r