2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.cluster.datastore;
12 import akka.actor.ActorRef;
13 import akka.actor.PoisonPill;
14 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
15 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
16 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
17 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
18 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
19 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
20 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
22 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
23 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
26 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
27 import org.opendaylight.controller.cluster.datastore.modification.Modification;
33 public class ShardWriteTransaction extends ShardTransaction {
35 private int totalBatchedModificationsReceived;
36 private Exception lastBatchedModificationsException;
37 private final ReadWriteShardDataTreeTransaction transaction;
39 public ShardWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor,
40 ShardStats shardStats, String transactionID, short clientTxVersion) {
41 super(shardActor, shardStats, transactionID, clientTxVersion);
42 this.transaction = transaction;
46 protected ReadWriteShardDataTreeTransaction getDOMStoreTransaction() {
51 public void handleReceive(Object message) throws Exception {
53 if (message instanceof BatchedModifications) {
54 batchedModifications((BatchedModifications)message);
55 } else if (message instanceof ReadyTransaction) {
56 readyTransaction(!SERIALIZED_REPLY, false);
57 } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
58 readyTransaction(SERIALIZED_REPLY, false);
59 } else if(WriteData.isSerializedType(message)) {
60 writeData(WriteData.fromSerializable(message), SERIALIZED_REPLY);
62 } else if(MergeData.isSerializedType(message)) {
63 mergeData(MergeData.fromSerializable(message), SERIALIZED_REPLY);
65 } else if(DeleteData.isSerializedType(message)) {
66 deleteData(DeleteData.fromSerializable(message), SERIALIZED_REPLY);
68 super.handleReceive(message);
72 private void batchedModifications(BatchedModifications batched) {
74 if (batched.isReady()) {
75 getSelf().tell(PoisonPill.getInstance(), getSelf());
81 for(Modification modification: batched.getModifications()) {
82 modification.apply(transaction.getSnapshot());
85 totalBatchedModificationsReceived++;
86 if(batched.isReady()) {
87 if(lastBatchedModificationsException != null) {
88 throw lastBatchedModificationsException;
91 if(totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
92 throw new IllegalStateException(String.format(
93 "The total number of batched messages received %d does not match the number sent %d",
94 totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
97 readyTransaction(false, batched.isDoCommitOnReady());
99 getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
101 } catch (Exception e) {
102 lastBatchedModificationsException = e;
103 getSender().tell(new akka.actor.Status.Failure(e), getSelf());
105 if(batched.isReady()) {
106 getSelf().tell(PoisonPill.getInstance(), getSelf());
111 protected final void dataExists(DataExists message, final boolean returnSerialized) {
112 super.dataExists(transaction, message, returnSerialized);
115 protected final void readData(ReadData message, final boolean returnSerialized) {
116 super.readData(transaction, message, returnSerialized);
119 private boolean checkClosed() {
120 if (transaction.isClosed()) {
121 getSender().tell(new akka.actor.Status.Failure(new IllegalStateException("Transaction is closed, no modifications allowed")), getSelf());
128 private void writeData(WriteData message, boolean returnSerialized) {
129 LOG.debug("writeData at path : {}", message.getPath());
135 transaction.getSnapshot().write(message.getPath(), message.getData());
136 WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
137 getSender().tell(returnSerialized ? writeDataReply.toSerializable(message.getVersion()) :
138 writeDataReply, getSelf());
140 getSender().tell(new akka.actor.Status.Failure(e), getSelf());
144 private void mergeData(MergeData message, boolean returnSerialized) {
145 LOG.debug("mergeData at path : {}", message.getPath());
151 transaction.getSnapshot().merge(message.getPath(), message.getData());
152 MergeDataReply mergeDataReply = MergeDataReply.INSTANCE;
153 getSender().tell(returnSerialized ? mergeDataReply.toSerializable(message.getVersion()) :
154 mergeDataReply, getSelf());
156 getSender().tell(new akka.actor.Status.Failure(e), getSelf());
160 private void deleteData(DeleteData message, boolean returnSerialized) {
161 LOG.debug("deleteData at path : {}", message.getPath());
167 transaction.getSnapshot().delete(message.getPath());
168 DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE;
169 getSender().tell(returnSerialized ? deleteDataReply.toSerializable(message.getVersion()) :
170 deleteDataReply, getSelf());
171 } catch(Exception e) {
172 getSender().tell(new akka.actor.Status.Failure(e), getSelf());
176 private void readyTransaction(boolean returnSerialized, boolean doImmediateCommit) {
177 String transactionID = getTransactionID();
179 LOG.debug("readyTransaction : {}", transactionID);
181 getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
182 transaction, returnSerialized, doImmediateCommit), getContext());
184 // The shard will handle the commit from here so we're no longer needed - self-destruct.
185 getSelf().tell(PoisonPill.getInstance(), getSelf());