Complete implementation of ThreePhaseCommitCohortProxy
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohort.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
28
29 import java.util.concurrent.ExecutionException;
30
31 public class ThreePhaseCommitCohort extends UntypedActor{
32   private final DOMStoreThreePhaseCommitCohort cohort;
33   private final ActorRef shardActor;
34   private final CompositeModification modification;
35
36   public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort, ActorRef shardActor, CompositeModification modification) {
37     this.cohort = cohort;
38     this.shardActor = shardActor;
39     this.modification = modification;
40   }
41
42   private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
43
44   public static Props props(final DOMStoreThreePhaseCommitCohort cohort, final ActorRef shardActor, final CompositeModification modification) {
45     return Props.create(new Creator<ThreePhaseCommitCohort>(){
46       @Override
47       public ThreePhaseCommitCohort create() throws Exception {
48         return new ThreePhaseCommitCohort(cohort, shardActor, modification);
49       }
50     });
51   }
52
53   @Override
54   public void onReceive(Object message) throws Exception {
55     if(message instanceof CanCommitTransaction){
56       canCommit((CanCommitTransaction) message);
57     } else if(message instanceof PreCommitTransaction) {
58       preCommit((PreCommitTransaction) message);
59     } else if(message instanceof CommitTransaction){
60       commit((CommitTransaction) message);
61     } else if (message instanceof AbortTransaction){
62       abort((AbortTransaction) message);
63     }
64   }
65
66   private void abort(AbortTransaction message) {
67     final ListenableFuture<Void> future = cohort.abort();
68     final ActorRef sender = getSender();
69     final ActorRef self = getSelf();
70
71     future.addListener(new Runnable() {
72       @Override
73       public void run() {
74         try {
75           future.get();
76           sender.tell(new AbortTransactionReply(), self);
77         } catch (InterruptedException | ExecutionException e) {
78           log.error(e, "An exception happened when aborting");
79         }
80       }
81     }, getContext().dispatcher());
82   }
83
84   private void commit(CommitTransaction message) {
85     // Forward the commit to the shard
86     log.info("Commit transaction now + " + shardActor);
87     shardActor.forward(new ForwardedCommitTransaction(cohort, modification), getContext());
88
89   }
90
91   private void preCommit(PreCommitTransaction message) {
92     final ListenableFuture<Void> future = cohort.preCommit();
93     final ActorRef sender = getSender();
94     final ActorRef self = getSelf();
95
96     future.addListener(new Runnable() {
97       @Override
98       public void run() {
99         try {
100           future.get();
101           sender.tell(new PreCommitTransactionReply(), self);
102         } catch (InterruptedException | ExecutionException e) {
103           log.error(e, "An exception happened when preCommitting");
104         }
105       }
106     }, getContext().dispatcher());
107
108   }
109
110   private void canCommit(CanCommitTransaction message) {
111     final ListenableFuture<Boolean> future = cohort.canCommit();
112     final ActorRef sender = getSender();
113     final ActorRef self = getSelf();
114
115     future.addListener(new Runnable() {
116       @Override
117       public void run() {
118         try {
119           Boolean canCommit = future.get();
120           sender.tell(new CanCommitTransactionReply(canCommit), self);
121         } catch (InterruptedException | ExecutionException e) {
122           log.error(e, "An exception happened when aborting");
123         }
124       }
125     }, getContext().dispatcher());
126
127   }
128 }