Bug 2252: Terminate ShardWriteTransaction actor on ready
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.actor.Terminated;
6 import akka.testkit.JavaTestKit;
7 import akka.testkit.TestActorRef;
8 import com.google.common.util.concurrent.ListeningExecutorService;
9 import com.google.common.util.concurrent.MoreExecutors;
10 import org.junit.BeforeClass;
11 import org.junit.Test;
12 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
13 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
14 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
15 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
16 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
18 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
19 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
20 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
21 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
22 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
23 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
29 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
30 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
31 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
32 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
33 import org.opendaylight.controller.cluster.datastore.modification.Modification;
34 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
35 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
36 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
37 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import scala.concurrent.duration.Duration;
42 import java.util.Collections;
43 import java.util.concurrent.TimeUnit;
44 import static org.junit.Assert.assertEquals;
45 import static org.junit.Assert.assertFalse;
46 import static org.junit.Assert.assertNotNull;
47 import static org.junit.Assert.assertTrue;
48
49 public class ShardTransactionTest extends AbstractActorTest {
50     private static ListeningExecutorService storeExecutor =
51         MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
52
53     private static final InMemoryDOMDataStore store =
54         new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
55
56     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
57
58     private static final ShardIdentifier SHARD_IDENTIFIER =
59         ShardIdentifier.builder().memberName("member-1")
60             .shardName("inventory").type("config").build();
61
62     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
63
64     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
65
66     @BeforeClass
67     public static void staticSetup() {
68         store.onGlobalContextUpdated(testSchemaContext);
69     }
70
71     private ActorRef createShard(){
72         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
73             Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()));
74     }
75
76     @Test
77     public void testOnReceiveReadData() throws Exception {
78         new JavaTestKit(getSystem()) {{
79             final ActorRef shard = createShard();
80             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
81                     testSchemaContext, datastoreContext, shardStats, "txn");
82
83             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
84
85             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
86                     testSchemaContext, datastoreContext, shardStats, "txn");
87
88             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
89         }
90
91         private void testOnReceiveReadData(final ActorRef transaction) {
92             //serialized read
93             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
94                 getRef());
95
96             ShardTransactionMessages.ReadDataReply replySerialized =
97                 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
98
99             assertNotNull(ReadDataReply.fromSerializable(
100                 testSchemaContext,YangInstanceIdentifier.builder().build(), replySerialized)
101                 .getNormalizedNode());
102
103             // unserialized read
104             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
105
106             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
107
108             assertNotNull(reply.getNormalizedNode());
109         }};
110     }
111
112     @Test
113     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
114         new JavaTestKit(getSystem()) {{
115             final ActorRef shard = createShard();
116             Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
117                     testSchemaContext, datastoreContext, shardStats, "txn");
118
119             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
120                     props, "testReadDataWhenDataNotFoundRO"));
121
122             props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
123                     testSchemaContext, datastoreContext, shardStats, "txn");
124
125             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
126                     props, "testReadDataWhenDataNotFoundRW"));
127         }
128
129         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
130             // serialized read
131             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
132
133             ShardTransactionMessages.ReadDataReply replySerialized =
134                 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
135
136             assertTrue(ReadDataReply.fromSerializable(
137                 testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
138
139             // unserialized read
140             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
141
142             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
143
144             assertTrue(reply.getNormalizedNode() == null);
145         }};
146     }
147
148     @Test
149     public void testOnReceiveDataExistsPositive() throws Exception {
150         new JavaTestKit(getSystem()) {{
151             final ActorRef shard = createShard();
152             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
153                     testSchemaContext, datastoreContext, shardStats, "txn");
154
155             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
156
157             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
158                     testSchemaContext, datastoreContext, shardStats, "txn");
159
160             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
161         }
162
163         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
164             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
165                 getRef());
166
167             ShardTransactionMessages.DataExistsReply replySerialized =
168                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
169
170             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
171
172             // unserialized read
173             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
174
175             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
176
177             assertTrue(reply.exists());
178         }};
179     }
180
181     @Test
182     public void testOnReceiveDataExistsNegative() throws Exception {
183         new JavaTestKit(getSystem()) {{
184             final ActorRef shard = createShard();
185             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
186                     testSchemaContext, datastoreContext, shardStats, "txn");
187
188             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
189
190             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
191                     testSchemaContext, datastoreContext, shardStats, "txn");
192
193             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
194         }
195
196         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
197             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
198
199             ShardTransactionMessages.DataExistsReply replySerialized =
200                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
201
202             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
203
204             // unserialized read
205             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
206
207             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
208
209             assertFalse(reply.exists());
210         }};
211     }
212
213     private void assertModification(final ActorRef subject,
214         final Class<? extends Modification> modificationType) {
215         new JavaTestKit(getSystem()) {{
216             subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
217
218             CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
219                     GetCompositeModificationReply.class).getModification();
220
221             assertTrue(compositeModification.getModifications().size() == 1);
222             assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
223         }};
224     }
225
226     @Test
227     public void testOnReceiveWriteData() throws Exception {
228         new JavaTestKit(getSystem()) {{
229             final ActorRef shard = createShard();
230             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
231                     testSchemaContext, datastoreContext, shardStats, "txn");
232             final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
233
234             transaction.tell(new WriteData(TestModel.TEST_PATH,
235                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
236                 getRef());
237
238             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
239
240             assertModification(transaction, WriteModification.class);
241
242             //unserialized write
243             transaction.tell(new WriteData(TestModel.TEST_PATH,
244                 ImmutableNodes.containerNode(TestModel.TEST_QNAME),
245                 TestModel.createTestContext()),
246                 getRef());
247
248             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
249         }};
250     }
251
252     @Test
253     public void testOnReceiveMergeData() throws Exception {
254         new JavaTestKit(getSystem()) {{
255             final ActorRef shard = createShard();
256             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
257                     testSchemaContext, datastoreContext, shardStats, "txn");
258             final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
259
260             transaction.tell(new MergeData(TestModel.TEST_PATH,
261                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
262                 getRef());
263
264             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
265
266             assertModification(transaction, MergeModification.class);
267
268             //unserialized merge
269             transaction.tell(new MergeData(TestModel.TEST_PATH,
270                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
271                 getRef());
272
273             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
274         }};
275     }
276
277     @Test
278     public void testOnReceiveDeleteData() throws Exception {
279         new JavaTestKit(getSystem()) {{
280             final ActorRef shard = createShard();
281             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
282                     testSchemaContext, datastoreContext, shardStats, "txn");
283             final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
284
285             transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
286
287             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
288
289             assertModification(transaction, DeleteModification.class);
290
291             //unserialized merge
292             transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
293
294             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
295         }};
296     }
297
298
299     @Test
300     public void testOnReceiveReadyTransaction() throws Exception {
301         new JavaTestKit(getSystem()) {{
302             final ActorRef shard = createShard();
303             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
304                     testSchemaContext, datastoreContext, shardStats, "txn");
305             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
306
307             watch(transaction);
308
309             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
310
311             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
312                     Terminated.class);
313             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
314                     Terminated.class);
315         }};
316
317         // test
318         new JavaTestKit(getSystem()) {{
319             final ActorRef shard = createShard();
320             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
321                 testSchemaContext, datastoreContext, shardStats, "txn");
322             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
323
324             watch(transaction);
325
326             transaction.tell(new ReadyTransaction(), getRef());
327
328             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
329                     Terminated.class);
330             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
331                     Terminated.class);
332         }};
333
334     }
335
336     @SuppressWarnings("unchecked")
337     @Test
338     public void testOnReceiveCloseTransaction() throws Exception {
339         new JavaTestKit(getSystem()) {{
340             final ActorRef shard = createShard();
341             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
342                     testSchemaContext, datastoreContext, shardStats, "txn");
343             final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
344
345             watch(transaction);
346
347             transaction.tell(new CloseTransaction().toSerializable(), getRef());
348
349             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
350             expectTerminated(duration("3 seconds"), transaction);
351         }};
352     }
353
354     @Test(expected=UnknownMessageException.class)
355     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
356         final ActorRef shard = createShard();
357         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
358                 testSchemaContext, datastoreContext, shardStats, "txn");
359         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
360
361         transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
362     }
363
364     @Test
365     public void testShardTransactionInactivity() {
366
367         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
368                 Duration.create(500, TimeUnit.MILLISECONDS)).build();
369
370         new JavaTestKit(getSystem()) {{
371             final ActorRef shard = createShard();
372             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
373                     testSchemaContext, datastoreContext, shardStats, "txn");
374             final ActorRef transaction =
375                 getSystem().actorOf(props, "testShardTransactionInactivity");
376
377             watch(transaction);
378
379             expectMsgClass(duration("3 seconds"), Terminated.class);
380         }};
381     }
382 }