3 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 package org.opendaylight.controller.cluster.datastore;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.pattern.AskTimeoutException;
16 import akka.testkit.TestActorRef;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.Collections;
19 import java.util.concurrent.TimeUnit;
20 import org.junit.BeforeClass;
21 import org.junit.Test;
22 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
23 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
24 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
25 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
26 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
27 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
28 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
29 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
31 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
32 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
33 import scala.concurrent.Await;
34 import scala.concurrent.Future;
35 import scala.concurrent.duration.Duration;
38 * Covers negative test cases
40 * @author Basheeruddin Ahmed <syedbahm@cisco.com>
42 public class ShardTransactionFailureTest extends AbstractActorTest {
43 private static final InMemoryDOMDataStore store =
44 new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
46 private static final SchemaContext testSchemaContext =
47 TestModel.createTestContext();
49 private static final ShardIdentifier SHARD_IDENTIFIER =
50 ShardIdentifier.builder().memberName("member-1")
51 .shardName("inventory").type("operational").build();
53 private final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
55 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
58 public static void staticSetup() {
59 store.onGlobalContextUpdated(testSchemaContext);
62 private ActorRef createShard(){
63 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<String, String>emptyMap(), datastoreContext,
64 TestModel.createTestContext()));
67 @Test(expected = ReadFailedException.class)
68 public void testNegativeReadWithReadOnlyTransactionClosed()
71 final ActorRef shard = createShard();
72 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
73 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
75 final TestActorRef<ShardTransaction> subject = TestActorRef
76 .create(getSystem(), props,
77 "testNegativeReadWithReadOnlyTransactionClosed");
79 ShardTransactionMessages.ReadData readData =
80 ShardTransactionMessages.ReadData.newBuilder()
81 .setInstanceIdentifierPathArguments(
82 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
85 Future<Object> future =
86 akka.pattern.Patterns.ask(subject, readData, 3000);
87 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
89 subject.underlyingActor().getDOMStoreTransaction().close();
91 future = akka.pattern.Patterns.ask(subject, readData, 3000);
92 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
96 @Test(expected = ReadFailedException.class)
97 public void testNegativeReadWithReadWriteTransactionClosed()
100 final ActorRef shard = createShard();
101 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
102 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
104 final TestActorRef<ShardTransaction> subject = TestActorRef
105 .create(getSystem(), props,
106 "testNegativeReadWithReadWriteTransactionClosed");
108 ShardTransactionMessages.ReadData readData =
109 ShardTransactionMessages.ReadData.newBuilder()
110 .setInstanceIdentifierPathArguments(
111 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
115 Future<Object> future =
116 akka.pattern.Patterns.ask(subject, readData, 3000);
117 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
119 subject.underlyingActor().getDOMStoreTransaction().close();
121 future = akka.pattern.Patterns.ask(subject, readData, 3000);
122 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
125 @Test(expected = ReadFailedException.class)
126 public void testNegativeExistsWithReadWriteTransactionClosed()
129 final ActorRef shard = createShard();
130 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
131 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
133 final TestActorRef<ShardTransaction> subject = TestActorRef
134 .create(getSystem(), props,
135 "testNegativeExistsWithReadWriteTransactionClosed");
137 ShardTransactionMessages.DataExists dataExists =
138 ShardTransactionMessages.DataExists.newBuilder()
139 .setInstanceIdentifierPathArguments(
140 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
144 Future<Object> future =
145 akka.pattern.Patterns.ask(subject, dataExists, 3000);
146 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
148 subject.underlyingActor().getDOMStoreTransaction().close();
150 future = akka.pattern.Patterns.ask(subject, dataExists, 3000);
151 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
154 @Test(expected = AskTimeoutException.class)
155 public void testNegativeWriteWithTransactionReady() throws Exception {
158 final ActorRef shard = createShard();
159 final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
160 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
162 final TestActorRef<ShardTransaction> subject = TestActorRef
163 .create(getSystem(), props,
164 "testNegativeWriteWithTransactionReady");
166 ShardTransactionMessages.ReadyTransaction readyTransaction =
167 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
169 Future<Object> future =
170 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
171 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
173 ShardTransactionMessages.WriteData writeData =
174 ShardTransactionMessages.WriteData.newBuilder()
175 .setInstanceIdentifierPathArguments(
176 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
177 .build()).setNormalizedNode(
178 buildNormalizedNode()
182 future = akka.pattern.Patterns.ask(subject, writeData, 3000);
183 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
186 @Test(expected = AskTimeoutException.class)
187 public void testNegativeReadWriteWithTransactionReady() throws Exception {
190 final ActorRef shard = createShard();
191 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
192 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
194 final TestActorRef<ShardTransaction> subject = TestActorRef
195 .create(getSystem(), props,
196 "testNegativeReadWriteWithTransactionReady");
198 ShardTransactionMessages.ReadyTransaction readyTransaction =
199 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
201 Future<Object> future =
202 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
203 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
205 ShardTransactionMessages.WriteData writeData =
206 ShardTransactionMessages.WriteData.newBuilder()
207 .setInstanceIdentifierPathArguments(
208 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
211 .setNormalizedNode(buildNormalizedNode())
214 future = akka.pattern.Patterns.ask(subject, writeData, 3000);
215 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
218 private NormalizedNodeMessages.Node buildNormalizedNode() {
219 return NormalizedNodeSerializer
220 .serialize(Builders.containerBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).build());
223 @Test(expected = AskTimeoutException.class)
224 public void testNegativeMergeTransactionReady() throws Exception {
227 final ActorRef shard = createShard();
228 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
229 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
231 final TestActorRef<ShardTransaction> subject = TestActorRef
232 .create(getSystem(), props, "testNegativeMergeTransactionReady");
234 ShardTransactionMessages.ReadyTransaction readyTransaction =
235 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
237 Future<Object> future =
238 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
239 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
241 ShardTransactionMessages.MergeData mergeData =
242 ShardTransactionMessages.MergeData.newBuilder()
243 .setInstanceIdentifierPathArguments(
244 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
245 .build()).setNormalizedNode(
246 buildNormalizedNode()
250 future = akka.pattern.Patterns.ask(subject, mergeData, 3000);
251 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
255 @Test(expected = AskTimeoutException.class)
256 public void testNegativeDeleteDataWhenTransactionReady() throws Exception {
259 final ActorRef shard = createShard();
260 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
261 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
263 final TestActorRef<ShardTransaction> subject = TestActorRef
264 .create(getSystem(), props,
265 "testNegativeDeleteDataWhenTransactionReady");
267 ShardTransactionMessages.ReadyTransaction readyTransaction =
268 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
270 Future<Object> future =
271 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
272 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
274 ShardTransactionMessages.DeleteData deleteData =
275 ShardTransactionMessages.DeleteData.newBuilder()
276 .setInstanceIdentifierPathArguments(
277 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
280 future = akka.pattern.Patterns.ask(subject, deleteData, 3000);
281 Await.result(future, Duration.create(3, TimeUnit.SECONDS));