Merge "Make Raft messages serializable"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import com.google.common.base.Optional;
6 import com.google.common.util.concurrent.ListenableFuture;
7 import junit.framework.Assert;
8 import org.junit.Before;
9 import org.junit.Test;
10 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
11 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
12 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
13 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
14 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
15 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
16 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
17 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
18 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
19 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
20 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
21 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
22 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
23 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
24 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
25 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
27 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
28 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
29
30 import java.util.List;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33
34 public class TransactionProxyTest extends AbstractActorTest {
35
36     private final Configuration configuration = new MockConfiguration();
37
38     private final ActorContext testContext =
39         new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration );
40
41     private ExecutorService transactionExecutor =
42         Executors.newSingleThreadExecutor();
43
44     @Before
45     public void setUp(){
46         ShardStrategyFactory.setConfiguration(configuration);
47     }
48
49     @Test
50     public void testRead() throws Exception {
51         final Props props = Props.create(DoNothingActor.class);
52         final ActorRef actorRef = getSystem().actorOf(props);
53
54         final MockActorContext actorContext = new MockActorContext(this.getSystem());
55         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
56         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
57         actorContext.setExecuteRemoteOperationResponse("message");
58
59
60         TransactionProxy transactionProxy =
61             new TransactionProxy(actorContext,
62                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
63
64
65         ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
66             transactionProxy.read(TestModel.TEST_PATH);
67
68         Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
69
70         Assert.assertFalse(normalizedNodeOptional.isPresent());
71
72         actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
73             TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable());
74
75         read = transactionProxy.read(TestModel.TEST_PATH);
76
77         normalizedNodeOptional = read.get();
78
79         Assert.assertTrue(normalizedNodeOptional.isPresent());
80     }
81
82     @Test
83     public void testReadWhenANullIsReturned() throws Exception {
84         final Props props = Props.create(DoNothingActor.class);
85         final ActorRef actorRef = getSystem().actorOf(props);
86
87         final MockActorContext actorContext = new MockActorContext(this.getSystem());
88         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
89         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
90         actorContext.setExecuteRemoteOperationResponse("message");
91
92         TransactionProxy transactionProxy =
93             new TransactionProxy(actorContext,
94                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
95
96
97         ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
98             transactionProxy.read(TestModel.TEST_PATH);
99
100         Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
101
102         Assert.assertFalse(normalizedNodeOptional.isPresent());
103
104         actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
105            TestModel.createTestContext(), null).toSerializable());
106
107         read = transactionProxy.read(TestModel.TEST_PATH);
108
109         normalizedNodeOptional = read.get();
110
111         Assert.assertFalse(normalizedNodeOptional.isPresent());
112     }
113
114     @Test
115     public void testWrite() throws Exception {
116         final Props props = Props.create(MessageCollectorActor.class);
117         final ActorRef actorRef = getSystem().actorOf(props);
118
119         final MockActorContext actorContext = new MockActorContext(this.getSystem());
120         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
121         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
122         actorContext.setExecuteRemoteOperationResponse("message");
123
124         TransactionProxy transactionProxy =
125             new TransactionProxy(actorContext,
126                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
127
128         transactionProxy.write(TestModel.TEST_PATH,
129             ImmutableNodes.containerNode(TestModel.NAME_QNAME));
130
131         Object messages = testContext
132             .executeLocalOperation(actorRef, "messages",
133                 ActorContext.ASK_DURATION);
134
135         Assert.assertNotNull(messages);
136
137         Assert.assertTrue(messages instanceof List);
138
139         List<Object> listMessages = (List<Object>) messages;
140
141         Assert.assertEquals(1, listMessages.size());
142
143         Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
144     }
145
146     private Object createPrimaryFound(ActorRef actorRef) {
147         return new PrimaryFound(actorRef.path().toString()).toSerializable();
148     }
149
150     @Test
151     public void testMerge() throws Exception {
152         final Props props = Props.create(MessageCollectorActor.class);
153         final ActorRef actorRef = getSystem().actorOf(props);
154
155         final MockActorContext actorContext = new MockActorContext(this.getSystem());
156         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
157         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
158         actorContext.setExecuteRemoteOperationResponse("message");
159
160         TransactionProxy transactionProxy =
161             new TransactionProxy(actorContext,
162                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
163
164         transactionProxy.merge(TestModel.TEST_PATH,
165             ImmutableNodes.containerNode(TestModel.NAME_QNAME));
166
167         Object messages = testContext
168             .executeLocalOperation(actorRef, "messages",
169                 ActorContext.ASK_DURATION);
170
171         Assert.assertNotNull(messages);
172
173         Assert.assertTrue(messages instanceof List);
174
175         List<Object> listMessages = (List<Object>) messages;
176
177         Assert.assertEquals(1, listMessages.size());
178
179         Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
180     }
181
182     @Test
183     public void testDelete() throws Exception {
184         final Props props = Props.create(MessageCollectorActor.class);
185         final ActorRef actorRef = getSystem().actorOf(props);
186
187         final MockActorContext actorContext = new MockActorContext(this.getSystem());
188         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
189         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
190         actorContext.setExecuteRemoteOperationResponse("message");
191
192         TransactionProxy transactionProxy =
193             new TransactionProxy(actorContext,
194                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
195
196         transactionProxy.delete(TestModel.TEST_PATH);
197
198         Object messages = testContext
199             .executeLocalOperation(actorRef, "messages",
200                 ActorContext.ASK_DURATION);
201
202         Assert.assertNotNull(messages);
203
204         Assert.assertTrue(messages instanceof List);
205
206         List<Object> listMessages = (List<Object>) messages;
207
208         Assert.assertEquals(1, listMessages.size());
209
210         Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
211     }
212
213     @Test
214     public void testReady() throws Exception {
215         final Props props = Props.create(DoNothingActor.class);
216         final ActorRef doNothingActorRef = getSystem().actorOf(props);
217
218         final MockActorContext actorContext = new MockActorContext(this.getSystem());
219         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(doNothingActorRef));
220         actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
221         actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()).toSerializable());
222
223         TransactionProxy transactionProxy =
224             new TransactionProxy(actorContext,
225                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
226
227
228         transactionProxy.read(TestModel.TEST_PATH);
229
230         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
231
232         Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
233
234         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
235
236         Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
237
238     }
239
240     @Test
241     public void testGetIdentifier(){
242         final Props props = Props.create(DoNothingActor.class);
243         final ActorRef doNothingActorRef = getSystem().actorOf(props);
244
245         final MockActorContext actorContext = new MockActorContext(this.getSystem());
246         actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
247
248         TransactionProxy transactionProxy =
249             new TransactionProxy(actorContext,
250                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
251
252         Assert.assertNotNull(transactionProxy.getIdentifier());
253     }
254
255     @Test
256     public void testClose(){
257         final Props props = Props.create(MessageCollectorActor.class);
258         final ActorRef actorRef = getSystem().actorOf(props);
259
260         final MockActorContext actorContext = new MockActorContext(this.getSystem());
261         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
262         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
263         actorContext.setExecuteRemoteOperationResponse("message");
264
265         TransactionProxy transactionProxy =
266             new TransactionProxy(actorContext,
267                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
268
269         transactionProxy.read(TestModel.TEST_PATH);
270
271         transactionProxy.close();
272
273         Object messages = testContext
274             .executeLocalOperation(actorRef, "messages",
275                 ActorContext.ASK_DURATION);
276
277         Assert.assertNotNull(messages);
278
279         Assert.assertTrue(messages instanceof List);
280
281         List<Object> listMessages = (List<Object>) messages;
282
283         Assert.assertEquals(1, listMessages.size());
284
285         Assert.assertTrue(listMessages.get(0).getClass().equals(CloseTransaction.SERIALIZABLE_CLASS));
286     }
287
288     private CreateTransactionReply createTransactionReply(ActorRef actorRef){
289         return CreateTransactionReply.newBuilder()
290             .setTransactionActorPath(actorRef.path().toString())
291             .setTransactionId("txn-1")
292             .build();
293     }
294 }