2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListenableFutureTask;
15 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
16 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
18 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
29 import java.util.Collections;
30 import java.util.List;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.ExecutorService;
35 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
37 public class ThreePhaseCommitCohortProxy implements
38 DOMStoreThreePhaseCommitCohort{
40 private static final Logger
41 LOG = LoggerFactory.getLogger(DistributedDataStore.class);
43 private final ActorContext actorContext;
44 private final List<ActorPath> cohortPaths;
45 private final ExecutorService executor;
46 private final String transactionId;
49 public ThreePhaseCommitCohortProxy(ActorContext actorContext,
50 List<ActorPath> cohortPaths,
52 ExecutorService executor) {
54 this.actorContext = actorContext;
55 this.cohortPaths = cohortPaths;
56 this.transactionId = transactionId;
57 this.executor = executor;
60 @Override public ListenableFuture<Boolean> canCommit() {
61 Callable<Boolean> call = new Callable() {
63 @Override public Boolean call() throws Exception {
64 for(ActorPath actorPath : cohortPaths){
65 ActorSelection cohort = actorContext.actorSelection(actorPath);
69 actorContext.executeRemoteOperation(cohort,
70 new CanCommitTransaction(),
71 ActorContext.ASK_DURATION);
73 if (response instanceof CanCommitTransactionReply) {
74 CanCommitTransactionReply reply =
75 (CanCommitTransactionReply) response;
76 if (!reply.getCanCommit()) {
80 } catch(RuntimeException e){
81 LOG.error("Unexpected Exception", e);
91 ListenableFutureTask<Boolean>
92 future = ListenableFutureTask.create(call);
94 executor.submit(future);
99 @Override public ListenableFuture<Void> preCommit() {
100 return voidOperation(new PreCommitTransaction(), PreCommitTransactionReply.class);
103 @Override public ListenableFuture<Void> abort() {
104 return voidOperation(new AbortTransaction(), AbortTransactionReply.class);
107 @Override public ListenableFuture<Void> commit() {
108 return voidOperation(new CommitTransaction(), CommitTransactionReply.class);
111 private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
112 Callable<Void> call = new Callable<Void>() {
114 @Override public Void call() throws Exception {
115 for(ActorPath actorPath : cohortPaths){
116 ActorSelection cohort = actorContext.actorSelection(actorPath);
120 actorContext.executeRemoteOperation(cohort,
122 ActorContext.ASK_DURATION);
124 if (response != null && !response.getClass()
125 .equals(expectedResponseClass)) {
126 throw new RuntimeException(
128 "did not get the expected response \n\t\t expected : %s \n\t\t actual : %s",
129 expectedResponseClass.toString(),
130 response.getClass().toString())
133 } catch(TimeoutException e){
134 LOG.error(String.format("A timeout occurred when processing operation : %s", message));
141 ListenableFutureTask<Void>
142 future = ListenableFutureTask.create(call);
144 executor.submit(future);
150 public List<ActorPath> getCohortPaths() {
151 return Collections.unmodifiableList(this.cohortPaths);