3 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 package org.opendaylight.controller.cluster.datastore;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.testkit.TestActorRef;
16 import com.google.common.util.concurrent.ListeningExecutorService;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import org.junit.BeforeClass;
19 import org.junit.Test;
20 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
21 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
22 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
23 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
24 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
25 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
26 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
27 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
30 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
31 import scala.concurrent.Await;
32 import scala.concurrent.Future;
33 import scala.concurrent.duration.Duration;
34 import java.util.Collections;
35 import java.util.concurrent.TimeUnit;
38 * Covers negative test cases
40 * @author Basheeruddin Ahmed <syedbahm@cisco.com>
42 public class ShardTransactionFailureTest extends AbstractActorTest {
43 private static ListeningExecutorService storeExecutor =
44 MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
46 private static final InMemoryDOMDataStore store =
47 new InMemoryDOMDataStore("OPER", storeExecutor,
48 MoreExecutors.sameThreadExecutor());
50 private static final SchemaContext testSchemaContext =
51 TestModel.createTestContext();
53 private static final ShardIdentifier SHARD_IDENTIFIER =
54 ShardIdentifier.builder().memberName("member-1")
55 .shardName("inventory").type("operational").build();
57 private final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
59 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
62 public static void staticSetup() {
63 store.onGlobalContextUpdated(testSchemaContext);
66 private ActorRef createShard(){
67 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, datastoreContext,
68 TestModel.createTestContext()));
71 @Test(expected = ReadFailedException.class)
72 public void testNegativeReadWithReadOnlyTransactionClosed()
75 final ActorRef shard = createShard();
76 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
77 testSchemaContext, datastoreContext, shardStats, "txn");
79 final TestActorRef<ShardTransaction> subject = TestActorRef
80 .create(getSystem(), props,
81 "testNegativeReadWithReadOnlyTransactionClosed");
83 ShardTransactionMessages.ReadData readData =
84 ShardTransactionMessages.ReadData.newBuilder()
85 .setInstanceIdentifierPathArguments(
86 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
89 Future<Object> future =
90 akka.pattern.Patterns.ask(subject, readData, 3000);
91 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
93 subject.underlyingActor().getDOMStoreTransaction().close();
95 future = akka.pattern.Patterns.ask(subject, readData, 3000);
96 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
100 @Test(expected = ReadFailedException.class)
101 public void testNegativeReadWithReadWriteTransactionClosed()
104 final ActorRef shard = createShard();
105 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
106 testSchemaContext, datastoreContext, shardStats, "txn");
108 final TestActorRef<ShardTransaction> subject = TestActorRef
109 .create(getSystem(), props,
110 "testNegativeReadWithReadWriteTransactionClosed");
112 ShardTransactionMessages.ReadData readData =
113 ShardTransactionMessages.ReadData.newBuilder()
114 .setInstanceIdentifierPathArguments(
115 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
119 Future<Object> future =
120 akka.pattern.Patterns.ask(subject, readData, 3000);
121 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
123 subject.underlyingActor().getDOMStoreTransaction().close();
125 future = akka.pattern.Patterns.ask(subject, readData, 3000);
126 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
129 @Test(expected = ReadFailedException.class)
130 public void testNegativeExistsWithReadWriteTransactionClosed()
133 final ActorRef shard = createShard();
134 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
135 testSchemaContext, datastoreContext, shardStats, "txn");
137 final TestActorRef<ShardTransaction> subject = TestActorRef
138 .create(getSystem(), props,
139 "testNegativeExistsWithReadWriteTransactionClosed");
141 ShardTransactionMessages.DataExists dataExists =
142 ShardTransactionMessages.DataExists.newBuilder()
143 .setInstanceIdentifierPathArguments(
144 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
148 Future<Object> future =
149 akka.pattern.Patterns.ask(subject, dataExists, 3000);
150 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
152 subject.underlyingActor().getDOMStoreTransaction().close();
154 future = akka.pattern.Patterns.ask(subject, dataExists, 3000);
155 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
158 @Test(expected = IllegalStateException.class)
159 public void testNegativeWriteWithTransactionReady() throws Exception {
162 final ActorRef shard = createShard();
163 final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
164 testSchemaContext, datastoreContext, shardStats, "txn");
166 final TestActorRef<ShardTransaction> subject = TestActorRef
167 .create(getSystem(), props,
168 "testNegativeWriteWithTransactionReady");
170 ShardTransactionMessages.ReadyTransaction readyTransaction =
171 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
173 Future<Object> future =
174 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
175 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
177 ShardTransactionMessages.WriteData writeData =
178 ShardTransactionMessages.WriteData.newBuilder()
179 .setInstanceIdentifierPathArguments(
180 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
181 .build()).setNormalizedNode(
182 buildNormalizedNode()
186 future = akka.pattern.Patterns.ask(subject, writeData, 3000);
187 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
190 @Test(expected = IllegalStateException.class)
191 public void testNegativeReadWriteWithTransactionReady() throws Exception {
194 final ActorRef shard = createShard();
195 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
196 testSchemaContext, datastoreContext, shardStats, "txn");
198 final TestActorRef<ShardTransaction> subject = TestActorRef
199 .create(getSystem(), props,
200 "testNegativeReadWriteWithTransactionReady");
202 ShardTransactionMessages.ReadyTransaction readyTransaction =
203 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
205 Future<Object> future =
206 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
207 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
209 ShardTransactionMessages.WriteData writeData =
210 ShardTransactionMessages.WriteData.newBuilder()
211 .setInstanceIdentifierPathArguments(
212 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
215 .setNormalizedNode(buildNormalizedNode())
218 future = akka.pattern.Patterns.ask(subject, writeData, 3000);
219 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
222 private NormalizedNodeMessages.Node buildNormalizedNode() {
223 return NormalizedNodeSerializer
224 .serialize(Builders.containerBuilder().withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).build());
227 @Test(expected = IllegalStateException.class)
228 public void testNegativeMergeTransactionReady() throws Exception {
231 final ActorRef shard = createShard();
232 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
233 testSchemaContext, datastoreContext, shardStats, "txn");
235 final TestActorRef<ShardTransaction> subject = TestActorRef
236 .create(getSystem(), props, "testNegativeMergeTransactionReady");
238 ShardTransactionMessages.ReadyTransaction readyTransaction =
239 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
241 Future<Object> future =
242 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
243 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
245 ShardTransactionMessages.MergeData mergeData =
246 ShardTransactionMessages.MergeData.newBuilder()
247 .setInstanceIdentifierPathArguments(
248 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
249 .build()).setNormalizedNode(
250 buildNormalizedNode()
254 future = akka.pattern.Patterns.ask(subject, mergeData, 3000);
255 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
259 @Test(expected = IllegalStateException.class)
260 public void testNegativeDeleteDataWhenTransactionReady() throws Exception {
263 final ActorRef shard = createShard();
264 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
265 testSchemaContext, datastoreContext, shardStats, "txn");
267 final TestActorRef<ShardTransaction> subject = TestActorRef
268 .create(getSystem(), props,
269 "testNegativeDeleteDataWhenTransactionReady");
271 ShardTransactionMessages.ReadyTransaction readyTransaction =
272 ShardTransactionMessages.ReadyTransaction.newBuilder().build();
274 Future<Object> future =
275 akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
276 Await.result(future, Duration.create(3, TimeUnit.SECONDS));
278 ShardTransactionMessages.DeleteData deleteData =
279 ShardTransactionMessages.DeleteData.newBuilder()
280 .setInstanceIdentifierPathArguments(
281 NormalizedNodeMessages.InstanceIdentifier.newBuilder()
284 future = akka.pattern.Patterns.ask(subject, deleteData, 3000);
285 Await.result(future, Duration.create(3, TimeUnit.SECONDS));