1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import com.google.common.base.Optional;
6 import com.google.common.util.concurrent.ListenableFuture;
7 import junit.framework.Assert;
9 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
10 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
11 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
12 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
13 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
14 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
15 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
16 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
17 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
18 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
19 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
20 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
22 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
23 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
25 import java.util.List;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
29 public class TransactionProxyTest extends AbstractActorTest {
31 private ExecutorService transactionExecutor =
32 Executors.newSingleThreadExecutor();
35 public void testRead() throws Exception {
36 final Props props = Props.create(DoNothingActor.class);
37 final ActorRef actorRef = getSystem().actorOf(props);
39 final MockActorContext actorContext = new MockActorContext(this.getSystem());
40 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
41 actorContext.setExecuteRemoteOperationResponse("message");
43 TransactionProxy transactionProxy =
44 new TransactionProxy(actorContext,
45 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
48 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
49 transactionProxy.read(TestModel.TEST_PATH);
51 Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
53 Assert.assertFalse(normalizedNodeOptional.isPresent());
55 actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
56 ImmutableNodes.containerNode(TestModel.TEST_QNAME)));
58 read = transactionProxy.read(TestModel.TEST_PATH);
60 normalizedNodeOptional = read.get();
62 Assert.assertTrue(normalizedNodeOptional.isPresent());
66 public void testReadWhenANullIsReturned() throws Exception {
67 final Props props = Props.create(DoNothingActor.class);
68 final ActorRef actorRef = getSystem().actorOf(props);
70 final MockActorContext actorContext = new MockActorContext(this.getSystem());
71 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
72 actorContext.setExecuteRemoteOperationResponse("message");
74 TransactionProxy transactionProxy =
75 new TransactionProxy(actorContext,
76 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
79 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
80 transactionProxy.read(TestModel.TEST_PATH);
82 Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
84 Assert.assertFalse(normalizedNodeOptional.isPresent());
86 actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
89 read = transactionProxy.read(TestModel.TEST_PATH);
91 normalizedNodeOptional = read.get();
93 Assert.assertFalse(normalizedNodeOptional.isPresent());
97 public void testWrite() throws Exception {
98 final Props props = Props.create(MessageCollectorActor.class);
99 final ActorRef actorRef = getSystem().actorOf(props);
101 final MockActorContext actorContext = new MockActorContext(this.getSystem());
102 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
103 actorContext.setExecuteRemoteOperationResponse("message");
105 TransactionProxy transactionProxy =
106 new TransactionProxy(actorContext,
107 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
109 transactionProxy.write(TestModel.TEST_PATH,
110 ImmutableNodes.containerNode(TestModel.NAME_QNAME));
112 ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
113 Object messages = testContext
114 .executeLocalOperation(actorRef, "messages",
115 ActorContext.ASK_DURATION);
117 Assert.assertNotNull(messages);
119 Assert.assertTrue(messages instanceof List);
121 List<Object> listMessages = (List<Object>) messages;
123 Assert.assertEquals(1, listMessages.size());
125 Assert.assertTrue(listMessages.get(0) instanceof WriteData);
129 public void testMerge() throws Exception {
130 final Props props = Props.create(MessageCollectorActor.class);
131 final ActorRef actorRef = getSystem().actorOf(props);
133 final MockActorContext actorContext = new MockActorContext(this.getSystem());
134 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
135 actorContext.setExecuteRemoteOperationResponse("message");
137 TransactionProxy transactionProxy =
138 new TransactionProxy(actorContext,
139 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
141 transactionProxy.merge(TestModel.TEST_PATH,
142 ImmutableNodes.containerNode(TestModel.NAME_QNAME));
144 ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
145 Object messages = testContext
146 .executeLocalOperation(actorRef, "messages",
147 ActorContext.ASK_DURATION);
149 Assert.assertNotNull(messages);
151 Assert.assertTrue(messages instanceof List);
153 List<Object> listMessages = (List<Object>) messages;
155 Assert.assertEquals(1, listMessages.size());
157 Assert.assertTrue(listMessages.get(0) instanceof MergeData);
161 public void testDelete() throws Exception {
162 final Props props = Props.create(MessageCollectorActor.class);
163 final ActorRef actorRef = getSystem().actorOf(props);
165 final MockActorContext actorContext = new MockActorContext(this.getSystem());
166 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
167 actorContext.setExecuteRemoteOperationResponse("message");
169 TransactionProxy transactionProxy =
170 new TransactionProxy(actorContext,
171 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
173 transactionProxy.delete(TestModel.TEST_PATH);
175 ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
176 Object messages = testContext
177 .executeLocalOperation(actorRef, "messages",
178 ActorContext.ASK_DURATION);
180 Assert.assertNotNull(messages);
182 Assert.assertTrue(messages instanceof List);
184 List<Object> listMessages = (List<Object>) messages;
186 Assert.assertEquals(1, listMessages.size());
188 Assert.assertTrue(listMessages.get(0) instanceof DeleteData);
192 public void testReady() throws Exception {
193 final Props props = Props.create(DoNothingActor.class);
194 final ActorRef doNothingActorRef = getSystem().actorOf(props);
196 final MockActorContext actorContext = new MockActorContext(this.getSystem());
197 actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
198 actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()));
200 TransactionProxy transactionProxy =
201 new TransactionProxy(actorContext,
202 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
205 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
207 Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
209 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
211 Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
216 public void testGetIdentifier(){
217 final Props props = Props.create(DoNothingActor.class);
218 final ActorRef doNothingActorRef = getSystem().actorOf(props);
220 final MockActorContext actorContext = new MockActorContext(this.getSystem());
221 actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
223 TransactionProxy transactionProxy =
224 new TransactionProxy(actorContext,
225 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
227 Assert.assertNotNull(transactionProxy.getIdentifier());
231 public void testClose(){
232 final Props props = Props.create(MessageCollectorActor.class);
233 final ActorRef actorRef = getSystem().actorOf(props);
235 final MockActorContext actorContext = new MockActorContext(this.getSystem());
236 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
237 actorContext.setExecuteRemoteOperationResponse("message");
239 TransactionProxy transactionProxy =
240 new TransactionProxy(actorContext,
241 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
243 transactionProxy.close();
245 ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
246 Object messages = testContext
247 .executeLocalOperation(actorRef, "messages",
248 ActorContext.ASK_DURATION);
250 Assert.assertNotNull(messages);
252 Assert.assertTrue(messages instanceof List);
254 List<Object> listMessages = (List<Object>) messages;
256 Assert.assertEquals(1, listMessages.size());
258 Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction);
261 private CreateTransactionReply createTransactionReply(ActorRef actorRef){
262 return new CreateTransactionReply(actorRef.path(), "txn-1");