1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import com.google.common.base.Optional;
6 import com.google.common.util.concurrent.ListenableFuture;
7 import junit.framework.Assert;
9 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
10 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
11 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
12 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
13 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
14 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
15 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
16 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
17 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
18 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
19 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
20 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
22 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
23 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
25 import java.util.List;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
29 public class TransactionProxyTest extends AbstractActorTest {
31 private ExecutorService transactionExecutor =
32 Executors.newSingleThreadExecutor();
35 public void testRead() throws Exception {
36 final Props props = Props.create(DoNothingActor.class);
37 final ActorRef actorRef = getSystem().actorOf(props);
39 final MockActorContext actorContext = new MockActorContext(this.getSystem());
40 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
41 actorContext.setExecuteRemoteOperationResponse("message");
44 TransactionProxy transactionProxy =
45 new TransactionProxy(actorContext,
46 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
49 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
50 transactionProxy.read(TestModel.TEST_PATH);
52 Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
54 Assert.assertFalse(normalizedNodeOptional.isPresent());
56 actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
57 TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable());
59 read = transactionProxy.read(TestModel.TEST_PATH);
61 normalizedNodeOptional = read.get();
63 Assert.assertTrue(normalizedNodeOptional.isPresent());
67 public void testReadWhenANullIsReturned() throws Exception {
68 final Props props = Props.create(DoNothingActor.class);
69 final ActorRef actorRef = getSystem().actorOf(props);
71 final MockActorContext actorContext = new MockActorContext(this.getSystem());
72 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
73 actorContext.setExecuteRemoteOperationResponse("message");
75 TransactionProxy transactionProxy =
76 new TransactionProxy(actorContext,
77 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
80 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
81 transactionProxy.read(TestModel.TEST_PATH);
83 Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
85 Assert.assertFalse(normalizedNodeOptional.isPresent());
87 actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
88 TestModel.createTestContext(), null).toSerializable());
90 read = transactionProxy.read(TestModel.TEST_PATH);
92 normalizedNodeOptional = read.get();
94 Assert.assertFalse(normalizedNodeOptional.isPresent());
98 public void testWrite() throws Exception {
99 final Props props = Props.create(MessageCollectorActor.class);
100 final ActorRef actorRef = getSystem().actorOf(props);
102 final MockActorContext actorContext = new MockActorContext(this.getSystem());
103 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
104 actorContext.setExecuteRemoteOperationResponse("message");
106 TransactionProxy transactionProxy =
107 new TransactionProxy(actorContext,
108 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
110 transactionProxy.write(TestModel.TEST_PATH,
111 ImmutableNodes.containerNode(TestModel.NAME_QNAME));
113 ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
114 Object messages = testContext
115 .executeLocalOperation(actorRef, "messages",
116 ActorContext.ASK_DURATION);
118 Assert.assertNotNull(messages);
120 Assert.assertTrue(messages instanceof List);
122 List<Object> listMessages = (List<Object>) messages;
124 Assert.assertEquals(1, listMessages.size());
126 Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
130 public void testMerge() throws Exception {
131 final Props props = Props.create(MessageCollectorActor.class);
132 final ActorRef actorRef = getSystem().actorOf(props);
134 final MockActorContext actorContext = new MockActorContext(this.getSystem());
135 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
136 actorContext.setExecuteRemoteOperationResponse("message");
138 TransactionProxy transactionProxy =
139 new TransactionProxy(actorContext,
140 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
142 transactionProxy.merge(TestModel.TEST_PATH,
143 ImmutableNodes.containerNode(TestModel.NAME_QNAME));
145 ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
146 Object messages = testContext
147 .executeLocalOperation(actorRef, "messages",
148 ActorContext.ASK_DURATION);
150 Assert.assertNotNull(messages);
152 Assert.assertTrue(messages instanceof List);
154 List<Object> listMessages = (List<Object>) messages;
156 Assert.assertEquals(1, listMessages.size());
158 Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
162 public void testDelete() throws Exception {
163 final Props props = Props.create(MessageCollectorActor.class);
164 final ActorRef actorRef = getSystem().actorOf(props);
166 final MockActorContext actorContext = new MockActorContext(this.getSystem());
167 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
168 actorContext.setExecuteRemoteOperationResponse("message");
170 TransactionProxy transactionProxy =
171 new TransactionProxy(actorContext,
172 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
174 transactionProxy.delete(TestModel.TEST_PATH);
176 ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
177 Object messages = testContext
178 .executeLocalOperation(actorRef, "messages",
179 ActorContext.ASK_DURATION);
181 Assert.assertNotNull(messages);
183 Assert.assertTrue(messages instanceof List);
185 List<Object> listMessages = (List<Object>) messages;
187 Assert.assertEquals(1, listMessages.size());
189 Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
193 public void testReady() throws Exception {
194 final Props props = Props.create(DoNothingActor.class);
195 final ActorRef doNothingActorRef = getSystem().actorOf(props);
197 final MockActorContext actorContext = new MockActorContext(this.getSystem());
198 actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
199 actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()));
201 TransactionProxy transactionProxy =
202 new TransactionProxy(actorContext,
203 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
206 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
208 Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
210 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
212 Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
217 public void testGetIdentifier(){
218 final Props props = Props.create(DoNothingActor.class);
219 final ActorRef doNothingActorRef = getSystem().actorOf(props);
221 final MockActorContext actorContext = new MockActorContext(this.getSystem());
222 actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
224 TransactionProxy transactionProxy =
225 new TransactionProxy(actorContext,
226 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
228 Assert.assertNotNull(transactionProxy.getIdentifier());
232 public void testClose(){
233 final Props props = Props.create(MessageCollectorActor.class);
234 final ActorRef actorRef = getSystem().actorOf(props);
236 final MockActorContext actorContext = new MockActorContext(this.getSystem());
237 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
238 actorContext.setExecuteRemoteOperationResponse("message");
240 TransactionProxy transactionProxy =
241 new TransactionProxy(actorContext,
242 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
244 transactionProxy.close();
246 ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
247 Object messages = testContext
248 .executeLocalOperation(actorRef, "messages",
249 ActorContext.ASK_DURATION);
251 Assert.assertNotNull(messages);
253 Assert.assertTrue(messages instanceof List);
255 List<Object> listMessages = (List<Object>) messages;
257 Assert.assertEquals(1, listMessages.size());
259 Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction);
262 private CreateTransactionReply createTransactionReply(ActorRef actorRef){
263 return CreateTransactionReply.newBuilder()
264 .setTransactionActorPath(actorRef.path().toString())
265 .setTransactionId("txn-1")