2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
17 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
18 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 import java.util.Collections;
32 import java.util.List;
33 import java.util.concurrent.Callable;
36 * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
38 public class ThreePhaseCommitCohortProxy implements
39 DOMStoreThreePhaseCommitCohort{
41 private static final Logger
42 LOG = LoggerFactory.getLogger(DistributedDataStore.class);
44 private final ActorContext actorContext;
45 private final List<ActorPath> cohortPaths;
46 private final ListeningExecutorService executor;
47 private final String transactionId;
50 public ThreePhaseCommitCohortProxy(ActorContext actorContext,
51 List<ActorPath> cohortPaths,
53 ListeningExecutorService executor) {
55 this.actorContext = actorContext;
56 this.cohortPaths = cohortPaths;
57 this.transactionId = transactionId;
58 this.executor = executor;
61 @Override public ListenableFuture<Boolean> canCommit() {
62 LOG.debug("txn {} canCommit", transactionId);
63 Callable<Boolean> call = new Callable<Boolean>() {
66 public Boolean call() throws Exception {
67 for(ActorPath actorPath : cohortPaths){
69 Object message = new CanCommitTransaction().toSerializable();
70 LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
72 ActorSelection cohort = actorContext.actorSelection(actorPath);
76 actorContext.executeRemoteOperation(cohort,
78 ActorContext.ASK_DURATION);
80 if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
81 CanCommitTransactionReply reply =
82 CanCommitTransactionReply.fromSerializable(response);
83 if (!reply.getCanCommit()) {
87 } catch(RuntimeException e){
88 // FIXME : Need to properly handle this
89 LOG.error("Unexpected Exception", e);
98 return executor.submit(call);
101 @Override public ListenableFuture<Void> preCommit() {
102 LOG.debug("txn {} preCommit", transactionId);
103 return voidOperation(new PreCommitTransaction().toSerializable(), PreCommitTransactionReply.SERIALIZABLE_CLASS);
106 @Override public ListenableFuture<Void> abort() {
107 LOG.debug("txn {} abort", transactionId);
108 return voidOperation(new AbortTransaction().toSerializable(), AbortTransactionReply.SERIALIZABLE_CLASS);
111 @Override public ListenableFuture<Void> commit() {
112 LOG.debug("txn {} commit", transactionId);
113 return voidOperation(new CommitTransaction().toSerializable(), CommitTransactionReply.SERIALIZABLE_CLASS);
116 private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
117 Callable<Void> call = new Callable<Void>() {
119 @Override public Void call() throws Exception {
120 for(ActorPath actorPath : cohortPaths){
121 ActorSelection cohort = actorContext.actorSelection(actorPath);
123 LOG.debug("txn {} Sending {} to {}", transactionId, message, actorPath);
127 actorContext.executeRemoteOperation(cohort,
129 ActorContext.ASK_DURATION);
131 if (response != null && !response.getClass()
132 .equals(expectedResponseClass)) {
133 throw new RuntimeException(
135 "did not get the expected response \n\t\t expected : %s \n\t\t actual : %s",
136 expectedResponseClass.toString(),
137 response.getClass().toString())
140 } catch(TimeoutException e){
141 LOG.error(String.format("A timeout occurred when processing operation : %s", message));
148 return executor.submit(call);
151 public List<ActorPath> getCohortPaths() {
152 return Collections.unmodifiableList(this.cohortPaths);