Bug-2136 : Clustering : When a transaction is local then do not serialize the Reading...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.actor.Terminated;
6 import akka.testkit.JavaTestKit;
7 import akka.testkit.TestActorRef;
8 import com.google.common.util.concurrent.ListeningExecutorService;
9 import com.google.common.util.concurrent.MoreExecutors;
10 import org.junit.BeforeClass;
11 import org.junit.Test;
12 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
13 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
14 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
15 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
16 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
18 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
19 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
20 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
21 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
22 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
23 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
29 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
30 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
31 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
32 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
33 import org.opendaylight.controller.cluster.datastore.modification.Modification;
34 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
35 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
36 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
37 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import scala.concurrent.duration.Duration;
42 import java.util.Collections;
43 import java.util.concurrent.TimeUnit;
44 import static org.junit.Assert.assertEquals;
45 import static org.junit.Assert.assertFalse;
46 import static org.junit.Assert.assertNotNull;
47 import static org.junit.Assert.assertTrue;
48
49 public class ShardTransactionTest extends AbstractActorTest {
50     private static ListeningExecutorService storeExecutor =
51         MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
52
53     private static final InMemoryDOMDataStore store =
54         new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
55
56     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
57
58     private static final ShardIdentifier SHARD_IDENTIFIER =
59         ShardIdentifier.builder().memberName("member-1")
60             .shardName("inventory").type("config").build();
61
62     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
63
64     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
65
66     @BeforeClass
67     public static void staticSetup() {
68         store.onGlobalContextUpdated(testSchemaContext);
69     }
70
71     private ActorRef createShard(){
72         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
73             Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()));
74     }
75
76     @Test
77     public void testOnReceiveReadData() throws Exception {
78         new JavaTestKit(getSystem()) {{
79             final ActorRef shard = createShard();
80             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
81                     testSchemaContext, datastoreContext, shardStats, "txn");
82
83             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
84
85             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
86                     testSchemaContext, datastoreContext, shardStats, "txn");
87
88             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
89         }
90
91         private void testOnReceiveReadData(final ActorRef subject) {
92             //serialized read
93             subject.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
94                 getRef());
95
96             ShardTransactionMessages.ReadDataReply replySerialized =
97                 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
98
99             assertNotNull(ReadDataReply.fromSerializable(
100                 testSchemaContext,YangInstanceIdentifier.builder().build(), replySerialized)
101                 .getNormalizedNode());
102
103             // unserialized read
104             subject.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
105
106             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
107
108             assertNotNull(reply.getNormalizedNode());
109         }};
110     }
111
112     @Test
113     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
114         new JavaTestKit(getSystem()) {{
115             final ActorRef shard = createShard();
116             Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
117                     testSchemaContext, datastoreContext, shardStats, "txn");
118
119             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
120                     props, "testReadDataWhenDataNotFoundRO"));
121
122             props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
123                     testSchemaContext, datastoreContext, shardStats, "txn");
124
125             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
126                     props, "testReadDataWhenDataNotFoundRW"));
127         }
128
129         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef subject) {
130             // serialized read
131             subject.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
132
133             ShardTransactionMessages.ReadDataReply replySerialized =
134                 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
135
136             assertTrue(ReadDataReply.fromSerializable(
137                 testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
138
139             // unserialized read
140             subject.tell(new ReadData(TestModel.TEST_PATH),getRef());
141
142             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
143
144             assertTrue(reply.getNormalizedNode() == null);
145         }};
146     }
147
148     @Test
149     public void testOnReceiveDataExistsPositive() throws Exception {
150         new JavaTestKit(getSystem()) {{
151             final ActorRef shard = createShard();
152             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
153                     testSchemaContext, datastoreContext, shardStats, "txn");
154
155             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
156
157             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
158                     testSchemaContext, datastoreContext, shardStats, "txn");
159
160             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
161         }
162
163         private void testOnReceiveDataExistsPositive(final ActorRef subject) {
164             subject.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
165                 getRef());
166
167             ShardTransactionMessages.DataExistsReply replySerialized =
168                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
169
170             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
171
172             // unserialized read
173             subject.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
174
175             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
176
177             assertTrue(reply.exists());
178         }};
179     }
180
181     @Test
182     public void testOnReceiveDataExistsNegative() throws Exception {
183         new JavaTestKit(getSystem()) {{
184             final ActorRef shard = createShard();
185             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
186                     testSchemaContext, datastoreContext, shardStats, "txn");
187
188             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
189
190             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
191                     testSchemaContext, datastoreContext, shardStats, "txn");
192
193             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
194         }
195
196         private void testOnReceiveDataExistsNegative(final ActorRef subject) {
197             subject.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
198
199             ShardTransactionMessages.DataExistsReply replySerialized =
200                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
201
202             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
203
204             // unserialized read
205             subject.tell(new DataExists(TestModel.TEST_PATH),getRef());
206
207             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
208
209             assertFalse(reply.exists());
210         }};
211     }
212
213     private void assertModification(final ActorRef subject,
214         final Class<? extends Modification> modificationType) {
215         new JavaTestKit(getSystem()) {{
216             subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
217
218             CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
219                     GetCompositeModificationReply.class).getModification();
220
221             assertTrue(compositeModification.getModifications().size() == 1);
222             assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
223         }};
224     }
225
226     @Test
227     public void testOnReceiveWriteData() throws Exception {
228         new JavaTestKit(getSystem()) {{
229             final ActorRef shard = createShard();
230             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
231                     testSchemaContext, datastoreContext, shardStats, "txn");
232             final ActorRef subject =
233                 getSystem().actorOf(props, "testWriteData");
234
235             subject.tell(new WriteData(TestModel.TEST_PATH,
236                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
237                 getRef());
238
239             ShardTransactionMessages.WriteDataReply replySerialized =
240                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
241
242             assertModification(subject, WriteModification.class);
243
244             //unserialized write
245             subject.tell(new WriteData(TestModel.TEST_PATH,
246                 ImmutableNodes.containerNode(TestModel.TEST_QNAME),
247                 TestModel.createTestContext()),
248                 getRef());
249
250             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
251         }};
252     }
253
254     @Test
255     public void testOnReceiveMergeData() throws Exception {
256         new JavaTestKit(getSystem()) {{
257             final ActorRef shard = createShard();
258             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
259                     testSchemaContext, datastoreContext, shardStats, "txn");
260             final ActorRef subject =
261                 getSystem().actorOf(props, "testMergeData");
262
263             subject.tell(new MergeData(TestModel.TEST_PATH,
264                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
265                 getRef());
266
267             ShardTransactionMessages.MergeDataReply replySerialized =
268                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
269
270             assertModification(subject, MergeModification.class);
271
272             //unserialized merge
273             subject.tell(new MergeData(TestModel.TEST_PATH,
274                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
275                 getRef());
276
277             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
278         }};
279     }
280
281     @Test
282     public void testOnReceiveDeleteData() throws Exception {
283         new JavaTestKit(getSystem()) {{
284             final ActorRef shard = createShard();
285             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
286                     testSchemaContext, datastoreContext, shardStats, "txn");
287             final ActorRef subject =
288                 getSystem().actorOf(props, "testDeleteData");
289
290             subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
291
292             ShardTransactionMessages.DeleteDataReply replySerialized =
293                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
294
295             assertModification(subject, DeleteModification.class);
296
297             //unserialized merge
298             subject.tell(new DeleteData(TestModel.TEST_PATH), getRef());
299
300             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
301         }};
302     }
303
304
305     @Test
306     public void testOnReceiveReadyTransaction() throws Exception {
307         new JavaTestKit(getSystem()) {{
308             final ActorRef shard = createShard();
309             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
310                     testSchemaContext, datastoreContext, shardStats, "txn");
311             final ActorRef subject =
312                 getSystem().actorOf(props, "testReadyTransaction");
313
314             subject.tell(new ReadyTransaction().toSerializable(), getRef());
315
316             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
317         }};
318
319         // test
320         new JavaTestKit(getSystem()) {{
321             final ActorRef shard = createShard();
322             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
323                 testSchemaContext, datastoreContext, shardStats, "txn");
324             final ActorRef subject =
325                 getSystem().actorOf(props, "testReadyTransaction2");
326
327             subject.tell(new ReadyTransaction(), getRef());
328
329             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
330         }};
331
332     }
333
334     @SuppressWarnings("unchecked")
335     @Test
336     public void testOnReceiveCloseTransaction() throws Exception {
337         new JavaTestKit(getSystem()) {{
338             final ActorRef shard = createShard();
339             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
340                     testSchemaContext, datastoreContext, shardStats, "txn");
341             final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction");
342
343             watch(subject);
344
345             subject.tell(new CloseTransaction().toSerializable(), getRef());
346
347             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
348             expectMsgClass(duration("3 seconds"), Terminated.class);
349         }};
350     }
351
352     @Test(expected=UnknownMessageException.class)
353     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
354         final ActorRef shard = createShard();
355         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
356                 testSchemaContext, datastoreContext, shardStats, "txn");
357         final TestActorRef subject = TestActorRef.apply(props,getSystem());
358
359         subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
360     }
361
362     @Test
363     public void testShardTransactionInactivity() {
364
365         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
366                 Duration.create(500, TimeUnit.MILLISECONDS)).build();
367
368         new JavaTestKit(getSystem()) {{
369             final ActorRef shard = createShard();
370             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
371                     testSchemaContext, datastoreContext, shardStats, "txn");
372             final ActorRef subject =
373                 getSystem().actorOf(props, "testShardTransactionInactivity");
374
375             watch(subject);
376
377             // The shard Tx actor should receive a ReceiveTimeout message and self-destruct.
378
379             final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
380                 // do not put code outside this method, will run afterwards
381                 @Override
382                 protected String match(Object in) {
383                     if (in instanceof Terminated) {
384                         return "match";
385                     } else {
386                         throw noMatch();
387                     }
388                 }
389             }.get(); // this extracts the received message
390
391             assertEquals("match", termination);
392         }};
393     }
394 }