Merge "Bug 2160: Added concurrent 3-phase commit coordinator"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Futures;
13 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
14 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
15 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
16 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
17 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
18 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
19 import scala.concurrent.Await;
20 import scala.concurrent.Future;
21
22 import java.util.Collections;
23 import java.util.List;
24
25 /**
26  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
27  */
28 public class TransactionChainProxy implements DOMStoreTransactionChain{
29     private final ActorContext actorContext;
30     private final String transactionChainId;
31     private volatile List<Future<ActorSelection>> cohortFutures = Collections.emptyList();
32
33     public TransactionChainProxy(ActorContext actorContext) {
34         this.actorContext = actorContext;
35         transactionChainId = actorContext.getCurrentMemberName() + "-" + System.currentTimeMillis();
36     }
37
38     @Override
39     public DOMStoreReadTransaction newReadOnlyTransaction() {
40         return new TransactionProxy(actorContext,
41             TransactionProxy.TransactionType.READ_ONLY, this);
42     }
43
44     @Override
45     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
46         return new TransactionProxy(actorContext,
47             TransactionProxy.TransactionType.READ_WRITE, this);
48     }
49
50     @Override
51     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
52         return new TransactionProxy(actorContext,
53             TransactionProxy.TransactionType.WRITE_ONLY, this);
54     }
55
56     @Override
57     public void close() {
58         // Send a close transaction chain request to each and every shard
59         actorContext.broadcast(new CloseTransactionChain(transactionChainId));
60     }
61
62     public String getTransactionChainId() {
63         return transactionChainId;
64     }
65
66     public void onTransactionReady(List<Future<ActorSelection>> cohortFutures){
67         this.cohortFutures = cohortFutures;
68     }
69
70     public void waitTillCurrentTransactionReady(){
71         try {
72             Await.result(Futures
73                 .sequence(this.cohortFutures, actorContext.getActorSystem().dispatcher()),
74                 actorContext.getOperationDuration());
75         } catch (Exception e) {
76             throw new IllegalStateException("Failed when waiting for transaction on a chain to become ready", e);
77         }
78     }
79 }