d12dc2b55a17f0ebaf1a3999778b109efbd60a0f
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListenableFutureTask;
15 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
16 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
18 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import java.util.Collections;
30 import java.util.List;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34
35 /**
36  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
37  */
38 public class ThreePhaseCommitCohortProxy implements
39     DOMStoreThreePhaseCommitCohort{
40
41     private static final Logger
42         LOG = LoggerFactory.getLogger(DistributedDataStore.class);
43
44     private final ActorContext actorContext;
45     private final List<ActorPath> cohortPaths;
46     //FIXME : Use a thread pool here
47     private final ExecutorService executorService = Executors.newSingleThreadExecutor();
48
49
50     public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths) {
51         this.actorContext = actorContext;
52         this.cohortPaths = cohortPaths;
53     }
54
55     @Override public ListenableFuture<Boolean> canCommit() {
56         Callable<Boolean> call = new Callable() {
57
58             @Override public Boolean call() throws Exception {
59             for(ActorPath actorPath : cohortPaths){
60                 ActorSelection cohort = actorContext.actorSelection(actorPath);
61
62                 try {
63                     Object response =
64                         actorContext.executeRemoteOperation(cohort,
65                             new CanCommitTransaction(),
66                             ActorContext.ASK_DURATION);
67
68                     if (response instanceof CanCommitTransactionReply) {
69                         CanCommitTransactionReply reply =
70                             (CanCommitTransactionReply) response;
71                         if (!reply.getCanCommit()) {
72                             return false;
73                         }
74                     }
75                 } catch(RuntimeException e){
76                     LOG.error("Unexpected Exception", e);
77                     return false;
78                 }
79
80
81             }
82             return true;
83             }
84         };
85
86         ListenableFutureTask<Boolean>
87             future = ListenableFutureTask.create(call);
88
89         executorService.submit(future);
90
91         return future;
92     }
93
94     @Override public ListenableFuture<Void> preCommit() {
95         return voidOperation(new PreCommitTransaction(), PreCommitTransactionReply.class);
96     }
97
98     @Override public ListenableFuture<Void> abort() {
99         return voidOperation(new AbortTransaction(), AbortTransactionReply.class);
100     }
101
102     @Override public ListenableFuture<Void> commit() {
103         return voidOperation(new CommitTransaction(), CommitTransactionReply.class);
104     }
105
106     private ListenableFuture<Void> voidOperation(final Object message, final Class expectedResponseClass){
107         Callable<Void> call = new Callable<Void>() {
108
109             @Override public Void call() throws Exception {
110                 for(ActorPath actorPath : cohortPaths){
111                     ActorSelection cohort = actorContext.actorSelection(actorPath);
112
113                     try {
114                         Object response =
115                             actorContext.executeRemoteOperation(cohort,
116                                 message,
117                                 ActorContext.ASK_DURATION);
118
119                         if (response != null && !response.getClass()
120                             .equals(expectedResponseClass)) {
121                             throw new RuntimeException(
122                                 String.format(
123                                     "did not get the expected response \n\t\t expected : %s \n\t\t actual   : %s",
124                                     expectedResponseClass.toString(),
125                                     response.getClass().toString())
126                             );
127                         }
128                     } catch(TimeoutException e){
129                         LOG.error(String.format("A timeout occurred when processing operation : %s", message));
130                     }
131                 }
132                 return null;
133             }
134         };
135
136         ListenableFutureTask<Void>
137             future = ListenableFutureTask.create(call);
138
139         executorService.submit(future);
140
141         return future;
142
143     }
144
145     public List<ActorPath> getCohortPaths() {
146         return Collections.unmodifiableList(this.cohortPaths);
147     }
148 }