3 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 package org.opendaylight.controller.cluster.datastore;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.pattern.AskTimeoutException;
16 import akka.testkit.TestActorRef;
17 import com.google.common.util.concurrent.ListeningExecutorService;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.util.Collections;
20 import java.util.concurrent.TimeUnit;
21 import org.junit.BeforeClass;
22 import org.junit.Test;
23 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
24 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
26 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
27 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
28 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
29 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
30 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
31 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35 import scala.concurrent.Await;
36 import scala.concurrent.Future;
37 import scala.concurrent.duration.Duration;
40 * Covers negative test cases
42 * @author Basheeruddin Ahmed <syedbahm@cisco.com>
44 public class ShardTransactionFailureTest extends AbstractActorTest {
45 private static ListeningExecutorService storeExecutor =
46 MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
48 private static final InMemoryDOMDataStore store =
49 new InMemoryDOMDataStore("OPER", storeExecutor,
50 MoreExecutors.sameThreadExecutor());
52 private static final SchemaContext testSchemaContext =
53 TestModel.createTestContext();
55 private static final ShardIdentifier SHARD_IDENTIFIER =
56 ShardIdentifier.builder().memberName("member-1")
57 .shardName("inventory").type("operational").build();
59 private final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
61 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
64 public static void staticSetup() {
65 store.onGlobalContextUpdated(testSchemaContext);
68 private ActorRef createShard(){
69 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<ShardIdentifier, String>emptyMap(), datastoreContext,
70 TestModel.createTestContext()));
73 @Test(expected = ReadFailedException.class)
74 public void testNegativeReadWithReadOnlyTransactionClosed()
77 final ActorRef shard = createShard();
78 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
79 testSchemaContext, datastoreContext, shardStats, "txn",
80 CreateTransaction.CURRENT_VERSION);
82 final TestActorRef<ShardTransaction> subject = TestActorRef
83 .create(getSystem(), props,
84 "testNegativeReadWithReadOnlyTransactionClosed");
86 ShardTransactionMessages.ReadData readData =
87 ShardTransactionMessages.ReadData.newBuilder()
88 .setInstanceIdentifierPathArguments(
89 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
92 Future<Object> future =
93 akka.pattern.Patterns.ask(subject, readData, 3000);
94 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
96 subject.underlyingActor().getDOMStoreTransaction().close();
98 future = akka.pattern.Patterns.ask(subject, readData, 3000);
99 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
103 @Test(expected = ReadFailedException.class)
104 public void testNegativeReadWithReadWriteTransactionClosed()
107 final ActorRef shard = createShard();
108 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
109 testSchemaContext, datastoreContext, shardStats, "txn",
110 CreateTransaction.CURRENT_VERSION);
112 final TestActorRef<ShardTransaction> subject = TestActorRef
113 .create(getSystem(), props,
114 "testNegativeReadWithReadWriteTransactionClosed");
116 ShardTransactionMessages.ReadData readData =
117 ShardTransactionMessages.ReadData.newBuilder()
118 .setInstanceIdentifierPathArguments(
119 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
123 Future<Object> future =
124 akka.pattern.Patterns.ask(subject, readData, 3000);
125 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
127 subject.underlyingActor().getDOMStoreTransaction().close();
129 future = akka.pattern.Patterns.ask(subject, readData, 3000);
130 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
133 @Test(expected = ReadFailedException.class)
134 public void testNegativeExistsWithReadWriteTransactionClosed()
137 final ActorRef shard = createShard();
138 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
139 testSchemaContext, datastoreContext, shardStats, "txn",
140 CreateTransaction.CURRENT_VERSION);
142 final TestActorRef<ShardTransaction> subject = TestActorRef
143 .create(getSystem(), props,
144 "testNegativeExistsWithReadWriteTransactionClosed");
146 ShardTransactionMessages.DataExists dataExists =
147 ShardTransactionMessages.DataExists.newBuilder()
148 .setInstanceIdentifierPathArguments(
149 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
153 Future<Object> future =
154 akka.pattern.Patterns.ask(subject, dataExists, 3000);
155 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
157 subject.underlyingActor().getDOMStoreTransaction().close();
159 future = akka.pattern.Patterns.ask(subject, dataExists, 3000);
160 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
163 @Test(expected = AskTimeoutException.class)
164 public void testNegativeWriteWithTransactionReady() throws Exception {
167 final ActorRef shard = createShard();
168 final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
169 testSchemaContext, datastoreContext, shardStats, "txn",
170 CreateTransaction.CURRENT_VERSION);
172 final TestActorRef<ShardTransaction> subject = TestActorRef
173 .create(getSystem(), props,
174 "testNegativeWriteWithTransactionReady");
176 ShardTransactionMessages.ReadyTransaction readyTransaction =
177 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
179 Future<Object> future =
180 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
181 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
183 ShardTransactionMessages.WriteData writeData =
184 ShardTransactionMessages.WriteData.newBuilder()
185 .setInstanceIdentifierPathArguments(
186 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
187 .build()).setNormalizedNode(
188 buildNormalizedNode()
192 future = akka.pattern.Patterns.ask(subject, writeData, 3000);
193 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
196 @Test(expected = AskTimeoutException.class)
197 public void testNegativeReadWriteWithTransactionReady() throws Exception {
200 final ActorRef shard = createShard();
201 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
202 testSchemaContext, datastoreContext, shardStats, "txn",
203 CreateTransaction.CURRENT_VERSION);
205 final TestActorRef<ShardTransaction> subject = TestActorRef
206 .create(getSystem(), props,
207 "testNegativeReadWriteWithTransactionReady");
209 ShardTransactionMessages.ReadyTransaction readyTransaction =
210 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
212 Future<Object> future =
213 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
214 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
216 ShardTransactionMessages.WriteData writeData =
217 ShardTransactionMessages.WriteData.newBuilder()
218 .setInstanceIdentifierPathArguments(
219 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
222 .setNormalizedNode(buildNormalizedNode())
225 future = akka.pattern.Patterns.ask(subject, writeData, 3000);
226 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
229 private NormalizedNodeMessages.Node buildNormalizedNode() {
230 return NormalizedNodeSerializer
231 .serialize(Builders.containerBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).build());
234 @Test(expected = AskTimeoutException.class)
235 public void testNegativeMergeTransactionReady() throws Exception {
238 final ActorRef shard = createShard();
239 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
240 testSchemaContext, datastoreContext, shardStats, "txn",
241 CreateTransaction.CURRENT_VERSION);
243 final TestActorRef<ShardTransaction> subject = TestActorRef
244 .create(getSystem(), props, "testNegativeMergeTransactionReady");
246 ShardTransactionMessages.ReadyTransaction readyTransaction =
247 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
249 Future<Object> future =
250 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
251 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
253 ShardTransactionMessages.MergeData mergeData =
254 ShardTransactionMessages.MergeData.newBuilder()
255 .setInstanceIdentifierPathArguments(
256 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
257 .build()).setNormalizedNode(
258 buildNormalizedNode()
262 future = akka.pattern.Patterns.ask(subject, mergeData, 3000);
263 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
267 @Test(expected = AskTimeoutException.class)
268 public void testNegativeDeleteDataWhenTransactionReady() throws Exception {
271 final ActorRef shard = createShard();
272 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
273 testSchemaContext, datastoreContext, shardStats, "txn",
274 CreateTransaction.CURRENT_VERSION);
276 final TestActorRef<ShardTransaction> subject = TestActorRef
277 .create(getSystem(), props,
278 "testNegativeDeleteDataWhenTransactionReady");
280 ShardTransactionMessages.ReadyTransaction readyTransaction =
281 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
283 Future<Object> future =
284 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
285 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
287 ShardTransactionMessages.DeleteData deleteData =
288 ShardTransactionMessages.DeleteData.newBuilder()
289 .setInstanceIdentifierPathArguments(
290 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
293 future = akka.pattern.Patterns.ask(subject, deleteData, 3000);
294 Await.result(future, Duration.create(3, TimeUnit.SECONDS));