2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListenableFutureTask;
15 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
16 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
18 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
29 import java.util.Collections;
30 import java.util.List;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
36 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
38 public class ThreePhaseCommitCohortProxy implements
39 DOMStoreThreePhaseCommitCohort{
41 private static final Logger
42 LOG = LoggerFactory.getLogger(DistributedDataStore.class);
44 private final ActorContext actorContext;
45 private final List<ActorPath> cohortPaths;
46 //FIXME : Use a thread pool here
47 private final ExecutorService executorService = Executors.newSingleThreadExecutor();
50 public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths) {
51 this.actorContext = actorContext;
52 this.cohortPaths = cohortPaths;
55 @Override public ListenableFuture<Boolean> canCommit() {
56 Callable<Boolean> call = new Callable() {
58 @Override public Boolean call() throws Exception {
59 for(ActorPath actorPath : cohortPaths){
60 ActorSelection cohort = actorContext.actorSelection(actorPath);
64 actorContext.executeRemoteOperation(cohort,
65 new CanCommitTransaction(),
66 ActorContext.ASK_DURATION);
68 if (response instanceof CanCommitTransactionReply) {
69 CanCommitTransactionReply reply =
70 (CanCommitTransactionReply) response;
71 if (!reply.getCanCommit()) {
75 } catch(RuntimeException e){
76 LOG.error("Unexpected Exception", e);
86 ListenableFutureTask<Boolean>
87 future = ListenableFutureTask.create(call);
89 executorService.submit(future);
94 @Override public ListenableFuture<Void> preCommit() {
95 return voidOperation(new PreCommitTransaction(), PreCommitTransactionReply.class);
98 @Override public ListenableFuture<Void> abort() {
99 return voidOperation(new AbortTransaction(), AbortTransactionReply.class);
102 @Override public ListenableFuture<Void> commit() {
103 return voidOperation(new CommitTransaction(), CommitTransactionReply.class);
106 private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
107 Callable<Void> call = new Callable<Void>() {
109 @Override public Void call() throws Exception {
110 for(ActorPath actorPath : cohortPaths){
111 ActorSelection cohort = actorContext.actorSelection(actorPath);
115 actorContext.executeRemoteOperation(cohort,
117 ActorContext.ASK_DURATION);
119 if (response != null && !response.getClass()
120 .equals(expectedResponseClass)) {
121 throw new RuntimeException(
123 "did not get the expected response \n\t\t expected : %s \n\t\t actual : %s",
124 expectedResponseClass.toString(),
125 response.getClass().toString())
128 } catch(TimeoutException e){
129 LOG.error(String.format("A timeout occurred when processing operation : %s", message));
136 ListenableFutureTask<Void>
137 future = ListenableFutureTask.create(call);
139 executorService.submit(future);
145 public List<ActorPath> getCohortPaths() {
146 return Collections.unmodifiableList(this.cohortPaths);