Guard against null transaction IDs
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransaction.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
19 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
30
31 /**
32  * The ShardTransaction Actor represents a remote transaction
33  * <p>
34  * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
35  * </p>
36  * <p>
37  * Handles Messages <br/>
38  * ---------------- <br/>
39  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
40  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.WriteData}
41  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.MergeData}
42  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.DeleteData}
43  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction}
44  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
45  * </p>
46  */
47 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
48
49     protected static final boolean SERIALIZED_REPLY = true;
50
51     private final ActorRef shardActor;
52     private final ShardStats shardStats;
53     private final String transactionID;
54     private final short clientTxVersion;
55
56     protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, String transactionID,
57             short clientTxVersion) {
58         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
59         this.shardActor = shardActor;
60         this.shardStats = shardStats;
61         this.transactionID = Preconditions.checkNotNull(transactionID);
62         this.clientTxVersion = clientTxVersion;
63     }
64
65     public static Props props(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
66             DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
67         return Props.create(new ShardTransactionCreator(type, transaction, shardActor,
68            datastoreContext, shardStats, transactionID, txnClientVersion));
69     }
70
71     protected abstract AbstractShardDataTreeTransaction<?> getDOMStoreTransaction();
72
73     protected ActorRef getShardActor() {
74         return shardActor;
75     }
76
77     protected String getTransactionID() {
78         return transactionID;
79     }
80
81     protected short getClientTxVersion() {
82         return clientTxVersion;
83     }
84
85     @Override
86     public void handleReceive(Object message) throws Exception {
87         if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
88             closeTransaction(true);
89         } else if (message instanceof ReceiveTimeout) {
90             if(LOG.isDebugEnabled()) {
91                 LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
92             }
93             closeTransaction(false);
94         } else {
95             throw new UnknownMessageException(message);
96         }
97     }
98
99     protected boolean returnCloseTransactionReply() {
100         return true;
101     }
102
103     private void closeTransaction(boolean sendReply) {
104         getDOMStoreTransaction().abort();
105
106         if(sendReply && returnCloseTransactionReply()) {
107             getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
108         }
109
110         getSelf().tell(PoisonPill.getInstance(), getSelf());
111     }
112
113     private boolean checkClosed(AbstractShardDataTreeTransaction<?> transaction) {
114         final boolean ret = transaction.isClosed();
115         if (ret) {
116             shardStats.incrementFailedReadTransactionsCount();
117             getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")), getSelf());
118         }
119         return ret;
120     }
121
122     protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message,
123             final boolean returnSerialized) {
124
125         if (checkClosed(transaction)) {
126             return;
127         }
128
129         final YangInstanceIdentifier path = message.getPath();
130         Optional<NormalizedNode<?, ?>> optional = transaction.getSnapshot().readNode(path);
131         ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
132         sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
133     }
134
135     protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message,
136         final boolean returnSerialized) {
137
138         if (checkClosed(transaction)) {
139             return;
140         }
141
142         final YangInstanceIdentifier path = message.getPath();
143         boolean exists = transaction.getSnapshot().readNode(path).isPresent();
144         DataExistsReply dataExistsReply = DataExistsReply.create(exists);
145         getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
146             dataExistsReply, getSelf());
147     }
148
149     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
150
151         private static final long serialVersionUID = 1L;
152
153         final AbstractShardDataTreeTransaction<?> transaction;
154         final ActorRef shardActor;
155         final DatastoreContext datastoreContext;
156         final ShardStats shardStats;
157         final String transactionID;
158         final short txnClientVersion;
159         final TransactionType type;
160
161         ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
162                 DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
163             this.transaction = Preconditions.checkNotNull(transaction);
164             this.shardActor = shardActor;
165             this.shardStats = shardStats;
166             this.datastoreContext = datastoreContext;
167             this.transactionID = Preconditions.checkNotNull(transactionID);
168             this.txnClientVersion = txnClientVersion;
169             this.type = type;
170         }
171
172         @Override
173         public ShardTransaction create() throws Exception {
174             final ShardTransaction tx;
175             switch (type) {
176             case READ_ONLY:
177                 tx = new ShardReadTransaction(transaction, shardActor,
178                     shardStats, transactionID, txnClientVersion);
179                 break;
180             case READ_WRITE:
181                 tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction,
182                     shardActor, shardStats, transactionID, txnClientVersion);
183                 break;
184             case WRITE_ONLY:
185                 tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction,
186                     shardActor, shardStats, transactionID, txnClientVersion);
187                 break;
188             default:
189                 throw new IllegalArgumentException("Unhandled transaction type " + type);
190             }
191
192             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
193             return tx;
194         }
195     }
196 }