Modify ChainedTransactionProxy to override sending of FindPrimaryShard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionHeliumBackwardsCompatibilityTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.dispatch.Dispatchers;
15 import akka.testkit.TestActorRef;
16 import java.util.Collections;
17 import org.junit.Assert;
18 import org.junit.Test;
19 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
20 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
21 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
27 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
28 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
29 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
30 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
31 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
32 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35 import scala.concurrent.duration.FiniteDuration;
36
37 /**
38  * Tests backwards compatibility support from Helium-1 to Helium.
39  *
40  * In Helium-1, the 3-phase commit support was moved from the ThreePhaseCommitCohort actor to the
41  * Shard. As a consequence, a new transactionId field was added to the CanCommitTransaction,
42  * CommitTransaction and AbortTransaction messages. With a base Helium version node, these messages
43  * would be sans transactionId so this test verifies the Shard handles that properly.
44  *
45  * @author Thomas Pantelis
46  */
47 public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest {
48
49     @SuppressWarnings("unchecked")
50     @Test
51     public void testTransactionCommit() throws Exception {
52         new ShardTestKit(getSystem()) {{
53             SchemaContext schemaContext = TestModel.createTestContext();
54             Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
55                     shardName("inventory").type("config").build(),
56                     Collections.<ShardIdentifier,String>emptyMap(),
57                     DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
58                     schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
59
60             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
61                     "testTransactionCommit");
62
63             waitUntilLeader(shard);
64
65             // Send CreateTransaction message with no messages version
66
67             String transactionID = "txn-1";
68             shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
69                     .setTransactionId(transactionID)
70                     .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal())
71                     .setTransactionChainId("").build(), getRef());
72
73             final FiniteDuration duration = duration("5 seconds");
74
75             CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
76
77             ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
78
79             // Write data to the Tx
80
81             txActor.tell(new WriteData(TestModel.TEST_PATH,
82                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.BASE_HELIUM_VERSION).
83                         toSerializable(), getRef());
84
85             expectMsgClass(duration, ShardTransactionMessages.WriteDataReply.class);
86
87             // Ready the Tx
88
89             txActor.tell(new ReadyTransaction().toSerializable(), getRef());
90
91             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
92                     duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
93
94             ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
95
96             // Send the CanCommitTransaction message with no transactionId.
97
98             cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
99                     getRef());
100
101             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
102
103             // Send the PreCommitTransaction message with no transactionId.
104
105             cohortActor.tell(ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder().build(),
106                     getRef());
107
108             expectMsgClass(duration, PreCommitTransactionReply.SERIALIZABLE_CLASS);
109
110             // Send the CommitTransaction message with no transactionId.
111
112             cohortActor.tell(ThreePhaseCommitCohortMessages.CommitTransaction.newBuilder().build(),
113                     getRef());
114
115             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
116
117             NormalizedNode<?, ?> node = ShardTest.readStore(shard, TestModel.TEST_PATH);
118             Assert.assertNotNull("Data not found in store", node);
119
120             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
121         }};
122     }
123
124     @Test
125     public void testTransactionAbort() throws Exception {
126         new ShardTestKit(getSystem()) {{
127             SchemaContext schemaContext = TestModel.createTestContext();
128             Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
129                     shardName("inventory").type("config").build(),
130                     Collections.<ShardIdentifier,String>emptyMap(),
131                     DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
132                     schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
133
134             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
135                     "testTransactionAbort");
136
137             waitUntilLeader(shard);
138
139             // Send CreateTransaction message with no messages version
140
141             String transactionID = "txn-1";
142             shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
143                     .setTransactionId(transactionID)
144                     .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal())
145                     .setTransactionChainId("").build(), getRef());
146
147             final FiniteDuration duration = duration("5 seconds");
148
149             CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
150
151             ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
152
153             // Write data to the Tx
154
155             txActor.tell(new WriteData(TestModel.TEST_PATH,
156                     ImmutableNodes.containerNode(TestModel.TEST_QNAME),
157                     DataStoreVersions.BASE_HELIUM_VERSION).toSerializable(), getRef());
158
159             expectMsgClass(duration, WriteDataReply.INSTANCE.toSerializable(
160                     DataStoreVersions.BASE_HELIUM_VERSION).getClass());
161
162             // Ready the Tx
163
164             txActor.tell(new ReadyTransaction().toSerializable(), getRef());
165
166             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
167                     duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
168
169             ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
170
171             // Send the CanCommitTransaction message with no transactionId.
172
173             cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
174                     getRef());
175
176             expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
177
178             // Send the AbortTransaction message with no transactionId.
179
180             cohortActor.tell(ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder().build(),
181                     getRef());
182
183             expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
184
185             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
186         }};
187     }
188 }