2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
20 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
21 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
25 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
29 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
34 * The ShardTransaction Actor represents a remote transaction that delegates all actions to DOMDataReadWriteTransaction.
36 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
37 private final ActorRef shardActor;
38 private final ShardStats shardStats;
39 private final TransactionIdentifier transactionId;
41 protected ShardTransaction(final ActorRef shardActor, final ShardStats shardStats,
42 final TransactionIdentifier transactionId) {
43 super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
44 this.shardActor = shardActor;
45 this.shardStats = shardStats;
46 this.transactionId = Preconditions.checkNotNull(transactionId);
49 public static Props props(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
50 final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
51 return Props.create(new ShardTransactionCreator(type, transaction, shardActor, datastoreContext, shardStats));
54 protected abstract AbstractShardDataTreeTransaction<?> getDOMStoreTransaction();
56 protected ActorRef getShardActor() {
60 protected final TransactionIdentifier getTransactionId() {
65 public void handleReceive(final Object message) {
66 if (CloseTransaction.isSerializedType(message)) {
67 closeTransaction(true);
68 } else if (message instanceof ReceiveTimeout) {
69 LOG.debug("Got ReceiveTimeout for inactivity - closing transaction {}", transactionId);
70 closeTransaction(false);
72 unknownMessage(message);
76 protected boolean returnCloseTransactionReply() {
80 private void closeTransaction(final boolean sendReply) {
81 getDOMStoreTransaction().abortFromTransactionActor();
82 shardActor.tell(new PersistAbortTransactionPayload(transactionId), ActorRef.noSender());
84 if (sendReply && returnCloseTransactionReply()) {
85 getSender().tell(new CloseTransactionReply(), getSelf());
88 getSelf().tell(PoisonPill.getInstance(), getSelf());
91 private boolean checkClosed(final AbstractShardDataTreeTransaction<?> transaction) {
92 final boolean ret = transaction.isClosed();
94 shardStats.incrementFailedReadTransactionsCount();
95 getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")),
101 protected void readData(final AbstractShardDataTreeTransaction<?> transaction, final ReadData message) {
102 if (checkClosed(transaction)) {
106 final YangInstanceIdentifier path = message.getPath();
107 Optional<NormalizedNode<?, ?>> optional = transaction.getSnapshot().readNode(path);
108 ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), message.getVersion());
109 sender().tell(readDataReply.toSerializable(), self());
112 protected void dataExists(final AbstractShardDataTreeTransaction<?> transaction, final DataExists message) {
113 if (checkClosed(transaction)) {
117 final YangInstanceIdentifier path = message.getPath();
118 boolean exists = transaction.getSnapshot().readNode(path).isPresent();
119 getSender().tell(new DataExistsReply(exists, message.getVersion()).toSerializable(), getSelf());
122 @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Some fields are not Serializable but we don't "
123 + "create remote instances of this actor and thus don't need it to be Serializable.")
124 private static class ShardTransactionCreator implements Creator<ShardTransaction> {
126 private static final long serialVersionUID = 1L;
128 final AbstractShardDataTreeTransaction<?> transaction;
129 final ActorRef shardActor;
130 final DatastoreContext datastoreContext;
131 final ShardStats shardStats;
132 final TransactionType type;
134 ShardTransactionCreator(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
135 final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
136 this.transaction = Preconditions.checkNotNull(transaction);
137 this.shardActor = shardActor;
138 this.shardStats = shardStats;
139 this.datastoreContext = datastoreContext;
144 public ShardTransaction create() throws Exception {
145 final ShardTransaction tx;
148 tx = new ShardReadTransaction(transaction, shardActor, shardStats);
151 tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor,
155 tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor,
159 throw new IllegalArgumentException("Unhandled transaction type " + type);
162 tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());