Bug 1446: Add JMX stats for clustered data store
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortFailureTest.java
1 /*
2  *
3  *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  *  This program and the accompanying materials are made available under the
6  *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  *  and is available at http://www.eclipse.org/legal/epl-v10.html
8  *
9  */
10
11 package org.opendaylight.controller.cluster.datastore;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.testkit.TestActorRef;
16 import akka.util.Timeout;
17
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListeningExecutorService;
20 import com.google.common.util.concurrent.MoreExecutors;
21
22 import org.junit.BeforeClass;
23 import org.junit.Test;
24 import org.mockito.Mockito;
25 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
26 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
27 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
29 import org.opendaylight.controller.cluster.datastore.modification.Modification;
30 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
31 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
32 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
33 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
34 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
35 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
36 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
39
40 import scala.concurrent.Await;
41 import scala.concurrent.Future;
42 import scala.concurrent.duration.Duration;
43 import scala.concurrent.duration.FiniteDuration;
44
45 import java.util.Collections;
46 import java.util.concurrent.TimeUnit;
47
48 import static org.junit.Assert.assertTrue;
49 import static org.mockito.Mockito.when;
50
51
52 public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
53
54     private static ListeningExecutorService storeExecutor =
55         MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
56
57     private static final InMemoryDOMDataStore store =
58         new InMemoryDOMDataStore("OPER", storeExecutor,
59             MoreExecutors.sameThreadExecutor());
60
61     private static final SchemaContext testSchemaContext =
62         TestModel.createTestContext();
63
64     private static final ShardIdentifier SHARD_IDENTIFIER =
65         ShardIdentifier.builder().memberName("member-1")
66             .shardName("inventory").type("config").build();
67
68     private final DatastoreContext datastoreContext = new DatastoreContext();
69
70     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
71
72     @BeforeClass
73     public static void staticSetup() {
74         store.onGlobalContextUpdated(testSchemaContext);
75     }
76
77     private final FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
78
79
80     @Test(expected = TestException.class)
81     public void testNegativeAbortResultsInException() throws Exception {
82
83         final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
84                 Collections.EMPTY_MAP, datastoreContext));
85         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
86             .mock(DOMStoreThreePhaseCommitCohort.class);
87         final CompositeModification mockComposite =
88             Mockito.mock(CompositeModification.class);
89         final Props props =
90             ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
91
92         final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
93             .create(getSystem(), props,
94                 "testNegativeAbortResultsInException");
95
96         when(mockCohort.abort()).thenReturn(
97             Futures.<Void>immediateFailedFuture(new TestException()));
98
99         Future<Object> future =
100             akka.pattern.Patterns.ask(subject,
101                 ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder()
102                     .build(), 3000);
103         assertTrue(future.isCompleted());
104
105         Await.result(future, ASK_RESULT_DURATION);
106     }
107
108
109     @Test(expected = OptimisticLockFailedException.class)
110     public void testNegativeCanCommitResultsInException() throws Exception {
111
112         final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
113                 Collections.EMPTY_MAP, datastoreContext));
114         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
115             .mock(DOMStoreThreePhaseCommitCohort.class);
116         final CompositeModification mockComposite =
117             Mockito.mock(CompositeModification.class);
118         final Props props =
119             ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
120
121         final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
122             .create(getSystem(), props,
123                 "testNegativeCanCommitResultsInException");
124
125         when(mockCohort.canCommit()).thenReturn(
126             Futures
127                 .<Boolean>immediateFailedFuture(
128                     new OptimisticLockFailedException("some exception")));
129
130         Future<Object> future =
131             akka.pattern.Patterns.ask(subject,
132                 ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder()
133                     .build(), 3000);
134
135
136         Await.result(future, ASK_RESULT_DURATION);
137
138     }
139
140
141     @Test(expected = TestException.class)
142     public void testNegativePreCommitResultsInException() throws Exception {
143
144         final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
145                 Collections.EMPTY_MAP, datastoreContext));
146         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
147             .mock(DOMStoreThreePhaseCommitCohort.class);
148         final CompositeModification mockComposite =
149             Mockito.mock(CompositeModification.class);
150         final Props props =
151             ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite, shardStats);
152
153         final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
154             .create(getSystem(), props,
155                 "testNegativePreCommitResultsInException");
156
157         when(mockCohort.preCommit()).thenReturn(
158             Futures
159                 .<Void>immediateFailedFuture(
160                     new TestException()));
161
162         Future<Object> future =
163             akka.pattern.Patterns.ask(subject,
164                 ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder()
165                     .build(), 3000);
166
167         Await.result(future, ASK_RESULT_DURATION);
168
169     }
170
171     @Test(expected = TestException.class)
172     public void testNegativeCommitResultsInException() throws Exception {
173
174         final TestActorRef<Shard> subject = TestActorRef.create(getSystem(),
175                 Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext),
176                 "testNegativeCommitResultsInException");
177
178         final ActorRef shardTransaction =
179             getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
180                     testSchemaContext, datastoreContext, shardStats));
181
182         ShardTransactionMessages.WriteData writeData =
183             ShardTransactionMessages.WriteData.newBuilder()
184                 .setInstanceIdentifierPathArguments(
185                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
186                         .build()).setNormalizedNode(
187                 NormalizedNodeMessages.Node.newBuilder().build()
188
189             ).build();
190
191         Timeout askTimeout = new Timeout(ASK_RESULT_DURATION);
192
193         //This is done so that Modification list is updated which is used during commit
194         Future<Object> future = akka.pattern.Patterns.ask(shardTransaction, writeData, askTimeout);
195
196         //ready transaction creates the cohort so that we get into the
197         //block where in commmit is done
198         ShardTransactionMessages.ReadyTransaction readyTransaction =
199             ShardTransactionMessages.ReadyTransaction.newBuilder().build();
200
201         future = akka.pattern.Patterns.ask(shardTransaction, readyTransaction, askTimeout);
202
203         //but when the message is sent it will have the MockCommit object
204         //so that we can simulate throwing of exception
205         ForwardedCommitTransaction mockForwardCommitTransaction =
206             Mockito.mock(ForwardedCommitTransaction.class);
207         DOMStoreThreePhaseCommitCohort mockThreePhaseCommitTransaction =
208             Mockito.mock(DOMStoreThreePhaseCommitCohort.class);
209         when(mockForwardCommitTransaction.getCohort())
210             .thenReturn(mockThreePhaseCommitTransaction);
211         when(mockThreePhaseCommitTransaction.commit()).thenReturn(Futures
212             .<Void>immediateFailedFuture(
213                 new TestException()));
214         Modification mockModification = Mockito.mock(
215             Modification.class);
216         when(mockForwardCommitTransaction.getModification())
217             .thenReturn(mockModification);
218
219         when(mockModification.toSerializable()).thenReturn(
220             PersistentMessages.CompositeModification.newBuilder().build());
221
222         future = akka.pattern.Patterns.ask(subject, mockForwardCommitTransaction, askTimeout);
223         Await.result(future, ASK_RESULT_DURATION);
224     }
225
226     private class TestException extends Exception {
227     }
228 }