2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.event.Logging;
15 import akka.event.LoggingAdapter;
16 import akka.japi.Creator;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
29 import java.util.concurrent.ExecutionException;
31 public class ThreePhaseCommitCohort extends UntypedActor{
32 private final DOMStoreThreePhaseCommitCohort cohort;
33 private final ActorRef shardActor;
34 private final CompositeModification modification;
36 public ThreePhaseCommitCohort(DOMStoreThreePhaseCommitCohort cohort, ActorRef shardActor, CompositeModification modification) {
38 this.shardActor = shardActor;
39 this.modification = modification;
42 private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
44 public static Props props(final DOMStoreThreePhaseCommitCohort cohort, final ActorRef shardActor, final CompositeModification modification) {
45 return Props.create(new Creator<ThreePhaseCommitCohort>(){
47 public ThreePhaseCommitCohort create() throws Exception {
48 return new ThreePhaseCommitCohort(cohort, shardActor, modification);
54 public void onReceive(Object message) throws Exception {
55 if(message instanceof CanCommitTransaction){
56 canCommit((CanCommitTransaction) message);
57 } else if(message instanceof PreCommitTransaction) {
58 preCommit((PreCommitTransaction) message);
59 } else if(message instanceof CommitTransaction){
60 commit((CommitTransaction) message);
61 } else if (message instanceof AbortTransaction){
62 abort((AbortTransaction) message);
66 private void abort(AbortTransaction message) {
67 final ListenableFuture<Void> future = cohort.abort();
68 final ActorRef sender = getSender();
69 final ActorRef self = getSelf();
71 future.addListener(new Runnable() {
76 sender.tell(new AbortTransactionReply(), self);
77 } catch (InterruptedException | ExecutionException e) {
78 log.error(e, "An exception happened when aborting");
81 }, getContext().dispatcher());
84 private void commit(CommitTransaction message) {
85 // Forward the commit to the shard
86 log.info("Commit transaction now + " + shardActor);
87 shardActor.forward(new ForwardedCommitTransaction(cohort, modification), getContext());
91 private void preCommit(PreCommitTransaction message) {
92 final ListenableFuture<Void> future = cohort.preCommit();
93 final ActorRef sender = getSender();
94 final ActorRef self = getSelf();
96 future.addListener(new Runnable() {
101 sender.tell(new PreCommitTransactionReply(), self);
102 } catch (InterruptedException | ExecutionException e) {
103 log.error(e, "An exception happened when preCommitting");
106 }, getContext().dispatcher());
110 private void canCommit(CanCommitTransaction message) {
111 final ListenableFuture<Boolean> future = cohort.canCommit();
112 final ActorRef sender = getSender();
113 final ActorRef self = getSelf();
115 future.addListener(new Runnable() {
119 Boolean canCommit = future.get();
120 sender.tell(new CanCommitTransactionReply(canCommit), self);
121 } catch (InterruptedException | ExecutionException e) {
122 log.error(e, "An exception happened when aborting");
125 }, getContext().dispatcher());