00d4ab5782e3e7f0a365bdd9ace5be917b423dc1
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohort.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
28
29 import java.util.concurrent.ExecutionException;
30
31 public class ThreePhaseCommitCohort extends UntypedActor {
32     private final DOMStoreThreePhaseCommitCohort cohort;
33     private final ActorRef shardActor;
34     private final CompositeModification modification;
35
36     public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort,
37         ActorRef shardActor, CompositeModification modification) {
38
39         this.cohort = cohort;
40         this.shardActor = shardActor;
41         this.modification = modification;
42     }
43
44     private final LoggingAdapter log =
45         Logging.getLogger(getContext().system(), this);
46
47     public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
48         final ActorRef shardActor, final CompositeModification modification) {
49         return Props.create(new Creator<ThreePhaseCommitCohort>() {
50             @Override
51             public ThreePhaseCommitCohort create() throws Exception {
52                 return new ThreePhaseCommitCohort(cohort, shardActor,
53                     modification);
54             }
55         });
56     }
57
58     @Override
59     public void onReceive(Object message) throws Exception {
60         log.debug("Received message {}", message);
61
62         if (message instanceof CanCommitTransaction) {
63             canCommit((CanCommitTransaction) message);
64         } else if (message instanceof PreCommitTransaction) {
65             preCommit((PreCommitTransaction) message);
66         } else if (message instanceof CommitTransaction) {
67             commit((CommitTransaction) message);
68         } else if (message instanceof AbortTransaction) {
69             abort((AbortTransaction) message);
70         }
71     }
72
73     private void abort(AbortTransaction message) {
74         final ListenableFuture<Void> future = cohort.abort();
75         final ActorRef sender = getSender();
76         final ActorRef self = getSelf();
77
78         future.addListener(new Runnable() {
79             @Override
80             public void run() {
81                 try {
82                     future.get();
83                     sender.tell(new AbortTransactionReply(), self);
84                 } catch (InterruptedException | ExecutionException e) {
85                     log.error(e, "An exception happened when aborting");
86                 }
87             }
88         }, getContext().dispatcher());
89     }
90
91     private void commit(CommitTransaction message) {
92         // Forward the commit to the shard
93         log.debug("Forward commit transaction to Shard {} ", shardActor);
94         shardActor.forward(new ForwardedCommitTransaction(cohort, modification),
95             getContext());
96
97     }
98
99     private void preCommit(PreCommitTransaction message) {
100         final ListenableFuture<Void> future = cohort.preCommit();
101         final ActorRef sender = getSender();
102         final ActorRef self = getSelf();
103
104         future.addListener(new Runnable() {
105             @Override
106             public void run() {
107                 try {
108                     future.get();
109                     sender.tell(new PreCommitTransactionReply(), self);
110                 } catch (InterruptedException | ExecutionException e) {
111                     log.error(e, "An exception happened when preCommitting");
112                 }
113             }
114         }, getContext().dispatcher());
115
116     }
117
118     private void canCommit(CanCommitTransaction message) {
119         final ListenableFuture<Boolean> future = cohort.canCommit();
120         final ActorRef sender = getSender();
121         final ActorRef self = getSelf();
122
123         future.addListener(new Runnable() {
124             @Override
125             public void run() {
126                 try {
127                     Boolean canCommit = future.get();
128                     sender.tell(new CanCommitTransactionReply(canCommit), self);
129                 } catch (InterruptedException | ExecutionException e) {
130                     log.error(e, "An exception happened when aborting");
131                 }
132             }
133         }, getContext().dispatcher());
134
135     }
136 }