Merge "Fix a typo in javadoc"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import akka.actor.ActorRef;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.util.concurrent.ListeningExecutorService;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.Collections;
15 import java.util.concurrent.TimeUnit;
16 import org.junit.BeforeClass;
17 import org.junit.Test;
18 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
19 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
20 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
21 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
26 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
27 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
28 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
29 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
30 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
36 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
37 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
38 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
39 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
40 import org.opendaylight.controller.cluster.datastore.modification.Modification;
41 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
42 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
43 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
44 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
47 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
48 import scala.concurrent.duration.Duration;
49
50 public class ShardTransactionTest extends AbstractActorTest {
51     private static ListeningExecutorService storeExecutor =
52         MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
53
54     private static final InMemoryDOMDataStore store =
55         new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
56
57     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
58
59     private static final ShardIdentifier SHARD_IDENTIFIER =
60         ShardIdentifier.builder().memberName("member-1")
61             .shardName("inventory").type("config").build();
62
63     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
64
65     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
66
67     @BeforeClass
68     public static void staticSetup() {
69         store.onGlobalContextUpdated(testSchemaContext);
70     }
71
72     private ActorRef createShard(){
73         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
74             Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
75     }
76
77     @Test
78     public void testOnReceiveReadData() throws Exception {
79         new JavaTestKit(getSystem()) {{
80             final ActorRef shard = createShard();
81             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
82                     testSchemaContext, datastoreContext, shardStats, "txn",
83                     CreateTransaction.CURRENT_VERSION);
84
85             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
86
87             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
88                     testSchemaContext, datastoreContext, shardStats, "txn",
89                     CreateTransaction.CURRENT_VERSION);
90
91             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
92         }
93
94         private void testOnReceiveReadData(final ActorRef transaction) {
95             //serialized read
96             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
97                 getRef());
98
99             ShardTransactionMessages.ReadDataReply replySerialized =
100                 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
101
102             assertNotNull(ReadDataReply.fromSerializable(
103                 testSchemaContext,YangInstanceIdentifier.builder().build(), replySerialized)
104                 .getNormalizedNode());
105
106             // unserialized read
107             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
108
109             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
110
111             assertNotNull(reply.getNormalizedNode());
112         }};
113     }
114
115     @Test
116     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
117         new JavaTestKit(getSystem()) {{
118             final ActorRef shard = createShard();
119             Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
120                     testSchemaContext, datastoreContext, shardStats, "txn",
121                     CreateTransaction.CURRENT_VERSION);
122
123             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
124                     props, "testReadDataWhenDataNotFoundRO"));
125
126             props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
127                     testSchemaContext, datastoreContext, shardStats, "txn",
128                     CreateTransaction.CURRENT_VERSION);
129
130             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
131                     props, "testReadDataWhenDataNotFoundRW"));
132         }
133
134         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
135             // serialized read
136             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
137
138             ShardTransactionMessages.ReadDataReply replySerialized =
139                 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
140
141             assertTrue(ReadDataReply.fromSerializable(
142                 testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
143
144             // unserialized read
145             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
146
147             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
148
149             assertTrue(reply.getNormalizedNode() == null);
150         }};
151     }
152
153     @Test
154     public void testOnReceiveDataExistsPositive() throws Exception {
155         new JavaTestKit(getSystem()) {{
156             final ActorRef shard = createShard();
157             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
158                     testSchemaContext, datastoreContext, shardStats, "txn",
159                     CreateTransaction.CURRENT_VERSION);
160
161             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
162
163             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
164                     testSchemaContext, datastoreContext, shardStats, "txn",
165                     CreateTransaction.CURRENT_VERSION);
166
167             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
168         }
169
170         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
171             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
172                 getRef());
173
174             ShardTransactionMessages.DataExistsReply replySerialized =
175                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
176
177             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
178
179             // unserialized read
180             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
181
182             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
183
184             assertTrue(reply.exists());
185         }};
186     }
187
188     @Test
189     public void testOnReceiveDataExistsNegative() throws Exception {
190         new JavaTestKit(getSystem()) {{
191             final ActorRef shard = createShard();
192             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
193                     testSchemaContext, datastoreContext, shardStats, "txn",
194                     CreateTransaction.CURRENT_VERSION);
195
196             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
197
198             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
199                     testSchemaContext, datastoreContext, shardStats, "txn",
200                     CreateTransaction.CURRENT_VERSION);
201
202             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
203         }
204
205         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
206             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
207
208             ShardTransactionMessages.DataExistsReply replySerialized =
209                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
210
211             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
212
213             // unserialized read
214             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
215
216             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
217
218             assertFalse(reply.exists());
219         }};
220     }
221
222     private void assertModification(final ActorRef subject,
223         final Class<? extends Modification> modificationType) {
224         new JavaTestKit(getSystem()) {{
225             subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
226
227             CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
228                     GetCompositeModificationReply.class).getModification();
229
230             assertTrue(compositeModification.getModifications().size() == 1);
231             assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
232         }};
233     }
234
235     @Test
236     public void testOnReceiveWriteData() throws Exception {
237         new JavaTestKit(getSystem()) {{
238             final ActorRef shard = createShard();
239             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
240                     testSchemaContext, datastoreContext, shardStats, "txn",
241                     CreateTransaction.CURRENT_VERSION);
242             final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
243
244             transaction.tell(new WriteData(TestModel.TEST_PATH,
245                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
246                 getRef());
247
248             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
249
250             assertModification(transaction, WriteModification.class);
251
252             //unserialized write
253             transaction.tell(new WriteData(TestModel.TEST_PATH,
254                 ImmutableNodes.containerNode(TestModel.TEST_QNAME),
255                 TestModel.createTestContext()),
256                 getRef());
257
258             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
259         }};
260     }
261
262     @Test
263     public void testOnReceiveMergeData() throws Exception {
264         new JavaTestKit(getSystem()) {{
265             final ActorRef shard = createShard();
266             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
267                     testSchemaContext, datastoreContext, shardStats, "txn",
268                     CreateTransaction.CURRENT_VERSION);
269             final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
270
271             transaction.tell(new MergeData(TestModel.TEST_PATH,
272                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
273                 getRef());
274
275             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
276
277             assertModification(transaction, MergeModification.class);
278
279             //unserialized merge
280             transaction.tell(new MergeData(TestModel.TEST_PATH,
281                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
282                 getRef());
283
284             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
285         }};
286     }
287
288     @Test
289     public void testOnReceiveDeleteData() throws Exception {
290         new JavaTestKit(getSystem()) {{
291             final ActorRef shard = createShard();
292             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
293                     testSchemaContext, datastoreContext, shardStats, "txn",
294                     CreateTransaction.CURRENT_VERSION);
295             final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
296
297             transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
298
299             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
300
301             assertModification(transaction, DeleteModification.class);
302
303             //unserialized merge
304             transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
305
306             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
307         }};
308     }
309
310
311     @Test
312     public void testOnReceiveReadyTransaction() throws Exception {
313         new JavaTestKit(getSystem()) {{
314             final ActorRef shard = createShard();
315             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
316                     testSchemaContext, datastoreContext, shardStats, "txn",
317                     CreateTransaction.CURRENT_VERSION);
318             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
319
320             watch(transaction);
321
322             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
323
324             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
325                     Terminated.class);
326             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
327                     Terminated.class);
328         }};
329
330         // test
331         new JavaTestKit(getSystem()) {{
332             final ActorRef shard = createShard();
333             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
334                 testSchemaContext, datastoreContext, shardStats, "txn",
335                 CreateTransaction.CURRENT_VERSION);
336             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
337
338             watch(transaction);
339
340             transaction.tell(new ReadyTransaction(), getRef());
341
342             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
343                     Terminated.class);
344             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
345                     Terminated.class);
346         }};
347
348     }
349
350     @SuppressWarnings("unchecked")
351     @Test
352     public void testOnReceiveCloseTransaction() throws Exception {
353         new JavaTestKit(getSystem()) {{
354             final ActorRef shard = createShard();
355             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
356                     testSchemaContext, datastoreContext, shardStats, "txn",
357                     CreateTransaction.CURRENT_VERSION);
358             final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
359
360             watch(transaction);
361
362             transaction.tell(new CloseTransaction().toSerializable(), getRef());
363
364             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
365             expectTerminated(duration("3 seconds"), transaction);
366         }};
367     }
368
369     @Test(expected=UnknownMessageException.class)
370     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
371         final ActorRef shard = createShard();
372         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
373                 testSchemaContext, datastoreContext, shardStats, "txn",
374                 CreateTransaction.CURRENT_VERSION);
375         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
376
377         transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
378     }
379
380     @Test
381     public void testShardTransactionInactivity() {
382
383         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
384                 Duration.create(500, TimeUnit.MILLISECONDS)).build();
385
386         new JavaTestKit(getSystem()) {{
387             final ActorRef shard = createShard();
388             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
389                     testSchemaContext, datastoreContext, shardStats, "txn",
390                     CreateTransaction.CURRENT_VERSION);
391             final ActorRef transaction =
392                 getSystem().actorOf(props, "testShardTransactionInactivity");
393
394             watch(transaction);
395
396             expectMsgClass(duration("3 seconds"), Terminated.class);
397         }};
398     }
399 }