1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import akka.actor.ActorRef;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import java.util.Collections;
14 import java.util.concurrent.TimeUnit;
15 import org.junit.Before;
16 import org.junit.Test;
17 import org.mockito.InOrder;
18 import org.mockito.Mockito;
19 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
20 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
21 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
22 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
23 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
28 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
30 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
39 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
40 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
41 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
42 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.Modification;
44 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
45 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
46 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
47 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
48 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
49 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
50 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
51 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
52 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
53 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
56 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
57 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
58 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
59 import scala.concurrent.duration.Duration;
61 public class ShardTransactionTest extends AbstractActorTest {
63 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
65 private static final ShardIdentifier SHARD_IDENTIFIER =
66 ShardIdentifier.builder().memberName("member-1")
67 .shardName("inventory").type("config").build();
69 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
71 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
73 private final InMemoryDOMDataStore store =
74 new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
78 store.onGlobalContextUpdated(testSchemaContext);
81 private ActorRef createShard(){
82 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
83 Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
86 private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
87 return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
90 private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
91 return newTransactionActor(transaction, null, name, version);
94 private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
95 return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
98 private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
100 Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
101 testSchemaContext, datastoreContext, shardStats, "txn", version);
102 return getSystem().actorOf(props, name);
106 public void testOnReceiveReadData() throws Exception {
107 new JavaTestKit(getSystem()) {{
108 final ActorRef shard = createShard();
110 testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
112 testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
115 private void testOnReceiveReadData(final ActorRef transaction) {
117 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
120 Object replySerialized =
121 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
123 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
126 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
128 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
130 assertNotNull(reply.getNormalizedNode());
135 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
136 new JavaTestKit(getSystem()) {{
137 final ActorRef shard = createShard();
139 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
140 store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
142 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
143 store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
146 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
148 transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
150 Object replySerialized =
151 expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
153 assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
156 transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
158 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
160 assertTrue(reply.getNormalizedNode() == null);
165 public void testOnReceiveReadDataHeliumR1() throws Exception {
166 new JavaTestKit(getSystem()) {{
167 ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
168 "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
170 transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
173 ShardTransactionMessages.ReadDataReply replySerialized =
174 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
176 assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
181 public void testOnReceiveDataExistsPositive() throws Exception {
182 new JavaTestKit(getSystem()) {{
183 final ActorRef shard = createShard();
185 testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
186 "testDataExistsPositiveRO"));
188 testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
189 "testDataExistsPositiveRW"));
192 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
193 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
196 ShardTransactionMessages.DataExistsReply replySerialized =
197 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
199 assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
202 transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
204 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
206 assertTrue(reply.exists());
211 public void testOnReceiveDataExistsNegative() throws Exception {
212 new JavaTestKit(getSystem()) {{
213 final ActorRef shard = createShard();
215 testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
216 "testDataExistsNegativeRO"));
218 testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
219 "testDataExistsNegativeRW"));
222 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
223 transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
225 ShardTransactionMessages.DataExistsReply replySerialized =
226 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
228 assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
231 transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
233 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
235 assertFalse(reply.exists());
239 private void assertModification(final ActorRef subject,
240 final Class<? extends Modification> modificationType) {
241 new JavaTestKit(getSystem()) {{
242 subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
244 CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
245 GetCompositeModificationReply.class).getModification();
247 assertTrue(compositeModification.getModifications().size() == 1);
248 assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
253 public void testOnReceiveWriteData() throws Exception {
254 new JavaTestKit(getSystem()) {{
255 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
256 "testOnReceiveWriteData");
258 transaction.tell(new WriteData(TestModel.TEST_PATH,
259 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
260 toSerializable(), getRef());
262 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
264 assertModification(transaction, WriteModification.class);
266 // unserialized write
267 transaction.tell(new WriteData(TestModel.TEST_PATH,
268 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
271 expectMsgClass(duration("5 seconds"), WriteDataReply.class);
276 public void testOnReceiveHeliumR1WriteData() throws Exception {
277 new JavaTestKit(getSystem()) {{
278 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
279 "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
281 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
282 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
283 ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
284 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
285 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
287 transaction.tell(serialized, getRef());
289 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
291 assertModification(transaction, WriteModification.class);
296 public void testOnReceiveMergeData() throws Exception {
297 new JavaTestKit(getSystem()) {{
298 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
301 transaction.tell(new MergeData(TestModel.TEST_PATH,
302 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
303 toSerializable(), getRef());
305 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
307 assertModification(transaction, MergeModification.class);
310 transaction.tell(new MergeData(TestModel.TEST_PATH,
311 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
314 expectMsgClass(duration("5 seconds"), MergeDataReply.class);
319 public void testOnReceiveHeliumR1MergeData() throws Exception {
320 new JavaTestKit(getSystem()) {{
321 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
322 "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
324 Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
325 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
326 ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
327 .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
328 .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
330 transaction.tell(serialized, getRef());
332 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
334 assertModification(transaction, MergeModification.class);
339 public void testOnReceiveDeleteData() throws Exception {
340 new JavaTestKit(getSystem()) {{
341 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
344 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
345 toSerializable(), getRef());
347 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
349 assertModification(transaction, DeleteModification.class);
352 transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
354 expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
359 public void testOnReceiveBatchedModifications() throws Exception {
360 new JavaTestKit(getSystem()) {{
362 DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
363 final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
365 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
366 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
367 new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
368 withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
370 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
371 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
372 new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
374 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
376 BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
377 batched.addModification(new WriteModification(writePath, writeData));
378 batched.addModification(new MergeModification(mergePath, mergeData));
379 batched.addModification(new DeleteModification(deletePath));
381 transaction.tell(batched, getRef());
383 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
384 assertEquals("getNumBatched", 3, reply.getNumBatched());
386 JavaTestKit verification = new JavaTestKit(getSystem());
387 transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
389 CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
390 GetCompositeModificationReply.class).getModification();
392 assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
394 WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
395 assertEquals("getPath", writePath, write.getPath());
396 assertEquals("getData", writeData, write.getData());
398 MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
399 assertEquals("getPath", mergePath, merge.getPath());
400 assertEquals("getData", mergeData, merge.getData());
402 DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
403 assertEquals("getPath", deletePath, delete.getPath());
405 InOrder inOrder = Mockito.inOrder(mockWriteTx);
406 inOrder.verify(mockWriteTx).write(writePath, writeData);
407 inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
408 inOrder.verify(mockWriteTx).delete(deletePath);
413 public void testOnReceiveReadyTransaction() throws Exception {
414 new JavaTestKit(getSystem()) {{
415 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
416 "testReadyTransaction");
420 transaction.tell(new ReadyTransaction().toSerializable(), getRef());
422 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
424 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
429 new JavaTestKit(getSystem()) {{
430 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
431 "testReadyTransaction2");
435 transaction.tell(new ReadyTransaction(), getRef());
437 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
439 expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
445 public void testOnReceiveCreateSnapshot() throws Exception {
446 new JavaTestKit(getSystem()) {{
447 ShardTest.writeToStore(store, TestModel.TEST_PATH,
448 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
450 NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
451 YangInstanceIdentifier.builder().build());
453 final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
454 "testOnReceiveCreateSnapshot");
458 transaction.tell(CreateSnapshot.INSTANCE, getRef());
460 CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
462 assertNotNull("getSnapshot is null", reply.getSnapshot());
464 NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
465 reply.getSnapshot());
467 assertEquals("Root node", expectedRoot, actualRoot);
469 expectTerminated(duration("3 seconds"), transaction);
474 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
475 new JavaTestKit(getSystem()) {{
476 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
477 "testReadWriteTxOnReceiveCloseTransaction");
481 transaction.tell(new CloseTransaction().toSerializable(), getRef());
483 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
484 expectTerminated(duration("3 seconds"), transaction);
489 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
490 new JavaTestKit(getSystem()) {{
491 final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
492 "testWriteTxOnReceiveCloseTransaction");
496 transaction.tell(new CloseTransaction().toSerializable(), getRef());
498 expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
499 expectTerminated(duration("3 seconds"), transaction);
504 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
505 new JavaTestKit(getSystem()) {{
506 final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
507 "testReadOnlyTxOnReceiveCloseTransaction");
511 transaction.tell(new CloseTransaction().toSerializable(), getRef());
513 expectMsgClass(duration("3 seconds"), Terminated.class);
517 @Test(expected=UnknownMessageException.class)
518 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
519 final ActorRef shard = createShard();
520 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
521 testSchemaContext, datastoreContext, shardStats, "txn",
522 DataStoreVersions.CURRENT_VERSION);
523 final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
525 transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
526 toSerializable(), ActorRef.noSender());
530 public void testShardTransactionInactivity() {
532 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
533 Duration.create(500, TimeUnit.MILLISECONDS)).build();
535 new JavaTestKit(getSystem()) {{
536 final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
537 "testShardTransactionInactivity");
541 expectMsgClass(duration("3 seconds"), Terminated.class);