Keep DataChange registrations and notifications local
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5
6 import com.google.common.base.Optional;
7 import com.google.common.util.concurrent.ListenableFuture;
8 import com.google.common.util.concurrent.ListeningExecutorService;
9 import com.google.common.util.concurrent.MoreExecutors;
10
11 import junit.framework.Assert;
12
13 import org.junit.After;
14 import org.junit.Before;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
18 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
19 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
20 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
21 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
23 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
24 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
25 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
26 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
27 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
28 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
29 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
30 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
31 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
32 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
33 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
34 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
35
36 import java.util.List;
37 import java.util.concurrent.Executors;
38
39 public class TransactionProxyTest extends AbstractActorTest {
40
41     private final Configuration configuration = new MockConfiguration();
42
43     private final ActorContext testContext =
44         new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration );
45
46     private final ListeningExecutorService transactionExecutor =
47         MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
48
49     @Before
50     public void setUp(){
51         ShardStrategyFactory.setConfiguration(configuration);
52     }
53
54     @After
55     public void tearDown() {
56         transactionExecutor.shutdownNow();
57     }
58
59     @Test
60     public void testRead() throws Exception {
61         final Props props = Props.create(DoNothingActor.class);
62         final ActorRef actorRef = getSystem().actorOf(props);
63
64         final MockActorContext actorContext = new MockActorContext(this.getSystem());
65         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
66         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
67         actorContext.setExecuteRemoteOperationResponse("message");
68
69
70         TransactionProxy transactionProxy =
71             new TransactionProxy(actorContext,
72                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
73
74
75         ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
76             transactionProxy.read(TestModel.TEST_PATH);
77
78         Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
79
80         Assert.assertFalse(normalizedNodeOptional.isPresent());
81
82         actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
83             TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable());
84
85         read = transactionProxy.read(TestModel.TEST_PATH);
86
87         normalizedNodeOptional = read.get();
88
89         Assert.assertTrue(normalizedNodeOptional.isPresent());
90     }
91
92     @Test
93     public void testReadWhenANullIsReturned() throws Exception {
94         final Props props = Props.create(DoNothingActor.class);
95         final ActorRef actorRef = getSystem().actorOf(props);
96
97         final MockActorContext actorContext = new MockActorContext(this.getSystem());
98         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
99         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
100         actorContext.setExecuteRemoteOperationResponse("message");
101
102         TransactionProxy transactionProxy =
103             new TransactionProxy(actorContext,
104                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
105
106
107         ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
108             transactionProxy.read(TestModel.TEST_PATH);
109
110         Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
111
112         Assert.assertFalse(normalizedNodeOptional.isPresent());
113
114         actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
115            TestModel.createTestContext(), null).toSerializable());
116
117         read = transactionProxy.read(TestModel.TEST_PATH);
118
119         normalizedNodeOptional = read.get();
120
121         Assert.assertFalse(normalizedNodeOptional.isPresent());
122     }
123
124     @Test
125     public void testWrite() throws Exception {
126         final Props props = Props.create(MessageCollectorActor.class);
127         final ActorRef actorRef = getSystem().actorOf(props);
128
129         final MockActorContext actorContext = new MockActorContext(this.getSystem());
130         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
131         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
132         actorContext.setExecuteRemoteOperationResponse("message");
133
134         TransactionProxy transactionProxy =
135             new TransactionProxy(actorContext,
136                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
137
138         transactionProxy.write(TestModel.TEST_PATH,
139             ImmutableNodes.containerNode(TestModel.NAME_QNAME));
140
141         Object messages = testContext
142             .executeLocalOperation(actorRef, "messages",
143                 ActorContext.ASK_DURATION);
144
145         Assert.assertNotNull(messages);
146
147         Assert.assertTrue(messages instanceof List);
148
149         List<Object> listMessages = (List<Object>) messages;
150
151         Assert.assertEquals(1, listMessages.size());
152
153         Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
154     }
155
156     private Object createPrimaryFound(ActorRef actorRef) {
157         return new PrimaryFound(actorRef.path().toString()).toSerializable();
158     }
159
160     @Test
161     public void testMerge() throws Exception {
162         final Props props = Props.create(MessageCollectorActor.class);
163         final ActorRef actorRef = getSystem().actorOf(props);
164
165         final MockActorContext actorContext = new MockActorContext(this.getSystem());
166         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
167         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
168         actorContext.setExecuteRemoteOperationResponse("message");
169
170         TransactionProxy transactionProxy =
171             new TransactionProxy(actorContext,
172                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
173
174         transactionProxy.merge(TestModel.TEST_PATH,
175             ImmutableNodes.containerNode(TestModel.NAME_QNAME));
176
177         Object messages = testContext
178             .executeLocalOperation(actorRef, "messages",
179                 ActorContext.ASK_DURATION);
180
181         Assert.assertNotNull(messages);
182
183         Assert.assertTrue(messages instanceof List);
184
185         List<Object> listMessages = (List<Object>) messages;
186
187         Assert.assertEquals(1, listMessages.size());
188
189         Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
190     }
191
192     @Test
193     public void testDelete() throws Exception {
194         final Props props = Props.create(MessageCollectorActor.class);
195         final ActorRef actorRef = getSystem().actorOf(props);
196
197         final MockActorContext actorContext = new MockActorContext(this.getSystem());
198         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
199         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
200         actorContext.setExecuteRemoteOperationResponse("message");
201
202         TransactionProxy transactionProxy =
203             new TransactionProxy(actorContext,
204                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
205
206         transactionProxy.delete(TestModel.TEST_PATH);
207
208         Object messages = testContext
209             .executeLocalOperation(actorRef, "messages",
210                 ActorContext.ASK_DURATION);
211
212         Assert.assertNotNull(messages);
213
214         Assert.assertTrue(messages instanceof List);
215
216         List<Object> listMessages = (List<Object>) messages;
217
218         Assert.assertEquals(1, listMessages.size());
219
220         Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass());
221     }
222
223     @Test
224     public void testReady() throws Exception {
225         final Props props = Props.create(DoNothingActor.class);
226         final ActorRef doNothingActorRef = getSystem().actorOf(props);
227
228         final MockActorContext actorContext = new MockActorContext(this.getSystem());
229         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(doNothingActorRef));
230         actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
231         actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()).toSerializable());
232
233         TransactionProxy transactionProxy =
234             new TransactionProxy(actorContext,
235                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
236
237
238         transactionProxy.read(TestModel.TEST_PATH);
239
240         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
241
242         Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
243
244         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
245
246         Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
247
248     }
249
250     @Test
251     public void testGetIdentifier(){
252         final Props props = Props.create(DoNothingActor.class);
253         final ActorRef doNothingActorRef = getSystem().actorOf(props);
254
255         final MockActorContext actorContext = new MockActorContext(this.getSystem());
256         actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
257
258         TransactionProxy transactionProxy =
259             new TransactionProxy(actorContext,
260                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
261
262         Assert.assertNotNull(transactionProxy.getIdentifier());
263     }
264
265     @Test
266     public void testClose(){
267         final Props props = Props.create(MessageCollectorActor.class);
268         final ActorRef actorRef = getSystem().actorOf(props);
269
270         final MockActorContext actorContext = new MockActorContext(this.getSystem());
271         actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef));
272         actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
273         actorContext.setExecuteRemoteOperationResponse("message");
274
275         TransactionProxy transactionProxy =
276             new TransactionProxy(actorContext,
277                 TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext());
278
279         transactionProxy.read(TestModel.TEST_PATH);
280
281         transactionProxy.close();
282
283         Object messages = testContext
284             .executeLocalOperation(actorRef, "messages",
285                 ActorContext.ASK_DURATION);
286
287         Assert.assertNotNull(messages);
288
289         Assert.assertTrue(messages instanceof List);
290
291         List<Object> listMessages = (List<Object>) messages;
292
293         Assert.assertEquals(1, listMessages.size());
294
295         Assert.assertTrue(listMessages.get(0).getClass().equals(CloseTransaction.SERIALIZABLE_CLASS));
296     }
297
298     private CreateTransactionReply createTransactionReply(ActorRef actorRef){
299         return CreateTransactionReply.newBuilder()
300             .setTransactionActorPath(actorRef.path().toString())
301             .setTransactionId("txn-1")
302             .build();
303     }
304 }