Merge "BUG 2820 - LLDP refactor"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardWriteTransaction.java
1 /*
2  *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *  Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  *  This program and the accompanying materials are made available under the
6  *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  *  and is available at http://www.eclipse.org/legal/epl-v10.html
8  *
9  */
10
11 package org.opendaylight.controller.cluster.datastore;
12
13 import akka.actor.ActorRef;
14 import akka.actor.PoisonPill;
15 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
16 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
17 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
18 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
19 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
20 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
21 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
23 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
27 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
28 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
29 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
30 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
31 import org.opendaylight.controller.cluster.datastore.modification.Modification;
32 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
33 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
34
35 /**
36  * @author: syedbahm
37  * Date: 8/6/14
38  */
39 public class ShardWriteTransaction extends ShardTransaction {
40
41     private final MutableCompositeModification compositeModification = new MutableCompositeModification();
42     private int totalBatchedModificationsReceived;
43     private Exception lastBatchedModificationsException;
44     private final ReadWriteShardDataTreeTransaction transaction;
45
46     public ShardWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor,
47             ShardStats shardStats, String transactionID, short clientTxVersion) {
48         super(shardActor, shardStats, transactionID, clientTxVersion);
49         this.transaction = transaction;
50     }
51
52     @Override
53     protected ReadWriteShardDataTreeTransaction getDOMStoreTransaction() {
54         return transaction;
55     }
56
57     @Override
58     public void handleReceive(Object message) throws Exception {
59
60         if (message instanceof BatchedModifications) {
61             batchedModifications((BatchedModifications)message);
62         } else if (message instanceof ReadyTransaction) {
63             readyTransaction(!SERIALIZED_REPLY, false);
64         } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
65             readyTransaction(SERIALIZED_REPLY, false);
66         } else if(WriteData.isSerializedType(message)) {
67             writeData(WriteData.fromSerializable(message), SERIALIZED_REPLY);
68
69         } else if(MergeData.isSerializedType(message)) {
70             mergeData(MergeData.fromSerializable(message), SERIALIZED_REPLY);
71
72         } else if(DeleteData.isSerializedType(message)) {
73             deleteData(DeleteData.fromSerializable(message), SERIALIZED_REPLY);
74
75         } else if (message instanceof GetCompositedModification) {
76             // This is here for testing only
77             getSender().tell(new GetCompositeModificationReply(compositeModification), getSelf());
78         } else {
79             super.handleReceive(message);
80         }
81     }
82
83     private void batchedModifications(BatchedModifications batched) {
84         if (checkClosed()) {
85             if (batched.isReady()) {
86                 getSelf().tell(PoisonPill.getInstance(), getSelf());
87             }
88             return;
89         }
90
91         try {
92             for(Modification modification: batched.getModifications()) {
93                 compositeModification.addModification(modification);
94                 modification.apply(transaction.getSnapshot());
95             }
96
97             totalBatchedModificationsReceived++;
98             if(batched.isReady()) {
99                 if(lastBatchedModificationsException != null) {
100                     throw lastBatchedModificationsException;
101                 }
102
103                 if(totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
104                     throw new IllegalStateException(String.format(
105                             "The total number of batched messages received %d does not match the number sent %d",
106                             totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
107                 }
108
109                 readyTransaction(false, batched.isDoCommitOnReady());
110             } else {
111                 getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
112             }
113         } catch (Exception e) {
114             lastBatchedModificationsException = e;
115             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
116
117             if(batched.isReady()) {
118                 getSelf().tell(PoisonPill.getInstance(), getSelf());
119             }
120         }
121     }
122
123     protected final void dataExists(DataExists message, final boolean returnSerialized) {
124         super.dataExists(transaction, message, returnSerialized);
125     }
126
127     protected final void readData(ReadData message, final boolean returnSerialized) {
128         super.readData(transaction, message, returnSerialized);
129     }
130
131     private boolean checkClosed() {
132         if (transaction.isClosed()) {
133             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException("Transaction is closed, no modifications allowed")), getSelf());
134             return true;
135         } else {
136             return false;
137         }
138     }
139
140     private void writeData(WriteData message, boolean returnSerialized) {
141         LOG.debug("writeData at path : {}", message.getPath());
142         if (checkClosed()) {
143             return;
144         }
145
146         compositeModification.addModification(
147                 new WriteModification(message.getPath(), message.getData()));
148         try {
149             transaction.getSnapshot().write(message.getPath(), message.getData());
150             WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
151             getSender().tell(returnSerialized ? writeDataReply.toSerializable(message.getVersion()) :
152                 writeDataReply, getSelf());
153         }catch(Exception e){
154             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
155         }
156     }
157
158     private void mergeData(MergeData message, boolean returnSerialized) {
159         LOG.debug("mergeData at path : {}", message.getPath());
160         if (checkClosed()) {
161             return;
162         }
163
164         compositeModification.addModification(
165                 new MergeModification(message.getPath(), message.getData()));
166
167         try {
168             transaction.getSnapshot().merge(message.getPath(), message.getData());
169             MergeDataReply mergeDataReply = MergeDataReply.INSTANCE;
170             getSender().tell(returnSerialized ? mergeDataReply.toSerializable(message.getVersion()) :
171                 mergeDataReply, getSelf());
172         }catch(Exception e){
173             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
174         }
175     }
176
177     private void deleteData(DeleteData message, boolean returnSerialized) {
178         LOG.debug("deleteData at path : {}", message.getPath());
179         if (checkClosed()) {
180             return;
181         }
182
183         compositeModification.addModification(new DeleteModification(message.getPath()));
184         try {
185             transaction.getSnapshot().delete(message.getPath());
186             DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE;
187             getSender().tell(returnSerialized ? deleteDataReply.toSerializable(message.getVersion()) :
188                 deleteDataReply, getSelf());
189         } catch(Exception e) {
190             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
191         }
192     }
193
194     private void readyTransaction(boolean returnSerialized, boolean doImmediateCommit) {
195         String transactionID = getTransactionID();
196
197         LOG.debug("readyTransaction : {}", transactionID);
198
199         ShardDataTreeCohort cohort =  transaction.ready();
200
201         getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
202                 cohort, compositeModification, returnSerialized, doImmediateCommit), getContext());
203
204         // The shard will handle the commit from here so we're no longer needed - self-destruct.
205         getSelf().tell(PoisonPill.getInstance(), getSelf());
206     }
207
208     // These classes are in here for test purposes only
209
210     static class GetCompositedModification {
211     }
212
213     static class GetCompositeModificationReply {
214         private final CompositeModification modification;
215
216
217         GetCompositeModificationReply(CompositeModification modification) {
218             this.modification = modification;
219         }
220
221         public CompositeModification getModification() {
222             return modification;
223         }
224     }
225 }