2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
19 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 * The ShardTransaction Actor represents a remote transaction
39 * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
42 * Handles Messages <br/>
43 * ---------------- <br/>
44 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
45 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.WriteData}
46 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.MergeData}
47 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.DeleteData}
48 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction}
49 * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
52 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
54 protected static final boolean SERIALIZED_REPLY = true;
56 private final ActorRef shardActor;
57 private final SchemaContext schemaContext;
58 private final ShardStats shardStats;
59 private final String transactionID;
60 private final short clientTxVersion;
62 protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
63 ShardStats shardStats, String transactionID, short clientTxVersion) {
64 super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
65 this.shardActor = shardActor;
66 this.schemaContext = schemaContext;
67 this.shardStats = shardStats;
68 this.transactionID = transactionID;
69 this.clientTxVersion = clientTxVersion;
72 public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
73 SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
74 String transactionID, short txnClientVersion) {
75 return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
76 datastoreContext, shardStats, transactionID, txnClientVersion));
79 protected abstract DOMStoreTransaction getDOMStoreTransaction();
81 protected ActorRef getShardActor() {
85 protected String getTransactionID() {
89 protected SchemaContext getSchemaContext() {
93 protected short getClientTxVersion() {
94 return clientTxVersion;
98 public void handleReceive(Object message) throws Exception {
99 if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
100 closeTransaction(true);
101 } else if (message instanceof ReceiveTimeout) {
102 if(LOG.isDebugEnabled()) {
103 LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
105 closeTransaction(false);
107 throw new UnknownMessageException(message);
111 protected boolean returnCloseTransactionReply() {
115 private void closeTransaction(boolean sendReply) {
116 getDOMStoreTransaction().close();
118 if(sendReply && returnCloseTransactionReply()) {
119 getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
122 getSelf().tell(PoisonPill.getInstance(), getSelf());
125 protected void readData(DOMStoreReadTransaction transaction, ReadData message,
126 final boolean returnSerialized) {
128 final YangInstanceIdentifier path = message.getPath();
130 final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future = transaction.read(path);
131 Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
132 ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
134 sender().tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion): readDataReply), self());
136 } catch (Exception e) {
137 LOG.error(String.format("Unexpected error reading path %s", path), e);
138 shardStats.incrementFailedReadTransactionsCount();
139 sender().tell(new akka.actor.Status.Failure(e), self());
143 protected void dataExists(DOMStoreReadTransaction transaction, DataExists message,
144 final boolean returnSerialized) {
145 final YangInstanceIdentifier path = message.getPath();
148 Boolean exists = transaction.exists(path).checkedGet();
149 DataExistsReply dataExistsReply = new DataExistsReply(exists);
150 getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
151 dataExistsReply, getSelf());
152 } catch (ReadFailedException e) {
153 getSender().tell(new akka.actor.Status.Failure(e),getSelf());
158 private static class ShardTransactionCreator implements Creator<ShardTransaction> {
160 private static final long serialVersionUID = 1L;
162 final DOMStoreTransaction transaction;
163 final ActorRef shardActor;
164 final SchemaContext schemaContext;
165 final DatastoreContext datastoreContext;
166 final ShardStats shardStats;
167 final String transactionID;
168 final short txnClientVersion;
170 ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
171 SchemaContext schemaContext, DatastoreContext datastoreContext,
172 ShardStats shardStats, String transactionID, short txnClientVersion) {
173 this.transaction = transaction;
174 this.shardActor = shardActor;
175 this.shardStats = shardStats;
176 this.schemaContext = schemaContext;
177 this.datastoreContext = datastoreContext;
178 this.transactionID = transactionID;
179 this.txnClientVersion = txnClientVersion;
183 public ShardTransaction create() throws Exception {
185 if(transaction instanceof DOMStoreReadWriteTransaction) {
186 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
187 shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
188 } else if(transaction instanceof DOMStoreReadTransaction) {
189 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
190 schemaContext, shardStats, transactionID, txnClientVersion);
192 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
193 shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
196 tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());