2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import static org.mockito.Mockito.inOrder;
17 import static org.mockito.Mockito.mock;
19 import akka.actor.ActorRef;
20 import akka.actor.Props;
21 import akka.actor.Status.Failure;
22 import akka.actor.Terminated;
23 import akka.dispatch.Dispatchers;
24 import akka.testkit.TestActorRef;
25 import akka.testkit.javadsl.TestKit;
26 import com.google.common.base.Throwables;
27 import java.time.Duration;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.mockito.InOrder;
32 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
33 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
34 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
35 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
36 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
40 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
44 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
45 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
46 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
47 import org.opendaylight.controller.cluster.raft.TestActorFactory;
48 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
52 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
53 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
54 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
55 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
56 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
58 @Deprecated(since = "9.0.0", forRemoval = true)
59 public class ShardTransactionTest extends AbstractActorTest {
61 private static final TransactionType RO = TransactionType.READ_ONLY;
62 private static final TransactionType RW = TransactionType.READ_WRITE;
63 private static final TransactionType WO = TransactionType.WRITE_ONLY;
65 private static final ShardIdentifier SHARD_IDENTIFIER =
66 ShardIdentifier.create("inventory", MEMBER_NAME, "config");
67 private static final EffectiveModelContext TEST_MODEL = TestModel.createTestContext();
69 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
71 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
73 private TestActorRef<Shard> shard;
74 private ShardDataTree store;
75 private TestKit testKit;
79 shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
80 .schemaContextProvider(() -> TEST_MODEL).props()
81 .withDispatcher(Dispatchers.DefaultDispatcherId()));
82 ShardTestKit.waitUntilLeader(shard);
83 store = shard.underlyingActor().getDataStore();
84 testKit = new TestKit(getSystem());
87 private ActorRef newTransactionActor(final TransactionType type,
88 final AbstractShardDataTreeTransaction<?> transaction, final String name) {
89 Props props = ShardTransaction.props(type, transaction, shard, datastoreContext,
90 shard.underlyingActor().getShardMBean());
91 return actorFactory.createActorNoVerify(props, name);
94 private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
95 return store.newReadOnlyTransaction(nextTransactionId());
98 private ReadWriteShardDataTreeTransaction readWriteTransaction() {
99 return store.newReadWriteTransaction(nextTransactionId());
103 public void testOnReceiveReadData() {
104 testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
105 testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
108 private void testOnReceiveReadData(final ActorRef transaction) {
109 transaction.tell(new ReadData(YangInstanceIdentifier.of(), DataStoreVersions.CURRENT_VERSION),
112 ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
114 assertNotNull(reply.getNormalizedNode());
118 public void testOnReceiveReadDataWhenDataNotFound() {
119 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RO, readOnlyTransaction(),
120 "testReadDataWhenDataNotFoundRO"));
121 testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RW, readWriteTransaction(),
122 "testReadDataWhenDataNotFoundRW"));
125 private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
126 transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
128 ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
130 assertNull(reply.getNormalizedNode());
134 public void testOnReceiveDataExistsPositive() {
135 testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
136 testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
139 private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
140 transaction.tell(new DataExists(YangInstanceIdentifier.of(), DataStoreVersions.CURRENT_VERSION),
143 DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
145 assertTrue(reply.exists());
149 public void testOnReceiveDataExistsNegative() {
150 testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
151 testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
154 private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
155 transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
157 DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
159 assertFalse(reply.exists());
163 public void testOnReceiveBatchedModifications() {
164 ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
165 DataTreeModification mockModification = mock(DataTreeModification.class);
166 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
167 nextTransactionId(), mockModification);
168 final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
170 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
171 NormalizedNode writeData = Builders.containerBuilder()
172 .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
173 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
176 YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
177 NormalizedNode mergeData = Builders.containerBuilder()
178 .withNodeIdentifier(new NodeIdentifier(TestModel.OUTER_LIST_QNAME))
181 YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
183 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
184 DataStoreVersions.CURRENT_VERSION);
185 batched.addModification(new WriteModification(writePath, writeData));
186 batched.addModification(new MergeModification(mergePath, mergeData));
187 batched.addModification(new DeleteModification(deletePath));
189 transaction.tell(batched, testKit.getRef());
191 BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
192 BatchedModificationsReply.class);
193 assertEquals("getNumBatched", 3, reply.getNumBatched());
195 InOrder inOrder = inOrder(mockModification);
196 inOrder.verify(mockModification).write(writePath, writeData);
197 inOrder.verify(mockModification).merge(mergePath, mergeData);
198 inOrder.verify(mockModification).delete(deletePath);
202 public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() {
203 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
204 "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
206 TestKit watcher = new TestKit(getSystem());
207 watcher.watch(transaction);
209 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
210 NormalizedNode writeData = Builders.containerBuilder()
211 .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
212 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
215 final TransactionIdentifier tx1 = nextTransactionId();
216 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
217 batched.addModification(new WriteModification(writePath, writeData));
219 transaction.tell(batched, testKit.getRef());
220 BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
221 BatchedModificationsReply.class);
222 assertEquals("getNumBatched", 1, reply.getNumBatched());
224 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
226 batched.setTotalMessagesSent(2);
228 transaction.tell(batched, testKit.getRef());
229 testKit.expectMsgClass(Duration.ofSeconds(5), ReadyTransactionReply.class);
230 watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
234 public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() {
235 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
236 "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
238 TestKit watcher = new TestKit(getSystem());
239 watcher.watch(transaction);
241 YangInstanceIdentifier writePath = TestModel.TEST_PATH;
242 NormalizedNode writeData = Builders.containerBuilder()
243 .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
244 .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
247 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
248 DataStoreVersions.CURRENT_VERSION);
249 batched.addModification(new WriteModification(writePath, writeData));
251 batched.setDoCommitOnReady(true);
252 batched.setTotalMessagesSent(1);
254 transaction.tell(batched, testKit.getRef());
255 testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
256 watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
259 @Test(expected = TestException.class)
260 public void testOnReceiveBatchedModificationsFailure() throws Exception {
261 ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
262 DataTreeModification mockModification = mock(DataTreeModification.class);
263 ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
264 nextTransactionId(), mockModification);
265 final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
266 "testOnReceiveBatchedModificationsFailure");
268 TestKit watcher = new TestKit(getSystem());
269 watcher.watch(transaction);
271 YangInstanceIdentifier path = TestModel.TEST_PATH;
272 ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
274 doThrow(new TestException()).when(mockModification).write(path, node);
276 final TransactionIdentifier tx1 = nextTransactionId();
277 BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
278 batched.addModification(new WriteModification(path, node));
280 transaction.tell(batched, testKit.getRef());
281 testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
283 batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
285 batched.setTotalMessagesSent(2);
287 transaction.tell(batched, testKit.getRef());
288 Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
289 watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
291 if (failure != null) {
292 Throwables.propagateIfPossible(failure.cause(), Exception.class);
293 throw new RuntimeException(failure.cause());
297 @Test(expected = IllegalStateException.class)
298 public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
299 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
300 "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
302 TestKit watcher = new TestKit(getSystem());
303 watcher.watch(transaction);
305 BatchedModifications batched = new BatchedModifications(nextTransactionId(),
306 DataStoreVersions.CURRENT_VERSION);
308 batched.setTotalMessagesSent(2);
310 transaction.tell(batched, testKit.getRef());
312 Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), akka.actor.Status.Failure.class);
313 watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
315 if (failure != null) {
316 Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
317 Throwables.throwIfUnchecked(failure.cause());
318 throw new RuntimeException(failure.cause());
323 public void testReadWriteTxOnReceiveCloseTransaction() {
324 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
325 "testReadWriteTxOnReceiveCloseTransaction");
327 testKit.watch(transaction);
329 transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
331 testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
332 testKit.expectTerminated(Duration.ofSeconds(3), transaction);
336 public void testWriteOnlyTxOnReceiveCloseTransaction() {
337 final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
338 "testWriteTxOnReceiveCloseTransaction");
340 testKit.watch(transaction);
342 transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
344 testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
345 testKit.expectTerminated(Duration.ofSeconds(3), transaction);
349 public void testReadOnlyTxOnReceiveCloseTransaction() {
350 final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
351 "testReadOnlyTxOnReceiveCloseTransaction");
353 testKit.watch(transaction);
355 transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
357 testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
361 public void testShardTransactionInactivity() {
362 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
363 500, TimeUnit.MILLISECONDS).build();
365 final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
366 "testShardTransactionInactivity");
368 testKit.watch(transaction);
370 testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
373 public static class TestException extends RuntimeException {
374 private static final long serialVersionUID = 1L;