2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
19 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
32 * The ShardTransaction Actor represents a remote transaction
34 * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
37 * Handles Messages <br/>
38 * ---------------- <br/>
39 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
40 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
43 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
44 private final ActorRef shardActor;
45 private final ShardStats shardStats;
46 private final String transactionID;
47 private final short clientTxVersion;
49 protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, String transactionID,
50 short clientTxVersion) {
51 super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
52 this.shardActor = shardActor;
53 this.shardStats = shardStats;
54 this.transactionID = Preconditions.checkNotNull(transactionID);
55 this.clientTxVersion = clientTxVersion;
58 public static Props props(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
59 DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
60 return Props.create(new ShardTransactionCreator(type, transaction, shardActor,
61 datastoreContext, shardStats, transactionID, txnClientVersion));
64 protected abstract AbstractShardDataTreeTransaction<?> getDOMStoreTransaction();
66 protected ActorRef getShardActor() {
70 protected String getTransactionID() {
74 protected short getClientTxVersion() {
75 return clientTxVersion;
79 public void handleReceive(Object message) throws Exception {
80 if (CloseTransaction.isSerializedType(message)) {
81 closeTransaction(true);
82 } else if (message instanceof ReceiveTimeout) {
83 if(LOG.isDebugEnabled()) {
84 LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
86 closeTransaction(false);
88 throw new UnknownMessageException(message);
92 protected boolean returnCloseTransactionReply() {
96 private void closeTransaction(boolean sendReply) {
97 getDOMStoreTransaction().abort();
99 if(sendReply && returnCloseTransactionReply()) {
100 getSender().tell(new CloseTransactionReply(), getSelf());
103 getSelf().tell(PoisonPill.getInstance(), getSelf());
106 private boolean checkClosed(AbstractShardDataTreeTransaction<?> transaction) {
107 final boolean ret = transaction.isClosed();
109 shardStats.incrementFailedReadTransactionsCount();
110 getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")), getSelf());
115 protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message) {
116 if (checkClosed(transaction)) {
120 final YangInstanceIdentifier path = message.getPath();
121 Optional<NormalizedNode<?, ?>> optional = transaction.getSnapshot().readNode(path);
122 ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), message.getVersion());
123 sender().tell(readDataReply.toSerializable(), self());
126 protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message) {
127 if (checkClosed(transaction)) {
131 final YangInstanceIdentifier path = message.getPath();
132 boolean exists = transaction.getSnapshot().readNode(path).isPresent();
133 getSender().tell(new DataExistsReply(exists, message.getVersion()).toSerializable(), getSelf());
136 private static class ShardTransactionCreator implements Creator<ShardTransaction> {
138 private static final long serialVersionUID = 1L;
140 final AbstractShardDataTreeTransaction<?> transaction;
141 final ActorRef shardActor;
142 final DatastoreContext datastoreContext;
143 final ShardStats shardStats;
144 final String transactionID;
145 final short txnClientVersion;
146 final TransactionType type;
148 ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
149 DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
150 this.transaction = Preconditions.checkNotNull(transaction);
151 this.shardActor = shardActor;
152 this.shardStats = shardStats;
153 this.datastoreContext = datastoreContext;
154 this.transactionID = Preconditions.checkNotNull(transactionID);
155 this.txnClientVersion = txnClientVersion;
160 public ShardTransaction create() throws Exception {
161 final ShardTransaction tx;
164 tx = new ShardReadTransaction(transaction, shardActor,
165 shardStats, transactionID, txnClientVersion);
168 tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction,
169 shardActor, shardStats, transactionID, txnClientVersion);
172 tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction,
173 shardActor, shardStats, transactionID, txnClientVersion);
176 throw new IllegalArgumentException("Unhandled transaction type " + type);
179 tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());