2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13 import com.google.common.base.Preconditions;
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
18 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import scala.concurrent.Future;
27 import scala.concurrent.Promise;
30 * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
32 public class TransactionChainProxy implements DOMStoreTransactionChain {
34 private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class);
36 private interface State {
39 List<Future<ActorSelection>> getPreviousReadyFutures();
42 private static class Allocated implements State {
43 private final ChainedTransactionProxy transaction;
45 Allocated(ChainedTransactionProxy transaction) {
46 this.transaction = transaction;
50 public boolean isReady() {
51 return transaction.isReady();
55 public List<Future<ActorSelection>> getPreviousReadyFutures() {
56 return transaction.getReadyFutures();
60 private static abstract class AbstractDefaultState implements State {
62 public List<Future<ActorSelection>> getPreviousReadyFutures() {
63 return Collections.emptyList();
67 private static final State IDLE_STATE = new AbstractDefaultState() {
69 public boolean isReady() {
74 private static final State CLOSED_STATE = new AbstractDefaultState() {
76 public boolean isReady() {
77 throw new TransactionChainClosedException("Transaction chain has been closed");
81 private static final AtomicInteger counter = new AtomicInteger(0);
83 private final ActorContext actorContext;
84 private final String transactionChainId;
85 private volatile State currentState = IDLE_STATE;
87 public TransactionChainProxy(ActorContext actorContext) {
88 this.actorContext = actorContext;
89 transactionChainId = actorContext.getCurrentMemberName() + "-txn-chain-" + counter.incrementAndGet();
92 public String getTransactionChainId() {
93 return transactionChainId;
97 public DOMStoreReadTransaction newReadOnlyTransaction() {
98 State localState = currentState;
99 checkReadyState(localState);
101 return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
102 transactionChainId, localState.getPreviousReadyFutures());
106 public DOMStoreReadWriteTransaction newReadWriteTransaction() {
107 return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
111 public DOMStoreWriteTransaction newWriteOnlyTransaction() {
112 return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
116 public void close() {
117 currentState = CLOSED_STATE;
119 // Send a close transaction chain request to each and every shard
120 actorContext.broadcast(new CloseTransactionChain(transactionChainId));
123 private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) {
124 State localState = currentState;
126 checkReadyState(localState);
128 // Pass the ready Futures from the previous Tx.
129 ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type,
130 transactionChainId, localState.getPreviousReadyFutures());
132 currentState = new Allocated(txProxy);
137 private void checkReadyState(State state) {
138 Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet");
141 private static class ChainedTransactionProxy extends TransactionProxy {
144 * Stores the ready Futures from the previous Tx in the chain.
146 private final List<Future<ActorSelection>> previousReadyFutures;
149 * Stores the ready Futures from this transaction when it is readied.
151 private volatile List<Future<ActorSelection>> readyFutures;
153 private ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType,
154 String transactionChainId, List<Future<ActorSelection>> previousReadyFutures) {
155 super(actorContext, transactionType, transactionChainId);
156 this.previousReadyFutures = previousReadyFutures;
159 List<Future<ActorSelection>> getReadyFutures() {
164 return readyFutures != null;
168 protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
169 LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
170 readyFutures.size(), getTransactionChainId());
171 this.readyFutures = readyFutures;
175 * This method is overridden to ensure the previous Tx's ready operations complete
176 * before we create the next shard Tx in the chain to avoid creation failures if the
177 * previous Tx's ready operations haven't completed yet.
180 protected Future<Object> sendCreateTransaction(final ActorSelection shard,
181 final Object serializedCreateMessage) {
183 // Check if there are any previous ready Futures, otherwise let the super class handle it.
184 if(previousReadyFutures.isEmpty()) {
185 return super.sendCreateTransaction(shard, serializedCreateMessage);
188 // Combine the ready Futures into 1.
189 Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
190 previousReadyFutures, getActorContext().getActorSystem().dispatcher());
192 // Add a callback for completion of the combined Futures.
193 final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
194 OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
196 public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
197 if(failure != null) {
198 // A Ready Future failed so fail the returned Promise.
199 createTxPromise.failure(failure);
201 LOG.debug("Previous Tx readied - sending CreateTransaction for {} on chain {}",
202 getIdentifier(), getTransactionChainId());
204 // Send the CreateTx message and use the resulting Future to complete the
206 createTxPromise.completeWith(getActorContext().executeOperationAsync(shard,
207 serializedCreateMessage));
212 combinedFutures.onComplete(onComplete, getActorContext().getActorSystem().dispatcher());
214 return createTxPromise.future();