2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
19 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
20 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
21 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
25 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
28 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33 * The ShardTransaction Actor represents a remote transaction
35 * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
38 * Handles Messages <br/>
39 * ---------------- <br/>
40 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
41 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.WriteData}
42 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.MergeData}
43 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.DeleteData}
44 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction}
45 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
48 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
50 protected static final boolean SERIALIZED_REPLY = true;
52 private final ActorRef shardActor;
53 private final ShardStats shardStats;
54 private final String transactionID;
55 private final short clientTxVersion;
57 protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, String transactionID,
58 short clientTxVersion) {
59 super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
60 this.shardActor = shardActor;
61 this.shardStats = shardStats;
62 this.transactionID = transactionID;
63 this.clientTxVersion = clientTxVersion;
66 public static Props props(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
67 DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
68 return Props.create(new ShardTransactionCreator(type, transaction, shardActor,
69 datastoreContext, shardStats, transactionID, txnClientVersion));
72 protected abstract AbstractShardDataTreeTransaction<?> getDOMStoreTransaction();
74 protected ActorRef getShardActor() {
78 protected String getTransactionID() {
82 protected short getClientTxVersion() {
83 return clientTxVersion;
87 public void handleReceive(Object message) throws Exception {
88 if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
89 closeTransaction(true);
90 } else if (message instanceof ReceiveTimeout) {
91 if(LOG.isDebugEnabled()) {
92 LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
94 closeTransaction(false);
96 throw new UnknownMessageException(message);
100 protected boolean returnCloseTransactionReply() {
104 private void closeTransaction(boolean sendReply) {
105 getDOMStoreTransaction().abort();
107 if(sendReply && returnCloseTransactionReply()) {
108 getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
111 getSelf().tell(PoisonPill.getInstance(), getSelf());
114 private boolean checkClosed(AbstractShardDataTreeTransaction<?> transaction) {
115 final boolean ret = transaction.isClosed();
117 shardStats.incrementFailedReadTransactionsCount();
118 getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")), getSelf());
123 protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message,
124 final boolean returnSerialized) {
126 if (checkClosed(transaction)) {
130 final YangInstanceIdentifier path = message.getPath();
131 Optional<NormalizedNode<?, ?>> optional = transaction.getSnapshot().readNode(path);
132 ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
133 sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
136 protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message,
137 final boolean returnSerialized) {
139 if (checkClosed(transaction)) {
143 final YangInstanceIdentifier path = message.getPath();
144 boolean exists = transaction.getSnapshot().readNode(path).isPresent();
145 DataExistsReply dataExistsReply = DataExistsReply.create(exists);
146 getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
147 dataExistsReply, getSelf());
150 private static class ShardTransactionCreator implements Creator<ShardTransaction> {
152 private static final long serialVersionUID = 1L;
154 final AbstractShardDataTreeTransaction<?> transaction;
155 final ActorRef shardActor;
156 final DatastoreContext datastoreContext;
157 final ShardStats shardStats;
158 final String transactionID;
159 final short txnClientVersion;
160 final TransactionType type;
162 ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
163 DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
164 this.transaction = Preconditions.checkNotNull(transaction);
165 this.shardActor = shardActor;
166 this.shardStats = shardStats;
167 this.datastoreContext = datastoreContext;
168 this.transactionID = transactionID;
169 this.txnClientVersion = txnClientVersion;
174 public ShardTransaction create() throws Exception {
175 final ShardTransaction tx;
178 tx = new ShardReadTransaction(transaction, shardActor,
179 shardStats, transactionID, txnClientVersion);
182 tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction,
183 shardActor, shardStats, transactionID, txnClientVersion);
186 tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction,
187 shardActor, shardStats, transactionID, txnClientVersion);
190 throw new IllegalArgumentException("Unhandled transaction type " + type);
193 tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());