2dce6a1079c4fdbb0a8e2fa090fa018908d3f5ce
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohort.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
17
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21
22 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
23 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
32 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
33
34 public class ThreePhaseCommitCohort extends AbstractUntypedActor {
35     private final DOMStoreThreePhaseCommitCohort cohort;
36     private final ActorRef shardActor;
37     private final CompositeModification modification;
38     private final ShardStats shardStats;
39
40     public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort,
41         ActorRef shardActor, CompositeModification modification, ShardStats shardStats) {
42
43         this.cohort = cohort;
44         this.shardActor = shardActor;
45         this.modification = modification;
46         this.shardStats = shardStats;
47     }
48
49     private final LoggingAdapter log =
50         Logging.getLogger(getContext().system(), this);
51
52     public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
53             final ActorRef shardActor, final CompositeModification modification,
54             ShardStats shardStats) {
55         return Props.create(new ThreePhaseCommitCohortCreator(cohort, shardActor, modification,
56                 shardStats));
57     }
58
59     @Override
60     public void handleReceive(Object message) throws Exception {
61         if (message.getClass()
62             .equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
63             canCommit(new CanCommitTransaction());
64         } else if (message.getClass()
65             .equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
66             preCommit(new PreCommitTransaction());
67         } else if (message.getClass()
68             .equals(CommitTransaction.SERIALIZABLE_CLASS)) {
69             commit(new CommitTransaction());
70         } else if (message.getClass()
71             .equals(AbortTransaction.SERIALIZABLE_CLASS)) {
72             abort(new AbortTransaction());
73         } else {
74             unknownMessage(message);
75         }
76     }
77
78     private void abort(AbortTransaction message) {
79         final ListenableFuture<Void> future = cohort.abort();
80         final ActorRef sender = getSender();
81         final ActorRef self = getSelf();
82
83         Futures.addCallback(future, new FutureCallback<Void>() {
84             @Override
85             public void onSuccess(Void v) {
86                 shardStats.incrementAbortTransactionsCount();
87                 sender
88                     .tell(new AbortTransactionReply().toSerializable(),
89                     self);
90             }
91
92             @Override
93             public void onFailure(Throwable t) {
94                 LOG.error(t, "An exception happened during abort");
95                 sender
96                     .tell(new akka.actor.Status.Failure(t), self);
97             }
98         });
99     }
100
101     private void commit(CommitTransaction message) {
102         // Forward the commit to the shard
103         log.debug("Forward commit transaction to Shard {} ", shardActor);
104         shardActor.forward(new ForwardedCommitTransaction(cohort, modification),
105             getContext());
106
107         getContext().parent().tell(PoisonPill.getInstance(), getSelf());
108
109     }
110
111     private void preCommit(PreCommitTransaction message) {
112         final ListenableFuture<Void> future = cohort.preCommit();
113         final ActorRef sender = getSender();
114         final ActorRef self = getSelf();
115         Futures.addCallback(future, new FutureCallback<Void>() {
116             @Override
117             public void onSuccess(Void v) {
118                 sender
119                     .tell(new PreCommitTransactionReply().toSerializable(),
120                         self);
121             }
122
123             @Override
124             public void onFailure(Throwable t) {
125                 LOG.error(t, "An exception happened during pre-commit");
126                 sender
127                     .tell(new akka.actor.Status.Failure(t), self);
128             }
129         });
130
131     }
132
133     private void canCommit(CanCommitTransaction message) {
134         final ListenableFuture<Boolean> future = cohort.canCommit();
135         final ActorRef sender = getSender();
136         final ActorRef self = getSelf();
137         Futures.addCallback(future, new FutureCallback<Boolean>() {
138             @Override
139             public void onSuccess(Boolean canCommit) {
140                 sender.tell(new CanCommitTransactionReply(canCommit)
141                     .toSerializable(), self);
142             }
143
144             @Override
145             public void onFailure(Throwable t) {
146                 LOG.error(t, "An exception happened during canCommit");
147                 sender
148                     .tell(new akka.actor.Status.Failure(t), self);
149             }
150         });
151     }
152
153     private static class ThreePhaseCommitCohortCreator implements Creator<ThreePhaseCommitCohort> {
154         final DOMStoreThreePhaseCommitCohort cohort;
155         final ActorRef shardActor;
156         final CompositeModification modification;
157         final ShardStats shardStats;
158
159         ThreePhaseCommitCohortCreator(DOMStoreThreePhaseCommitCohort cohort,
160             ActorRef shardActor, CompositeModification modification, ShardStats shardStats) {
161             this.cohort = cohort;
162             this.shardActor = shardActor;
163             this.modification = modification;
164             this.shardStats = shardStats;
165         }
166
167         @Override
168         public ThreePhaseCommitCohort create() throws Exception {
169             return new ThreePhaseCommitCohort(cohort, shardActor, modification, shardStats);
170         }
171     }
172 }