5a0049aa6d05acd5f0e9114ff93e5dcc4ca53dad
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListenableFutureTask;
16 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
18 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
19 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
20 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
21 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
22 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
23 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
26 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
27 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
32 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
33
34 import java.util.ArrayList;
35 import java.util.HashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.Callable;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.atomic.AtomicLong;
41
42 /**
43  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
44  * <p>
45  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
46  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
47  * be created on each of those shards by the TransactionProxy
48  *</p>
49  * <p>
50  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
51  * shards will be executed.
52  * </p>
53  */
54 public class TransactionProxy implements DOMStoreReadWriteTransaction {
55     public enum TransactionType {
56         READ_ONLY,
57         WRITE_ONLY,
58         READ_WRITE
59     }
60
61     private static final AtomicLong counter = new AtomicLong();
62
63     private final TransactionType transactionType;
64     private final ActorContext actorContext;
65     private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
66     private final String identifier;
67     private final ExecutorService executor;
68     private final SchemaContext schemaContext;
69
70     public TransactionProxy(
71         ActorContext actorContext,
72         TransactionType transactionType,
73         ExecutorService executor,
74         SchemaContext schemaContext
75         ) {
76
77         this.identifier = actorContext.getCurrentMemberName() + "-txn-" + counter.getAndIncrement();
78         this.transactionType = transactionType;
79         this.actorContext = actorContext;
80         this.executor = executor;
81         this.schemaContext = schemaContext;
82
83
84     }
85
86     @Override
87     public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final YangInstanceIdentifier path) {
88
89         createTransactionIfMissing(actorContext, path);
90
91         final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
92
93         Callable<Optional<NormalizedNode<?,?>>> call = new Callable() {
94
95             @Override public Optional<NormalizedNode<?,?>> call() throws Exception {
96                 Object response = actorContext
97                     .executeRemoteOperation(remoteTransaction, new ReadData(path).toSerializable(),
98                         ActorContext.ASK_DURATION);
99                 if(response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)){
100                     ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,path, response);
101                     if(reply.getNormalizedNode() == null){
102                         return Optional.absent();
103                     }
104                     //FIXME : A cast should not be required here ???
105                     return (Optional<NormalizedNode<?, ?>>) Optional.of(reply.getNormalizedNode());
106                 }
107
108                 return Optional.absent();
109             }
110         };
111
112         ListenableFutureTask<Optional<NormalizedNode<?, ?>>>
113             future = ListenableFutureTask.create(call);
114
115         executor.submit(future);
116
117         return future;
118     }
119
120     @Override
121     public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
122
123         createTransactionIfMissing(actorContext, path);
124
125         final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
126         remoteTransaction.tell(new WriteData(path, data, schemaContext).toSerializable(), null);
127     }
128
129     @Override
130     public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
131
132         createTransactionIfMissing(actorContext, path);
133
134         final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
135         remoteTransaction.tell(new MergeData(path, data, schemaContext).toSerializable(), null);
136     }
137
138     @Override
139     public void delete(YangInstanceIdentifier path) {
140
141         createTransactionIfMissing(actorContext, path);
142
143         final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
144         remoteTransaction.tell(new DeleteData(path).toSerializable(), null);
145     }
146
147     @Override
148     public DOMStoreThreePhaseCommitCohort ready() {
149         List<ActorPath> cohortPaths = new ArrayList<>();
150
151         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
152             Object result = actorContext.executeRemoteOperation(transactionContext.getActor(),
153                 new ReadyTransaction().toSerializable(),
154                 ActorContext.ASK_DURATION
155             );
156
157             if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
158                 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(actorContext.getActorSystem(),result);
159                 String resolvedCohortPath = transactionContext
160                     .getResolvedCohortPath(reply.getCohortPath().toString());
161                 cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
162             }
163         }
164
165         return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor);
166     }
167
168     @Override
169     public Object getIdentifier() {
170         return this.identifier;
171     }
172
173     @Override
174     public void close() {
175         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
176             transactionContext.getActor().tell(
177                 new CloseTransaction().toSerializable(), null);
178         }
179     }
180
181     private ActorSelection remoteTransactionFromIdentifier(YangInstanceIdentifier path){
182         String shardName = shardNameFromIdentifier(path);
183         return remoteTransactionPaths.get(shardName).getActor();
184     }
185
186     private String shardNameFromIdentifier(YangInstanceIdentifier path){
187         return ShardStrategyFactory.getStrategy(path).findShard(path);
188     }
189
190     private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
191         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
192
193         TransactionContext transactionContext =
194             remoteTransactionPaths.get(shardName);
195
196         if(transactionContext != null){
197             // A transaction already exists with that shard
198             return;
199         }
200
201         Object response = actorContext.executeShardOperation(shardName, new CreateTransaction(identifier).toSerializable(), ActorContext.ASK_DURATION);
202         if(response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)){
203             CreateTransactionReply reply = CreateTransactionReply.fromSerializable(response);
204             String transactionPath = actorContext.getRemoteActorPath(shardName, reply.getTransactionPath());
205
206             ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
207             transactionContext = new TransactionContext(shardName, transactionPath, transactionActor);
208
209             remoteTransactionPaths.put(shardName, transactionContext);
210         }
211     }
212
213
214     private class TransactionContext {
215         private final String shardName;
216         private final String actorPath;
217         private final ActorSelection  actor;
218
219
220         private TransactionContext(String shardName, String actorPath,
221             ActorSelection actor) {
222             this.shardName = shardName;
223             this.actorPath = actorPath;
224             this.actor = actor;
225         }
226
227
228         public String getShardName() {
229             return shardName;
230         }
231
232         public String getActorPath() {
233             return actorPath;
234         }
235
236         public ActorSelection getActor() {
237             return actor;
238         }
239
240         public String getResolvedCohortPath(String cohortPath){
241             return actorContext.resolvePath(actorPath, cohortPath);
242         }
243     }
244
245 }