Merge "Bug 2684 - Fixing support for Neutron binding extended attributes"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransaction.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
19 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35
36 /**
37  * The ShardTransaction Actor represents a remote transaction
38  * <p>
39  * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
40  * </p>
41  * <p>
42  * Handles Messages <br/>
43  * ---------------- <br/>
44  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
45  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.WriteData}
46  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.MergeData}
47  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.DeleteData}
48  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction}
49  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
50  * </p>
51  */
52 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
53
54     protected static final boolean SERIALIZED_REPLY = true;
55
56     private final ActorRef shardActor;
57     private final SchemaContext schemaContext;
58     private final ShardStats shardStats;
59     private final String transactionID;
60     private final short clientTxVersion;
61
62     protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
63             ShardStats shardStats, String transactionID, short clientTxVersion) {
64         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
65         this.shardActor = shardActor;
66         this.schemaContext = schemaContext;
67         this.shardStats = shardStats;
68         this.transactionID = transactionID;
69         this.clientTxVersion = clientTxVersion;
70     }
71
72     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
73             SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
74             String transactionID, short txnClientVersion) {
75         return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
76            datastoreContext, shardStats, transactionID, txnClientVersion));
77     }
78
79     protected abstract DOMStoreTransaction getDOMStoreTransaction();
80
81     protected ActorRef getShardActor() {
82         return shardActor;
83     }
84
85     protected String getTransactionID() {
86         return transactionID;
87     }
88
89     protected SchemaContext getSchemaContext() {
90         return schemaContext;
91     }
92
93     protected short getClientTxVersion() {
94         return clientTxVersion;
95     }
96
97     @Override
98     public void handleReceive(Object message) throws Exception {
99         if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
100             closeTransaction(true);
101         } else if (message instanceof ReceiveTimeout) {
102             if(LOG.isDebugEnabled()) {
103                 LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
104             }
105             closeTransaction(false);
106         } else {
107             throw new UnknownMessageException(message);
108         }
109     }
110
111     protected boolean returnCloseTransactionReply() {
112         return true;
113     }
114
115     private void closeTransaction(boolean sendReply) {
116         getDOMStoreTransaction().close();
117
118         if(sendReply && returnCloseTransactionReply()) {
119             getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
120         }
121
122         getSelf().tell(PoisonPill.getInstance(), getSelf());
123     }
124
125     protected void readData(DOMStoreReadTransaction transaction, ReadData message,
126             final boolean returnSerialized) {
127         final ActorRef sender = getSender();
128         final ActorRef self = getSelf();
129         final YangInstanceIdentifier path = message.getPath();
130         final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
131                 transaction.read(path);
132
133         future.addListener(new Runnable() {
134             @Override
135             public void run() {
136                 try {
137                     Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
138                     ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
139
140                     sender.tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion):
141                         readDataReply), self);
142
143                 } catch (Exception e) {
144                     shardStats.incrementFailedReadTransactionsCount();
145                     sender.tell(new akka.actor.Status.Failure(e), self);
146                 }
147
148             }
149         }, getContext().dispatcher());
150     }
151
152     protected void dataExists(DOMStoreReadTransaction transaction, DataExists message,
153         final boolean returnSerialized) {
154         final YangInstanceIdentifier path = message.getPath();
155
156         try {
157             Boolean exists = transaction.exists(path).checkedGet();
158             DataExistsReply dataExistsReply = new DataExistsReply(exists);
159             getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
160                 dataExistsReply, getSelf());
161         } catch (ReadFailedException e) {
162             getSender().tell(new akka.actor.Status.Failure(e),getSelf());
163         }
164
165     }
166
167     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
168
169         private static final long serialVersionUID = 1L;
170
171         final DOMStoreTransaction transaction;
172         final ActorRef shardActor;
173         final SchemaContext schemaContext;
174         final DatastoreContext datastoreContext;
175         final ShardStats shardStats;
176         final String transactionID;
177         final short txnClientVersion;
178
179         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
180                 SchemaContext schemaContext, DatastoreContext datastoreContext,
181                 ShardStats shardStats, String transactionID, short txnClientVersion) {
182             this.transaction = transaction;
183             this.shardActor = shardActor;
184             this.shardStats = shardStats;
185             this.schemaContext = schemaContext;
186             this.datastoreContext = datastoreContext;
187             this.transactionID = transactionID;
188             this.txnClientVersion = txnClientVersion;
189         }
190
191         @Override
192         public ShardTransaction create() throws Exception {
193             ShardTransaction tx;
194             if(transaction instanceof DOMStoreReadWriteTransaction) {
195                 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
196                         shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
197             } else if(transaction instanceof DOMStoreReadTransaction) {
198                 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
199                         schemaContext, shardStats, transactionID, txnClientVersion);
200             } else {
201                 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
202                         shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
203             }
204
205             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
206             return tx;
207         }
208     }
209 }