2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.UntypedActor;
15 import akka.event.Logging;
16 import akka.event.LoggingAdapter;
17 import akka.japi.Creator;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
20 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
21 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
30 import java.util.concurrent.ExecutionException;
32 public class ThreePhaseCommitCohort extends UntypedActor {
33 private final DOMStoreThreePhaseCommitCohort cohort;
34 private final ActorRef shardActor;
35 private final CompositeModification modification;
37 public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort,
38 ActorRef shardActor, CompositeModification modification) {
41 this.shardActor = shardActor;
42 this.modification = modification;
45 private final LoggingAdapter log =
46 Logging.getLogger(getContext().system(), this);
48 public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
49 final ActorRef shardActor, final CompositeModification modification) {
50 return Props.create(new Creator<ThreePhaseCommitCohort>() {
52 public ThreePhaseCommitCohort create() throws Exception {
53 return new ThreePhaseCommitCohort(cohort, shardActor,
61 public void onReceive(Object message) throws Exception {
62 log.debug("Received message {}", message);
64 if (message instanceof CanCommitTransaction) {
65 canCommit((CanCommitTransaction) message);
66 } else if (message instanceof PreCommitTransaction) {
67 preCommit((PreCommitTransaction) message);
68 } else if (message instanceof CommitTransaction) {
69 commit((CommitTransaction) message);
70 } else if (message instanceof AbortTransaction) {
71 abort((AbortTransaction) message);
75 private void abort(AbortTransaction message) {
76 final ListenableFuture<Void> future = cohort.abort();
77 final ActorRef sender = getSender();
78 final ActorRef self = getSelf();
80 future.addListener(new Runnable() {
85 sender.tell(new AbortTransactionReply(), self);
86 } catch (InterruptedException | ExecutionException e) {
87 log.error(e, "An exception happened when aborting");
90 }, getContext().dispatcher());
93 private void commit(CommitTransaction message) {
94 // Forward the commit to the shard
95 log.debug("Forward commit transaction to Shard {} ", shardActor);
96 shardActor.forward(new ForwardedCommitTransaction(cohort, modification),
99 getContext().parent().tell(PoisonPill.getInstance(), getSelf());
103 private void preCommit(PreCommitTransaction message) {
104 final ListenableFuture<Void> future = cohort.preCommit();
105 final ActorRef sender = getSender();
106 final ActorRef self = getSelf();
108 future.addListener(new Runnable() {
113 sender.tell(new PreCommitTransactionReply(), self);
114 } catch (InterruptedException | ExecutionException e) {
115 log.error(e, "An exception happened when preCommitting");
118 }, getContext().dispatcher());
122 private void canCommit(CanCommitTransaction message) {
123 final ListenableFuture<Boolean> future = cohort.canCommit();
124 final ActorRef sender = getSender();
125 final ActorRef self = getSelf();
127 future.addListener(new Runnable() {
131 Boolean canCommit = future.get();
132 sender.tell(new CanCommitTransactionReply(canCommit), self);
133 } catch (InterruptedException | ExecutionException e) {
134 log.error(e, "An exception happened when aborting");
137 }, getContext().dispatcher());