2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
23 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
24 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
33 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
35 public class ThreePhaseCommitCohort extends AbstractUntypedActor {
36 private final DOMStoreThreePhaseCommitCohort cohort;
37 private final ActorRef shardActor;
38 private final CompositeModification modification;
39 private final ShardStats shardStats;
41 public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort,
42 ActorRef shardActor, CompositeModification modification, ShardStats shardStats) {
45 this.shardActor = shardActor;
46 this.modification = modification;
47 this.shardStats = shardStats;
50 private final LoggingAdapter log =
51 Logging.getLogger(getContext().system(), this);
53 public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
54 final ActorRef shardActor, final CompositeModification modification,
55 ShardStats shardStats) {
56 return Props.create(new ThreePhaseCommitCohortCreator(cohort, shardActor, modification,
61 public void handleReceive(Object message) throws Exception {
62 if (message.getClass()
63 .equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
64 canCommit(new CanCommitTransaction());
65 } else if (message.getClass()
66 .equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
67 preCommit(new PreCommitTransaction());
68 } else if (message.getClass()
69 .equals(CommitTransaction.SERIALIZABLE_CLASS)) {
70 commit(new CommitTransaction());
71 } else if (message.getClass()
72 .equals(AbortTransaction.SERIALIZABLE_CLASS)) {
73 abort(new AbortTransaction());
75 unknownMessage(message);
79 private void abort(AbortTransaction message) {
80 final ListenableFuture<Void> future = cohort.abort();
81 final ActorRef sender = getSender();
82 final ActorRef self = getSelf();
84 Futures.addCallback(future, new FutureCallback<Void>() {
86 public void onSuccess(Void v) {
87 shardStats.incrementAbortTransactionsCount();
89 .tell(new AbortTransactionReply().toSerializable(),
94 public void onFailure(Throwable t) {
95 LOG.error(t, "An exception happened during abort");
97 .tell(new akka.actor.Status.Failure(t), self);
102 private void commit(CommitTransaction message) {
103 // Forward the commit to the shard
104 if(log.isDebugEnabled()) {
105 log.debug("Forward commit transaction to Shard {} ", shardActor);
107 shardActor.forward(new ForwardedCommitTransaction(cohort, modification),
110 getContext().parent().tell(PoisonPill.getInstance(), getSelf());
114 private void preCommit(PreCommitTransaction message) {
115 final ListenableFuture<Void> future = cohort.preCommit();
116 final ActorRef sender = getSender();
117 final ActorRef self = getSelf();
118 Futures.addCallback(future, new FutureCallback<Void>() {
120 public void onSuccess(Void v) {
122 .tell(new PreCommitTransactionReply().toSerializable(),
127 public void onFailure(Throwable t) {
128 LOG.error(t, "An exception happened during pre-commit");
130 .tell(new akka.actor.Status.Failure(t), self);
136 private void canCommit(CanCommitTransaction message) {
137 final ListenableFuture<Boolean> future = cohort.canCommit();
138 final ActorRef sender = getSender();
139 final ActorRef self = getSelf();
140 Futures.addCallback(future, new FutureCallback<Boolean>() {
142 public void onSuccess(Boolean canCommit) {
143 sender.tell(new CanCommitTransactionReply(canCommit)
144 .toSerializable(), self);
148 public void onFailure(Throwable t) {
149 LOG.error(t, "An exception happened during canCommit");
151 .tell(new akka.actor.Status.Failure(t), self);
156 private static class ThreePhaseCommitCohortCreator implements Creator<ThreePhaseCommitCohort> {
157 final DOMStoreThreePhaseCommitCohort cohort;
158 final ActorRef shardActor;
159 final CompositeModification modification;
160 final ShardStats shardStats;
162 ThreePhaseCommitCohortCreator(DOMStoreThreePhaseCommitCohort cohort,
163 ActorRef shardActor, CompositeModification modification, ShardStats shardStats) {
164 this.cohort = cohort;
165 this.shardActor = shardActor;
166 this.modification = modification;
167 this.shardStats = shardStats;
171 public ThreePhaseCommitCohort create() throws Exception {
172 return new ThreePhaseCommitCohort(cohort, shardActor, modification, shardStats);