Move Tx ready call from ShardWriteTransaction to Shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardWriteTransaction.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.cluster.datastore;
11
12 import akka.actor.ActorRef;
13 import akka.actor.PoisonPill;
14 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
15 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
16 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
17 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
18 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
19 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
20 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
22 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
23 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
26 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
27 import org.opendaylight.controller.cluster.datastore.modification.Modification;
28
29 /**
30  * @author: syedbahm
31  * Date: 8/6/14
32  */
33 public class ShardWriteTransaction extends ShardTransaction {
34
35     private int totalBatchedModificationsReceived;
36     private Exception lastBatchedModificationsException;
37     private final ReadWriteShardDataTreeTransaction transaction;
38
39     public ShardWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor,
40             ShardStats shardStats, String transactionID, short clientTxVersion) {
41         super(shardActor, shardStats, transactionID, clientTxVersion);
42         this.transaction = transaction;
43     }
44
45     @Override
46     protected ReadWriteShardDataTreeTransaction getDOMStoreTransaction() {
47         return transaction;
48     }
49
50     @Override
51     public void handleReceive(Object message) throws Exception {
52
53         if (message instanceof BatchedModifications) {
54             batchedModifications((BatchedModifications)message);
55         } else if (message instanceof ReadyTransaction) {
56             readyTransaction(!SERIALIZED_REPLY, false);
57         } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
58             readyTransaction(SERIALIZED_REPLY, false);
59         } else if(WriteData.isSerializedType(message)) {
60             writeData(WriteData.fromSerializable(message), SERIALIZED_REPLY);
61
62         } else if(MergeData.isSerializedType(message)) {
63             mergeData(MergeData.fromSerializable(message), SERIALIZED_REPLY);
64
65         } else if(DeleteData.isSerializedType(message)) {
66             deleteData(DeleteData.fromSerializable(message), SERIALIZED_REPLY);
67         } else {
68             super.handleReceive(message);
69         }
70     }
71
72     private void batchedModifications(BatchedModifications batched) {
73         if (checkClosed()) {
74             if (batched.isReady()) {
75                 getSelf().tell(PoisonPill.getInstance(), getSelf());
76             }
77             return;
78         }
79
80         try {
81             for(Modification modification: batched.getModifications()) {
82                 modification.apply(transaction.getSnapshot());
83             }
84
85             totalBatchedModificationsReceived++;
86             if(batched.isReady()) {
87                 if(lastBatchedModificationsException != null) {
88                     throw lastBatchedModificationsException;
89                 }
90
91                 if(totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
92                     throw new IllegalStateException(String.format(
93                             "The total number of batched messages received %d does not match the number sent %d",
94                             totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
95                 }
96
97                 readyTransaction(false, batched.isDoCommitOnReady());
98             } else {
99                 getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
100             }
101         } catch (Exception e) {
102             lastBatchedModificationsException = e;
103             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
104
105             if(batched.isReady()) {
106                 getSelf().tell(PoisonPill.getInstance(), getSelf());
107             }
108         }
109     }
110
111     protected final void dataExists(DataExists message, final boolean returnSerialized) {
112         super.dataExists(transaction, message, returnSerialized);
113     }
114
115     protected final void readData(ReadData message, final boolean returnSerialized) {
116         super.readData(transaction, message, returnSerialized);
117     }
118
119     private boolean checkClosed() {
120         if (transaction.isClosed()) {
121             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException("Transaction is closed, no modifications allowed")), getSelf());
122             return true;
123         } else {
124             return false;
125         }
126     }
127
128     private void writeData(WriteData message, boolean returnSerialized) {
129         LOG.debug("writeData at path : {}", message.getPath());
130         if (checkClosed()) {
131             return;
132         }
133
134         try {
135             transaction.getSnapshot().write(message.getPath(), message.getData());
136             WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
137             getSender().tell(returnSerialized ? writeDataReply.toSerializable(message.getVersion()) :
138                 writeDataReply, getSelf());
139         }catch(Exception e){
140             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
141         }
142     }
143
144     private void mergeData(MergeData message, boolean returnSerialized) {
145         LOG.debug("mergeData at path : {}", message.getPath());
146         if (checkClosed()) {
147             return;
148         }
149
150         try {
151             transaction.getSnapshot().merge(message.getPath(), message.getData());
152             MergeDataReply mergeDataReply = MergeDataReply.INSTANCE;
153             getSender().tell(returnSerialized ? mergeDataReply.toSerializable(message.getVersion()) :
154                 mergeDataReply, getSelf());
155         }catch(Exception e){
156             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
157         }
158     }
159
160     private void deleteData(DeleteData message, boolean returnSerialized) {
161         LOG.debug("deleteData at path : {}", message.getPath());
162         if (checkClosed()) {
163             return;
164         }
165
166         try {
167             transaction.getSnapshot().delete(message.getPath());
168             DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE;
169             getSender().tell(returnSerialized ? deleteDataReply.toSerializable(message.getVersion()) :
170                 deleteDataReply, getSelf());
171         } catch(Exception e) {
172             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
173         }
174     }
175
176     private void readyTransaction(boolean returnSerialized, boolean doImmediateCommit) {
177         String transactionID = getTransactionID();
178
179         LOG.debug("readyTransaction : {}", transactionID);
180
181         getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
182                 transaction, returnSerialized, doImmediateCommit), getContext());
183
184         // The shard will handle the commit from here so we're no longer needed - self-destruct.
185         getSelf().tell(PoisonPill.getInstance(), getSelf());
186     }
187 }