2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
17 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
18 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.concurrent.Callable;
36 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
38 public class ThreePhaseCommitCohortProxy implements
39 DOMStoreThreePhaseCommitCohort{
41 private static final Logger
42 LOG = LoggerFactory.getLogger(DistributedDataStore.class);
44 private final ActorContext actorContext;
45 private final List<ActorPath> cohortPaths;
46 private final ListeningExecutorService executor;
47 private final String transactionId;
50 public ThreePhaseCommitCohortProxy(ActorContext actorContext,
51 List<ActorPath> cohortPaths,
53 ListeningExecutorService executor) {
55 this.actorContext = actorContext;
56 this.cohortPaths = cohortPaths;
57 this.transactionId = transactionId;
58 this.executor = executor;
61 @Override public ListenableFuture<Boolean> canCommit() {
62 Callable<Boolean> call = new Callable<Boolean>() {
65 public Boolean call() throws Exception {
66 for(ActorPath actorPath : cohortPaths){
67 ActorSelection cohort = actorContext.actorSelection(actorPath);
71 actorContext.executeRemoteOperation(cohort,
72 new CanCommitTransaction().toSerializable(),
73 ActorContext.ASK_DURATION);
75 if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
76 CanCommitTransactionReply reply =
77 CanCommitTransactionReply.fromSerializable(response);
78 if (!reply.getCanCommit()) {
82 } catch(RuntimeException e){
83 LOG.error("Unexpected Exception", e);
92 return executor.submit(call);
95 @Override public ListenableFuture<Void> preCommit() {
96 return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS);
99 @Override public ListenableFuture<Void> abort() {
100 return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS);
103 @Override public ListenableFuture<Void> commit() {
104 return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS);
107 private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
108 Callable<Void> call = new Callable<Void>() {
110 @Override public Void call() throws Exception {
111 for(ActorPath actorPath : cohortPaths){
112 ActorSelection cohort = actorContext.actorSelection(actorPath);
116 actorContext.executeRemoteOperation(cohort,
118 ActorContext.ASK_DURATION);
120 if (response != null && !response.getClass()
121 .equals(expectedResponseClass)) {
122 throw new RuntimeException(
124 "did not get the expected response \n\t\t expected : %s \n\t\t actual : %s",
125 expectedResponseClass.toString(),
126 response.getClass().toString())
129 } catch(TimeoutException e){
130 LOG.error(String.format("A timeout occurred when processing operation : %s", message));
137 return executor.submit(call);
140 public List<ActorPath> getCohortPaths() {
141 return Collections.unmodifiableList(this.cohortPaths);