2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
22 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
23 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
32 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
34 public class ThreePhaseCommitCohort extends AbstractUntypedActor {
35 private final DOMStoreThreePhaseCommitCohort cohort;
36 private final ActorRef shardActor;
37 private final CompositeModification modification;
38 private final String shardName;
40 public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort,
41 ActorRef shardActor, CompositeModification modification,String shardName) {
44 this.shardActor = shardActor;
45 this.modification = modification;
46 this.shardName = shardName;
49 private final LoggingAdapter log =
50 Logging.getLogger(getContext().system(), this);
52 public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
53 final ActorRef shardActor, final CompositeModification modification,
55 return Props.create(new ThreePhaseCommitCohortCreator(cohort, shardActor, modification,
60 public void handleReceive(Object message) throws Exception {
61 if (message.getClass()
62 .equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
63 canCommit(new CanCommitTransaction());
64 } else if (message.getClass()
65 .equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
66 preCommit(new PreCommitTransaction());
67 } else if (message.getClass()
68 .equals(CommitTransaction.SERIALIZABLE_CLASS)) {
69 commit(new CommitTransaction());
70 } else if (message.getClass()
71 .equals(AbortTransaction.SERIALIZABLE_CLASS)) {
72 abort(new AbortTransaction());
74 unknownMessage(message);
78 private void abort(AbortTransaction message) {
79 final ListenableFuture<Void> future = cohort.abort();
80 final ActorRef sender = getSender();
81 final ActorRef self = getSelf();
83 Futures.addCallback(future, new FutureCallback<Void>() {
85 public void onSuccess(Void v) {
86 ShardMBeanFactory.getShardStatsMBean(shardName).incrementAbortTransactionsCount();
88 .tell(new AbortTransactionReply().toSerializable(),
93 public void onFailure(Throwable t) {
94 LOG.error(t, "An exception happened during abort");
96 .tell(new akka.actor.Status.Failure(t), self);
101 private void commit(CommitTransaction message) {
102 // Forward the commit to the shard
103 log.debug("Forward commit transaction to Shard {} ", shardActor);
104 shardActor.forward(new ForwardedCommitTransaction(cohort, modification),
107 getContext().parent().tell(PoisonPill.getInstance(), getSelf());
111 private void preCommit(PreCommitTransaction message) {
112 final ListenableFuture<Void> future = cohort.preCommit();
113 final ActorRef sender = getSender();
114 final ActorRef self = getSelf();
115 Futures.addCallback(future, new FutureCallback<Void>() {
117 public void onSuccess(Void v) {
119 .tell(new PreCommitTransactionReply().toSerializable(),
124 public void onFailure(Throwable t) {
125 LOG.error(t, "An exception happened during pre-commit");
127 .tell(new akka.actor.Status.Failure(t), self);
133 private void canCommit(CanCommitTransaction message) {
134 final ListenableFuture<Boolean> future = cohort.canCommit();
135 final ActorRef sender = getSender();
136 final ActorRef self = getSelf();
137 Futures.addCallback(future, new FutureCallback<Boolean>() {
139 public void onSuccess(Boolean canCommit) {
140 sender.tell(new CanCommitTransactionReply(canCommit)
141 .toSerializable(), self);
145 public void onFailure(Throwable t) {
146 LOG.error(t, "An exception happened during canCommit");
148 .tell(new akka.actor.Status.Failure(t), self);
153 private static class ThreePhaseCommitCohortCreator implements Creator<ThreePhaseCommitCohort> {
154 final DOMStoreThreePhaseCommitCohort cohort;
155 final ActorRef shardActor;
156 final CompositeModification modification;
157 final String shardName;
159 ThreePhaseCommitCohortCreator(DOMStoreThreePhaseCommitCohort cohort,
160 ActorRef shardActor, CompositeModification modification, String shardName) {
161 this.cohort = cohort;
162 this.shardActor = shardActor;
163 this.modification = modification;
164 this.shardName = shardName;
168 public ThreePhaseCommitCohort create() throws Exception {
169 return new ThreePhaseCommitCohort(cohort, shardActor, modification, shardName);