Fix license header violations in sal-distributed-datastore
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardWriteTransaction.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.cluster.datastore;
11
12 import akka.actor.ActorRef;
13 import akka.actor.PoisonPill;
14 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
15 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
16 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
17 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
18 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
19 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
20 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
22 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
23 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
26 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
27 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
28 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
29 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
30 import org.opendaylight.controller.cluster.datastore.modification.Modification;
31 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
32 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
33
34 /**
35  * @author: syedbahm
36  * Date: 8/6/14
37  */
38 public class ShardWriteTransaction extends ShardTransaction {
39
40     private final MutableCompositeModification compositeModification = new MutableCompositeModification();
41     private int totalBatchedModificationsReceived;
42     private Exception lastBatchedModificationsException;
43     private final ReadWriteShardDataTreeTransaction transaction;
44
45     public ShardWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor,
46             ShardStats shardStats, String transactionID, short clientTxVersion) {
47         super(shardActor, shardStats, transactionID, clientTxVersion);
48         this.transaction = transaction;
49     }
50
51     @Override
52     protected ReadWriteShardDataTreeTransaction getDOMStoreTransaction() {
53         return transaction;
54     }
55
56     @Override
57     public void handleReceive(Object message) throws Exception {
58
59         if (message instanceof BatchedModifications) {
60             batchedModifications((BatchedModifications)message);
61         } else if (message instanceof ReadyTransaction) {
62             readyTransaction(!SERIALIZED_REPLY, false);
63         } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
64             readyTransaction(SERIALIZED_REPLY, false);
65         } else if(WriteData.isSerializedType(message)) {
66             writeData(WriteData.fromSerializable(message), SERIALIZED_REPLY);
67
68         } else if(MergeData.isSerializedType(message)) {
69             mergeData(MergeData.fromSerializable(message), SERIALIZED_REPLY);
70
71         } else if(DeleteData.isSerializedType(message)) {
72             deleteData(DeleteData.fromSerializable(message), SERIALIZED_REPLY);
73
74         } else if (message instanceof GetCompositedModification) {
75             // This is here for testing only
76             getSender().tell(new GetCompositeModificationReply(compositeModification), getSelf());
77         } else {
78             super.handleReceive(message);
79         }
80     }
81
82     private void batchedModifications(BatchedModifications batched) {
83         if (checkClosed()) {
84             if (batched.isReady()) {
85                 getSelf().tell(PoisonPill.getInstance(), getSelf());
86             }
87             return;
88         }
89
90         try {
91             for(Modification modification: batched.getModifications()) {
92                 compositeModification.addModification(modification);
93                 modification.apply(transaction.getSnapshot());
94             }
95
96             totalBatchedModificationsReceived++;
97             if(batched.isReady()) {
98                 if(lastBatchedModificationsException != null) {
99                     throw lastBatchedModificationsException;
100                 }
101
102                 if(totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
103                     throw new IllegalStateException(String.format(
104                             "The total number of batched messages received %d does not match the number sent %d",
105                             totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
106                 }
107
108                 readyTransaction(false, batched.isDoCommitOnReady());
109             } else {
110                 getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
111             }
112         } catch (Exception e) {
113             lastBatchedModificationsException = e;
114             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
115
116             if(batched.isReady()) {
117                 getSelf().tell(PoisonPill.getInstance(), getSelf());
118             }
119         }
120     }
121
122     protected final void dataExists(DataExists message, final boolean returnSerialized) {
123         super.dataExists(transaction, message, returnSerialized);
124     }
125
126     protected final void readData(ReadData message, final boolean returnSerialized) {
127         super.readData(transaction, message, returnSerialized);
128     }
129
130     private boolean checkClosed() {
131         if (transaction.isClosed()) {
132             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException("Transaction is closed, no modifications allowed")), getSelf());
133             return true;
134         } else {
135             return false;
136         }
137     }
138
139     private void writeData(WriteData message, boolean returnSerialized) {
140         LOG.debug("writeData at path : {}", message.getPath());
141         if (checkClosed()) {
142             return;
143         }
144
145         compositeModification.addModification(
146                 new WriteModification(message.getPath(), message.getData()));
147         try {
148             transaction.getSnapshot().write(message.getPath(), message.getData());
149             WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
150             getSender().tell(returnSerialized ? writeDataReply.toSerializable(message.getVersion()) :
151                 writeDataReply, getSelf());
152         }catch(Exception e){
153             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
154         }
155     }
156
157     private void mergeData(MergeData message, boolean returnSerialized) {
158         LOG.debug("mergeData at path : {}", message.getPath());
159         if (checkClosed()) {
160             return;
161         }
162
163         compositeModification.addModification(
164                 new MergeModification(message.getPath(), message.getData()));
165
166         try {
167             transaction.getSnapshot().merge(message.getPath(), message.getData());
168             MergeDataReply mergeDataReply = MergeDataReply.INSTANCE;
169             getSender().tell(returnSerialized ? mergeDataReply.toSerializable(message.getVersion()) :
170                 mergeDataReply, getSelf());
171         }catch(Exception e){
172             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
173         }
174     }
175
176     private void deleteData(DeleteData message, boolean returnSerialized) {
177         LOG.debug("deleteData at path : {}", message.getPath());
178         if (checkClosed()) {
179             return;
180         }
181
182         compositeModification.addModification(new DeleteModification(message.getPath()));
183         try {
184             transaction.getSnapshot().delete(message.getPath());
185             DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE;
186             getSender().tell(returnSerialized ? deleteDataReply.toSerializable(message.getVersion()) :
187                 deleteDataReply, getSelf());
188         } catch(Exception e) {
189             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
190         }
191     }
192
193     private void readyTransaction(boolean returnSerialized, boolean doImmediateCommit) {
194         String transactionID = getTransactionID();
195
196         LOG.debug("readyTransaction : {}", transactionID);
197
198         ShardDataTreeCohort cohort =  transaction.ready();
199
200         getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
201                 cohort, compositeModification, returnSerialized, doImmediateCommit), getContext());
202
203         // The shard will handle the commit from here so we're no longer needed - self-destruct.
204         getSelf().tell(PoisonPill.getInstance(), getSelf());
205     }
206
207     // These classes are in here for test purposes only
208
209     static class GetCompositedModification {
210     }
211
212     static class GetCompositeModificationReply {
213         private final CompositeModification modification;
214
215
216         GetCompositeModificationReply(CompositeModification modification) {
217             this.modification = modification;
218         }
219
220         public CompositeModification getModification() {
221             return modification;
222         }
223     }
224 }