Define DataStoreVersions.MAGNESIUM_VERSION
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardWriteTransaction.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.cluster.datastore;
11
12 import akka.actor.ActorRef;
13 import akka.actor.PoisonPill;
14 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
15 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
16 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
17 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
18 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
19 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
20 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
21 import org.opendaylight.controller.cluster.datastore.modification.Modification;
22
23 /**
24  * Actor for a shard write-only transaction.
25  *
26  * @author syedbahm
27  */
28 public class ShardWriteTransaction extends ShardTransaction {
29
30     private int totalBatchedModificationsReceived;
31     private Exception lastBatchedModificationsException;
32     private final ReadWriteShardDataTreeTransaction transaction;
33
34     public ShardWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor,
35             ShardStats shardStats) {
36         super(shardActor, shardStats, transaction.getIdentifier());
37         this.transaction = transaction;
38     }
39
40     @Override
41     protected ReadWriteShardDataTreeTransaction getDOMStoreTransaction() {
42         return transaction;
43     }
44
45     @Override
46     public void handleReceive(Object message) {
47         if (message instanceof BatchedModifications) {
48             batchedModifications((BatchedModifications)message);
49         } else {
50             super.handleReceive(message);
51         }
52     }
53
54     @SuppressWarnings("checkstyle:IllegalCatch")
55     private void batchedModifications(BatchedModifications batched) {
56         if (checkClosed()) {
57             if (batched.isReady()) {
58                 getSelf().tell(PoisonPill.getInstance(), getSelf());
59             }
60             return;
61         }
62
63         try {
64             for (Modification modification: batched.getModifications()) {
65                 modification.apply(transaction.getSnapshot());
66             }
67
68             totalBatchedModificationsReceived++;
69             if (batched.isReady()) {
70                 if (lastBatchedModificationsException != null) {
71                     throw lastBatchedModificationsException;
72                 }
73
74                 if (totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
75                     throw new IllegalStateException(String.format(
76                             "The total number of batched messages received %d does not match the number sent %d",
77                             totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
78                 }
79
80                 readyTransaction(batched);
81             } else {
82                 getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
83             }
84         } catch (Exception e) {
85             lastBatchedModificationsException = e;
86             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
87
88             if (batched.isReady()) {
89                 getSelf().tell(PoisonPill.getInstance(), getSelf());
90             }
91         }
92     }
93
94     protected final void dataExists(DataExists message) {
95         super.dataExists(transaction, message);
96     }
97
98     protected final void readData(ReadData message) {
99         super.readData(transaction, message);
100     }
101
102     private boolean checkClosed() {
103         if (transaction.isClosed()) {
104             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(
105                     "Transaction is closed, no modifications allowed")), getSelf());
106             return true;
107         } else {
108             return false;
109         }
110     }
111
112     private void readyTransaction(BatchedModifications batched) {
113         TransactionIdentifier transactionID = getTransactionId();
114
115         LOG.debug("readyTransaction : {}", transactionID);
116
117         getShardActor().forward(new ForwardedReadyTransaction(transactionID, batched.getVersion(),
118                 transaction, batched.isDoCommitOnReady(), batched.getParticipatingShardNames()), getContext());
119
120         // The shard will handle the commit from here so we're no longer needed - self-destruct.
121         getSelf().tell(PoisonPill.getInstance(), getSelf());
122     }
123 }