Implementation of ModuleShardStrategy
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import com.google.common.base.Optional;
6 import com.google.common.util.concurrent.ListenableFuture;
7 import junit.framework.Assert;
8 import org.junit.Test;
9 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
10 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
11 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
12 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
13 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
14 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
15 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
16 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
17 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
18 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
19 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
20 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
21 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
23 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
24 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
25
26 import java.util.List;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29
30 public class TransactionProxyTest extends AbstractActorTest {
31
32     private final ActorContext testContext =
33         new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockConfiguration());
34
35     private ExecutorService transactionExecutor =
36         Executors.newSingleThreadExecutor();
37
38     @Test
39     public void testRead() throws Exception {
40         final Props props = Props.create(DoNothingActor.class);
41         final ActorRef actorRef = getSystem().actorOf(props);
42
43         final MockActorContext actorContext = new MockActorContext(this.getSystem());
44         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
45         actorContext.setExecuteRemoteOperationResponse("message");
46
47
48         TransactionProxy transactionProxy =
49             new TransactionProxy(actorContext,
50                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
51
52
53         ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
54             transactionProxy.read(TestModel.TEST_PATH);
55
56         Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
57
58         Assert.assertFalse(normalizedNodeOptional.isPresent());
59
60         actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
61             TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable());
62
63         read = transactionProxy.read(TestModel.TEST_PATH);
64
65         normalizedNodeOptional = read.get();
66
67         Assert.assertTrue(normalizedNodeOptional.isPresent());
68     }
69
70     @Test
71     public void testReadWhenANullIsReturned() throws Exception {
72         final Props props = Props.create(DoNothingActor.class);
73         final ActorRef actorRef = getSystem().actorOf(props);
74
75         final MockActorContext actorContext = new MockActorContext(this.getSystem());
76         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
77         actorContext.setExecuteRemoteOperationResponse("message");
78
79         TransactionProxy transactionProxy =
80             new TransactionProxy(actorContext,
81                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
82
83
84         ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
85             transactionProxy.read(TestModel.TEST_PATH);
86
87         Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
88
89         Assert.assertFalse(normalizedNodeOptional.isPresent());
90
91         actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
92            TestModel.createTestContext(), null).toSerializable());
93
94         read = transactionProxy.read(TestModel.TEST_PATH);
95
96         normalizedNodeOptional = read.get();
97
98         Assert.assertFalse(normalizedNodeOptional.isPresent());
99     }
100
101     @Test
102     public void testWrite() throws Exception {
103         final Props props = Props.create(MessageCollectorActor.class);
104         final ActorRef actorRef = getSystem().actorOf(props);
105
106         final MockActorContext actorContext = new MockActorContext(this.getSystem());
107         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
108         actorContext.setExecuteRemoteOperationResponse("message");
109
110         TransactionProxy transactionProxy =
111             new TransactionProxy(actorContext,
112                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
113
114         transactionProxy.write(TestModel.TEST_PATH,
115             ImmutableNodes.containerNode(TestModel.NAME_QNAME));
116
117         Object messages = testContext
118             .executeLocalOperation(actorRef, "messages",
119                 ActorContext.ASK_DURATION);
120
121         Assert.assertNotNull(messages);
122
123         Assert.assertTrue(messages instanceof List);
124
125         List<Object> listMessages = (List<Object>) messages;
126
127         Assert.assertEquals(1, listMessages.size());
128
129         Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
130     }
131
132     @Test
133     public void testMerge() throws Exception {
134         final Props props = Props.create(MessageCollectorActor.class);
135         final ActorRef actorRef = getSystem().actorOf(props);
136
137         final MockActorContext actorContext = new MockActorContext(this.getSystem());
138         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
139         actorContext.setExecuteRemoteOperationResponse("message");
140
141         TransactionProxy transactionProxy =
142             new TransactionProxy(actorContext,
143                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
144
145         transactionProxy.merge(TestModel.TEST_PATH,
146             ImmutableNodes.containerNode(TestModel.NAME_QNAME));
147
148         Object messages = testContext
149             .executeLocalOperation(actorRef, "messages",
150                 ActorContext.ASK_DURATION);
151
152         Assert.assertNotNull(messages);
153
154         Assert.assertTrue(messages instanceof List);
155
156         List<Object> listMessages = (List<Object>) messages;
157
158         Assert.assertEquals(1, listMessages.size());
159
160         Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
161     }
162
163     @Test
164     public void testDelete() throws Exception {
165         final Props props = Props.create(MessageCollectorActor.class);
166         final ActorRef actorRef = getSystem().actorOf(props);
167
168         final MockActorContext actorContext = new MockActorContext(this.getSystem());
169         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
170         actorContext.setExecuteRemoteOperationResponse("message");
171
172         TransactionProxy transactionProxy =
173             new TransactionProxy(actorContext,
174                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
175
176         transactionProxy.delete(TestModel.TEST_PATH);
177
178         Object messages = testContext
179             .executeLocalOperation(actorRef, "messages",
180                 ActorContext.ASK_DURATION);
181
182         Assert.assertNotNull(messages);
183
184         Assert.assertTrue(messages instanceof List);
185
186         List<Object> listMessages = (List<Object>) messages;
187
188         Assert.assertEquals(1, listMessages.size());
189
190         Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
191     }
192
193     @Test
194     public void testReady() throws Exception {
195         final Props props = Props.create(DoNothingActor.class);
196         final ActorRef doNothingActorRef = getSystem().actorOf(props);
197
198         final MockActorContext actorContext = new MockActorContext(this.getSystem());
199         actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
200         actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()));
201
202         TransactionProxy transactionProxy =
203             new TransactionProxy(actorContext,
204                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
205
206
207         transactionProxy.read(TestModel.TEST_PATH);
208
209         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
210
211         Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
212
213         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
214
215         Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
216
217     }
218
219     @Test
220     public void testGetIdentifier(){
221         final Props props = Props.create(DoNothingActor.class);
222         final ActorRef doNothingActorRef = getSystem().actorOf(props);
223
224         final MockActorContext actorContext = new MockActorContext(this.getSystem());
225         actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
226
227         TransactionProxy transactionProxy =
228             new TransactionProxy(actorContext,
229                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
230
231         Assert.assertNotNull(transactionProxy.getIdentifier());
232     }
233
234     @Test
235     public void testClose(){
236         final Props props = Props.create(MessageCollectorActor.class);
237         final ActorRef actorRef = getSystem().actorOf(props);
238
239         final MockActorContext actorContext = new MockActorContext(this.getSystem());
240         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
241         actorContext.setExecuteRemoteOperationResponse("message");
242
243         TransactionProxy transactionProxy =
244             new TransactionProxy(actorContext,
245                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
246
247         transactionProxy.read(TestModel.TEST_PATH);
248
249         transactionProxy.close();
250
251         Object messages = testContext
252             .executeLocalOperation(actorRef, "messages",
253                 ActorContext.ASK_DURATION);
254
255         Assert.assertNotNull(messages);
256
257         Assert.assertTrue(messages instanceof List);
258
259         List<Object> listMessages = (List<Object>) messages;
260
261         Assert.assertEquals(1, listMessages.size());
262
263         Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction);
264     }
265
266     private CreateTransactionReply createTransactionReply(ActorRef actorRef){
267         return CreateTransactionReply.newBuilder()
268             .setTransactionActorPath(actorRef.path().toString())
269             .setTransactionId("txn-1")
270             .build();
271     }
272 }