BUG-1568 Ssh Handler for netconf client Mina implementation
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.Props;
15 import akka.dispatch.OnComplete;
16
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import com.google.common.util.concurrent.CheckedFuture;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.SettableFuture;
22
23 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
24 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
28 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
29 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
30 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
36 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
38 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
41 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 import scala.concurrent.Future;
49
50 import java.util.ArrayList;
51 import java.util.HashMap;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.concurrent.atomic.AtomicLong;
55
56 /**
57  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
58  * <p>
59  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
60  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
61  * be created on each of those shards by the TransactionProxy
62  *</p>
63  * <p>
64  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
65  * shards will be executed.
66  * </p>
67  */
68 public class TransactionProxy implements DOMStoreReadWriteTransaction {
69     public enum TransactionType {
70         READ_ONLY,
71         WRITE_ONLY,
72         READ_WRITE
73     }
74
75     private static final AtomicLong counter = new AtomicLong();
76
77     private static final Logger
78         LOG = LoggerFactory.getLogger(TransactionProxy.class);
79
80
81     private final TransactionType transactionType;
82     private final ActorContext actorContext;
83     private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
84     private final TransactionIdentifier identifier;
85     private final SchemaContext schemaContext;
86     private boolean inReadyState;
87
88     public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
89             SchemaContext schemaContext) {
90         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
91         this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
92         this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
93
94         String memberName = actorContext.getCurrentMemberName();
95         if(memberName == null){
96             memberName = "UNKNOWN-MEMBER";
97         }
98
99         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
100                 counter.getAndIncrement()).build();
101
102         LOG.debug("Created txn {}", identifier);
103
104     }
105
106     @Override
107     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
108             final YangInstanceIdentifier path) {
109
110         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
111                 "Read operation on write-only transaction is not allowed");
112
113         LOG.debug("txn {} read {}", identifier, path);
114
115         createTransactionIfMissing(actorContext, path);
116
117         return transactionContext(path).readData(path);
118     }
119
120     @Override
121     public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
122
123         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
124                 "Exists operation on write-only transaction is not allowed");
125
126         LOG.debug("txn {} exists {}", identifier, path);
127
128         createTransactionIfMissing(actorContext, path);
129
130         return transactionContext(path).dataExists(path);
131     }
132
133     private void checkModificationState() {
134         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
135                 "Modification operation on read-only transaction is not allowed");
136         Preconditions.checkState(!inReadyState,
137                 "Transaction is sealed - further modifications are allowed");
138     }
139
140     @Override
141     public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
142
143         checkModificationState();
144
145         LOG.debug("txn {} write {}", identifier, path);
146
147         createTransactionIfMissing(actorContext, path);
148
149         transactionContext(path).writeData(path, data);
150     }
151
152     @Override
153     public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
154
155         checkModificationState();
156
157         LOG.debug("txn {} merge {}", identifier, path);
158
159         createTransactionIfMissing(actorContext, path);
160
161         transactionContext(path).mergeData(path, data);
162     }
163
164     @Override
165     public void delete(YangInstanceIdentifier path) {
166
167         checkModificationState();
168
169         LOG.debug("txn {} delete {}", identifier, path);
170
171         createTransactionIfMissing(actorContext, path);
172
173         transactionContext(path).deleteData(path);
174     }
175
176     @Override
177     public DOMStoreThreePhaseCommitCohort ready() {
178
179         checkModificationState();
180
181         inReadyState = true;
182
183         List<ActorPath> cohortPaths = new ArrayList<>();
184
185         LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier,
186                 remoteTransactionPaths.size());
187
188         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
189
190             LOG.debug("txn {} Readying transaction for shard {}", identifier,
191                     transactionContext.getShardName());
192
193             Object result = transactionContext.readyTransaction();
194
195             if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
196                 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
197                         actorContext.getActorSystem(),result);
198                 String resolvedCohortPath = transactionContext.getResolvedCohortPath(
199                         reply.getCohortPath().toString());
200                 cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
201             } else {
202                 LOG.error("Was expecting {} but got {}", ReadyTransactionReply.SERIALIZABLE_CLASS,
203                         result.getClass());
204             }
205         }
206
207         return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString());
208     }
209
210     @Override
211     public Object getIdentifier() {
212         return this.identifier;
213     }
214
215     @Override
216     public void close() {
217         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
218             transactionContext.closeTransaction();
219         }
220     }
221
222     private TransactionContext transactionContext(YangInstanceIdentifier path){
223         String shardName = shardNameFromIdentifier(path);
224         return remoteTransactionPaths.get(shardName);
225     }
226
227     private String shardNameFromIdentifier(YangInstanceIdentifier path){
228         return ShardStrategyFactory.getStrategy(path).findShard(path);
229     }
230
231     private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
232         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
233
234         TransactionContext transactionContext =
235             remoteTransactionPaths.get(shardName);
236
237         if(transactionContext != null){
238             // A transaction already exists with that shard
239             return;
240         }
241
242         try {
243             Object response = actorContext.executeShardOperation(shardName,
244                 new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
245                 ActorContext.ASK_DURATION);
246             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
247                 CreateTransactionReply reply =
248                     CreateTransactionReply.fromSerializable(response);
249
250                 String transactionPath = reply.getTransactionPath();
251
252                 LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
253
254                 ActorSelection transactionActor =
255                     actorContext.actorSelection(transactionPath);
256                 transactionContext =
257                     new TransactionContextImpl(shardName, transactionPath,
258                         transactionActor);
259
260                 remoteTransactionPaths.put(shardName, transactionContext);
261             } else {
262                 LOG.error("Was expecting {} but got {}", CreateTransactionReply.SERIALIZABLE_CLASS,
263                         response.getClass());
264             }
265         } catch(Exception e){
266             LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
267             remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e));
268         }
269     }
270
271     private interface TransactionContext {
272         String getShardName();
273
274         String getResolvedCohortPath(String cohortPath);
275
276         public void closeTransaction();
277
278         public Object readyTransaction();
279
280         void deleteData(YangInstanceIdentifier path);
281
282         void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
283
284         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
285                 final YangInstanceIdentifier path);
286
287         void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
288
289         CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
290     }
291
292
293     private class TransactionContextImpl implements TransactionContext {
294         private final String shardName;
295         private final String actorPath;
296         private final ActorSelection actor;
297
298
299         private TransactionContextImpl(String shardName, String actorPath,
300             ActorSelection actor) {
301             this.shardName = shardName;
302             this.actorPath = actorPath;
303             this.actor = actor;
304         }
305
306         @Override
307         public String getShardName() {
308             return shardName;
309         }
310
311         private ActorSelection getActor() {
312             return actor;
313         }
314
315         @Override
316         public String getResolvedCohortPath(String cohortPath) {
317             return actorContext.resolvePath(actorPath, cohortPath);
318         }
319
320         @Override
321         public void closeTransaction() {
322             actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
323         }
324
325         @Override
326         public Object readyTransaction() {
327             return actorContext.executeRemoteOperation(getActor(),
328                     new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
329         }
330
331         @Override
332         public void deleteData(YangInstanceIdentifier path) {
333             actorContext.sendRemoteOperationAsync(getActor(), new DeleteData(path).toSerializable() );
334         }
335
336         @Override
337         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
338             actorContext.sendRemoteOperationAsync(getActor(),
339                     new MergeData(path, data, schemaContext).toSerializable());
340         }
341
342         @Override
343         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
344             final YangInstanceIdentifier path) {
345
346             final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
347
348             OnComplete<Object> onComplete = new OnComplete<Object>() {
349                 @Override
350                 public void onComplete(Throwable failure, Object response) throws Throwable {
351                     if(failure != null) {
352                         returnFuture.setException(new ReadFailedException(
353                                 "Error reading data for path " + path, failure));
354                     } else {
355                         if (response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
356                             ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
357                                     path, response);
358                             if (reply.getNormalizedNode() == null) {
359                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
360                             } else {
361                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
362                                         reply.getNormalizedNode()));
363                             }
364                         } else {
365                             returnFuture.setException(new ReadFailedException(
366                                     "Invalid response reading data for path " + path));
367                         }
368                     }
369                 }
370             };
371
372             Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
373                     new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
374             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
375
376             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
377         }
378
379         @Override
380         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
381             actorContext.sendRemoteOperationAsync(getActor(),
382                     new WriteData(path, data, schemaContext).toSerializable());
383         }
384
385         @Override
386         public CheckedFuture<Boolean, ReadFailedException> dataExists(
387                 final YangInstanceIdentifier path) {
388
389             final SettableFuture<Boolean> returnFuture = SettableFuture.create();
390
391             OnComplete<Object> onComplete = new OnComplete<Object>() {
392                 @Override
393                 public void onComplete(Throwable failure, Object response) throws Throwable {
394                     if(failure != null) {
395                         returnFuture.setException(new ReadFailedException(
396                                 "Error checking exists for path " + path, failure));
397                     } else {
398                         if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
399                             returnFuture.set(Boolean.valueOf(DataExistsReply.
400                                         fromSerializable(response).exists()));
401                         } else {
402                             returnFuture.setException(new ReadFailedException(
403                                     "Invalid response checking exists for path " + path));
404                         }
405                     }
406                 }
407             };
408
409             Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
410                     new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
411             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
412
413             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
414         }
415     }
416
417     private class NoOpTransactionContext implements TransactionContext {
418
419         private final Logger
420             LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
421
422         private final String shardName;
423         private final Exception failure;
424
425         private ActorRef cohort;
426
427         public NoOpTransactionContext(String shardName, Exception failure){
428             this.shardName = shardName;
429             this.failure = failure;
430         }
431
432         @Override
433         public String getShardName() {
434             return  shardName;
435
436         }
437
438         @Override
439         public String getResolvedCohortPath(String cohortPath) {
440             return cohort.path().toString();
441         }
442
443         @Override
444         public void closeTransaction() {
445             LOG.warn("txn {} closeTransaction called", identifier);
446         }
447
448         @Override public Object readyTransaction() {
449             LOG.warn("txn {} readyTransaction called", identifier);
450             cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
451             return new ReadyTransactionReply(cohort.path()).toSerializable();
452         }
453
454         @Override
455         public void deleteData(YangInstanceIdentifier path) {
456             LOG.warn("txt {} deleteData called path = {}", identifier, path);
457         }
458
459         @Override
460         public void mergeData(YangInstanceIdentifier path,
461             NormalizedNode<?, ?> data) {
462             LOG.warn("txn {} mergeData called path = {}", identifier, path);
463         }
464
465         @Override
466         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
467             YangInstanceIdentifier path) {
468             LOG.warn("txn {} readData called path = {}", identifier, path);
469             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
470                     "Error reading data for path " + path, failure));
471         }
472
473         @Override public void writeData(YangInstanceIdentifier path,
474             NormalizedNode<?, ?> data) {
475             LOG.warn("txn {} writeData called path = {}", identifier, path);
476         }
477
478         @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
479             YangInstanceIdentifier path) {
480             LOG.warn("txn {} dataExists called path = {}", identifier, path);
481             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
482                     "Error checking exists for path " + path, failure));
483         }
484     }
485
486
487
488 }