2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
19 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36 * The ShardTransaction Actor represents a remote transaction
38 * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
41 * Handles Messages <br/>
42 * ---------------- <br/>
43 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
44 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.WriteData}
45 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.MergeData}
46 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.DeleteData}
47 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction}
48 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
51 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
53 protected static final boolean SERIALIZED_REPLY = true;
55 private final ActorRef shardActor;
56 private final ShardStats shardStats;
57 private final String transactionID;
58 private final short clientTxVersion;
60 protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, String transactionID,
61 short clientTxVersion) {
62 super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
63 this.shardActor = shardActor;
64 this.shardStats = shardStats;
65 this.transactionID = transactionID;
66 this.clientTxVersion = clientTxVersion;
69 public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
70 DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
71 return Props.create(new ShardTransactionCreator(transaction, shardActor,
72 datastoreContext, shardStats, transactionID, txnClientVersion));
75 protected abstract DOMStoreTransaction getDOMStoreTransaction();
77 protected ActorRef getShardActor() {
81 protected String getTransactionID() {
85 protected short getClientTxVersion() {
86 return clientTxVersion;
90 public void handleReceive(Object message) throws Exception {
91 if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
92 closeTransaction(true);
93 } else if (message instanceof ReceiveTimeout) {
94 if(LOG.isDebugEnabled()) {
95 LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
97 closeTransaction(false);
99 throw new UnknownMessageException(message);
103 protected boolean returnCloseTransactionReply() {
107 private void closeTransaction(boolean sendReply) {
108 getDOMStoreTransaction().close();
110 if(sendReply && returnCloseTransactionReply()) {
111 getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
114 getSelf().tell(PoisonPill.getInstance(), getSelf());
117 protected void readData(DOMStoreReadTransaction transaction, ReadData message,
118 final boolean returnSerialized) {
120 final YangInstanceIdentifier path = message.getPath();
122 final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future = transaction.read(path);
123 Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
124 ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
126 sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
128 } catch (Exception e) {
129 LOG.debug(String.format("Unexpected error reading path %s", path), e);
130 shardStats.incrementFailedReadTransactionsCount();
131 sender().tell(new akka.actor.Status.Failure(e), self());
135 protected void dataExists(DOMStoreReadTransaction transaction, DataExists message,
136 final boolean returnSerialized) {
137 final YangInstanceIdentifier path = message.getPath();
140 boolean exists = transaction.exists(path).checkedGet();
141 DataExistsReply dataExistsReply = DataExistsReply.create(exists);
142 getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
143 dataExistsReply, getSelf());
144 } catch (ReadFailedException e) {
145 getSender().tell(new akka.actor.Status.Failure(e),getSelf());
149 private static class ShardTransactionCreator implements Creator<ShardTransaction> {
151 private static final long serialVersionUID = 1L;
153 final DOMStoreTransaction transaction;
154 final ActorRef shardActor;
155 final DatastoreContext datastoreContext;
156 final ShardStats shardStats;
157 final String transactionID;
158 final short txnClientVersion;
160 ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
161 DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
162 this.transaction = transaction;
163 this.shardActor = shardActor;
164 this.shardStats = shardStats;
165 this.datastoreContext = datastoreContext;
166 this.transactionID = transactionID;
167 this.txnClientVersion = txnClientVersion;
171 public ShardTransaction create() throws Exception {
173 if(transaction instanceof DOMStoreReadWriteTransaction) {
174 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
175 shardActor, shardStats, transactionID, txnClientVersion);
176 } else if(transaction instanceof DOMStoreReadTransaction) {
177 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
178 shardStats, transactionID, txnClientVersion);
180 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
181 shardActor, shardStats, transactionID, txnClientVersion);
184 tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());