Merge "BUG-868: migrate ListenerRegistry"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.Props;
15 import com.google.common.base.Optional;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListeningExecutorService;
19 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
20 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
25 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
31 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
32 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
33 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
36 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import java.util.ArrayList;
44 import java.util.HashMap;
45 import java.util.List;
46 import java.util.Map;
47 import java.util.concurrent.Callable;
48 import java.util.concurrent.atomic.AtomicLong;
49
50 /**
51  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
52  * <p>
53  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
54  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
55  * be created on each of those shards by the TransactionProxy
56  *</p>
57  * <p>
58  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
59  * shards will be executed.
60  * </p>
61  */
62 public class TransactionProxy implements DOMStoreReadWriteTransaction {
63     public enum TransactionType {
64         READ_ONLY,
65         WRITE_ONLY,
66         READ_WRITE
67     }
68
69     private static final AtomicLong counter = new AtomicLong();
70
71     private static final Logger
72         LOG = LoggerFactory.getLogger(TransactionProxy.class);
73
74
75     private final TransactionType transactionType;
76     private final ActorContext actorContext;
77     private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
78     private final String identifier;
79     private final ListeningExecutorService executor;
80     private final SchemaContext schemaContext;
81
82     public TransactionProxy(
83         ActorContext actorContext,
84         TransactionType transactionType,
85         ListeningExecutorService executor,
86         SchemaContext schemaContext
87     ) {
88
89         this.identifier = actorContext.getCurrentMemberName() + "-txn-" + counter.getAndIncrement();
90         this.transactionType = transactionType;
91         this.actorContext = actorContext;
92         this.executor = executor;
93         this.schemaContext = schemaContext;
94
95
96     }
97
98     @Override
99     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
100             final YangInstanceIdentifier path) {
101
102         createTransactionIfMissing(actorContext, path);
103
104         return transactionContext(path).readData(path);
105     }
106
107     @Override
108     public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
109
110         createTransactionIfMissing(actorContext, path);
111
112         transactionContext(path).writeData(path, data);
113     }
114
115     @Override
116     public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
117
118         createTransactionIfMissing(actorContext, path);
119
120         transactionContext(path).mergeData(path, data);
121     }
122
123     @Override
124     public void delete(YangInstanceIdentifier path) {
125
126         createTransactionIfMissing(actorContext, path);
127
128         transactionContext(path).deleteData(path);
129     }
130
131     @Override
132     public DOMStoreThreePhaseCommitCohort ready() {
133         List<ActorPath> cohortPaths = new ArrayList<>();
134
135         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
136             Object result = transactionContext.readyTransaction();
137
138             if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
139                 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(actorContext.getActorSystem(),result);
140                 String resolvedCohortPath = transactionContext
141                     .getResolvedCohortPath(reply.getCohortPath().toString());
142                 cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
143             }
144         }
145
146         return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor);
147     }
148
149     @Override
150     public Object getIdentifier() {
151         return this.identifier;
152     }
153
154     @Override
155     public void close() {
156         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
157             transactionContext.closeTransaction();
158         }
159     }
160
161     private TransactionContext transactionContext(YangInstanceIdentifier path){
162         String shardName = shardNameFromIdentifier(path);
163         return remoteTransactionPaths.get(shardName);
164     }
165
166     private String shardNameFromIdentifier(YangInstanceIdentifier path){
167         return ShardStrategyFactory.getStrategy(path).findShard(path);
168     }
169
170     private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
171         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
172
173         TransactionContext transactionContext =
174             remoteTransactionPaths.get(shardName);
175
176         if(transactionContext != null){
177             // A transaction already exists with that shard
178             return;
179         }
180
181         try {
182             Object response = actorContext.executeShardOperation(shardName,
183                 new CreateTransaction(identifier,this.transactionType.ordinal() ).toSerializable(),
184                 ActorContext.ASK_DURATION);
185             if (response.getClass()
186                 .equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
187                 CreateTransactionReply reply =
188                     CreateTransactionReply.fromSerializable(response);
189
190                 String transactionPath = reply.getTransactionPath();
191
192                 LOG.info("Received transaction path = {}"  , transactionPath );
193
194                 ActorSelection transactionActor =
195                     actorContext.actorSelection(transactionPath);
196                 transactionContext =
197                     new TransactionContextImpl(shardName, transactionPath,
198                         transactionActor);
199
200                 remoteTransactionPaths.put(shardName, transactionContext);
201             }
202         } catch(TimeoutException | PrimaryNotFoundException e){
203             LOG.error("Creating NoOpTransaction because of : {}", e.getMessage());
204             remoteTransactionPaths.put(shardName,
205                 new NoOpTransactionContext(shardName));
206         }
207     }
208
209     private interface TransactionContext {
210         String getShardName();
211
212         String getResolvedCohortPath(String cohortPath);
213
214         public void closeTransaction();
215
216         public Object readyTransaction();
217
218         void deleteData(YangInstanceIdentifier path);
219
220         void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
221
222         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
223                 final YangInstanceIdentifier path);
224
225         void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
226     }
227
228
229     private class TransactionContextImpl implements TransactionContext{
230         private final String shardName;
231         private final String actorPath;
232         private final ActorSelection  actor;
233
234
235         private TransactionContextImpl(String shardName, String actorPath,
236             ActorSelection actor) {
237             this.shardName = shardName;
238             this.actorPath = actorPath;
239             this.actor = actor;
240         }
241
242         @Override public String getShardName() {
243             return shardName;
244         }
245
246         private ActorSelection getActor() {
247             return actor;
248         }
249
250         @Override public String getResolvedCohortPath(String cohortPath){
251             return actorContext.resolvePath(actorPath, cohortPath);
252         }
253
254         @Override public void closeTransaction() {
255             getActor().tell(
256                 new CloseTransaction().toSerializable(), null);
257         }
258
259         @Override public Object readyTransaction() {
260             return actorContext.executeRemoteOperation(getActor(),
261                 new ReadyTransaction().toSerializable(),
262                 ActorContext.ASK_DURATION
263             );
264
265         }
266
267         @Override public void deleteData(YangInstanceIdentifier path) {
268             getActor().tell(new DeleteData(path).toSerializable(), null);
269         }
270
271         @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data){
272             getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null);
273         }
274
275         @Override public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
276                 final YangInstanceIdentifier path) {
277
278             Callable<Optional<NormalizedNode<?,?>>> call = new Callable<Optional<NormalizedNode<?,?>>>() {
279
280                 @Override public Optional<NormalizedNode<?,?>> call() throws Exception {
281                     Object response = actorContext
282                         .executeRemoteOperation(getActor(), new ReadData(path).toSerializable(),
283                             ActorContext.ASK_DURATION);
284                     if(response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)){
285                         ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,path, response);
286                         if(reply.getNormalizedNode() == null){
287                             return Optional.absent();
288                         }
289                         return Optional.<NormalizedNode<?,?>>of(reply.getNormalizedNode());
290                     }
291
292                     return Optional.absent();
293                 }
294             };
295
296             return MappingCheckedFuture.create(executor.submit(call), ReadFailedException.MAPPER);
297         }
298
299         @Override public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
300             getActor().tell(new WriteData(path, data, schemaContext).toSerializable(), null);
301         }
302
303     }
304
305     private class NoOpTransactionContext implements TransactionContext {
306
307         private final Logger
308             LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
309
310         private final String shardName;
311
312         private ActorRef cohort;
313
314         public NoOpTransactionContext(String shardName){
315             this.shardName = shardName;
316         }
317         @Override public String getShardName() {
318             return  shardName;
319
320         }
321
322         @Override public String getResolvedCohortPath(String cohortPath) {
323             return cohort.path().toString();
324         }
325
326         @Override public void closeTransaction() {
327             LOG.error("closeTransaction called");
328         }
329
330         @Override public Object readyTransaction() {
331             LOG.error("readyTransaction called");
332             cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
333             return new ReadyTransactionReply(cohort.path()).toSerializable();
334         }
335
336         @Override public void deleteData(YangInstanceIdentifier path) {
337             LOG.error("deleteData called path = {}", path);
338         }
339
340         @Override public void mergeData(YangInstanceIdentifier path,
341             NormalizedNode<?, ?> data) {
342             LOG.error("mergeData called path = {}", path);
343         }
344
345         @Override
346         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
347             YangInstanceIdentifier path) {
348             LOG.error("readData called path = {}", path);
349             return Futures.immediateCheckedFuture(
350                 Optional.<NormalizedNode<?, ?>>absent());
351         }
352
353         @Override public void writeData(YangInstanceIdentifier path,
354             NormalizedNode<?, ?> data) {
355             LOG.error("writeData called path = {}", path);
356         }
357     }
358
359
360
361 }