Switch to using protocol buffer serialization for the WriteData message
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import com.google.common.base.Optional;
6 import com.google.common.util.concurrent.ListenableFuture;
7 import junit.framework.Assert;
8 import org.junit.Test;
9 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
10 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
11 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
12 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
13 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
14 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
15 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
16 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
17 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
18 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
19 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
20 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
22 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
23 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
24
25 import java.util.List;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28
29 public class TransactionProxyTest extends AbstractActorTest {
30
31     private ExecutorService transactionExecutor =
32         Executors.newSingleThreadExecutor();
33
34     @Test
35     public void testRead() throws Exception {
36         final Props props = Props.create(DoNothingActor.class);
37         final ActorRef actorRef = getSystem().actorOf(props);
38
39         final MockActorContext actorContext = new MockActorContext(this.getSystem());
40         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
41         actorContext.setExecuteRemoteOperationResponse("message");
42
43         TransactionProxy transactionProxy =
44             new TransactionProxy(actorContext,
45                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
46
47
48         ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
49             transactionProxy.read(TestModel.TEST_PATH);
50
51         Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
52
53         Assert.assertFalse(normalizedNodeOptional.isPresent());
54
55         actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
56             ImmutableNodes.containerNode(TestModel.TEST_QNAME)));
57
58         read = transactionProxy.read(TestModel.TEST_PATH);
59
60         normalizedNodeOptional = read.get();
61
62         Assert.assertTrue(normalizedNodeOptional.isPresent());
63     }
64
65     @Test
66     public void testReadWhenANullIsReturned() throws Exception {
67         final Props props = Props.create(DoNothingActor.class);
68         final ActorRef actorRef = getSystem().actorOf(props);
69
70         final MockActorContext actorContext = new MockActorContext(this.getSystem());
71         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
72         actorContext.setExecuteRemoteOperationResponse("message");
73
74         TransactionProxy transactionProxy =
75             new TransactionProxy(actorContext,
76                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
77
78
79         ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
80             transactionProxy.read(TestModel.TEST_PATH);
81
82         Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
83
84         Assert.assertFalse(normalizedNodeOptional.isPresent());
85
86         actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
87             null));
88
89         read = transactionProxy.read(TestModel.TEST_PATH);
90
91         normalizedNodeOptional = read.get();
92
93         Assert.assertFalse(normalizedNodeOptional.isPresent());
94     }
95
96     @Test
97     public void testWrite() throws Exception {
98         final Props props = Props.create(MessageCollectorActor.class);
99         final ActorRef actorRef = getSystem().actorOf(props);
100
101         final MockActorContext actorContext = new MockActorContext(this.getSystem());
102         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
103         actorContext.setExecuteRemoteOperationResponse("message");
104
105         TransactionProxy transactionProxy =
106             new TransactionProxy(actorContext,
107                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
108
109         transactionProxy.write(TestModel.TEST_PATH,
110             ImmutableNodes.containerNode(TestModel.NAME_QNAME));
111
112         ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
113         Object messages = testContext
114             .executeLocalOperation(actorRef, "messages",
115                 ActorContext.ASK_DURATION);
116
117         Assert.assertNotNull(messages);
118
119         Assert.assertTrue(messages instanceof List);
120
121         List<Object> listMessages = (List<Object>) messages;
122
123         Assert.assertEquals(1, listMessages.size());
124
125         Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
126     }
127
128     @Test
129     public void testMerge() throws Exception {
130         final Props props = Props.create(MessageCollectorActor.class);
131         final ActorRef actorRef = getSystem().actorOf(props);
132
133         final MockActorContext actorContext = new MockActorContext(this.getSystem());
134         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
135         actorContext.setExecuteRemoteOperationResponse("message");
136
137         TransactionProxy transactionProxy =
138             new TransactionProxy(actorContext,
139                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
140
141         transactionProxy.merge(TestModel.TEST_PATH,
142             ImmutableNodes.containerNode(TestModel.NAME_QNAME));
143
144         ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
145         Object messages = testContext
146             .executeLocalOperation(actorRef, "messages",
147                 ActorContext.ASK_DURATION);
148
149         Assert.assertNotNull(messages);
150
151         Assert.assertTrue(messages instanceof List);
152
153         List<Object> listMessages = (List<Object>) messages;
154
155         Assert.assertEquals(1, listMessages.size());
156
157         Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
158     }
159
160     @Test
161     public void testDelete() throws Exception {
162         final Props props = Props.create(MessageCollectorActor.class);
163         final ActorRef actorRef = getSystem().actorOf(props);
164
165         final MockActorContext actorContext = new MockActorContext(this.getSystem());
166         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
167         actorContext.setExecuteRemoteOperationResponse("message");
168
169         TransactionProxy transactionProxy =
170             new TransactionProxy(actorContext,
171                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
172
173         transactionProxy.delete(TestModel.TEST_PATH);
174
175         ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
176         Object messages = testContext
177             .executeLocalOperation(actorRef, "messages",
178                 ActorContext.ASK_DURATION);
179
180         Assert.assertNotNull(messages);
181
182         Assert.assertTrue(messages instanceof List);
183
184         List<Object> listMessages = (List<Object>) messages;
185
186         Assert.assertEquals(1, listMessages.size());
187
188         Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
189     }
190
191     @Test
192     public void testReady() throws Exception {
193         final Props props = Props.create(DoNothingActor.class);
194         final ActorRef doNothingActorRef = getSystem().actorOf(props);
195
196         final MockActorContext actorContext = new MockActorContext(this.getSystem());
197         actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
198         actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()));
199
200         TransactionProxy transactionProxy =
201             new TransactionProxy(actorContext,
202                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
203
204
205         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
206
207         Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
208
209         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
210
211         Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
212
213     }
214
215     @Test
216     public void testGetIdentifier(){
217         final Props props = Props.create(DoNothingActor.class);
218         final ActorRef doNothingActorRef = getSystem().actorOf(props);
219
220         final MockActorContext actorContext = new MockActorContext(this.getSystem());
221         actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
222
223         TransactionProxy transactionProxy =
224             new TransactionProxy(actorContext,
225                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
226
227         Assert.assertNotNull(transactionProxy.getIdentifier());
228     }
229
230     @Test
231     public void testClose(){
232         final Props props = Props.create(MessageCollectorActor.class);
233         final ActorRef actorRef = getSystem().actorOf(props);
234
235         final MockActorContext actorContext = new MockActorContext(this.getSystem());
236         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
237         actorContext.setExecuteRemoteOperationResponse("message");
238
239         TransactionProxy transactionProxy =
240             new TransactionProxy(actorContext,
241                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
242
243         transactionProxy.close();
244
245         ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
246         Object messages = testContext
247             .executeLocalOperation(actorRef, "messages",
248                 ActorContext.ASK_DURATION);
249
250         Assert.assertNotNull(messages);
251
252         Assert.assertTrue(messages instanceof List);
253
254         List<Object> listMessages = (List<Object>) messages;
255
256         Assert.assertEquals(1, listMessages.size());
257
258         Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction);
259     }
260
261     private CreateTransactionReply createTransactionReply(ActorRef actorRef){
262         return CreateTransactionReply.newBuilder()
263             .setTransactionActorPath(actorRef.path().toString())
264             .setTransactionId("txn-1")
265             .build();
266     }
267 }