Merge changes I880310f2,I9f437328,I552372db,I587fb203,I05f0bd94, ...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransaction.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
19 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35
36 /**
37  * The ShardTransaction Actor represents a remote transaction
38  * <p>
39  * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
40  * </p>
41  * <p>
42  * Even though the DOMStore and the DOMStoreTransactionChain implement multiple types of transactions
43  * the ShardTransaction Actor only works with read-write transactions. This is just to keep the logic simple. At this
44  * time there are no known advantages for creating a read-only or write-only transaction which may change over time
45  * at which point we can optimize things in the distributed store as well.
46  * </p>
47  * <p>
48  * Handles Messages <br/>
49  * ---------------- <br/>
50  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
51  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.WriteData}
52  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.MergeData}
53  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.DeleteData}
54  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction}
55  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
56  * </p>
57  */
58 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
59
60     protected static final boolean SERIALIZED_REPLY = true;
61
62     private final ActorRef shardActor;
63     private final SchemaContext schemaContext;
64     private final ShardStats shardStats;
65     private final String transactionID;
66     private final short clientTxVersion;
67
68     protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
69             ShardStats shardStats, String transactionID, short clientTxVersion) {
70         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
71         this.shardActor = shardActor;
72         this.schemaContext = schemaContext;
73         this.shardStats = shardStats;
74         this.transactionID = transactionID;
75         this.clientTxVersion = clientTxVersion;
76     }
77
78     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
79             SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
80             String transactionID, short txnClientVersion) {
81         return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
82            datastoreContext, shardStats, transactionID, txnClientVersion));
83     }
84
85     protected abstract DOMStoreTransaction getDOMStoreTransaction();
86
87     protected ActorRef getShardActor() {
88         return shardActor;
89     }
90
91     protected String getTransactionID() {
92         return transactionID;
93     }
94
95     protected SchemaContext getSchemaContext() {
96         return schemaContext;
97     }
98
99     protected short getClientTxVersion() {
100         return clientTxVersion;
101     }
102
103     @Override
104     public void handleReceive(Object message) throws Exception {
105         if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
106             closeTransaction(true);
107         } else if (message instanceof ReceiveTimeout) {
108             if(LOG.isDebugEnabled()) {
109                 LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
110             }
111             closeTransaction(false);
112         } else {
113             throw new UnknownMessageException(message);
114         }
115     }
116
117     private void closeTransaction(boolean sendReply) {
118         getDOMStoreTransaction().close();
119
120         if(sendReply) {
121             getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
122         }
123
124         getSelf().tell(PoisonPill.getInstance(), getSelf());
125     }
126
127     protected void readData(DOMStoreReadTransaction transaction, ReadData message,
128             final boolean returnSerialized) {
129         final ActorRef sender = getSender();
130         final ActorRef self = getSelf();
131         final YangInstanceIdentifier path = message.getPath();
132         final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
133                 transaction.read(path);
134
135         future.addListener(new Runnable() {
136             @Override
137             public void run() {
138                 try {
139                     Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
140                     ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
141
142                     sender.tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion):
143                         readDataReply), self);
144
145                 } catch (Exception e) {
146                     shardStats.incrementFailedReadTransactionsCount();
147                     sender.tell(new akka.actor.Status.Failure(e), self);
148                 }
149
150             }
151         }, getContext().dispatcher());
152     }
153
154     protected void dataExists(DOMStoreReadTransaction transaction, DataExists message,
155         final boolean returnSerialized) {
156         final YangInstanceIdentifier path = message.getPath();
157
158         try {
159             Boolean exists = transaction.exists(path).checkedGet();
160             DataExistsReply dataExistsReply = new DataExistsReply(exists);
161             getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
162                 dataExistsReply, getSelf());
163         } catch (ReadFailedException e) {
164             getSender().tell(new akka.actor.Status.Failure(e),getSelf());
165         }
166
167     }
168
169     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
170
171         private static final long serialVersionUID = 1L;
172
173         final DOMStoreTransaction transaction;
174         final ActorRef shardActor;
175         final SchemaContext schemaContext;
176         final DatastoreContext datastoreContext;
177         final ShardStats shardStats;
178         final String transactionID;
179         final short txnClientVersion;
180
181         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
182                 SchemaContext schemaContext, DatastoreContext datastoreContext,
183                 ShardStats shardStats, String transactionID, short txnClientVersion) {
184             this.transaction = transaction;
185             this.shardActor = shardActor;
186             this.shardStats = shardStats;
187             this.schemaContext = schemaContext;
188             this.datastoreContext = datastoreContext;
189             this.transactionID = transactionID;
190             this.txnClientVersion = txnClientVersion;
191         }
192
193         @Override
194         public ShardTransaction create() throws Exception {
195             ShardTransaction tx;
196             if(transaction instanceof DOMStoreReadWriteTransaction) {
197                 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
198                         shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
199             } else if(transaction instanceof DOMStoreReadTransaction) {
200                 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
201                         schemaContext, shardStats, transactionID, txnClientVersion);
202             } else {
203                 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
204                         shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
205             }
206
207             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
208             return tx;
209         }
210     }
211 }