2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListenableFutureTask;
15 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
16 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
17 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
18 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
19 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
20 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
21 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.concurrent.Callable;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
35 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
37 public class ThreePhaseCommitCohortProxy implements
38 DOMStoreThreePhaseCommitCohort{
40 private static final Logger
41 LOG = LoggerFactory.getLogger(DistributedDataStore.class);
43 private final ActorContext actorContext;
44 private final List<ActorPath> cohortPaths;
45 //FIXME : Use a thread pool here
46 private final ExecutorService executorService = Executors.newSingleThreadExecutor();
49 public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths) {
50 this.actorContext = actorContext;
51 this.cohortPaths = cohortPaths;
54 @Override public ListenableFuture<Boolean> canCommit() {
55 Callable<Boolean> call = new Callable() {
57 @Override public Boolean call() throws Exception {
58 for(ActorPath actorPath : cohortPaths){
59 ActorSelection cohort = actorContext.actorSelection(actorPath);
63 actorContext.executeRemoteOperation(cohort,
64 new CanCommitTransaction(),
65 ActorContext.ASK_DURATION);
67 if (response instanceof CanCommitTransactionReply) {
68 CanCommitTransactionReply reply =
69 (CanCommitTransactionReply) response;
70 if (!reply.getCanCommit()) {
74 } catch(RuntimeException e){
75 LOG.error("Unexpected Exception", e);
85 ListenableFutureTask<Boolean>
86 future = ListenableFutureTask.create(call);
88 executorService.submit(future);
93 @Override public ListenableFuture<Void> preCommit() {
94 return voidOperation(new PreCommitTransaction(), PreCommitTransactionReply.class);
97 @Override public ListenableFuture<Void> abort() {
98 return voidOperation(new AbortTransaction(), AbortTransactionReply.class);
101 @Override public ListenableFuture<Void> commit() {
102 return voidOperation(new CommitTransaction(), CommitTransactionReply.class);
105 private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
106 Callable<Void> call = new Callable<Void>() {
108 @Override public Void call() throws Exception {
109 for(ActorPath actorPath : cohortPaths){
110 ActorSelection cohort = actorContext.actorSelection(actorPath);
112 Object response = actorContext.executeRemoteOperation(cohort,
114 ActorContext.ASK_DURATION);
116 if(response != null && !response.getClass().equals(expectedResponseClass)){
117 throw new RuntimeException(
119 "did not get the expected response \n\t\t expected : %s \n\t\t actual : %s",
120 expectedResponseClass.toString(),
121 response.getClass().toString()));
128 ListenableFutureTask<Void>
129 future = ListenableFutureTask.create(call);
131 executorService.submit(future);
137 public List<ActorPath> getCohortPaths() {
138 return Collections.unmodifiableList(this.cohortPaths);