2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import static org.mockito.Mockito.inOrder;
17 import static org.mockito.Mockito.mock;
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.actor.Status.Failure;
22 import akka.actor.Terminated;
23 import akka.dispatch.Dispatchers;
24 import akka.testkit.TestActorRef;
25 import akka.testkit.javadsl.TestKit;
26 import com.google.common.base.Throwables;
27 import java.time.Duration;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.mockito.InOrder;
32 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
33 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
34 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
35 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
36 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
40 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
44 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
45 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
46 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
47 import org.opendaylight.controller.cluster.raft.TestActorFactory;
48 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
52 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
53 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
54 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
55 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
56 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
58 public class ShardTransactionTest extends AbstractActorTest {
60 private static final TransactionType RO = TransactionType.READ_ONLY;
61 private static final TransactionType RW = TransactionType.READ_WRITE;
62 private static final TransactionType WO = TransactionType.WRITE_ONLY;
64 private static final ShardIdentifier SHARD_IDENTIFIER =
65 ShardIdentifier.create("inventory", MEMBER_NAME, "config");
66 private static final EffectiveModelContext TEST_MODEL = TestModel.createTestContext();
68 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
70 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
72 private TestActorRef<Shard> shard;
73 private ShardDataTree store;
74 private TestKit testKit;
78 shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
79 .schemaContextProvider(() -> TEST_MODEL).props()
80 .withDispatcher(Dispatchers.DefaultDispatcherId()));
81 ShardTestKit.waitUntilLeader(shard);
82 store = shard.underlyingActor().getDataStore();
83 testKit = new TestKit(getSystem());
86 private ActorRef newTransactionActor(final TransactionType type,
87 final AbstractShardDataTreeTransaction<?> transaction, final String name) {
88 Props props = ShardTransaction.props(type, transaction, shard, datastoreContext,
89 shard.underlyingActor().getShardMBean());
90 return actorFactory.createActorNoVerify(props, name);
93 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
94 return store.newReadOnlyTransaction(nextTransactionId());
97 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
98 return store.newReadWriteTransaction(nextTransactionId());
102 public void testOnReceiveReadData() {
103 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
104 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
107 private void testOnReceiveReadData(final ActorRef transaction) {
108 transaction.tell(new ReadData(YangInstanceIdentifier.empty(), DataStoreVersions.CURRENT_VERSION),
111 ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
113 assertNotNull(reply.getNormalizedNode());
117 public void testOnReceiveReadDataWhenDataNotFound() {
118 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RO, readOnlyTransaction(),
119 "testReadDataWhenDataNotFoundRO"));
120 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RW, readWriteTransaction(),
121 "testReadDataWhenDataNotFoundRW"));
124 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
125 transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
127 ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
129 assertNull(reply.getNormalizedNode());
133 public void testOnReceiveDataExistsPositive() {
134 testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
135 testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
138 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
139 transaction.tell(new DataExists(YangInstanceIdentifier.empty(), DataStoreVersions.CURRENT_VERSION),
142 DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
144 assertTrue(reply.exists());
148 public void testOnReceiveDataExistsNegative() {
149 testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
150 testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
153 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
154 transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
156 DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
158 assertFalse(reply.exists());
162 public void testOnReceiveBatchedModifications() {
163 ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
164 DataTreeModification mockModification = mock(DataTreeModification.class);
165 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
166 nextTransactionId(), mockModification);
167 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
169 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
170 NormalizedNode writeData = Builders.containerBuilder()
171 .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
172 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
175 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
176 NormalizedNode mergeData = Builders.containerBuilder()
177 .withNodeIdentifier(new NodeIdentifier(TestModel.OUTER_LIST_QNAME))
180 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
182 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
183 DataStoreVersions.CURRENT_VERSION);
184 batched.addModification(new WriteModification(writePath, writeData));
185 batched.addModification(new MergeModification(mergePath, mergeData));
186 batched.addModification(new DeleteModification(deletePath));
188 transaction.tell(batched, testKit.getRef());
190 BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
191 BatchedModificationsReply.class);
192 assertEquals("getNumBatched", 3, reply.getNumBatched());
194 InOrder inOrder = inOrder(mockModification);
195 inOrder.verify(mockModification).write(writePath, writeData);
196 inOrder.verify(mockModification).merge(mergePath, mergeData);
197 inOrder.verify(mockModification).delete(deletePath);
201 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() {
202 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
203 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
205 TestKit watcher = new TestKit(getSystem());
206 watcher.watch(transaction);
208 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
209 NormalizedNode writeData = Builders.containerBuilder()
210 .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
211 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
214 final TransactionIdentifier tx1 = nextTransactionId();
215 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
216 batched.addModification(new WriteModification(writePath, writeData));
218 transaction.tell(batched, testKit.getRef());
219 BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
220 BatchedModificationsReply.class);
221 assertEquals("getNumBatched", 1, reply.getNumBatched());
223 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
225 batched.setTotalMessagesSent(2);
227 transaction.tell(batched, testKit.getRef());
228 testKit.expectMsgClass(Duration.ofSeconds(5), ReadyTransactionReply.class);
229 watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
233 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() {
234 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
235 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
237 TestKit watcher = new TestKit(getSystem());
238 watcher.watch(transaction);
240 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
241 NormalizedNode writeData = Builders.containerBuilder()
242 .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
243 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
246 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
247 DataStoreVersions.CURRENT_VERSION);
248 batched.addModification(new WriteModification(writePath, writeData));
250 batched.setDoCommitOnReady(true);
251 batched.setTotalMessagesSent(1);
253 transaction.tell(batched, testKit.getRef());
254 testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
255 watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
258 @Test(expected = TestException.class)
259 public void testOnReceiveBatchedModificationsFailure() throws Exception {
260 ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
261 DataTreeModification mockModification = mock(DataTreeModification.class);
262 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
263 nextTransactionId(), mockModification);
264 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
265 "testOnReceiveBatchedModificationsFailure");
267 TestKit watcher = new TestKit(getSystem());
268 watcher.watch(transaction);
270 YangInstanceIdentifier path = TestModel.TEST_PATH;
271 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
273 doThrow(new TestException()).when(mockModification).write(path, node);
275 final TransactionIdentifier tx1 = nextTransactionId();
276 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
277 batched.addModification(new WriteModification(path, node));
279 transaction.tell(batched, testKit.getRef());
280 testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
282 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
284 batched.setTotalMessagesSent(2);
286 transaction.tell(batched, testKit.getRef());
287 Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
288 watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
290 if (failure != null) {
291 Throwables.propagateIfPossible(failure.cause(), Exception.class);
292 throw new RuntimeException(failure.cause());
296 @Test(expected = IllegalStateException.class)
297 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
298 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
299 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
301 TestKit watcher = new TestKit(getSystem());
302 watcher.watch(transaction);
304 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
305 DataStoreVersions.CURRENT_VERSION);
307 batched.setTotalMessagesSent(2);
309 transaction.tell(batched, testKit.getRef());
311 Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
312 watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
314 if (failure != null) {
315 Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
316 Throwables.throwIfUnchecked(failure.cause());
317 throw new RuntimeException(failure.cause());
322 public void testReadWriteTxOnReceiveCloseTransaction() {
323 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
324 "testReadWriteTxOnReceiveCloseTransaction");
326 testKit.watch(transaction);
328 transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
330 testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
331 testKit.expectTerminated(Duration.ofSeconds(3), transaction);
335 public void testWriteOnlyTxOnReceiveCloseTransaction() {
336 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
337 "testWriteTxOnReceiveCloseTransaction");
339 testKit.watch(transaction);
341 transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
343 testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
344 testKit.expectTerminated(Duration.ofSeconds(3), transaction);
348 public void testReadOnlyTxOnReceiveCloseTransaction() {
349 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
350 "testReadOnlyTxOnReceiveCloseTransaction");
352 testKit.watch(transaction);
354 transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
356 testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
360 public void testShardTransactionInactivity() {
361 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
362 500, TimeUnit.MILLISECONDS).build();
364 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
365 "testShardTransactionInactivity");
367 testKit.watch(transaction);
369 testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
372 public static class TestException extends RuntimeException {
373 private static final long serialVersionUID = 1L;