1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import com.google.common.base.Optional;
6 import com.google.common.util.concurrent.CheckedFuture;
7 import com.google.common.util.concurrent.ListenableFuture;
8 import com.google.common.util.concurrent.ListeningExecutorService;
9 import com.google.common.util.concurrent.MoreExecutors;
10 import junit.framework.Assert;
11 import org.junit.After;
12 import org.junit.Before;
13 import org.junit.Test;
14 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
15 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
16 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
18 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
19 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
20 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
21 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
22 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
24 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
25 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
26 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
27 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
28 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
29 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
30 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
31 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
32 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
33 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
35 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
37 import scala.concurrent.duration.FiniteDuration;
39 import java.util.List;
40 import java.util.concurrent.Executors;
42 import static junit.framework.Assert.fail;
43 import static org.mockito.Matchers.any;
44 import static org.mockito.Matchers.anyString;
45 import static org.mockito.Mockito.mock;
46 import static org.mockito.Mockito.when;
48 public class TransactionProxyTest extends AbstractActorTest {
50 private final Configuration configuration = new MockConfiguration();
52 private final ActorContext testContext =
53 new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration );
55 private final ListeningExecutorService transactionExecutor =
56 MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
60 ShardStrategyFactory.setConfiguration(configuration);
64 public void tearDown() {
65 transactionExecutor.shutdownNow();
69 public void testRead() throws Exception {
70 final Props props = Props.create(DoNothingActor.class);
71 final ActorRef actorRef = getSystem().actorOf(props);
73 final MockActorContext actorContext = new MockActorContext(this.getSystem());
74 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
75 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
76 actorContext.setExecuteRemoteOperationResponse("message");
79 TransactionProxy transactionProxy =
80 new TransactionProxy(actorContext,
81 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
84 actorContext.setExecuteRemoteOperationResponse(
85 new ReadDataReply(TestModel.createTestContext(), null)
88 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
89 transactionProxy.read(TestModel.TEST_PATH);
91 Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
93 Assert.assertFalse(normalizedNodeOptional.isPresent());
95 actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
96 TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable());
98 read = transactionProxy.read(TestModel.TEST_PATH);
100 normalizedNodeOptional = read.get();
102 Assert.assertTrue(normalizedNodeOptional.isPresent());
106 public void testExists() throws Exception {
107 final Props props = Props.create(DoNothingActor.class);
108 final ActorRef actorRef = getSystem().actorOf(props);
110 final MockActorContext actorContext = new MockActorContext(this.getSystem());
111 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
112 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
113 actorContext.setExecuteRemoteOperationResponse("message");
116 TransactionProxy transactionProxy =
117 new TransactionProxy(actorContext,
118 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
121 actorContext.setExecuteRemoteOperationResponse(new DataExistsReply(false).toSerializable());
123 CheckedFuture<Boolean, ReadFailedException> exists =
124 transactionProxy.exists(TestModel.TEST_PATH);
126 Assert.assertFalse(exists.checkedGet());
128 actorContext.setExecuteRemoteOperationResponse(new DataExistsReply(true).toSerializable());
130 exists = transactionProxy.exists(TestModel.TEST_PATH);
132 Assert.assertTrue(exists.checkedGet());
134 actorContext.setExecuteRemoteOperationResponse("bad message");
136 exists = transactionProxy.exists(TestModel.TEST_PATH);
141 } catch(ReadFailedException e){
146 @Test(expected = ReadFailedException.class)
147 public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception {
148 final Props props = Props.create(DoNothingActor.class);
149 final ActorRef actorRef = getSystem().actorOf(props);
151 final MockActorContext actorContext = new MockActorContext(this.getSystem());
152 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
153 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
154 actorContext.setExecuteRemoteOperationResponse("message");
156 TransactionProxy transactionProxy =
157 new TransactionProxy(actorContext,
158 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
162 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>
163 read = transactionProxy.read(TestModel.TEST_PATH);
169 public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Exception {
170 final ActorContext actorContext = mock(ActorContext.class);
172 when(actorContext.executeShardOperation(anyString(), any(), any(
173 FiniteDuration.class))).thenThrow(new PrimaryNotFoundException("test"));
175 TransactionProxy transactionProxy =
176 new TransactionProxy(actorContext,
177 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
180 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
181 transactionProxy.read(TestModel.TEST_PATH);
183 Assert.assertFalse(read.get().isPresent());
189 public void testReadWhenATimeoutExceptionIsThrown() throws Exception {
190 final ActorContext actorContext = mock(ActorContext.class);
192 when(actorContext.executeShardOperation(anyString(), any(), any(
193 FiniteDuration.class))).thenThrow(new TimeoutException("test", new Exception("reason")));
195 TransactionProxy transactionProxy =
196 new TransactionProxy(actorContext,
197 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
200 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
201 transactionProxy.read(TestModel.TEST_PATH);
203 Assert.assertFalse(read.get().isPresent());
208 public void testReadWhenAAnyOtherExceptionIsThrown() throws Exception {
209 final ActorContext actorContext = mock(ActorContext.class);
211 when(actorContext.executeShardOperation(anyString(), any(), any(
212 FiniteDuration.class))).thenThrow(new NullPointerException());
214 TransactionProxy transactionProxy =
215 new TransactionProxy(actorContext,
216 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
220 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
221 transactionProxy.read(TestModel.TEST_PATH);
222 fail("A null pointer exception was expected");
223 } catch(NullPointerException e){
231 public void testWrite() throws Exception {
232 final Props props = Props.create(MessageCollectorActor.class);
233 final ActorRef actorRef = getSystem().actorOf(props);
235 final MockActorContext actorContext = new MockActorContext(this.getSystem());
236 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
237 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
238 actorContext.setExecuteRemoteOperationResponse("message");
240 TransactionProxy transactionProxy =
241 new TransactionProxy(actorContext,
242 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
244 transactionProxy.write(TestModel.TEST_PATH,
245 ImmutableNodes.containerNode(TestModel.NAME_QNAME));
247 Object messages = testContext
248 .executeLocalOperation(actorRef, "messages",
249 ActorContext.ASK_DURATION);
251 Assert.assertNotNull(messages);
253 Assert.assertTrue(messages instanceof List);
255 List<Object> listMessages = (List<Object>) messages;
257 Assert.assertEquals(1, listMessages.size());
259 Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
262 private Object createPrimaryFound(ActorRef actorRef) {
263 return new PrimaryFound(actorRef.path().toString()).toSerializable();
267 public void testMerge() throws Exception {
268 final Props props = Props.create(MessageCollectorActor.class);
269 final ActorRef actorRef = getSystem().actorOf(props);
271 final MockActorContext actorContext = new MockActorContext(this.getSystem());
272 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
273 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
274 actorContext.setExecuteRemoteOperationResponse("message");
276 TransactionProxy transactionProxy =
277 new TransactionProxy(actorContext,
278 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
280 transactionProxy.merge(TestModel.TEST_PATH,
281 ImmutableNodes.containerNode(TestModel.NAME_QNAME));
283 Object messages = testContext
284 .executeLocalOperation(actorRef, "messages",
285 ActorContext.ASK_DURATION);
287 Assert.assertNotNull(messages);
289 Assert.assertTrue(messages instanceof List);
291 List<Object> listMessages = (List<Object>) messages;
293 Assert.assertEquals(1, listMessages.size());
295 Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
299 public void testDelete() throws Exception {
300 final Props props = Props.create(MessageCollectorActor.class);
301 final ActorRef actorRef = getSystem().actorOf(props);
303 final MockActorContext actorContext = new MockActorContext(this.getSystem());
304 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
305 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
306 actorContext.setExecuteRemoteOperationResponse("message");
308 TransactionProxy transactionProxy =
309 new TransactionProxy(actorContext,
310 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
312 transactionProxy.delete(TestModel.TEST_PATH);
314 Object messages = testContext
315 .executeLocalOperation(actorRef, "messages",
316 ActorContext.ASK_DURATION);
318 Assert.assertNotNull(messages);
320 Assert.assertTrue(messages instanceof List);
322 List<Object> listMessages = (List<Object>) messages;
324 Assert.assertEquals(1, listMessages.size());
326 Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
330 public void testReady() throws Exception {
331 final Props props = Props.create(DoNothingActor.class);
332 final ActorRef doNothingActorRef = getSystem().actorOf(props);
334 final MockActorContext actorContext = new MockActorContext(this.getSystem());
335 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(doNothingActorRef));
336 actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
337 actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()).toSerializable());
339 TransactionProxy transactionProxy =
340 new TransactionProxy(actorContext,
341 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
344 transactionProxy.read(TestModel.TEST_PATH);
346 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
348 Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
350 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
352 Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
357 public void testGetIdentifier(){
358 final Props props = Props.create(DoNothingActor.class);
359 final ActorRef doNothingActorRef = getSystem().actorOf(props);
361 final MockActorContext actorContext = new MockActorContext(this.getSystem());
362 actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
364 TransactionProxy transactionProxy =
365 new TransactionProxy(actorContext,
366 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
368 Assert.assertNotNull(transactionProxy.getIdentifier());
372 public void testClose(){
373 final Props props = Props.create(MessageCollectorActor.class);
374 final ActorRef actorRef = getSystem().actorOf(props);
376 final MockActorContext actorContext = new MockActorContext(this.getSystem());
377 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
378 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
379 actorContext.setExecuteRemoteOperationResponse("message");
381 TransactionProxy transactionProxy =
382 new TransactionProxy(actorContext,
383 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
385 transactionProxy.read(TestModel.TEST_PATH);
387 transactionProxy.close();
389 Object messages = testContext
390 .executeLocalOperation(actorRef, "messages",
391 ActorContext.ASK_DURATION);
393 Assert.assertNotNull(messages);
395 Assert.assertTrue(messages instanceof List);
397 List<Object> listMessages = (List<Object>) messages;
399 Assert.assertEquals(1, listMessages.size());
401 Assert.assertTrue(listMessages.get(0).getClass().equals(CloseTransaction.SERIALIZABLE_CLASS));
404 private CreateTransactionReply createTransactionReply(ActorRef actorRef){
405 return CreateTransactionReply.newBuilder()
406 .setTransactionActorPath(actorRef.path().toString())
407 .setTransactionId("txn-1")