Merge "BUG 2221 : Add metering to ShardTransaction actor"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransaction.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
19 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35
36 /**
37  * The ShardTransaction Actor represents a remote transaction
38  * <p>
39  * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
40  * </p>
41  * <p>
42  * Even though the DOMStore and the DOMStoreTransactionChain implement multiple types of transactions
43  * the ShardTransaction Actor only works with read-write transactions. This is just to keep the logic simple. At this
44  * time there are no known advantages for creating a read-only or write-only transaction which may change over time
45  * at which point we can optimize things in the distributed store as well.
46  * </p>
47  * <p>
48  * Handles Messages <br/>
49  * ---------------- <br/>
50  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
51  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.WriteData}
52  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.MergeData}
53  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.DeleteData}
54  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction}
55  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
56  * </p>
57  */
58 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
59
60     private final ActorRef shardActor;
61     private final SchemaContext schemaContext;
62     private final ShardStats shardStats;
63     private final String transactionID;
64     protected static final boolean SERIALIZED_REPLY = true;
65
66     protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
67             ShardStats shardStats, String transactionID) {
68         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
69         this.shardActor = shardActor;
70         this.schemaContext = schemaContext;
71         this.shardStats = shardStats;
72         this.transactionID = transactionID;
73     }
74
75     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
76             SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
77             String transactionID) {
78         return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
79            datastoreContext, shardStats, transactionID));
80     }
81
82     protected abstract DOMStoreTransaction getDOMStoreTransaction();
83
84     protected ActorRef getShardActor() {
85         return shardActor;
86     }
87
88     protected String getTransactionID() {
89         return transactionID;
90     }
91
92     protected SchemaContext getSchemaContext() {
93         return schemaContext;
94     }
95
96     @Override
97     public void handleReceive(Object message) throws Exception {
98         if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
99             closeTransaction(true);
100         } else if (message instanceof ReceiveTimeout) {
101             if(LOG.isDebugEnabled()) {
102                 LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
103             }
104             closeTransaction(false);
105         } else {
106             throw new UnknownMessageException(message);
107         }
108     }
109
110     private void closeTransaction(boolean sendReply) {
111         getDOMStoreTransaction().close();
112
113         if(sendReply) {
114             getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
115         }
116
117         getSelf().tell(PoisonPill.getInstance(), getSelf());
118     }
119
120     protected void readData(DOMStoreReadTransaction transaction, ReadData message, final boolean returnSerialized) {
121         final ActorRef sender = getSender();
122         final ActorRef self = getSelf();
123         final YangInstanceIdentifier path = message.getPath();
124         final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
125                 transaction.read(path);
126
127
128         future.addListener(new Runnable() {
129             @Override
130             public void run() {
131                 try {
132                     Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
133                     ReadDataReply readDataReply = new ReadDataReply(schemaContext, optional.orNull());
134
135                     sender.tell((returnSerialized ? readDataReply.toSerializable():
136                         readDataReply), self);
137
138                 } catch (Exception e) {
139                     shardStats.incrementFailedReadTransactionsCount();
140                     sender.tell(new akka.actor.Status.Failure(e), self);
141                 }
142
143             }
144         }, getContext().dispatcher());
145     }
146
147     protected void dataExists(DOMStoreReadTransaction transaction, DataExists message,
148         final boolean returnSerialized) {
149         final YangInstanceIdentifier path = message.getPath();
150
151         try {
152             Boolean exists = transaction.exists(path).checkedGet();
153             DataExistsReply dataExistsReply = new DataExistsReply(exists);
154             getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
155                 dataExistsReply, getSelf());
156         } catch (ReadFailedException e) {
157             getSender().tell(new akka.actor.Status.Failure(e),getSelf());
158         }
159
160     }
161
162     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
163
164         private static final long serialVersionUID = 1L;
165
166         final DOMStoreTransaction transaction;
167         final ActorRef shardActor;
168         final SchemaContext schemaContext;
169         final DatastoreContext datastoreContext;
170         final ShardStats shardStats;
171         final String transactionID;
172
173         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
174                 SchemaContext schemaContext, DatastoreContext datastoreContext,
175                 ShardStats shardStats, String transactionID) {
176             this.transaction = transaction;
177             this.shardActor = shardActor;
178             this.shardStats = shardStats;
179             this.schemaContext = schemaContext;
180             this.datastoreContext = datastoreContext;
181             this.transactionID = transactionID;
182         }
183
184         @Override
185         public ShardTransaction create() throws Exception {
186             ShardTransaction tx;
187             if(transaction instanceof DOMStoreReadWriteTransaction) {
188                 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
189                         shardActor, schemaContext, shardStats, transactionID);
190             } else if(transaction instanceof DOMStoreReadTransaction) {
191                 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
192                         schemaContext, shardStats, transactionID);
193             } else {
194                 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
195                         shardActor, schemaContext, shardStats, transactionID);
196             }
197
198             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
199             return tx;
200         }
201     }
202 }