0e82116a0d009065bdc9e0ce910ee5d23ead779a
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransaction.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
20 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
21 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
25 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
29 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
32
33 /**
34  * The ShardTransaction Actor represents a remote transaction that delegates all actions to DOMDataReadWriteTransaction.
35  */
36 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
37     private final ActorRef shardActor;
38     private final ShardStats shardStats;
39     private final TransactionIdentifier transactionId;
40
41     protected ShardTransaction(final ActorRef shardActor, final ShardStats shardStats,
42             final TransactionIdentifier transactionId) {
43         // actor name override used for metering. This does not change the "real" actor name
44         super("shard-tx");
45         this.shardActor = shardActor;
46         this.shardStats = shardStats;
47         this.transactionId = Preconditions.checkNotNull(transactionId);
48     }
49
50     public static Props props(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
51             final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
52         return Props.create(new ShardTransactionCreator(type, transaction, shardActor, datastoreContext, shardStats));
53     }
54
55     protected abstract AbstractShardDataTreeTransaction<?> getDOMStoreTransaction();
56
57     protected ActorRef getShardActor() {
58         return shardActor;
59     }
60
61     protected final TransactionIdentifier getTransactionId() {
62         return transactionId;
63     }
64
65     @Override
66     public void handleReceive(final Object message) {
67         if (CloseTransaction.isSerializedType(message)) {
68             closeTransaction(true);
69         } else if (message instanceof ReceiveTimeout) {
70             LOG.debug("Got ReceiveTimeout for inactivity - closing transaction {}", transactionId);
71             closeTransaction(false);
72         } else {
73             unknownMessage(message);
74         }
75     }
76
77     protected boolean returnCloseTransactionReply() {
78         return true;
79     }
80
81     private void closeTransaction(final boolean sendReply) {
82         getDOMStoreTransaction().abortFromTransactionActor();
83         shardActor.tell(new PersistAbortTransactionPayload(transactionId), ActorRef.noSender());
84
85         if (sendReply && returnCloseTransactionReply()) {
86             getSender().tell(new CloseTransactionReply(), getSelf());
87         }
88
89         getSelf().tell(PoisonPill.getInstance(), getSelf());
90     }
91
92     private boolean checkClosed(final AbstractShardDataTreeTransaction<?> transaction) {
93         final boolean ret = transaction.isClosed();
94         if (ret) {
95             shardStats.incrementFailedReadTransactionsCount();
96             getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")),
97                     getSelf());
98         }
99         return ret;
100     }
101
102     protected void readData(final AbstractShardDataTreeTransaction<?> transaction, final ReadData message) {
103         if (checkClosed(transaction)) {
104             return;
105         }
106
107         final YangInstanceIdentifier path = message.getPath();
108         Optional<NormalizedNode<?, ?>> optional = transaction.getSnapshot().readNode(path);
109         ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), message.getVersion());
110         sender().tell(readDataReply.toSerializable(), self());
111     }
112
113     protected void dataExists(final AbstractShardDataTreeTransaction<?> transaction, final DataExists message) {
114         if (checkClosed(transaction)) {
115             return;
116         }
117
118         final YangInstanceIdentifier path = message.getPath();
119         boolean exists = transaction.getSnapshot().readNode(path).isPresent();
120         getSender().tell(new DataExistsReply(exists, message.getVersion()).toSerializable(), getSelf());
121     }
122
123     @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Some fields are not Serializable but we don't "
124             + "create remote instances of this actor and thus don't need it to be Serializable.")
125     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
126
127         private static final long serialVersionUID = 1L;
128
129         final AbstractShardDataTreeTransaction<?> transaction;
130         final ActorRef shardActor;
131         final DatastoreContext datastoreContext;
132         final ShardStats shardStats;
133         final TransactionType type;
134
135         ShardTransactionCreator(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
136                 final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
137             this.transaction = Preconditions.checkNotNull(transaction);
138             this.shardActor = shardActor;
139             this.shardStats = shardStats;
140             this.datastoreContext = datastoreContext;
141             this.type = type;
142         }
143
144         @Override
145         public ShardTransaction create() throws Exception {
146             final ShardTransaction tx;
147             switch (type) {
148                 case READ_ONLY:
149                     tx = new ShardReadTransaction(transaction, shardActor, shardStats);
150                     break;
151                 case READ_WRITE:
152                     tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor,
153                             shardStats);
154                     break;
155                 case WRITE_ONLY:
156                     tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor,
157                             shardStats);
158                     break;
159                 default:
160                     throw new IllegalArgumentException("Unhandled transaction type " + type);
161             }
162
163             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
164             return tx;
165         }
166     }
167 }