2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13 import java.util.AbstractMap.SimpleEntry;
14 import java.util.List;
15 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
16 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
17 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
18 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
19 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
21 import scala.concurrent.Future;
22 import scala.concurrent.Promise;
25 * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
27 public class TransactionChainProxy implements DOMStoreTransactionChain{
28 private final ActorContext actorContext;
29 private final String transactionChainId;
30 private volatile SimpleEntry<Object, List<Future<ActorSelection>>> previousTxReadyFutures;
32 public TransactionChainProxy(ActorContext actorContext) {
33 this.actorContext = actorContext;
34 transactionChainId = actorContext.getCurrentMemberName() + "-" + System.currentTimeMillis();
38 public DOMStoreReadTransaction newReadOnlyTransaction() {
39 return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
43 public DOMStoreReadWriteTransaction newReadWriteTransaction() {
44 return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
48 public DOMStoreWriteTransaction newWriteOnlyTransaction() {
49 return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
54 // Send a close transaction chain request to each and every shard
55 actorContext.broadcast(new CloseTransactionChain(transactionChainId));
58 public String getTransactionChainId() {
59 return transactionChainId;
62 private class ChainedTransactionProxy extends TransactionProxy {
64 ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) {
65 super(actorContext, transactionType, transactionChainId);
69 protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
70 if(!cohortFutures.isEmpty()) {
71 previousTxReadyFutures = new SimpleEntry<>(getIdentifier(), cohortFutures);
73 previousTxReadyFutures = null;
78 * This method is overridden to ensure the previous Tx's ready operations complete
79 * before we create the next shard Tx in the chain to avoid creation failures if the
80 * previous Tx's ready operations haven't completed yet.
83 protected Future<Object> sendCreateTransaction(final ActorSelection shard,
84 final Object serializedCreateMessage) {
85 // Check if there are any previous ready Futures. Also make sure the previous ready
86 // Futures aren't for this Tx as deadlock would occur if tried to wait on our own
87 // Futures. This may happen b/c the shard Tx creates are done async so it's possible
88 // for the client to ready this Tx before we've even attempted to create a shard Tx.
89 if(previousTxReadyFutures == null ||
90 previousTxReadyFutures.getKey().equals(getIdentifier())) {
91 return super.sendCreateTransaction(shard, serializedCreateMessage);
94 // Combine the ready Futures into 1.
95 Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
96 previousTxReadyFutures.getValue(), actorContext.getActorSystem().dispatcher());
98 // Add a callback for completion of the combined Futures.
99 final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
100 OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
102 public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
103 if(failure != null) {
104 // A Ready Future failed so fail the returned Promise.
105 createTxPromise.failure(failure);
107 // Send the CreateTx message and use the resulting Future to complete the
109 createTxPromise.completeWith(actorContext.executeOperationAsync(shard,
110 serializedCreateMessage));
115 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
117 return createTxPromise.future();