1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import akka.actor.ActorRef;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import java.util.Collections;
14 import java.util.concurrent.TimeUnit;
15 import org.junit.Before;
16 import org.junit.Test;
17 import org.mockito.InOrder;
18 import org.mockito.Mockito;
19 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
20 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
21 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
22 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
23 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
28 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
30 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
39 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
40 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
41 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
42 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.Modification;
44 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
45 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
46 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
47 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
48 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
49 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
50 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
51 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
52 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
53 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
56 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
57 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
58 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
60 public class ShardTransactionTest extends AbstractActorTest {
62 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
64 private static final ShardIdentifier SHARD_IDENTIFIER =
65 ShardIdentifier.builder().memberName("member-1")
66 .shardName("inventory").type("config").build();
68 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
70 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
72 private final InMemoryDOMDataStore store =
73 new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
77 store.onGlobalContextUpdated(testSchemaContext);
80 private ActorRef createShard(){
81 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
82 Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
85 private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
86 return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
89 private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
90 return newTransactionActor(transaction, null, name, version);
93 private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
94 return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
97 private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
99 Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
100 testSchemaContext, datastoreContext, shardStats, "txn", version);
101 return getSystem().actorOf(props, name);
105 public void testOnReceiveReadData() throws Exception {
106 new JavaTestKit(getSystem()) {{
107 final ActorRef shard = createShard();
109 testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
111 testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
114 private void testOnReceiveReadData(final ActorRef transaction) {
116 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
119 Object replySerialized =
120 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
122 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
125 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
127 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
129 assertNotNull(reply.getNormalizedNode());
134 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
135 new JavaTestKit(getSystem()) {{
136 final ActorRef shard = createShard();
138 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
139 store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
141 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
142 store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
145 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
147 transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
149 Object replySerialized =
150 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
152 assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
155 transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
157 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
159 assertTrue(reply.getNormalizedNode() == null);
164 public void testOnReceiveReadDataHeliumR1() throws Exception {
165 new JavaTestKit(getSystem()) {{
166 ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
167 "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
169 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
172 ShardTransactionMessages.ReadDataReply replySerialized =
173 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
175 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
180 public void testOnReceiveDataExistsPositive() throws Exception {
181 new JavaTestKit(getSystem()) {{
182 final ActorRef shard = createShard();
184 testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
185 "testDataExistsPositiveRO"));
187 testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
188 "testDataExistsPositiveRW"));
191 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
192 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
195 ShardTransactionMessages.DataExistsReply replySerialized =
196 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
198 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
201 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
203 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
205 assertTrue(reply.exists());
210 public void testOnReceiveDataExistsNegative() throws Exception {
211 new JavaTestKit(getSystem()) {{
212 final ActorRef shard = createShard();
214 testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
215 "testDataExistsNegativeRO"));
217 testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
218 "testDataExistsNegativeRW"));
221 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
222 transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
224 ShardTransactionMessages.DataExistsReply replySerialized =
225 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
227 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
230 transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
232 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
234 assertFalse(reply.exists());
238 private void assertModification(final ActorRef subject,
239 final Class<? extends Modification> modificationType) {
240 new JavaTestKit(getSystem()) {{
241 subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
243 CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
244 GetCompositeModificationReply.class).getModification();
246 assertTrue(compositeModification.getModifications().size() == 1);
247 assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
252 public void testOnReceiveWriteData() throws Exception {
253 new JavaTestKit(getSystem()) {{
254 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
255 "testOnReceiveWriteData");
257 transaction.tell(new WriteData(TestModel.TEST_PATH,
258 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
259 toSerializable(), getRef());
261 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
263 assertModification(transaction, WriteModification.class);
265 // unserialized write
266 transaction.tell(new WriteData(TestModel.TEST_PATH,
267 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
270 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
275 public void testOnReceiveHeliumR1WriteData() throws Exception {
276 new JavaTestKit(getSystem()) {{
277 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
278 "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
280 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
281 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
282 ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
283 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
284 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
286 transaction.tell(serialized, getRef());
288 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
290 assertModification(transaction, WriteModification.class);
295 public void testOnReceiveMergeData() throws Exception {
296 new JavaTestKit(getSystem()) {{
297 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
300 transaction.tell(new MergeData(TestModel.TEST_PATH,
301 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
302 toSerializable(), getRef());
304 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
306 assertModification(transaction, MergeModification.class);
309 transaction.tell(new MergeData(TestModel.TEST_PATH,
310 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
313 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
318 public void testOnReceiveHeliumR1MergeData() throws Exception {
319 new JavaTestKit(getSystem()) {{
320 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
321 "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
323 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
324 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
325 ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
326 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
327 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
329 transaction.tell(serialized, getRef());
331 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
333 assertModification(transaction, MergeModification.class);
338 public void testOnReceiveDeleteData() throws Exception {
339 new JavaTestKit(getSystem()) {{
340 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
343 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
344 toSerializable(), getRef());
346 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
348 assertModification(transaction, DeleteModification.class);
351 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
353 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
358 public void testOnReceiveBatchedModifications() throws Exception {
359 new JavaTestKit(getSystem()) {{
361 DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
362 final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
364 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
365 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
366 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
367 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
369 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
370 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
371 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
373 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
375 BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
376 batched.addModification(new WriteModification(writePath, writeData));
377 batched.addModification(new MergeModification(mergePath, mergeData));
378 batched.addModification(new DeleteModification(deletePath));
380 transaction.tell(batched, getRef());
382 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
383 assertEquals("getNumBatched", 3, reply.getNumBatched());
385 JavaTestKit verification = new JavaTestKit(getSystem());
386 transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
388 CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
389 GetCompositeModificationReply.class).getModification();
391 assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
393 WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
394 assertEquals("getPath", writePath, write.getPath());
395 assertEquals("getData", writeData, write.getData());
397 MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
398 assertEquals("getPath", mergePath, merge.getPath());
399 assertEquals("getData", mergeData, merge.getData());
401 DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
402 assertEquals("getPath", deletePath, delete.getPath());
404 InOrder inOrder = Mockito.inOrder(mockWriteTx);
405 inOrder.verify(mockWriteTx).write(writePath, writeData);
406 inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
407 inOrder.verify(mockWriteTx).delete(deletePath);
412 public void testOnReceiveReadyTransaction() throws Exception {
413 new JavaTestKit(getSystem()) {{
414 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
415 "testReadyTransaction");
419 transaction.tell(new ReadyTransaction().toSerializable(), getRef());
421 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
423 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
428 new JavaTestKit(getSystem()) {{
429 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
430 "testReadyTransaction2");
434 transaction.tell(new ReadyTransaction(), getRef());
436 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
438 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
444 public void testOnReceiveCreateSnapshot() throws Exception {
445 new JavaTestKit(getSystem()) {{
446 ShardTest.writeToStore(store, TestModel.TEST_PATH,
447 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
449 NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
450 YangInstanceIdentifier.builder().build());
452 final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
453 "testOnReceiveCreateSnapshot");
457 transaction.tell(CreateSnapshot.INSTANCE, getRef());
459 CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
461 assertNotNull("getSnapshot is null", reply.getSnapshot());
463 NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
464 reply.getSnapshot());
466 assertEquals("Root node", expectedRoot, actualRoot);
468 expectTerminated(duration("3 seconds"), transaction);
473 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
474 new JavaTestKit(getSystem()) {{
475 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
476 "testReadWriteTxOnReceiveCloseTransaction");
480 transaction.tell(new CloseTransaction().toSerializable(), getRef());
482 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
483 expectTerminated(duration("3 seconds"), transaction);
488 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
489 new JavaTestKit(getSystem()) {{
490 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
491 "testWriteTxOnReceiveCloseTransaction");
495 transaction.tell(new CloseTransaction().toSerializable(), getRef());
497 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
498 expectTerminated(duration("3 seconds"), transaction);
503 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
504 new JavaTestKit(getSystem()) {{
505 final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
506 "testReadOnlyTxOnReceiveCloseTransaction");
510 transaction.tell(new CloseTransaction().toSerializable(), getRef());
512 expectMsgClass(duration("3 seconds"), Terminated.class);
516 @Test(expected=UnknownMessageException.class)
517 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
518 final ActorRef shard = createShard();
519 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
520 testSchemaContext, datastoreContext, shardStats, "txn",
521 DataStoreVersions.CURRENT_VERSION);
522 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
524 transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
525 toSerializable(), ActorRef.noSender());
529 public void testShardTransactionInactivity() {
531 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
532 500, TimeUnit.MILLISECONDS).build();
534 new JavaTestKit(getSystem()) {{
535 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
536 "testShardTransactionInactivity");
540 expectMsgClass(duration("3 seconds"), Terminated.class);