2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
29 import java.util.concurrent.ExecutionException;
31 public class ThreePhaseCommitCohort extends AbstractUntypedActor {
32 private final DOMStoreThreePhaseCommitCohort cohort;
33 private final ActorRef shardActor;
34 private final CompositeModification modification;
36 public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort,
37 ActorRef shardActor, CompositeModification modification) {
40 this.shardActor = shardActor;
41 this.modification = modification;
44 private final LoggingAdapter log =
45 Logging.getLogger(getContext().system(), this);
47 public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
48 final ActorRef shardActor, final CompositeModification modification) {
49 return Props.create(new Creator<ThreePhaseCommitCohort>() {
51 public ThreePhaseCommitCohort create() throws Exception {
52 return new ThreePhaseCommitCohort(cohort, shardActor,
60 public void handleReceive(Object message) throws Exception {
61 if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
62 canCommit(new CanCommitTransaction());
63 } else if (message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
64 preCommit(new PreCommitTransaction());
65 } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
66 commit(new CommitTransaction());
67 } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
68 abort(new AbortTransaction());
70 throw new Exception ("Not recognized message received,message="+message);
74 private void abort(AbortTransaction message) {
75 final ListenableFuture<Void> future = cohort.abort();
76 final ActorRef sender = getSender();
77 final ActorRef self = getSelf();
79 future.addListener(new Runnable() {
84 sender.tell(new AbortTransactionReply().toSerializable(), self);
85 } catch (InterruptedException | ExecutionException e) {
86 log.error(e, "An exception happened when aborting");
89 }, getContext().dispatcher());
92 private void commit(CommitTransaction message) {
93 // Forward the commit to the shard
94 log.debug("Forward commit transaction to Shard {} ", shardActor);
95 shardActor.forward(new ForwardedCommitTransaction(cohort, modification),
98 getContext().parent().tell(PoisonPill.getInstance(), getSelf());
102 private void preCommit(PreCommitTransaction message) {
103 final ListenableFuture<Void> future = cohort.preCommit();
104 final ActorRef sender = getSender();
105 final ActorRef self = getSelf();
107 future.addListener(new Runnable() {
112 sender.tell(new PreCommitTransactionReply().toSerializable(), self);
113 } catch (InterruptedException | ExecutionException e) {
114 log.error(e, "An exception happened when preCommitting");
117 }, getContext().dispatcher());
121 private void canCommit(CanCommitTransaction message) {
122 final ListenableFuture<Boolean> future = cohort.canCommit();
123 final ActorRef sender = getSender();
124 final ActorRef self = getSelf();
126 future.addListener(new Runnable() {
130 Boolean canCommit = future.get();
131 sender.tell(new CanCommitTransactionReply(canCommit).toSerializable(), self);
132 } catch (InterruptedException | ExecutionException e) {
133 log.error(e, "An exception happened when aborting");
136 }, getContext().dispatcher());