2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import akka.dispatch.OnComplete;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import com.google.common.collect.Lists;
19 import com.google.common.util.concurrent.CheckedFuture;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.SettableFuture;
23 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
24 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
28 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
29 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
30 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
36 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
38 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
41 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 import scala.Function1;
49 import scala.concurrent.Future;
50 import scala.runtime.AbstractFunction1;
52 import java.util.HashMap;
53 import java.util.List;
55 import java.util.concurrent.atomic.AtomicLong;
58 * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
60 * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
61 * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
62 * be created on each of those shards by the TransactionProxy
65 * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
66 * shards will be executed.
69 public class TransactionProxy implements DOMStoreReadWriteTransaction {
70 public enum TransactionType {
76 static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
77 Throwable, Throwable>() {
79 public Throwable apply(Throwable failure) {
84 private static final AtomicLong counter = new AtomicLong();
86 private static final Logger
87 LOG = LoggerFactory.getLogger(TransactionProxy.class);
90 private final TransactionType transactionType;
91 private final ActorContext actorContext;
92 private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
93 private final TransactionIdentifier identifier;
94 private final SchemaContext schemaContext;
95 private boolean inReadyState;
97 public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
98 SchemaContext schemaContext) {
99 this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
100 this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
101 this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
103 String memberName = actorContext.getCurrentMemberName();
104 if(memberName == null){
105 memberName = "UNKNOWN-MEMBER";
108 this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
109 counter.getAndIncrement()).build();
111 LOG.debug("Created txn {} of type {}", identifier, transactionType);
116 List<Future<Object>> getRecordedOperationFutures() {
117 List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
118 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
119 recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
122 return recordedOperationFutures;
126 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
127 final YangInstanceIdentifier path) {
129 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
130 "Read operation on write-only transaction is not allowed");
132 LOG.debug("Tx {} read {}", identifier, path);
134 createTransactionIfMissing(actorContext, path);
136 return transactionContext(path).readData(path);
140 public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
142 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
143 "Exists operation on write-only transaction is not allowed");
145 LOG.debug("Tx {} exists {}", identifier, path);
147 createTransactionIfMissing(actorContext, path);
149 return transactionContext(path).dataExists(path);
152 private void checkModificationState() {
153 Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
154 "Modification operation on read-only transaction is not allowed");
155 Preconditions.checkState(!inReadyState,
156 "Transaction is sealed - further modifications are allowed");
160 public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
162 checkModificationState();
164 LOG.debug("Tx {} write {}", identifier, path);
166 createTransactionIfMissing(actorContext, path);
168 transactionContext(path).writeData(path, data);
172 public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
174 checkModificationState();
176 LOG.debug("Tx {} merge {}", identifier, path);
178 createTransactionIfMissing(actorContext, path);
180 transactionContext(path).mergeData(path, data);
184 public void delete(YangInstanceIdentifier path) {
186 checkModificationState();
188 LOG.debug("Tx {} delete {}", identifier, path);
190 createTransactionIfMissing(actorContext, path);
192 transactionContext(path).deleteData(path);
196 public DOMStoreThreePhaseCommitCohort ready() {
198 checkModificationState();
202 LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
203 remoteTransactionPaths.size());
205 List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
207 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
209 LOG.debug("Tx {} Readying transaction for shard {}", identifier,
210 transactionContext.getShardName());
212 cohortPathFutures.add(transactionContext.readyTransaction());
215 return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
216 identifier.toString());
220 public Object getIdentifier() {
221 return this.identifier;
225 public void close() {
226 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
227 transactionContext.closeTransaction();
231 private TransactionContext transactionContext(YangInstanceIdentifier path){
232 String shardName = shardNameFromIdentifier(path);
233 return remoteTransactionPaths.get(shardName);
236 private String shardNameFromIdentifier(YangInstanceIdentifier path){
237 return ShardStrategyFactory.getStrategy(path).findShard(path);
240 private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
241 String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
243 TransactionContext transactionContext =
244 remoteTransactionPaths.get(shardName);
246 if(transactionContext != null){
247 // A transaction already exists with that shard
252 Object response = actorContext.executeShardOperation(shardName,
253 new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
254 ActorContext.ASK_DURATION);
255 if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
256 CreateTransactionReply reply =
257 CreateTransactionReply.fromSerializable(response);
259 String transactionPath = reply.getTransactionPath();
261 LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
263 ActorSelection transactionActor =
264 actorContext.actorSelection(transactionPath);
266 new TransactionContextImpl(shardName, transactionPath,
269 remoteTransactionPaths.put(shardName, transactionContext);
271 throw new IllegalArgumentException(String.format(
272 "Invalid reply type {} for CreateTransaction", response.getClass()));
274 } catch(Exception e){
275 LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
276 remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e));
280 private interface TransactionContext {
281 String getShardName();
283 void closeTransaction();
285 Future<ActorPath> readyTransaction();
287 void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
289 void deleteData(YangInstanceIdentifier path);
291 void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
293 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
294 final YangInstanceIdentifier path);
296 CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
298 List<Future<Object>> getRecordedOperationFutures();
301 private abstract class AbstractTransactionContext implements TransactionContext {
303 protected final String shardName;
304 protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
306 AbstractTransactionContext(String shardName) {
307 this.shardName = shardName;
311 public String getShardName() {
316 public List<Future<Object>> getRecordedOperationFutures() {
317 return recordedOperationFutures;
321 private class TransactionContextImpl extends AbstractTransactionContext {
322 private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
324 private final String actorPath;
325 private final ActorSelection actor;
327 private TransactionContextImpl(String shardName, String actorPath,
328 ActorSelection actor) {
330 this.actorPath = actorPath;
334 private ActorSelection getActor() {
338 private String getResolvedCohortPath(String cohortPath) {
339 return actorContext.resolvePath(actorPath, cohortPath);
343 public void closeTransaction() {
344 LOG.debug("Tx {} closeTransaction called", identifier);
345 actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
349 public Future<ActorPath> readyTransaction() {
350 LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
351 identifier, recordedOperationFutures.size());
353 // Send the ReadyTransaction message to the Tx actor.
355 final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
356 new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
358 // Combine all the previously recorded put/merge/delete operation reply Futures and the
359 // ReadyTransactionReply Future into one Future. If any one fails then the combined
360 // Future will fail. We need all prior operations and the ready operation to succeed
361 // in order to attempt commit.
363 List<Future<Object>> futureList =
364 Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
365 futureList.addAll(recordedOperationFutures);
366 futureList.add(replyFuture);
368 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
369 actorContext.getActorSystem().dispatcher());
371 // Transform the combined Future into a Future that returns the cohort actor path from
372 // the ReadyTransactionReply. That's the end result of the ready operation.
374 return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
376 public ActorPath apply(Iterable<Object> notUsed) {
378 LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
381 // At this point all the Futures succeeded and we need to extract the cohort
382 // actor path from the ReadyTransactionReply. For the recorded operations, they
383 // don't return any data so we're only interested that they completed
384 // successfully. We could be paranoid and verify the correct reply types but
385 // that really should never happen so it's not worth the overhead of
386 // de-serializing each reply.
388 // Note the Future get call here won't block as it's complete.
389 Object serializedReadyReply = replyFuture.value().get().get();
390 if(serializedReadyReply.getClass().equals(
391 ReadyTransactionReply.SERIALIZABLE_CLASS)) {
392 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
393 actorContext.getActorSystem(), serializedReadyReply);
395 String resolvedCohortPath = getResolvedCohortPath(
396 reply.getCohortPath().toString());
398 LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
399 identifier, resolvedCohortPath);
401 return actorContext.actorFor(resolvedCohortPath);
403 // Throwing an exception here will fail the Future.
405 throw new IllegalArgumentException(String.format("Invalid reply type {}",
406 serializedReadyReply.getClass()));
409 }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
413 public void deleteData(YangInstanceIdentifier path) {
414 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
415 recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
416 new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
420 public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
421 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
422 recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
423 new MergeData(path, data, schemaContext).toSerializable(),
424 ActorContext.ASK_DURATION));
428 public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
429 LOG.debug("Tx {} writeData called path = {}", identifier, path);
430 recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
431 new WriteData(path, data, schemaContext).toSerializable(),
432 ActorContext.ASK_DURATION));
436 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
437 final YangInstanceIdentifier path) {
439 LOG.debug("Tx {} readData called path = {}", identifier, path);
441 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
443 // If there were any previous recorded put/merge/delete operation reply Futures then we
444 // must wait for them to successfully complete. This is necessary to honor the read
445 // uncommitted semantics of the public API contract. If any one fails then fail the read.
447 if(recordedOperationFutures.isEmpty()) {
448 finishReadData(path, returnFuture);
450 LOG.debug("Tx {} readData: verifying {} previous recorded operations",
451 identifier, recordedOperationFutures.size());
453 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
454 // Futures#sequence accesses the passed List on a different thread, as
455 // recordedOperationFutures is not synchronized.
457 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
458 Lists.newArrayList(recordedOperationFutures),
459 actorContext.getActorSystem().dispatcher());
460 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
462 public void onComplete(Throwable failure, Iterable<Object> notUsed)
464 if(failure != null) {
465 LOG.debug("Tx {} readData: a recorded operation failed: {}",
466 identifier, failure);
468 returnFuture.setException(new ReadFailedException(
469 "The read could not be performed because a previous put, merge,"
470 + "or delete operation failed", failure));
472 finishReadData(path, returnFuture);
477 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
480 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
483 private void finishReadData(final YangInstanceIdentifier path,
484 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
486 LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
488 OnComplete<Object> onComplete = new OnComplete<Object>() {
490 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
491 if(failure != null) {
492 LOG.debug("Tx {} read operation failed: {}", identifier, failure);
494 returnFuture.setException(new ReadFailedException(
495 "Error reading data for path " + path, failure));
497 LOG.debug("Tx {} read operation succeeded", identifier, failure);
499 if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
500 ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
502 if (reply.getNormalizedNode() == null) {
503 returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
505 returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
506 reply.getNormalizedNode()));
509 returnFuture.setException(new ReadFailedException(
510 "Invalid response reading data for path " + path));
516 Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
517 new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
518 readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
522 public CheckedFuture<Boolean, ReadFailedException> dataExists(
523 final YangInstanceIdentifier path) {
525 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
527 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
529 // If there were any previous recorded put/merge/delete operation reply Futures then we
530 // must wait for them to successfully complete. This is necessary to honor the read
531 // uncommitted semantics of the public API contract. If any one fails then fail this
534 if(recordedOperationFutures.isEmpty()) {
535 finishDataExists(path, returnFuture);
537 LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
538 identifier, recordedOperationFutures.size());
540 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
541 // Futures#sequence accesses the passed List on a different thread, as
542 // recordedOperationFutures is not synchronized.
544 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
545 Lists.newArrayList(recordedOperationFutures),
546 actorContext.getActorSystem().dispatcher());
547 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
549 public void onComplete(Throwable failure, Iterable<Object> notUsed)
551 if(failure != null) {
552 LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
553 identifier, failure);
555 returnFuture.setException(new ReadFailedException(
556 "The data exists could not be performed because a previous "
557 + "put, merge, or delete operation failed", failure));
559 finishDataExists(path, returnFuture);
564 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
567 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
570 private void finishDataExists(final YangInstanceIdentifier path,
571 final SettableFuture<Boolean> returnFuture) {
573 LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
575 OnComplete<Object> onComplete = new OnComplete<Object>() {
577 public void onComplete(Throwable failure, Object response) throws Throwable {
578 if(failure != null) {
579 LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
581 returnFuture.setException(new ReadFailedException(
582 "Error checking data exists for path " + path, failure));
584 LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
586 if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
587 returnFuture.set(Boolean.valueOf(DataExistsReply.
588 fromSerializable(response).exists()));
590 returnFuture.setException(new ReadFailedException(
591 "Invalid response checking exists for path " + path));
597 Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
598 new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
599 future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
603 private class NoOpTransactionContext extends AbstractTransactionContext {
605 private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
607 private final Exception failure;
609 public NoOpTransactionContext(String shardName, Exception failure){
611 this.failure = failure;
615 public void closeTransaction() {
616 LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
620 public Future<ActorPath> readyTransaction() {
621 LOG.debug("Tx {} readyTransaction called", identifier);
622 return akka.dispatch.Futures.failed(failure);
626 public void deleteData(YangInstanceIdentifier path) {
627 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
631 public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
632 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
636 public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
637 LOG.debug("Tx {} writeData called path = {}", identifier, path);
641 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
642 YangInstanceIdentifier path) {
643 LOG.debug("Tx {} readData called path = {}", identifier, path);
644 return Futures.immediateFailedCheckedFuture(new ReadFailedException(
645 "Error reading data for path " + path, failure));
649 public CheckedFuture<Boolean, ReadFailedException> dataExists(
650 YangInstanceIdentifier path) {
651 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
652 return Futures.immediateFailedCheckedFuture(new ReadFailedException(
653 "Error checking exists for path " + path, failure));