2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import static org.mockito.Mockito.inOrder;
17 import static org.mockito.Mockito.mock;
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.actor.Status.Failure;
22 import akka.actor.Terminated;
23 import akka.dispatch.Dispatchers;
24 import akka.testkit.TestActorRef;
25 import akka.testkit.javadsl.TestKit;
26 import com.google.common.base.Throwables;
27 import java.util.concurrent.TimeUnit;
28 import org.junit.Before;
29 import org.junit.Test;
30 import org.mockito.InOrder;
31 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
32 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
33 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
34 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
35 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
39 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
43 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
44 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
45 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
46 import org.opendaylight.controller.cluster.raft.TestActorFactory;
47 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
52 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
53 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56 public class ShardTransactionTest extends AbstractActorTest {
58 private static final TransactionType RO = TransactionType.READ_ONLY;
59 private static final TransactionType RW = TransactionType.READ_WRITE;
60 private static final TransactionType WO = TransactionType.WRITE_ONLY;
62 private static final ShardIdentifier SHARD_IDENTIFIER =
63 ShardIdentifier.create("inventory", MEMBER_NAME, "config");
64 private static final SchemaContext TEST_MODEL = TestModel.createTestContext();
66 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
68 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
70 private TestActorRef<Shard> shard;
71 private ShardDataTree store;
72 private TestKit testKit;
76 shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
77 .schemaContextProvider(() -> TEST_MODEL).props()
78 .withDispatcher(Dispatchers.DefaultDispatcherId()));
79 ShardTestKit.waitUntilLeader(shard);
80 store = shard.underlyingActor().getDataStore();
81 testKit = new TestKit(getSystem());
84 private ActorRef newTransactionActor(final TransactionType type,
85 final AbstractShardDataTreeTransaction<?> transaction, final String name) {
86 Props props = ShardTransaction.props(type, transaction, shard, datastoreContext,
87 shard.underlyingActor().getShardMBean());
88 return actorFactory.createActorNoVerify(props, name);
91 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
92 return store.newReadOnlyTransaction(nextTransactionId());
95 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
96 return store.newReadWriteTransaction(nextTransactionId());
100 public void testOnReceiveReadData() {
101 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
102 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
105 private void testOnReceiveReadData(final ActorRef transaction) {
106 transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
109 ReadDataReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ReadDataReply.class);
111 assertNotNull(reply.getNormalizedNode());
115 public void testOnReceiveReadDataWhenDataNotFound() {
116 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RO, readOnlyTransaction(),
117 "testReadDataWhenDataNotFoundRO"));
118 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RW, readWriteTransaction(),
119 "testReadDataWhenDataNotFoundRW"));
122 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
123 transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
125 ReadDataReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), ReadDataReply.class);
127 assertNull(reply.getNormalizedNode());
131 public void testOnReceiveDataExistsPositive() {
132 testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
133 testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
136 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
137 transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
140 DataExistsReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), DataExistsReply.class);
142 assertTrue(reply.exists());
146 public void testOnReceiveDataExistsNegative() {
147 testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
148 testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
151 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
152 transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
154 DataExistsReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"), DataExistsReply.class);
156 assertFalse(reply.exists());
160 public void testOnReceiveBatchedModifications() {
161 ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
162 DataTreeModification mockModification = mock(DataTreeModification.class);
163 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
164 nextTransactionId(), mockModification);
165 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
167 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
168 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
169 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
170 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
172 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
173 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create()
174 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME))
177 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
179 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
180 DataStoreVersions.CURRENT_VERSION);
181 batched.addModification(new WriteModification(writePath, writeData));
182 batched.addModification(new MergeModification(mergePath, mergeData));
183 batched.addModification(new DeleteModification(deletePath));
185 transaction.tell(batched, testKit.getRef());
187 BatchedModificationsReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
188 BatchedModificationsReply.class);
189 assertEquals("getNumBatched", 3, reply.getNumBatched());
191 InOrder inOrder = inOrder(mockModification);
192 inOrder.verify(mockModification).write(writePath, writeData);
193 inOrder.verify(mockModification).merge(mergePath, mergeData);
194 inOrder.verify(mockModification).delete(deletePath);
198 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() {
199 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
200 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
202 TestKit watcher = new TestKit(getSystem());
203 watcher.watch(transaction);
205 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
206 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
207 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
208 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
210 final TransactionIdentifier tx1 = nextTransactionId();
211 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
212 batched.addModification(new WriteModification(writePath, writeData));
214 transaction.tell(batched, testKit.getRef());
215 BatchedModificationsReply reply = testKit.expectMsgClass(testKit.duration("5 seconds"),
216 BatchedModificationsReply.class);
217 assertEquals("getNumBatched", 1, reply.getNumBatched());
219 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
221 batched.setTotalMessagesSent(2);
223 transaction.tell(batched, testKit.getRef());
224 testKit.expectMsgClass(testKit.duration("5 seconds"), ReadyTransactionReply.class);
225 watcher.expectMsgClass(watcher.duration("5 seconds"), Terminated.class);
229 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() {
230 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
231 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
233 TestKit watcher = new TestKit(getSystem());
234 watcher.watch(transaction);
236 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
237 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
238 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
239 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
241 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
242 DataStoreVersions.CURRENT_VERSION);
243 batched.addModification(new WriteModification(writePath, writeData));
245 batched.setDoCommitOnReady(true);
246 batched.setTotalMessagesSent(1);
248 transaction.tell(batched, testKit.getRef());
249 testKit.expectMsgClass(testKit.duration("5 seconds"), CommitTransactionReply.class);
250 watcher.expectMsgClass(testKit.duration("5 seconds"), Terminated.class);
253 @Test(expected = TestException.class)
254 public void testOnReceiveBatchedModificationsFailure() throws Exception {
255 ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
256 DataTreeModification mockModification = mock(DataTreeModification.class);
257 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
258 nextTransactionId(), mockModification);
259 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
260 "testOnReceiveBatchedModificationsFailure");
262 TestKit watcher = new TestKit(getSystem());
263 watcher.watch(transaction);
265 YangInstanceIdentifier path = TestModel.TEST_PATH;
266 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
268 doThrow(new TestException()).when(mockModification).write(path, node);
270 final TransactionIdentifier tx1 = nextTransactionId();
271 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
272 batched.addModification(new WriteModification(path, node));
274 transaction.tell(batched, testKit.getRef());
275 testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
277 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
279 batched.setTotalMessagesSent(2);
281 transaction.tell(batched, testKit.getRef());
282 Failure failure = testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
283 watcher.expectMsgClass(testKit.duration("5 seconds"), Terminated.class);
285 if (failure != null) {
286 Throwables.propagateIfPossible(failure.cause(), Exception.class);
287 throw new RuntimeException(failure.cause());
291 @Test(expected = IllegalStateException.class)
292 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
293 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
294 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
296 TestKit watcher = new TestKit(getSystem());
297 watcher.watch(transaction);
299 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
300 DataStoreVersions.CURRENT_VERSION);
302 batched.setTotalMessagesSent(2);
304 transaction.tell(batched, testKit.getRef());
306 Failure failure = testKit.expectMsgClass(testKit.duration("5 seconds"), akka.actor.Status.Failure.class);
307 watcher.expectMsgClass(watcher.duration("5 seconds"), Terminated.class);
309 if (failure != null) {
310 Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
311 Throwables.throwIfUnchecked(failure.cause());
312 throw new RuntimeException(failure.cause());
317 public void testReadWriteTxOnReceiveCloseTransaction() {
318 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
319 "testReadWriteTxOnReceiveCloseTransaction");
321 testKit.watch(transaction);
323 transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
325 testKit.expectMsgClass(testKit.duration("3 seconds"), CloseTransactionReply.class);
326 testKit.expectTerminated(testKit.duration("3 seconds"), transaction);
330 public void testWriteOnlyTxOnReceiveCloseTransaction() {
331 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
332 "testWriteTxOnReceiveCloseTransaction");
334 testKit.watch(transaction);
336 transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
338 testKit.expectMsgClass(testKit.duration("3 seconds"), CloseTransactionReply.class);
339 testKit.expectTerminated(testKit.duration("3 seconds"), transaction);
343 public void testReadOnlyTxOnReceiveCloseTransaction() {
344 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
345 "testReadOnlyTxOnReceiveCloseTransaction");
347 testKit.watch(transaction);
349 transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
351 testKit.expectMsgClass(testKit.duration("3 seconds"), Terminated.class);
355 public void testShardTransactionInactivity() {
356 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
357 500, TimeUnit.MILLISECONDS).build();
359 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
360 "testShardTransactionInactivity");
362 testKit.watch(transaction);
364 testKit.expectMsgClass(testKit.duration("3 seconds"), Terminated.class);
367 public static class TestException extends RuntimeException {
368 private static final long serialVersionUID = 1L;