Kill Dynamic Actors when we're done with them
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohort.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.UntypedActor;
15 import akka.event.Logging;
16 import akka.event.LoggingAdapter;
17 import akka.japi.Creator;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
20 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
21 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
29
30 import java.util.concurrent.ExecutionException;
31
32 public class ThreePhaseCommitCohort extends UntypedActor {
33     private final DOMStoreThreePhaseCommitCohort cohort;
34     private final ActorRef shardActor;
35     private final CompositeModification modification;
36
37     public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort,
38         ActorRef shardActor, CompositeModification modification) {
39
40         this.cohort = cohort;
41         this.shardActor = shardActor;
42         this.modification = modification;
43     }
44
45     private final LoggingAdapter log =
46         Logging.getLogger(getContext().system(), this);
47
48     public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
49         final ActorRef shardActor, final CompositeModification modification) {
50         return Props.create(new Creator<ThreePhaseCommitCohort>() {
51             @Override
52             public ThreePhaseCommitCohort create() throws Exception {
53                 return new ThreePhaseCommitCohort(cohort, shardActor,
54                     modification);
55             }
56         });
57     }
58
59     @Override
60     public void onReceive(Object message) throws Exception {
61         log.debug("Received message {}", message);
62
63         if (message instanceof CanCommitTransaction) {
64             canCommit((CanCommitTransaction) message);
65         } else if (message instanceof PreCommitTransaction) {
66             preCommit((PreCommitTransaction) message);
67         } else if (message instanceof CommitTransaction) {
68             commit((CommitTransaction) message);
69         } else if (message instanceof AbortTransaction) {
70             abort((AbortTransaction) message);
71         }
72     }
73
74     private void abort(AbortTransaction message) {
75         final ListenableFuture<Void> future = cohort.abort();
76         final ActorRef sender = getSender();
77         final ActorRef self = getSelf();
78
79         future.addListener(new Runnable() {
80             @Override
81             public void run() {
82                 try {
83                     future.get();
84                     sender.tell(new AbortTransactionReply(), self);
85                 } catch (InterruptedException | ExecutionException e) {
86                     log.error(e, "An exception happened when aborting");
87                 }
88             }
89         }, getContext().dispatcher());
90     }
91
92     private void commit(CommitTransaction message) {
93         // Forward the commit to the shard
94         log.debug("Forward commit transaction to Shard {} ", shardActor);
95         shardActor.forward(new ForwardedCommitTransaction(cohort, modification),
96             getContext());
97
98         getContext().parent().tell(PoisonPill.getInstance(), getSelf());
99
100     }
101
102     private void preCommit(PreCommitTransaction message) {
103         final ListenableFuture<Void> future = cohort.preCommit();
104         final ActorRef sender = getSender();
105         final ActorRef self = getSelf();
106
107         future.addListener(new Runnable() {
108             @Override
109             public void run() {
110                 try {
111                     future.get();
112                     sender.tell(new PreCommitTransactionReply(), self);
113                 } catch (InterruptedException | ExecutionException e) {
114                     log.error(e, "An exception happened when preCommitting");
115                 }
116             }
117         }, getContext().dispatcher());
118
119     }
120
121     private void canCommit(CanCommitTransaction message) {
122         final ListenableFuture<Boolean> future = cohort.canCommit();
123         final ActorRef sender = getSender();
124         final ActorRef self = getSelf();
125
126         future.addListener(new Runnable() {
127             @Override
128             public void run() {
129                 try {
130                     Boolean canCommit = future.get();
131                     sender.tell(new CanCommitTransactionReply(canCommit), self);
132                 } catch (InterruptedException | ExecutionException e) {
133                     log.error(e, "An exception happened when aborting");
134                 }
135             }
136         }, getContext().dispatcher());
137
138     }
139 }