2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
17 import akka.actor.ActorRef;
18 import akka.actor.Props;
19 import akka.actor.Status.Failure;
20 import akka.actor.Terminated;
21 import akka.dispatch.Dispatchers;
22 import akka.testkit.TestActorRef;
23 import akka.testkit.javadsl.TestKit;
24 import com.google.common.base.Throwables;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.Before;
27 import org.junit.Test;
28 import org.mockito.InOrder;
29 import org.mockito.Mockito;
30 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
31 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
32 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
33 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
35 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
38 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
43 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
44 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
45 import org.opendaylight.controller.cluster.raft.TestActorFactory;
46 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
51 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
52 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
53 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55 public class ShardTransactionTest extends AbstractActorTest {
57 private static final TransactionType RO = TransactionType.READ_ONLY;
58 private static final TransactionType RW = TransactionType.READ_WRITE;
59 private static final TransactionType WO = TransactionType.WRITE_ONLY;
61 private static final ShardIdentifier SHARD_IDENTIFIER =
62 ShardIdentifier.create("inventory", MEMBER_NAME, "config");
63 private static final SchemaContext TEST_MODEL = TestModel.createTestContext();
65 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
67 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
69 private TestActorRef<Shard> shard;
70 private ShardDataTree store;
74 shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
75 .schemaContextProvider(() -> TEST_MODEL).props()
76 .withDispatcher(Dispatchers.DefaultDispatcherId()));
77 ShardTestKit.waitUntilLeader(shard);
78 store = shard.underlyingActor().getDataStore();
81 private ActorRef newTransactionActor(final TransactionType type,
82 final AbstractShardDataTreeTransaction<?> transaction, final String name) {
83 Props props = ShardTransaction.props(type, transaction, shard, datastoreContext,
84 shard.underlyingActor().getShardMBean());
85 return actorFactory.createActorNoVerify(props, name);
88 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
89 return store.newReadOnlyTransaction(nextTransactionId());
92 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
93 return store.newReadWriteTransaction(nextTransactionId());
97 public void testOnReceiveReadData() throws Exception {
98 new TestKit(getSystem()) {
100 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
102 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
105 private void testOnReceiveReadData(final ActorRef transaction) {
106 transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
109 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
111 assertNotNull(reply.getNormalizedNode());
117 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
118 new TestKit(getSystem()) {
120 testOnReceiveReadDataWhenDataNotFound(
121 newTransactionActor(RO, readOnlyTransaction(), "testReadDataWhenDataNotFoundRO"));
123 testOnReceiveReadDataWhenDataNotFound(
124 newTransactionActor(RW, readWriteTransaction(), "testReadDataWhenDataNotFoundRW"));
127 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
128 transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
130 ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
132 assertTrue(reply.getNormalizedNode() == null);
138 public void testOnReceiveDataExistsPositive() throws Exception {
139 new TestKit(getSystem()) {
141 testOnReceiveDataExistsPositive(
142 newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
144 testOnReceiveDataExistsPositive(
145 newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
148 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
149 transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION),
152 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
154 assertTrue(reply.exists());
160 public void testOnReceiveDataExistsNegative() throws Exception {
161 new TestKit(getSystem()) {
163 testOnReceiveDataExistsNegative(
164 newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
166 testOnReceiveDataExistsNegative(
167 newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
170 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
171 transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
173 DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
175 assertFalse(reply.exists());
181 public void testOnReceiveBatchedModifications() throws Exception {
182 new TestKit(getSystem()) {
184 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
185 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
186 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
187 nextTransactionId(), mockModification);
188 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
190 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
191 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
192 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
193 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
195 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
196 NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create()
197 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME))
200 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
202 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
203 DataStoreVersions.CURRENT_VERSION);
204 batched.addModification(new WriteModification(writePath, writeData));
205 batched.addModification(new MergeModification(mergePath, mergeData));
206 batched.addModification(new DeleteModification(deletePath));
208 transaction.tell(batched, getRef());
210 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"),
211 BatchedModificationsReply.class);
212 assertEquals("getNumBatched", 3, reply.getNumBatched());
214 InOrder inOrder = Mockito.inOrder(mockModification);
215 inOrder.verify(mockModification).write(writePath, writeData);
216 inOrder.verify(mockModification).merge(mergePath, mergeData);
217 inOrder.verify(mockModification).delete(deletePath);
223 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
224 new TestKit(getSystem()) {
226 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
227 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
229 TestKit watcher = new TestKit(getSystem());
230 watcher.watch(transaction);
232 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
233 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
234 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
235 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
237 final TransactionIdentifier tx1 = nextTransactionId();
238 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
239 batched.addModification(new WriteModification(writePath, writeData));
241 transaction.tell(batched, getRef());
242 BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"),
243 BatchedModificationsReply.class);
244 assertEquals("getNumBatched", 1, reply.getNumBatched());
246 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
248 batched.setTotalMessagesSent(2);
250 transaction.tell(batched, getRef());
251 expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
252 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
258 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
259 new TestKit(getSystem()) {
261 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
262 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
264 TestKit watcher = new TestKit(getSystem());
265 watcher.watch(transaction);
267 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
268 NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create()
269 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
270 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
272 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
273 DataStoreVersions.CURRENT_VERSION);
274 batched.addModification(new WriteModification(writePath, writeData));
276 batched.setDoCommitOnReady(true);
277 batched.setTotalMessagesSent(1);
279 transaction.tell(batched, getRef());
280 expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
281 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
286 @Test(expected = TestException.class)
287 public void testOnReceiveBatchedModificationsFailure() throws Exception {
288 new TestKit(getSystem()) {
291 ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
292 DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
293 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
294 nextTransactionId(), mockModification);
295 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
296 "testOnReceiveBatchedModificationsFailure");
298 TestKit watcher = new TestKit(getSystem());
299 watcher.watch(transaction);
301 YangInstanceIdentifier path = TestModel.TEST_PATH;
302 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
304 doThrow(new TestException()).when(mockModification).write(path, node);
306 final TransactionIdentifier tx1 = nextTransactionId();
307 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
308 batched.addModification(new WriteModification(path, node));
310 transaction.tell(batched, getRef());
311 expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
313 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
315 batched.setTotalMessagesSent(2);
317 transaction.tell(batched, getRef());
318 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
319 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
321 if (failure != null) {
322 Throwables.propagateIfPossible(failure.cause(), Exception.class);
323 throw new RuntimeException(failure.cause());
329 @Test(expected = IllegalStateException.class)
330 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
331 new TestKit(getSystem()) {
334 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
335 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
337 TestKit watcher = new TestKit(getSystem());
338 watcher.watch(transaction);
340 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
341 DataStoreVersions.CURRENT_VERSION);
343 batched.setTotalMessagesSent(2);
345 transaction.tell(batched, getRef());
347 Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
348 watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
350 if (failure != null) {
351 Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
352 Throwables.throwIfUnchecked(failure.cause());
353 throw new RuntimeException(failure.cause());
360 public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
361 new TestKit(getSystem()) {
363 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
364 "testReadWriteTxOnReceiveCloseTransaction");
368 transaction.tell(new CloseTransaction().toSerializable(), getRef());
370 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
371 expectTerminated(duration("3 seconds"), transaction);
377 public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
378 new TestKit(getSystem()) {
380 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
381 "testWriteTxOnReceiveCloseTransaction");
385 transaction.tell(new CloseTransaction().toSerializable(), getRef());
387 expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
388 expectTerminated(duration("3 seconds"), transaction);
394 public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
395 new TestKit(getSystem()) {
397 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
398 "testReadOnlyTxOnReceiveCloseTransaction");
402 transaction.tell(new CloseTransaction().toSerializable(), getRef());
404 expectMsgClass(duration("3 seconds"), Terminated.class);
410 public void testShardTransactionInactivity() {
411 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
412 500, TimeUnit.MILLISECONDS).build();
414 new TestKit(getSystem()) {
416 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
417 "testShardTransactionInactivity");
421 expectMsgClass(duration("3 seconds"), Terminated.class);
426 public static class TestException extends RuntimeException {
427 private static final long serialVersionUID = 1L;