3 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 package org.opendaylight.controller.cluster.datastore;
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.anyObject;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.timeout;
22 import static org.mockito.Mockito.times;
23 import static org.mockito.Mockito.verify;
24 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
25 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
26 import akka.actor.ActorRef;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.junit.Assert;
31 import org.junit.Test;
32 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
33 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
35 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
36 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
38 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
45 import scala.concurrent.Promise;
47 public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
49 @SuppressWarnings("resource")
51 public void testNewReadOnlyTransaction() throws Exception {
53 DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadOnlyTransaction();
54 Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
58 @SuppressWarnings("resource")
60 public void testNewReadWriteTransaction() throws Exception {
61 DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadWriteTransaction();
62 Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
66 @SuppressWarnings("resource")
68 public void testNewWriteOnlyTransaction() throws Exception {
69 DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newWriteOnlyTransaction();
70 Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
75 public void testClose() throws Exception {
76 new TransactionChainProxy(mockActorContext).close();
78 verify(mockActorContext, times(1)).broadcast(anyObject());
82 public void testTransactionChainsHaveUniqueId(){
83 TransactionChainProxy one = new TransactionChainProxy(mock(ActorContext.class));
84 TransactionChainProxy two = new TransactionChainProxy(mock(ActorContext.class));
86 Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
90 public void testRateLimitingUsedInReadWriteTxCreation(){
91 TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
93 txChainProxy.newReadWriteTransaction();
95 verify(mockActorContext, times(1)).acquireTxCreationPermit();
99 public void testRateLimitingUsedInWriteOnlyTxCreation(){
100 TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
102 txChainProxy.newWriteOnlyTransaction();
104 verify(mockActorContext, times(1)).acquireTxCreationPermit();
109 public void testRateLimitingNotUsedInReadOnlyTxCreation(){
110 TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
112 txChainProxy.newReadOnlyTransaction();
114 verify(mockActorContext, times(0)).acquireTxCreationPermit();
118 * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
119 * initiated until the first one completes its read future.
122 public void testChainedWriteOnlyTransactions() throws Exception {
123 dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
125 TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
127 ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
129 Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
130 doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
131 eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
133 DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
135 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
136 writeTx1.write(TestModel.TEST_PATH, writeNode1);
140 verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
142 verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
144 ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
146 expectBatchedModifications(txActorRef2, 1);
148 final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
150 final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
152 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
153 final CountDownLatch write2Complete = new CountDownLatch(1);
158 writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
159 } catch (Exception e) {
162 write2Complete.countDown();
167 assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
169 if(caughtEx.get() != null) {
170 throw caughtEx.get();
174 verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
175 } catch (AssertionError e) {
176 fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
179 batchedReplyPromise1.success(new BatchedModificationsReply(1, txActorRef1.path().toString()));
181 // Tx 2 should've proceeded to find the primary shard.
182 verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
186 * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
187 * initiated until the first one completes its read future.
190 public void testChainedReadWriteTransactions() throws Exception {
191 TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
193 ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
195 expectBatchedModifications(txActorRef1, 1);
197 Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
198 doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
199 eq(actorSelection(txActorRef1)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
201 DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
203 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
204 writeTx1.write(TestModel.TEST_PATH, writeNode1);
208 verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), false);
210 String tx2MemberName = "tx2MemberName";
211 doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
212 ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
213 ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
214 DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
216 expectBatchedModifications(txActorRef2, 1);
218 final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
220 final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
222 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
223 final CountDownLatch write2Complete = new CountDownLatch(1);
228 writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
229 } catch (Exception e) {
232 write2Complete.countDown();
237 assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
239 if(caughtEx.get() != null) {
240 throw caughtEx.get();
244 verify(mockActorContext, never()).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
245 eqCreateTransaction(tx2MemberName, READ_WRITE));
246 } catch (AssertionError e) {
247 fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
250 readyReplyPromise1.success(readySerializedTxReply(txActorRef1.path().toString()).value().get().get());
252 verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
253 eqCreateTransaction(tx2MemberName, READ_WRITE));
256 @Test(expected=IllegalStateException.class)
257 public void testChainedWriteTransactionsWithPreviousTxNotReady() throws Exception {
258 ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
260 expectBatchedModifications(actorRef, 1);
262 TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
264 DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
266 NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
267 writeTx1.write(TestModel.TEST_PATH, writeNode1);
269 NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
271 txChainProxy.newWriteOnlyTransaction();