1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
6 import com.google.common.base.Optional;
7 import com.google.common.util.concurrent.ListenableFuture;
8 import com.google.common.util.concurrent.ListeningExecutorService;
9 import com.google.common.util.concurrent.MoreExecutors;
11 import junit.framework.Assert;
13 import org.junit.After;
14 import org.junit.Before;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
17 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
18 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
20 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
21 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
22 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
23 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
25 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
26 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
27 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
28 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
29 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
30 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
32 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
33 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
35 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
37 import scala.concurrent.duration.FiniteDuration;
39 import java.util.List;
40 import java.util.concurrent.Executors;
42 import static junit.framework.Assert.fail;
43 import static org.mockito.Matchers.any;
44 import static org.mockito.Matchers.anyString;
45 import static org.mockito.Mockito.mock;
46 import static org.mockito.Mockito.when;
48 public class TransactionProxyTest extends AbstractActorTest {
50 private final Configuration configuration = new MockConfiguration();
52 private final ActorContext testContext =
53 new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration );
55 private final ListeningExecutorService transactionExecutor =
56 MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
60 ShardStrategyFactory.setConfiguration(configuration);
64 public void tearDown() {
65 transactionExecutor.shutdownNow();
69 public void testRead() throws Exception {
70 final Props props = Props.create(DoNothingActor.class);
71 final ActorRef actorRef = getSystem().actorOf(props);
73 final MockActorContext actorContext = new MockActorContext(this.getSystem());
74 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
75 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
76 actorContext.setExecuteRemoteOperationResponse("message");
79 TransactionProxy transactionProxy =
80 new TransactionProxy(actorContext,
81 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
84 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
85 transactionProxy.read(TestModel.TEST_PATH);
87 Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
89 Assert.assertFalse(normalizedNodeOptional.isPresent());
91 actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
92 TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable());
94 read = transactionProxy.read(TestModel.TEST_PATH);
96 normalizedNodeOptional = read.get();
98 Assert.assertTrue(normalizedNodeOptional.isPresent());
102 public void testReadWhenANullIsReturned() throws Exception {
103 final Props props = Props.create(DoNothingActor.class);
104 final ActorRef actorRef = getSystem().actorOf(props);
106 final MockActorContext actorContext = new MockActorContext(this.getSystem());
107 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
108 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
109 actorContext.setExecuteRemoteOperationResponse("message");
111 TransactionProxy transactionProxy =
112 new TransactionProxy(actorContext,
113 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
116 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
117 transactionProxy.read(TestModel.TEST_PATH);
119 Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
121 Assert.assertFalse(normalizedNodeOptional.isPresent());
123 actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
124 TestModel.createTestContext(), null).toSerializable());
126 read = transactionProxy.read(TestModel.TEST_PATH);
128 normalizedNodeOptional = read.get();
130 Assert.assertFalse(normalizedNodeOptional.isPresent());
134 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
135 final ActorContext actorContext = mock(ActorContext.class);
137 when(actorContext.executeShardOperation(anyString(), any(), any(
138 FiniteDuration.class))).thenThrow(new PrimaryNotFoundException("test"));
140 TransactionProxy transactionProxy =
141 new TransactionProxy(actorContext,
142 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
145 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
146 transactionProxy.read(TestModel.TEST_PATH);
148 Assert.assertFalse(read.get().isPresent());
154 public void testReadWhenATimeoutExceptionIsThrown() throws Exception {
155 final ActorContext actorContext = mock(ActorContext.class);
157 when(actorContext.executeShardOperation(anyString(), any(), any(
158 FiniteDuration.class))).thenThrow(new TimeoutException("test", new Exception("reason")));
160 TransactionProxy transactionProxy =
161 new TransactionProxy(actorContext,
162 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
165 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
166 transactionProxy.read(TestModel.TEST_PATH);
168 Assert.assertFalse(read.get().isPresent());
173 public void testReadWhenAAnyOtherExceptionIsThrown() throws Exception {
174 final ActorContext actorContext = mock(ActorContext.class);
176 when(actorContext.executeShardOperation(anyString(), any(), any(
177 FiniteDuration.class))).thenThrow(new NullPointerException());
179 TransactionProxy transactionProxy =
180 new TransactionProxy(actorContext,
181 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
185 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
186 transactionProxy.read(TestModel.TEST_PATH);
187 fail("A null pointer exception was expected");
188 } catch(NullPointerException e){
196 public void testWrite() throws Exception {
197 final Props props = Props.create(MessageCollectorActor.class);
198 final ActorRef actorRef = getSystem().actorOf(props);
200 final MockActorContext actorContext = new MockActorContext(this.getSystem());
201 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
202 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
203 actorContext.setExecuteRemoteOperationResponse("message");
205 TransactionProxy transactionProxy =
206 new TransactionProxy(actorContext,
207 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
209 transactionProxy.write(TestModel.TEST_PATH,
210 ImmutableNodes.containerNode(TestModel.NAME_QNAME));
212 Object messages = testContext
213 .executeLocalOperation(actorRef, "messages",
214 ActorContext.ASK_DURATION);
216 Assert.assertNotNull(messages);
218 Assert.assertTrue(messages instanceof List);
220 List<Object> listMessages = (List<Object>) messages;
222 Assert.assertEquals(1, listMessages.size());
224 Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
227 private Object createPrimaryFound(ActorRef actorRef) {
228 return new PrimaryFound(actorRef.path().toString()).toSerializable();
232 public void testMerge() throws Exception {
233 final Props props = Props.create(MessageCollectorActor.class);
234 final ActorRef actorRef = getSystem().actorOf(props);
236 final MockActorContext actorContext = new MockActorContext(this.getSystem());
237 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
238 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
239 actorContext.setExecuteRemoteOperationResponse("message");
241 TransactionProxy transactionProxy =
242 new TransactionProxy(actorContext,
243 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
245 transactionProxy.merge(TestModel.TEST_PATH,
246 ImmutableNodes.containerNode(TestModel.NAME_QNAME));
248 Object messages = testContext
249 .executeLocalOperation(actorRef, "messages",
250 ActorContext.ASK_DURATION);
252 Assert.assertNotNull(messages);
254 Assert.assertTrue(messages instanceof List);
256 List<Object> listMessages = (List<Object>) messages;
258 Assert.assertEquals(1, listMessages.size());
260 Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
264 public void testDelete() throws Exception {
265 final Props props = Props.create(MessageCollectorActor.class);
266 final ActorRef actorRef = getSystem().actorOf(props);
268 final MockActorContext actorContext = new MockActorContext(this.getSystem());
269 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
270 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
271 actorContext.setExecuteRemoteOperationResponse("message");
273 TransactionProxy transactionProxy =
274 new TransactionProxy(actorContext,
275 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
277 transactionProxy.delete(TestModel.TEST_PATH);
279 Object messages = testContext
280 .executeLocalOperation(actorRef, "messages",
281 ActorContext.ASK_DURATION);
283 Assert.assertNotNull(messages);
285 Assert.assertTrue(messages instanceof List);
287 List<Object> listMessages = (List<Object>) messages;
289 Assert.assertEquals(1, listMessages.size());
291 Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
295 public void testReady() throws Exception {
296 final Props props = Props.create(DoNothingActor.class);
297 final ActorRef doNothingActorRef = getSystem().actorOf(props);
299 final MockActorContext actorContext = new MockActorContext(this.getSystem());
300 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(doNothingActorRef));
301 actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
302 actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()).toSerializable());
304 TransactionProxy transactionProxy =
305 new TransactionProxy(actorContext,
306 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
309 transactionProxy.read(TestModel.TEST_PATH);
311 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
313 Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
315 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
317 Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
322 public void testGetIdentifier(){
323 final Props props = Props.create(DoNothingActor.class);
324 final ActorRef doNothingActorRef = getSystem().actorOf(props);
326 final MockActorContext actorContext = new MockActorContext(this.getSystem());
327 actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
329 TransactionProxy transactionProxy =
330 new TransactionProxy(actorContext,
331 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
333 Assert.assertNotNull(transactionProxy.getIdentifier());
337 public void testClose(){
338 final Props props = Props.create(MessageCollectorActor.class);
339 final ActorRef actorRef = getSystem().actorOf(props);
341 final MockActorContext actorContext = new MockActorContext(this.getSystem());
342 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
343 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
344 actorContext.setExecuteRemoteOperationResponse("message");
346 TransactionProxy transactionProxy =
347 new TransactionProxy(actorContext,
348 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
350 transactionProxy.read(TestModel.TEST_PATH);
352 transactionProxy.close();
354 Object messages = testContext
355 .executeLocalOperation(actorRef, "messages",
356 ActorContext.ASK_DURATION);
358 Assert.assertNotNull(messages);
360 Assert.assertTrue(messages instanceof List);
362 List<Object> listMessages = (List<Object>) messages;
364 Assert.assertEquals(1, listMessages.size());
366 Assert.assertTrue(listMessages.get(0).getClass().equals(CloseTransaction.SERIALIZABLE_CLASS));
369 private CreateTransactionReply createTransactionReply(ActorRef actorRef){
370 return CreateTransactionReply.newBuilder()
371 .setTransactionActorPath(actorRef.path().toString())
372 .setTransactionId("txn-1")