1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
6 import com.google.common.base.Optional;
7 import com.google.common.util.concurrent.ListenableFuture;
8 import com.google.common.util.concurrent.ListeningExecutorService;
9 import com.google.common.util.concurrent.MoreExecutors;
11 import junit.framework.Assert;
13 import org.junit.After;
14 import org.junit.Before;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
18 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
19 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
20 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
21 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
23 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
24 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
25 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
26 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
27 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
28 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
29 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
30 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
31 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
32 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
33 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
34 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
36 import java.util.List;
37 import java.util.concurrent.Executors;
39 public class TransactionProxyTest extends AbstractActorTest {
41 private final Configuration configuration = new MockConfiguration();
43 private final ActorContext testContext =
44 new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration );
46 private final ListeningExecutorService transactionExecutor =
47 MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
51 ShardStrategyFactory.setConfiguration(configuration);
55 public void tearDown() {
56 transactionExecutor.shutdownNow();
60 public void testRead() throws Exception {
61 final Props props = Props.create(DoNothingActor.class);
62 final ActorRef actorRef = getSystem().actorOf(props);
64 final MockActorContext actorContext = new MockActorContext(this.getSystem());
65 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
66 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
67 actorContext.setExecuteRemoteOperationResponse("message");
70 TransactionProxy transactionProxy =
71 new TransactionProxy(actorContext,
72 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
75 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
76 transactionProxy.read(TestModel.TEST_PATH);
78 Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
80 Assert.assertFalse(normalizedNodeOptional.isPresent());
82 actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
83 TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable());
85 read = transactionProxy.read(TestModel.TEST_PATH);
87 normalizedNodeOptional = read.get();
89 Assert.assertTrue(normalizedNodeOptional.isPresent());
93 public void testReadWhenANullIsReturned() throws Exception {
94 final Props props = Props.create(DoNothingActor.class);
95 final ActorRef actorRef = getSystem().actorOf(props);
97 final MockActorContext actorContext = new MockActorContext(this.getSystem());
98 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
99 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
100 actorContext.setExecuteRemoteOperationResponse("message");
102 TransactionProxy transactionProxy =
103 new TransactionProxy(actorContext,
104 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
107 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
108 transactionProxy.read(TestModel.TEST_PATH);
110 Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
112 Assert.assertFalse(normalizedNodeOptional.isPresent());
114 actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
115 TestModel.createTestContext(), null).toSerializable());
117 read = transactionProxy.read(TestModel.TEST_PATH);
119 normalizedNodeOptional = read.get();
121 Assert.assertFalse(normalizedNodeOptional.isPresent());
125 public void testWrite() throws Exception {
126 final Props props = Props.create(MessageCollectorActor.class);
127 final ActorRef actorRef = getSystem().actorOf(props);
129 final MockActorContext actorContext = new MockActorContext(this.getSystem());
130 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
131 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
132 actorContext.setExecuteRemoteOperationResponse("message");
134 TransactionProxy transactionProxy =
135 new TransactionProxy(actorContext,
136 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
138 transactionProxy.write(TestModel.TEST_PATH,
139 ImmutableNodes.containerNode(TestModel.NAME_QNAME));
141 Object messages = testContext
142 .executeLocalOperation(actorRef, "messages",
143 ActorContext.ASK_DURATION);
145 Assert.assertNotNull(messages);
147 Assert.assertTrue(messages instanceof List);
149 List<Object> listMessages = (List<Object>) messages;
151 Assert.assertEquals(1, listMessages.size());
153 Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
156 private Object createPrimaryFound(ActorRef actorRef) {
157 return new PrimaryFound(actorRef.path().toString()).toSerializable();
161 public void testMerge() throws Exception {
162 final Props props = Props.create(MessageCollectorActor.class);
163 final ActorRef actorRef = getSystem().actorOf(props);
165 final MockActorContext actorContext = new MockActorContext(this.getSystem());
166 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
167 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
168 actorContext.setExecuteRemoteOperationResponse("message");
170 TransactionProxy transactionProxy =
171 new TransactionProxy(actorContext,
172 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
174 transactionProxy.merge(TestModel.TEST_PATH,
175 ImmutableNodes.containerNode(TestModel.NAME_QNAME));
177 Object messages = testContext
178 .executeLocalOperation(actorRef, "messages",
179 ActorContext.ASK_DURATION);
181 Assert.assertNotNull(messages);
183 Assert.assertTrue(messages instanceof List);
185 List<Object> listMessages = (List<Object>) messages;
187 Assert.assertEquals(1, listMessages.size());
189 Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
193 public void testDelete() throws Exception {
194 final Props props = Props.create(MessageCollectorActor.class);
195 final ActorRef actorRef = getSystem().actorOf(props);
197 final MockActorContext actorContext = new MockActorContext(this.getSystem());
198 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
199 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
200 actorContext.setExecuteRemoteOperationResponse("message");
202 TransactionProxy transactionProxy =
203 new TransactionProxy(actorContext,
204 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
206 transactionProxy.delete(TestModel.TEST_PATH);
208 Object messages = testContext
209 .executeLocalOperation(actorRef, "messages",
210 ActorContext.ASK_DURATION);
212 Assert.assertNotNull(messages);
214 Assert.assertTrue(messages instanceof List);
216 List<Object> listMessages = (List<Object>) messages;
218 Assert.assertEquals(1, listMessages.size());
220 Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
224 public void testReady() throws Exception {
225 final Props props = Props.create(DoNothingActor.class);
226 final ActorRef doNothingActorRef = getSystem().actorOf(props);
228 final MockActorContext actorContext = new MockActorContext(this.getSystem());
229 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(doNothingActorRef));
230 actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
231 actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()).toSerializable());
233 TransactionProxy transactionProxy =
234 new TransactionProxy(actorContext,
235 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
238 transactionProxy.read(TestModel.TEST_PATH);
240 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
242 Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
244 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
246 Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
251 public void testGetIdentifier(){
252 final Props props = Props.create(DoNothingActor.class);
253 final ActorRef doNothingActorRef = getSystem().actorOf(props);
255 final MockActorContext actorContext = new MockActorContext(this.getSystem());
256 actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
258 TransactionProxy transactionProxy =
259 new TransactionProxy(actorContext,
260 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
262 Assert.assertNotNull(transactionProxy.getIdentifier());
266 public void testClose(){
267 final Props props = Props.create(MessageCollectorActor.class);
268 final ActorRef actorRef = getSystem().actorOf(props);
270 final MockActorContext actorContext = new MockActorContext(this.getSystem());
271 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
272 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
273 actorContext.setExecuteRemoteOperationResponse("message");
275 TransactionProxy transactionProxy =
276 new TransactionProxy(actorContext,
277 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
279 transactionProxy.read(TestModel.TEST_PATH);
281 transactionProxy.close();
283 Object messages = testContext
284 .executeLocalOperation(actorRef, "messages",
285 ActorContext.ASK_DURATION);
287 Assert.assertNotNull(messages);
289 Assert.assertTrue(messages instanceof List);
291 List<Object> listMessages = (List<Object>) messages;
293 Assert.assertEquals(1, listMessages.size());
295 Assert.assertTrue(listMessages.get(0).getClass().equals(CloseTransaction.SERIALIZABLE_CLASS));
298 private CreateTransactionReply createTransactionReply(ActorRef actorRef){
299 return CreateTransactionReply.newBuilder()
300 .setTransactionActorPath(actorRef.path().toString())
301 .setTransactionId("txn-1")