Merge "Add netconf-ssh as dependency to features-mdsal"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxyTest.java
1 /*
2  *
3  *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  *  This program and the accompanying materials are made available under the
6  *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  *  and is available at http://www.eclipse.org/legal/epl-v10.html
8  *
9  */
10
11 package org.opendaylight.controller.cluster.datastore;
12
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Matchers.anyObject;
16 import static org.mockito.Matchers.eq;
17 import static org.mockito.Matchers.isA;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.never;
21 import static org.mockito.Mockito.timeout;
22 import static org.mockito.Mockito.times;
23 import static org.mockito.Mockito.verify;
24 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE;
25 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY;
26 import akka.actor.ActorRef;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.junit.Assert;
31 import org.junit.Test;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
33 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
34 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
35 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
40 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
41 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
42 import scala.concurrent.Promise;
43
44 public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
45
46     @SuppressWarnings("resource")
47     @Test
48     public void testNewReadOnlyTransaction() throws Exception {
49
50      DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadOnlyTransaction();
51          Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
52
53     }
54
55     @SuppressWarnings("resource")
56     @Test
57     public void testNewReadWriteTransaction() throws Exception {
58         DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadWriteTransaction();
59         Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
60
61     }
62
63     @SuppressWarnings("resource")
64     @Test
65     public void testNewWriteOnlyTransaction() throws Exception {
66         DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newWriteOnlyTransaction();
67         Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
68
69     }
70
71     @Test
72     public void testClose() throws Exception {
73         new TransactionChainProxy(mockActorContext).close();
74
75         verify(mockActorContext, times(1)).broadcast(anyObject());
76     }
77
78     @Test
79     public void testTransactionChainsHaveUniqueId(){
80         TransactionChainProxy one = new TransactionChainProxy(mock(ActorContext.class));
81         TransactionChainProxy two = new TransactionChainProxy(mock(ActorContext.class));
82
83         Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
84     }
85
86     @Test
87     public void testRateLimitingUsedInReadWriteTxCreation(){
88         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
89
90         txChainProxy.newReadWriteTransaction();
91
92         verify(mockActorContext, times(1)).acquireTxCreationPermit();
93     }
94
95     @Test
96     public void testRateLimitingUsedInWriteOnlyTxCreation(){
97         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
98
99         txChainProxy.newWriteOnlyTransaction();
100
101         verify(mockActorContext, times(1)).acquireTxCreationPermit();
102     }
103
104
105     @Test
106     public void testRateLimitingNotUsedInReadOnlyTxCreation(){
107         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
108
109         txChainProxy.newReadOnlyTransaction();
110
111         verify(mockActorContext, times(0)).acquireTxCreationPermit();
112     }
113
114     /**
115      * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
116      * initiated until the first one completes its read future.
117      */
118     @Test
119     public void testChainedReadWriteTransactions() throws Exception {
120         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
121
122         ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
123
124         expectBatchedModifications(txActorRef1, 1);
125
126         Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
127         doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
128                 eq(actorSelection(txActorRef1)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
129
130         DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
131
132         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
133         writeTx1.write(TestModel.TEST_PATH, writeNode1);
134
135         writeTx1.ready();
136
137         verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1));
138
139         String tx2MemberName = "tx2MemberName";
140         doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
141         ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
142         ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
143                 DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
144
145         expectBatchedModifications(txActorRef2, 1);
146
147         final NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
148
149         final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
150
151         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
152         final CountDownLatch write2Complete = new CountDownLatch(1);
153         new Thread() {
154             @Override
155             public void run() {
156                 try {
157                     writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
158                 } catch (Exception e) {
159                     caughtEx.set(e);
160                 } finally {
161                     write2Complete.countDown();
162                 }
163             }
164         }.start();
165
166         assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
167
168         if(caughtEx.get() != null) {
169             throw caughtEx.get();
170         }
171
172         try {
173             verify(mockActorContext, never()).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
174                     eqCreateTransaction(tx2MemberName, READ_WRITE));
175         } catch (AssertionError e) {
176             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
177         }
178
179         readyReplyPromise1.success(readySerializedTxReply(txActorRef1.path().toString()).value().get().get());
180
181         verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
182                 eqCreateTransaction(tx2MemberName, READ_WRITE));
183     }
184
185     @Test(expected=IllegalStateException.class)
186     public void testChainedWriteTransactionsWithPreviousTxNotReady() throws Exception {
187         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
188
189         expectBatchedModifications(actorRef, 1);
190
191         TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
192
193         DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
194
195         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
196         writeTx1.write(TestModel.TEST_PATH, writeNode1);
197
198         NormalizedNode<?, ?> writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
199
200         txChainProxy.newWriteOnlyTransaction();
201     }
202 }