1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import com.google.common.base.Optional;
6 import com.google.common.util.concurrent.ListenableFuture;
7 import junit.framework.Assert;
9 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
10 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
11 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
12 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
13 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
14 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
15 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
16 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
17 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
18 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
19 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
20 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
21 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
23 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
24 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
26 import java.util.List;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
30 public class TransactionProxyTest extends AbstractActorTest {
32 private final ActorContext testContext =
33 new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockConfiguration());
35 private ExecutorService transactionExecutor =
36 Executors.newSingleThreadExecutor();
39 public void testRead() throws Exception {
40 final Props props = Props.create(DoNothingActor.class);
41 final ActorRef actorRef = getSystem().actorOf(props);
43 final MockActorContext actorContext = new MockActorContext(this.getSystem());
44 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
45 actorContext.setExecuteRemoteOperationResponse("message");
48 TransactionProxy transactionProxy =
49 new TransactionProxy(actorContext,
50 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
53 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
54 transactionProxy.read(TestModel.TEST_PATH);
56 Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
58 Assert.assertFalse(normalizedNodeOptional.isPresent());
60 actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
61 TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable());
63 read = transactionProxy.read(TestModel.TEST_PATH);
65 normalizedNodeOptional = read.get();
67 Assert.assertTrue(normalizedNodeOptional.isPresent());
71 public void testReadWhenANullIsReturned() throws Exception {
72 final Props props = Props.create(DoNothingActor.class);
73 final ActorRef actorRef = getSystem().actorOf(props);
75 final MockActorContext actorContext = new MockActorContext(this.getSystem());
76 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
77 actorContext.setExecuteRemoteOperationResponse("message");
79 TransactionProxy transactionProxy =
80 new TransactionProxy(actorContext,
81 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
84 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
85 transactionProxy.read(TestModel.TEST_PATH);
87 Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
89 Assert.assertFalse(normalizedNodeOptional.isPresent());
91 actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
92 TestModel.createTestContext(), null).toSerializable());
94 read = transactionProxy.read(TestModel.TEST_PATH);
96 normalizedNodeOptional = read.get();
98 Assert.assertFalse(normalizedNodeOptional.isPresent());
102 public void testWrite() throws Exception {
103 final Props props = Props.create(MessageCollectorActor.class);
104 final ActorRef actorRef = getSystem().actorOf(props);
106 final MockActorContext actorContext = new MockActorContext(this.getSystem());
107 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
108 actorContext.setExecuteRemoteOperationResponse("message");
110 TransactionProxy transactionProxy =
111 new TransactionProxy(actorContext,
112 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
114 transactionProxy.write(TestModel.TEST_PATH,
115 ImmutableNodes.containerNode(TestModel.NAME_QNAME));
117 Object messages = testContext
118 .executeLocalOperation(actorRef, "messages",
119 ActorContext.ASK_DURATION);
121 Assert.assertNotNull(messages);
123 Assert.assertTrue(messages instanceof List);
125 List<Object> listMessages = (List<Object>) messages;
127 Assert.assertEquals(1, listMessages.size());
129 Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
133 public void testMerge() throws Exception {
134 final Props props = Props.create(MessageCollectorActor.class);
135 final ActorRef actorRef = getSystem().actorOf(props);
137 final MockActorContext actorContext = new MockActorContext(this.getSystem());
138 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
139 actorContext.setExecuteRemoteOperationResponse("message");
141 TransactionProxy transactionProxy =
142 new TransactionProxy(actorContext,
143 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
145 transactionProxy.merge(TestModel.TEST_PATH,
146 ImmutableNodes.containerNode(TestModel.NAME_QNAME));
148 Object messages = testContext
149 .executeLocalOperation(actorRef, "messages",
150 ActorContext.ASK_DURATION);
152 Assert.assertNotNull(messages);
154 Assert.assertTrue(messages instanceof List);
156 List<Object> listMessages = (List<Object>) messages;
158 Assert.assertEquals(1, listMessages.size());
160 Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
164 public void testDelete() throws Exception {
165 final Props props = Props.create(MessageCollectorActor.class);
166 final ActorRef actorRef = getSystem().actorOf(props);
168 final MockActorContext actorContext = new MockActorContext(this.getSystem());
169 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
170 actorContext.setExecuteRemoteOperationResponse("message");
172 TransactionProxy transactionProxy =
173 new TransactionProxy(actorContext,
174 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
176 transactionProxy.delete(TestModel.TEST_PATH);
178 Object messages = testContext
179 .executeLocalOperation(actorRef, "messages",
180 ActorContext.ASK_DURATION);
182 Assert.assertNotNull(messages);
184 Assert.assertTrue(messages instanceof List);
186 List<Object> listMessages = (List<Object>) messages;
188 Assert.assertEquals(1, listMessages.size());
190 Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
194 public void testReady() throws Exception {
195 final Props props = Props.create(DoNothingActor.class);
196 final ActorRef doNothingActorRef = getSystem().actorOf(props);
198 final MockActorContext actorContext = new MockActorContext(this.getSystem());
199 actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
200 actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()));
202 TransactionProxy transactionProxy =
203 new TransactionProxy(actorContext,
204 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
207 transactionProxy.read(TestModel.TEST_PATH);
209 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
211 Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
213 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
215 Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
220 public void testGetIdentifier(){
221 final Props props = Props.create(DoNothingActor.class);
222 final ActorRef doNothingActorRef = getSystem().actorOf(props);
224 final MockActorContext actorContext = new MockActorContext(this.getSystem());
225 actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
227 TransactionProxy transactionProxy =
228 new TransactionProxy(actorContext,
229 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
231 Assert.assertNotNull(transactionProxy.getIdentifier());
235 public void testClose(){
236 final Props props = Props.create(MessageCollectorActor.class);
237 final ActorRef actorRef = getSystem().actorOf(props);
239 final MockActorContext actorContext = new MockActorContext(this.getSystem());
240 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
241 actorContext.setExecuteRemoteOperationResponse("message");
243 TransactionProxy transactionProxy =
244 new TransactionProxy(actorContext,
245 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
247 transactionProxy.read(TestModel.TEST_PATH);
249 transactionProxy.close();
251 Object messages = testContext
252 .executeLocalOperation(actorRef, "messages",
253 ActorContext.ASK_DURATION);
255 Assert.assertNotNull(messages);
257 Assert.assertTrue(messages instanceof List);
259 List<Object> listMessages = (List<Object>) messages;
261 Assert.assertEquals(1, listMessages.size());
263 Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction);
266 private CreateTransactionReply createTransactionReply(ActorRef actorRef){
267 return CreateTransactionReply.newBuilder()
268 .setTransactionActorPath(actorRef.path().toString())
269 .setTransactionId("txn-1")