BUG 3045 : Use non-strict parsing in hello message.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicLong;
32 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
33 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
34 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
36 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
37 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
38 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
40 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47 import scala.concurrent.Future;
48 import scala.concurrent.Promise;
49
50 /**
51  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
52  * <p>
53  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
54  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
55  * be created on each of those shards by the TransactionProxy
56  *</p>
57  * <p>
58  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
59  * shards will be executed.
60  * </p>
61  */
62 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
63
64     public static enum TransactionType {
65         READ_ONLY,
66         WRITE_ONLY,
67         READ_WRITE;
68
69         // Cache all values
70         private static final TransactionType[] VALUES = values();
71
72         public static TransactionType fromInt(final int type) {
73             try {
74                 return VALUES[type];
75             } catch (IndexOutOfBoundsException e) {
76                 throw new IllegalArgumentException("In TransactionType enum value " + type, e);
77             }
78         }
79     }
80
81     private static enum TransactionState {
82         OPEN,
83         READY,
84         CLOSED,
85     }
86
87     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
88                                                               new Mapper<Throwable, Throwable>() {
89         @Override
90         public Throwable apply(Throwable failure) {
91             return failure;
92         }
93     };
94
95     private static final AtomicLong counter = new AtomicLong();
96
97     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
98
99     /**
100      * Stores the remote Tx actors for each requested data store path to be used by the
101      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
102      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
103      * remoteTransactionActors list so they will be visible to the thread accessing the
104      * PhantomReference.
105      */
106     List<ActorSelection> remoteTransactionActors;
107     volatile AtomicBoolean remoteTransactionActorsMB;
108
109     /**
110      * Stores the create transaction results per shard.
111      */
112     private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
113
114     private final TransactionType transactionType;
115     final ActorContext actorContext;
116     private final String transactionChainId;
117     private final SchemaContext schemaContext;
118     private TransactionState state = TransactionState.OPEN;
119
120     private volatile boolean initialized;
121     private Semaphore operationLimiter;
122     private OperationCompleter operationCompleter;
123
124     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
125         this(actorContext, transactionType, "");
126     }
127
128     public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
129         super(createIdentifier(actorContext));
130         this.actorContext = Preconditions.checkNotNull(actorContext,
131             "actorContext should not be null");
132         this.transactionType = Preconditions.checkNotNull(transactionType,
133             "transactionType should not be null");
134         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
135             "schemaContext should not be null");
136         this.transactionChainId = transactionChainId;
137
138         LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
139     }
140
141     private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
142         String memberName = actorContext.getCurrentMemberName();
143         if (memberName == null) {
144             memberName = "UNKNOWN-MEMBER";
145         }
146
147         return new TransactionIdentifier(memberName, counter.getAndIncrement());
148     }
149
150     @VisibleForTesting
151     boolean hasTransactionContext() {
152         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
153             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
154             if(transactionContext != null) {
155                 return true;
156             }
157         }
158
159         return false;
160     }
161
162     private boolean isRootPath(YangInstanceIdentifier path){
163         return !path.getPathArguments().iterator().hasNext();
164     }
165
166     @Override
167     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
168
169         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
170                 "Read operation on write-only transaction is not allowed");
171
172         LOG.debug("Tx {} read {}", getIdentifier(), path);
173
174         final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
175
176         if(isRootPath(path)){
177             readAllData(path, proxyFuture);
178         } else {
179             throttleOperation();
180
181             TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
182             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
183                 @Override
184                 public void invoke(TransactionContext transactionContext) {
185                     transactionContext.readData(path, proxyFuture);
186                 }
187             });
188
189         }
190
191         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
192     }
193
194     private void readAllData(final YangInstanceIdentifier path,
195                              final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
196         Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
197         List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
198
199         for(String shardName : allShardNames){
200             final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
201
202             throttleOperation();
203
204             TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
205             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
206                 @Override
207                 public void invoke(TransactionContext transactionContext) {
208                     transactionContext.readData(path, subProxyFuture);
209                 }
210             });
211
212             futures.add(subProxyFuture);
213         }
214
215         final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
216
217         future.addListener(new Runnable() {
218             @Override
219             public void run() {
220                 try {
221                     proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
222                             future.get(), actorContext.getSchemaContext()));
223                 } catch (DataValidationFailedException | InterruptedException | ExecutionException e) {
224                     proxyFuture.setException(e);
225                 }
226             }
227         }, actorContext.getActorSystem().dispatcher());
228     }
229
230     @Override
231     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
232
233         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
234                 "Exists operation on write-only transaction is not allowed");
235
236         LOG.debug("Tx {} exists {}", getIdentifier(), path);
237
238         throttleOperation();
239
240         final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
241
242         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
243         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
244             @Override
245             public void invoke(TransactionContext transactionContext) {
246                 transactionContext.dataExists(path, proxyFuture);
247             }
248         });
249
250         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
251     }
252
253     private void checkModificationState() {
254         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
255                 "Modification operation on read-only transaction is not allowed");
256         Preconditions.checkState(state == TransactionState.OPEN,
257                 "Transaction is sealed - further modifications are not allowed");
258     }
259
260     private void throttleOperation() {
261         throttleOperation(1);
262     }
263
264     private void throttleOperation(int acquirePermits) {
265         if(!initialized) {
266             // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
267             operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
268             operationCompleter = new OperationCompleter(operationLimiter);
269
270             // Make sure we write this last because it's volatile and will also publish the non-volatile writes
271             // above as well so they'll be visible to other threads.
272             initialized = true;
273         }
274
275         try {
276             if(!operationLimiter.tryAcquire(acquirePermits,
277                     actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
278                 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
279             }
280         } catch (InterruptedException e) {
281             if(LOG.isDebugEnabled()) {
282                 LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
283             } else {
284                 LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
285             }
286         }
287     }
288
289     final void ensureInitializied() {
290         Preconditions.checkState(initialized, "Transaction %s was not propertly initialized.", getIdentifier());
291     }
292
293     @Override
294     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
295
296         checkModificationState();
297
298         LOG.debug("Tx {} write {}", getIdentifier(), path);
299
300         throttleOperation();
301
302         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
303         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
304             @Override
305             public void invoke(TransactionContext transactionContext) {
306                 transactionContext.writeData(path, data);
307             }
308         });
309     }
310
311     @Override
312     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
313
314         checkModificationState();
315
316         LOG.debug("Tx {} merge {}", getIdentifier(), path);
317
318         throttleOperation();
319
320         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
321         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
322             @Override
323             public void invoke(TransactionContext transactionContext) {
324                 transactionContext.mergeData(path, data);
325             }
326         });
327     }
328
329     @Override
330     public void delete(final YangInstanceIdentifier path) {
331
332         checkModificationState();
333
334         LOG.debug("Tx {} delete {}", getIdentifier(), path);
335
336         throttleOperation();
337
338         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
339         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
340             @Override
341             public void invoke(TransactionContext transactionContext) {
342                 transactionContext.deleteData(path);
343             }
344         });
345     }
346
347     private boolean seal(final TransactionState newState) {
348         if (state == TransactionState.OPEN) {
349             state = newState;
350             return true;
351         } else {
352             return false;
353         }
354     }
355
356     @Override
357     public AbstractThreePhaseCommitCohort<?> ready() {
358         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
359                 "Read-only transactions cannot be readied");
360
361         final boolean success = seal(TransactionState.READY);
362         Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
363
364         LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
365                     txFutureCallbackMap.size());
366
367         if (txFutureCallbackMap.isEmpty()) {
368             TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
369             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
370         }
371
372         throttleOperation(txFutureCallbackMap.size());
373
374         final boolean isSingleShard = txFutureCallbackMap.size() == 1;
375         return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort();
376     }
377
378     @SuppressWarnings({ "rawtypes", "unchecked" })
379     private AbstractThreePhaseCommitCohort<Object> createSingleCommitCohort() {
380         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next();
381
382         LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
383                 txFutureCallback.getShardName(), transactionChainId);
384
385         final OperationCallback.Reference operationCallbackRef =
386                 new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
387         final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
388         final Future future;
389         if (transactionContext != null) {
390             // avoid the creation of a promise and a TransactionOperation
391             future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef);
392         } else {
393             final Promise promise = akka.dispatch.Futures.promise();
394             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
395                 @Override
396                 public void invoke(TransactionContext transactionContext) {
397                     promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef));
398                 }
399             });
400             future = promise.future();
401         }
402
403         return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef);
404     }
405
406     private Future<?> getReadyOrDirectCommitFuture(TransactionContext transactionContext,
407             OperationCallback.Reference operationCallbackRef) {
408         if(transactionContext.supportsDirectCommit()) {
409             TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext);
410             operationCallbackRef.set(rateLimitingCallback);
411             rateLimitingCallback.run();
412             return transactionContext.directCommit();
413         } else {
414             return transactionContext.readyTransaction();
415         }
416     }
417
418     private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
419         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
420         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
421
422             LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
423                         txFutureCallback.getShardName(), transactionChainId);
424
425             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
426             final Future<ActorSelection> future;
427             if (transactionContext != null) {
428                 // avoid the creation of a promise and a TransactionOperation
429                 future = transactionContext.readyTransaction();
430             } else {
431                 final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
432                 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
433                     @Override
434                     public void invoke(TransactionContext transactionContext) {
435                         promise.completeWith(transactionContext.readyTransaction());
436                     }
437                 });
438                 future = promise.future();
439             }
440
441             cohortFutures.add(future);
442         }
443
444         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString());
445     }
446
447     @Override
448     public void close() {
449         if (!seal(TransactionState.CLOSED)) {
450             if (state == TransactionState.CLOSED) {
451                 // Idempotent no-op as per AutoCloseable recommendation
452                 return;
453             }
454
455             throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
456                 getIdentifier()));
457         }
458
459         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
460             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
461                 @Override
462                 public void invoke(TransactionContext transactionContext) {
463                     transactionContext.closeTransaction();
464                 }
465             });
466         }
467
468         txFutureCallbackMap.clear();
469
470         if(remoteTransactionActorsMB != null) {
471             remoteTransactionActors.clear();
472             remoteTransactionActorsMB.set(true);
473         }
474     }
475
476     private String shardNameFromIdentifier(YangInstanceIdentifier path){
477         return ShardStrategyFactory.getStrategy(path).findShard(path);
478     }
479
480     protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
481         return actorContext.findPrimaryShardAsync(shardName);
482     }
483
484     final TransactionType getTransactionType() {
485         return transactionType;
486     }
487
488     final Semaphore getOperationLimiter() {
489         return operationLimiter;
490     }
491
492     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
493         String shardName = shardNameFromIdentifier(path);
494         return getOrCreateTxFutureCallback(shardName);
495     }
496
497     private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
498         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
499         if(txFutureCallback == null) {
500             Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
501
502             final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName);
503
504             txFutureCallback = newTxFutureCallback;
505             txFutureCallbackMap.put(shardName, txFutureCallback);
506
507             findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
508                 @Override
509                 public void onComplete(Throwable failure, ActorSelection primaryShard) {
510                     if(failure != null) {
511                         newTxFutureCallback.createTransactionContext(failure, null);
512                     } else {
513                         newTxFutureCallback.setPrimaryShard(primaryShard);
514                     }
515                 }
516             }, actorContext.getClientDispatcher());
517         }
518
519         return txFutureCallback;
520     }
521
522     String getTransactionChainId() {
523         return transactionChainId;
524     }
525
526     protected ActorContext getActorContext() {
527         return actorContext;
528     }
529
530     TransactionContext createValidTransactionContext(ActorSelection transactionActor,
531             String transactionPath, short remoteTransactionVersion) {
532
533         if (transactionType == TransactionType.READ_ONLY) {
534             // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
535             // to close the remote Tx's when this instance is no longer in use and is garbage
536             // collected.
537
538             if(remoteTransactionActorsMB == null) {
539                 remoteTransactionActors = Lists.newArrayList();
540                 remoteTransactionActorsMB = new AtomicBoolean();
541
542                 TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
543             }
544
545             // Add the actor to the remoteTransactionActors list for access by the
546             // cleanup PhantonReference.
547             remoteTransactionActors.add(transactionActor);
548
549             // Write to the memory barrier volatile to publish the above update to the
550             // remoteTransactionActors list for thread visibility.
551             remoteTransactionActorsMB.set(true);
552         }
553
554         // TxActor is always created where the leader of the shard is.
555         // Check if TxActor is created in the same node
556         boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
557
558         if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
559             return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
560                     transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
561                     operationCompleter);
562         } else {
563             return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
564                     actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
565         }
566     }
567 }