BUG-650: remove executor abstraction
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import akka.actor.ActorRef;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import java.util.Collections;
14 import java.util.concurrent.TimeUnit;
15 import org.junit.BeforeClass;
16 import org.junit.Test;
17 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
18 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
19 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
25 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
27 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
28 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
29 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
35 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
36 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
37 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
38 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
39 import org.opendaylight.controller.cluster.datastore.modification.Modification;
40 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
41 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
42 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
43 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import scala.concurrent.duration.Duration;
48
49 public class ShardTransactionTest extends AbstractActorTest {
50     private static final InMemoryDOMDataStore store =
51         new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
52
53     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
54
55     private static final ShardIdentifier SHARD_IDENTIFIER =
56         ShardIdentifier.builder().memberName("member-1")
57             .shardName("inventory").type("config").build();
58
59     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
60
61     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
62
63     @BeforeClass
64     public static void staticSetup() {
65         store.onGlobalContextUpdated(testSchemaContext);
66     }
67
68     private ActorRef createShard(){
69         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
70             Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
71     }
72
73     @Test
74     public void testOnReceiveReadData() throws Exception {
75         new JavaTestKit(getSystem()) {{
76             final ActorRef shard = createShard();
77             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
78                     testSchemaContext, datastoreContext, shardStats, "txn",
79                     CreateTransaction.CURRENT_VERSION);
80
81             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
82
83             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
84                     testSchemaContext, datastoreContext, shardStats, "txn",
85                     CreateTransaction.CURRENT_VERSION);
86
87             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
88         }
89
90         private void testOnReceiveReadData(final ActorRef transaction) {
91             //serialized read
92             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
93                 getRef());
94
95             ShardTransactionMessages.ReadDataReply replySerialized =
96                 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
97
98             assertNotNull(ReadDataReply.fromSerializable(
99                 testSchemaContext,YangInstanceIdentifier.builder().build(), replySerialized)
100                 .getNormalizedNode());
101
102             // unserialized read
103             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
104
105             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
106
107             assertNotNull(reply.getNormalizedNode());
108         }};
109     }
110
111     @Test
112     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
113         new JavaTestKit(getSystem()) {{
114             final ActorRef shard = createShard();
115             Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
116                     testSchemaContext, datastoreContext, shardStats, "txn",
117                     CreateTransaction.CURRENT_VERSION);
118
119             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
120                     props, "testReadDataWhenDataNotFoundRO"));
121
122             props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
123                     testSchemaContext, datastoreContext, shardStats, "txn",
124                     CreateTransaction.CURRENT_VERSION);
125
126             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
127                     props, "testReadDataWhenDataNotFoundRW"));
128         }
129
130         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
131             // serialized read
132             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
133
134             ShardTransactionMessages.ReadDataReply replySerialized =
135                 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
136
137             assertTrue(ReadDataReply.fromSerializable(
138                 testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
139
140             // unserialized read
141             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
142
143             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
144
145             assertTrue(reply.getNormalizedNode() == null);
146         }};
147     }
148
149     @Test
150     public void testOnReceiveDataExistsPositive() throws Exception {
151         new JavaTestKit(getSystem()) {{
152             final ActorRef shard = createShard();
153             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
154                     testSchemaContext, datastoreContext, shardStats, "txn",
155                     CreateTransaction.CURRENT_VERSION);
156
157             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
158
159             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
160                     testSchemaContext, datastoreContext, shardStats, "txn",
161                     CreateTransaction.CURRENT_VERSION);
162
163             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
164         }
165
166         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
167             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
168                 getRef());
169
170             ShardTransactionMessages.DataExistsReply replySerialized =
171                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
172
173             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
174
175             // unserialized read
176             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
177
178             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
179
180             assertTrue(reply.exists());
181         }};
182     }
183
184     @Test
185     public void testOnReceiveDataExistsNegative() throws Exception {
186         new JavaTestKit(getSystem()) {{
187             final ActorRef shard = createShard();
188             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
189                     testSchemaContext, datastoreContext, shardStats, "txn",
190                     CreateTransaction.CURRENT_VERSION);
191
192             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
193
194             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
195                     testSchemaContext, datastoreContext, shardStats, "txn",
196                     CreateTransaction.CURRENT_VERSION);
197
198             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
199         }
200
201         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
202             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
203
204             ShardTransactionMessages.DataExistsReply replySerialized =
205                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
206
207             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
208
209             // unserialized read
210             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
211
212             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
213
214             assertFalse(reply.exists());
215         }};
216     }
217
218     private void assertModification(final ActorRef subject,
219         final Class<? extends Modification> modificationType) {
220         new JavaTestKit(getSystem()) {{
221             subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
222
223             CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
224                     GetCompositeModificationReply.class).getModification();
225
226             assertTrue(compositeModification.getModifications().size() == 1);
227             assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
228         }};
229     }
230
231     @Test
232     public void testOnReceiveWriteData() throws Exception {
233         new JavaTestKit(getSystem()) {{
234             final ActorRef shard = createShard();
235             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
236                     testSchemaContext, datastoreContext, shardStats, "txn",
237                     CreateTransaction.CURRENT_VERSION);
238             final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
239
240             transaction.tell(new WriteData(TestModel.TEST_PATH,
241                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
242                 getRef());
243
244             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
245
246             assertModification(transaction, WriteModification.class);
247
248             //unserialized write
249             transaction.tell(new WriteData(TestModel.TEST_PATH,
250                 ImmutableNodes.containerNode(TestModel.TEST_QNAME),
251                 TestModel.createTestContext()),
252                 getRef());
253
254             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
255         }};
256     }
257
258     @Test
259     public void testOnReceiveMergeData() throws Exception {
260         new JavaTestKit(getSystem()) {{
261             final ActorRef shard = createShard();
262             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
263                     testSchemaContext, datastoreContext, shardStats, "txn",
264                     CreateTransaction.CURRENT_VERSION);
265             final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
266
267             transaction.tell(new MergeData(TestModel.TEST_PATH,
268                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
269                 getRef());
270
271             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
272
273             assertModification(transaction, MergeModification.class);
274
275             //unserialized merge
276             transaction.tell(new MergeData(TestModel.TEST_PATH,
277                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
278                 getRef());
279
280             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
281         }};
282     }
283
284     @Test
285     public void testOnReceiveDeleteData() throws Exception {
286         new JavaTestKit(getSystem()) {{
287             final ActorRef shard = createShard();
288             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
289                     testSchemaContext, datastoreContext, shardStats, "txn",
290                     CreateTransaction.CURRENT_VERSION);
291             final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
292
293             transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
294
295             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
296
297             assertModification(transaction, DeleteModification.class);
298
299             //unserialized merge
300             transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
301
302             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
303         }};
304     }
305
306
307     @Test
308     public void testOnReceiveReadyTransaction() throws Exception {
309         new JavaTestKit(getSystem()) {{
310             final ActorRef shard = createShard();
311             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
312                     testSchemaContext, datastoreContext, shardStats, "txn",
313                     CreateTransaction.CURRENT_VERSION);
314             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
315
316             watch(transaction);
317
318             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
319
320             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
321                     Terminated.class);
322             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
323                     Terminated.class);
324         }};
325
326         // test
327         new JavaTestKit(getSystem()) {{
328             final ActorRef shard = createShard();
329             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
330                 testSchemaContext, datastoreContext, shardStats, "txn",
331                 CreateTransaction.CURRENT_VERSION);
332             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
333
334             watch(transaction);
335
336             transaction.tell(new ReadyTransaction(), getRef());
337
338             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
339                     Terminated.class);
340             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
341                     Terminated.class);
342         }};
343
344     }
345
346     @SuppressWarnings("unchecked")
347     @Test
348     public void testOnReceiveCloseTransaction() throws Exception {
349         new JavaTestKit(getSystem()) {{
350             final ActorRef shard = createShard();
351             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
352                     testSchemaContext, datastoreContext, shardStats, "txn",
353                     CreateTransaction.CURRENT_VERSION);
354             final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
355
356             watch(transaction);
357
358             transaction.tell(new CloseTransaction().toSerializable(), getRef());
359
360             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
361             expectTerminated(duration("3 seconds"), transaction);
362         }};
363     }
364
365     @Test(expected=UnknownMessageException.class)
366     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
367         final ActorRef shard = createShard();
368         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
369                 testSchemaContext, datastoreContext, shardStats, "txn",
370                 CreateTransaction.CURRENT_VERSION);
371         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
372
373         transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
374     }
375
376     @Test
377     public void testShardTransactionInactivity() {
378
379         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
380                 Duration.create(500, TimeUnit.MILLISECONDS)).build();
381
382         new JavaTestKit(getSystem()) {{
383             final ActorRef shard = createShard();
384             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
385                     testSchemaContext, datastoreContext, shardStats, "txn",
386                     CreateTransaction.CURRENT_VERSION);
387             final ActorRef transaction =
388                 getSystem().actorOf(props, "testShardTransactionInactivity");
389
390             watch(transaction);
391
392             expectMsgClass(duration("3 seconds"), Terminated.class);
393         }};
394     }
395 }