2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
19 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 * The ShardTransaction Actor represents a remote transaction
39 * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
42 * Even though the DOMStore and the DOMStoreTransactionChain implement multiple types of transactions
43 * the ShardTransaction Actor only works with read-write transactions. This is just to keep the logic simple. At this
44 * time there are no known advantages for creating a read-only or write-only transaction which may change over time
45 * at which point we can optimize things in the distributed store as well.
48 * Handles Messages <br/>
49 * ---------------- <br/>
50 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
51 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.WriteData}
52 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.MergeData}
53 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.DeleteData}
54 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction}
55 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
58 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
60 protected static final boolean SERIALIZED_REPLY = true;
62 private final ActorRef shardActor;
63 private final SchemaContext schemaContext;
64 private final ShardStats shardStats;
65 private final String transactionID;
66 private final int txnClientVersion;
68 protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
69 ShardStats shardStats, String transactionID, int txnClientVersion) {
70 super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
71 this.shardActor = shardActor;
72 this.schemaContext = schemaContext;
73 this.shardStats = shardStats;
74 this.transactionID = transactionID;
75 this.txnClientVersion = txnClientVersion;
78 public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
79 SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
80 String transactionID, int txnClientVersion) {
81 return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
82 datastoreContext, shardStats, transactionID, txnClientVersion));
85 protected abstract DOMStoreTransaction getDOMStoreTransaction();
87 protected ActorRef getShardActor() {
91 protected String getTransactionID() {
95 protected SchemaContext getSchemaContext() {
99 protected int getTxnClientVersion() {
100 return txnClientVersion;
104 public void handleReceive(Object message) throws Exception {
105 if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
106 closeTransaction(true);
107 } else if (message instanceof ReceiveTimeout) {
108 if(LOG.isDebugEnabled()) {
109 LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
111 closeTransaction(false);
113 throw new UnknownMessageException(message);
117 private void closeTransaction(boolean sendReply) {
118 getDOMStoreTransaction().close();
121 getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
124 getSelf().tell(PoisonPill.getInstance(), getSelf());
127 protected void readData(DOMStoreReadTransaction transaction, ReadData message, final boolean returnSerialized) {
128 final ActorRef sender = getSender();
129 final ActorRef self = getSelf();
130 final YangInstanceIdentifier path = message.getPath();
131 final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
132 transaction.read(path);
135 future.addListener(new Runnable() {
139 Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
140 ReadDataReply readDataReply = new ReadDataReply(schemaContext, optional.orNull());
142 sender.tell((returnSerialized ? readDataReply.toSerializable():
143 readDataReply), self);
145 } catch (Exception e) {
146 shardStats.incrementFailedReadTransactionsCount();
147 sender.tell(new akka.actor.Status.Failure(e), self);
151 }, getContext().dispatcher());
154 protected void dataExists(DOMStoreReadTransaction transaction, DataExists message,
155 final boolean returnSerialized) {
156 final YangInstanceIdentifier path = message.getPath();
159 Boolean exists = transaction.exists(path).checkedGet();
160 DataExistsReply dataExistsReply = new DataExistsReply(exists);
161 getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
162 dataExistsReply, getSelf());
163 } catch (ReadFailedException e) {
164 getSender().tell(new akka.actor.Status.Failure(e),getSelf());
169 private static class ShardTransactionCreator implements Creator<ShardTransaction> {
171 private static final long serialVersionUID = 1L;
173 final DOMStoreTransaction transaction;
174 final ActorRef shardActor;
175 final SchemaContext schemaContext;
176 final DatastoreContext datastoreContext;
177 final ShardStats shardStats;
178 final String transactionID;
179 final int txnClientVersion;
181 ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
182 SchemaContext schemaContext, DatastoreContext datastoreContext,
183 ShardStats shardStats, String transactionID, int txnClientVersion) {
184 this.transaction = transaction;
185 this.shardActor = shardActor;
186 this.shardStats = shardStats;
187 this.schemaContext = schemaContext;
188 this.datastoreContext = datastoreContext;
189 this.transactionID = transactionID;
190 this.txnClientVersion = txnClientVersion;
194 public ShardTransaction create() throws Exception {
196 if(transaction instanceof DOMStoreReadWriteTransaction) {
197 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
198 shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
199 } else if(transaction instanceof DOMStoreReadTransaction) {
200 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
201 schemaContext, shardStats, transactionID, txnClientVersion);
203 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
204 shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
207 tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());