2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorRef;
13 import akka.actor.PoisonPill;
14 import akka.actor.Props;
15 import akka.actor.ReceiveTimeout;
16 import akka.japi.Creator;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
19 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
20 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
26 import org.opendaylight.mdsal.common.api.ReadFailedException;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 * The ShardTransaction Actor represents a remote transaction that delegates all actions to DOMDataReadWriteTransaction.
32 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
33 private final ActorRef shardActor;
34 private final ShardStats shardStats;
35 private final TransactionIdentifier transactionId;
37 protected ShardTransaction(final ActorRef shardActor, final ShardStats shardStats,
38 final TransactionIdentifier transactionId) {
39 // actor name override used for metering. This does not change the "real" actor name
41 this.shardActor = shardActor;
42 this.shardStats = shardStats;
43 this.transactionId = requireNonNull(transactionId);
46 public static Props props(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
47 final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
48 return Props.create(ShardTransaction.class,
49 new ShardTransactionCreator(type, transaction, shardActor, datastoreContext, shardStats));
52 protected abstract AbstractShardDataTreeTransaction<?> getDOMStoreTransaction();
54 protected ActorRef getShardActor() {
58 protected final TransactionIdentifier getTransactionId() {
63 public void handleReceive(final Object message) {
64 if (CloseTransaction.isSerializedType(message)) {
65 closeTransaction(true);
66 } else if (message instanceof ReceiveTimeout) {
67 LOG.debug("Got ReceiveTimeout for inactivity - closing transaction {}", transactionId);
68 closeTransaction(false);
70 unknownMessage(message);
74 protected boolean returnCloseTransactionReply() {
78 private void closeTransaction(final boolean sendReply) {
79 getDOMStoreTransaction().abortFromTransactionActor();
81 if (sendReply && returnCloseTransactionReply()) {
82 getSender().tell(new CloseTransactionReply(), getSelf());
85 getSelf().tell(PoisonPill.getInstance(), getSelf());
88 private boolean checkClosed(final AbstractShardDataTreeTransaction<?> transaction) {
89 final boolean ret = transaction.isClosed();
91 shardStats.incrementFailedReadTransactionsCount();
92 getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")),
98 protected void readData(final AbstractShardDataTreeTransaction<?> transaction, final ReadData message) {
99 if (checkClosed(transaction)) {
103 final YangInstanceIdentifier path = message.getPath();
104 ReadDataReply readDataReply = new ReadDataReply(transaction.getSnapshot().readNode(path).orElse(null),
105 message.getVersion());
106 sender().tell(readDataReply.toSerializable(), self());
109 protected void dataExists(final AbstractShardDataTreeTransaction<?> transaction, final DataExists message) {
110 if (checkClosed(transaction)) {
114 final YangInstanceIdentifier path = message.getPath();
115 boolean exists = transaction.getSnapshot().readNode(path).isPresent();
116 getSender().tell(new DataExistsReply(exists, message.getVersion()).toSerializable(), getSelf());
119 @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Some fields are not Serializable but we don't "
120 + "create remote instances of this actor and thus don't need it to be Serializable.")
121 private static class ShardTransactionCreator implements Creator<ShardTransaction> {
123 private static final long serialVersionUID = 1L;
125 final AbstractShardDataTreeTransaction<?> transaction;
126 final ActorRef shardActor;
127 final DatastoreContext datastoreContext;
128 final ShardStats shardStats;
129 final TransactionType type;
131 ShardTransactionCreator(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
132 final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
133 this.transaction = requireNonNull(transaction);
134 this.shardActor = shardActor;
135 this.shardStats = shardStats;
136 this.datastoreContext = datastoreContext;
141 public ShardTransaction create() {
142 final ShardTransaction tx;
145 tx = new ShardReadTransaction(transaction, shardActor, shardStats);
148 tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor,
152 tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor,
156 throw new IllegalArgumentException("Unhandled transaction type " + type);
159 tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());