Merge "Fixes Bug 2935"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransaction.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
19 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
34
35 /**
36  * The ShardTransaction Actor represents a remote transaction
37  * <p>
38  * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
39  * </p>
40  * <p>
41  * Handles Messages <br/>
42  * ---------------- <br/>
43  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
44  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.WriteData}
45  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.MergeData}
46  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.DeleteData}
47  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction}
48  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
49  * </p>
50  */
51 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
52
53     protected static final boolean SERIALIZED_REPLY = true;
54
55     private final ActorRef shardActor;
56     private final ShardStats shardStats;
57     private final String transactionID;
58     private final short clientTxVersion;
59
60     protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, String transactionID,
61             short clientTxVersion) {
62         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
63         this.shardActor = shardActor;
64         this.shardStats = shardStats;
65         this.transactionID = transactionID;
66         this.clientTxVersion = clientTxVersion;
67     }
68
69     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
70             DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
71         return Props.create(new ShardTransactionCreator(transaction, shardActor,
72            datastoreContext, shardStats, transactionID, txnClientVersion));
73     }
74
75     protected abstract DOMStoreTransaction getDOMStoreTransaction();
76
77     protected ActorRef getShardActor() {
78         return shardActor;
79     }
80
81     protected String getTransactionID() {
82         return transactionID;
83     }
84
85     protected short getClientTxVersion() {
86         return clientTxVersion;
87     }
88
89     @Override
90     public void handleReceive(Object message) throws Exception {
91         if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
92             closeTransaction(true);
93         } else if (message instanceof ReceiveTimeout) {
94             if(LOG.isDebugEnabled()) {
95                 LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
96             }
97             closeTransaction(false);
98         } else {
99             throw new UnknownMessageException(message);
100         }
101     }
102
103     protected boolean returnCloseTransactionReply() {
104         return true;
105     }
106
107     private void closeTransaction(boolean sendReply) {
108         getDOMStoreTransaction().close();
109
110         if(sendReply && returnCloseTransactionReply()) {
111             getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
112         }
113
114         getSelf().tell(PoisonPill.getInstance(), getSelf());
115     }
116
117     protected void readData(DOMStoreReadTransaction transaction, ReadData message,
118             final boolean returnSerialized) {
119
120         final YangInstanceIdentifier path = message.getPath();
121         try {
122             final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future = transaction.read(path);
123             Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
124             ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
125
126             sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
127
128         } catch (Exception e) {
129             LOG.debug(String.format("Unexpected error reading path %s", path), e);
130             shardStats.incrementFailedReadTransactionsCount();
131             sender().tell(new akka.actor.Status.Failure(e), self());
132         }
133     }
134
135     protected void dataExists(DOMStoreReadTransaction transaction, DataExists message,
136         final boolean returnSerialized) {
137         final YangInstanceIdentifier path = message.getPath();
138
139         try {
140             Boolean exists = transaction.exists(path).checkedGet();
141             DataExistsReply dataExistsReply = new DataExistsReply(exists);
142             getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
143                 dataExistsReply, getSelf());
144         } catch (ReadFailedException e) {
145             getSender().tell(new akka.actor.Status.Failure(e),getSelf());
146         }
147
148     }
149
150     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
151
152         private static final long serialVersionUID = 1L;
153
154         final DOMStoreTransaction transaction;
155         final ActorRef shardActor;
156         final DatastoreContext datastoreContext;
157         final ShardStats shardStats;
158         final String transactionID;
159         final short txnClientVersion;
160
161         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
162                 DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
163             this.transaction = transaction;
164             this.shardActor = shardActor;
165             this.shardStats = shardStats;
166             this.datastoreContext = datastoreContext;
167             this.transactionID = transactionID;
168             this.txnClientVersion = txnClientVersion;
169         }
170
171         @Override
172         public ShardTransaction create() throws Exception {
173             ShardTransaction tx;
174             if(transaction instanceof DOMStoreReadWriteTransaction) {
175                 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
176                         shardActor, shardStats, transactionID, txnClientVersion);
177             } else if(transaction instanceof DOMStoreReadTransaction) {
178                 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
179                         shardStats, transactionID, txnClientVersion);
180             } else {
181                 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
182                         shardActor, shardStats, transactionID, txnClientVersion);
183             }
184
185             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
186             return tx;
187         }
188     }
189 }