1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import com.google.common.base.Optional;
6 import com.google.common.util.concurrent.ListenableFuture;
7 import junit.framework.Assert;
8 import org.junit.Before;
10 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
11 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
12 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
13 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
14 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
15 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
16 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
17 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
18 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
19 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
20 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
21 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
22 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
23 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
24 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
25 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
27 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
28 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
30 import java.util.List;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
34 public class TransactionProxyTest extends AbstractActorTest {
36 private final Configuration configuration = new MockConfiguration();
38 private final ActorContext testContext =
39 new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration );
41 private ExecutorService transactionExecutor =
42 Executors.newSingleThreadExecutor();
46 ShardStrategyFactory.setConfiguration(configuration);
50 public void testRead() throws Exception {
51 final Props props = Props.create(DoNothingActor.class);
52 final ActorRef actorRef = getSystem().actorOf(props);
54 final MockActorContext actorContext = new MockActorContext(this.getSystem());
55 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
56 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
57 actorContext.setExecuteRemoteOperationResponse("message");
60 TransactionProxy transactionProxy =
61 new TransactionProxy(actorContext,
62 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
65 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
66 transactionProxy.read(TestModel.TEST_PATH);
68 Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
70 Assert.assertFalse(normalizedNodeOptional.isPresent());
72 actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
73 TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable());
75 read = transactionProxy.read(TestModel.TEST_PATH);
77 normalizedNodeOptional = read.get();
79 Assert.assertTrue(normalizedNodeOptional.isPresent());
83 public void testReadWhenANullIsReturned() throws Exception {
84 final Props props = Props.create(DoNothingActor.class);
85 final ActorRef actorRef = getSystem().actorOf(props);
87 final MockActorContext actorContext = new MockActorContext(this.getSystem());
88 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
89 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
90 actorContext.setExecuteRemoteOperationResponse("message");
92 TransactionProxy transactionProxy =
93 new TransactionProxy(actorContext,
94 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
97 ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
98 transactionProxy.read(TestModel.TEST_PATH);
100 Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
102 Assert.assertFalse(normalizedNodeOptional.isPresent());
104 actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
105 TestModel.createTestContext(), null).toSerializable());
107 read = transactionProxy.read(TestModel.TEST_PATH);
109 normalizedNodeOptional = read.get();
111 Assert.assertFalse(normalizedNodeOptional.isPresent());
115 public void testWrite() throws Exception {
116 final Props props = Props.create(MessageCollectorActor.class);
117 final ActorRef actorRef = getSystem().actorOf(props);
119 final MockActorContext actorContext = new MockActorContext(this.getSystem());
120 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
121 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
122 actorContext.setExecuteRemoteOperationResponse("message");
124 TransactionProxy transactionProxy =
125 new TransactionProxy(actorContext,
126 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
128 transactionProxy.write(TestModel.TEST_PATH,
129 ImmutableNodes.containerNode(TestModel.NAME_QNAME));
131 Object messages = testContext
132 .executeLocalOperation(actorRef, "messages",
133 ActorContext.ASK_DURATION);
135 Assert.assertNotNull(messages);
137 Assert.assertTrue(messages instanceof List);
139 List<Object> listMessages = (List<Object>) messages;
141 Assert.assertEquals(1, listMessages.size());
143 Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
146 private Object createPrimaryFound(ActorRef actorRef) {
147 return new PrimaryFound(actorRef.path().toString()).toSerializable();
151 public void testMerge() throws Exception {
152 final Props props = Props.create(MessageCollectorActor.class);
153 final ActorRef actorRef = getSystem().actorOf(props);
155 final MockActorContext actorContext = new MockActorContext(this.getSystem());
156 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
157 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
158 actorContext.setExecuteRemoteOperationResponse("message");
160 TransactionProxy transactionProxy =
161 new TransactionProxy(actorContext,
162 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
164 transactionProxy.merge(TestModel.TEST_PATH,
165 ImmutableNodes.containerNode(TestModel.NAME_QNAME));
167 Object messages = testContext
168 .executeLocalOperation(actorRef, "messages",
169 ActorContext.ASK_DURATION);
171 Assert.assertNotNull(messages);
173 Assert.assertTrue(messages instanceof List);
175 List<Object> listMessages = (List<Object>) messages;
177 Assert.assertEquals(1, listMessages.size());
179 Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
183 public void testDelete() throws Exception {
184 final Props props = Props.create(MessageCollectorActor.class);
185 final ActorRef actorRef = getSystem().actorOf(props);
187 final MockActorContext actorContext = new MockActorContext(this.getSystem());
188 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
189 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
190 actorContext.setExecuteRemoteOperationResponse("message");
192 TransactionProxy transactionProxy =
193 new TransactionProxy(actorContext,
194 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
196 transactionProxy.delete(TestModel.TEST_PATH);
198 Object messages = testContext
199 .executeLocalOperation(actorRef, "messages",
200 ActorContext.ASK_DURATION);
202 Assert.assertNotNull(messages);
204 Assert.assertTrue(messages instanceof List);
206 List<Object> listMessages = (List<Object>) messages;
208 Assert.assertEquals(1, listMessages.size());
210 Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
214 public void testReady() throws Exception {
215 final Props props = Props.create(DoNothingActor.class);
216 final ActorRef doNothingActorRef = getSystem().actorOf(props);
218 final MockActorContext actorContext = new MockActorContext(this.getSystem());
219 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(doNothingActorRef));
220 actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
221 actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()).toSerializable());
223 TransactionProxy transactionProxy =
224 new TransactionProxy(actorContext,
225 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
228 transactionProxy.read(TestModel.TEST_PATH);
230 DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
232 Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
234 ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
236 Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
241 public void testGetIdentifier(){
242 final Props props = Props.create(DoNothingActor.class);
243 final ActorRef doNothingActorRef = getSystem().actorOf(props);
245 final MockActorContext actorContext = new MockActorContext(this.getSystem());
246 actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
248 TransactionProxy transactionProxy =
249 new TransactionProxy(actorContext,
250 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
252 Assert.assertNotNull(transactionProxy.getIdentifier());
256 public void testClose(){
257 final Props props = Props.create(MessageCollectorActor.class);
258 final ActorRef actorRef = getSystem().actorOf(props);
260 final MockActorContext actorContext = new MockActorContext(this.getSystem());
261 actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
262 actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
263 actorContext.setExecuteRemoteOperationResponse("message");
265 TransactionProxy transactionProxy =
266 new TransactionProxy(actorContext,
267 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
269 transactionProxy.read(TestModel.TEST_PATH);
271 transactionProxy.close();
273 Object messages = testContext
274 .executeLocalOperation(actorRef, "messages",
275 ActorContext.ASK_DURATION);
277 Assert.assertNotNull(messages);
279 Assert.assertTrue(messages instanceof List);
281 List<Object> listMessages = (List<Object>) messages;
283 Assert.assertEquals(1, listMessages.size());
285 Assert.assertTrue(listMessages.get(0).getClass().equals(CloseTransaction.SERIALIZABLE_CLASS));
288 private CreateTransactionReply createTransactionReply(ActorRef actorRef){
289 return CreateTransactionReply.newBuilder()
290 .setTransactionActorPath(actorRef.path().toString())
291 .setTransactionId("txn-1")