Fix shard deadlock in 3 nodes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / LocalTransactionContext.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.Optional;
17 import java.util.SortedSet;
18 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
19 import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
20 import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
21 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
22 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransaction;
23 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
24 import scala.concurrent.Future;
25
26 /**
27  * Processes front-end transaction operations locally before being committed to the destination shard.
28  * Instances of this class are used when the destination shard is local to the caller.
29  *
30  * @author Thomas Pantelis
31  */
32 abstract class LocalTransactionContext extends AbstractTransactionContext {
33     private final DOMStoreTransaction txDelegate;
34     private final LocalTransactionReadySupport readySupport;
35     private Exception operationError;
36
37     LocalTransactionContext(final DOMStoreTransaction txDelegate, final TransactionIdentifier identifier,
38             final LocalTransactionReadySupport readySupport) {
39         super(identifier);
40         this.txDelegate = Preconditions.checkNotNull(txDelegate);
41         this.readySupport = readySupport;
42     }
43
44     protected abstract DOMStoreWriteTransaction getWriteDelegate();
45
46     protected abstract DOMStoreReadTransaction getReadDelegate();
47
48     @Override
49     @SuppressWarnings("checkstyle:IllegalCatch")
50     public void executeModification(final AbstractModification modification, final Boolean havePermit) {
51         incrementModificationCount();
52         if (operationError == null) {
53             try {
54                 modification.apply(getWriteDelegate());
55             } catch (Exception e) {
56                 operationError = e;
57             }
58         }
59     }
60
61     @Override
62     public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture,
63             final Boolean havePermit) {
64         Futures.addCallback(readCmd.apply(getReadDelegate()), new FutureCallback<T>() {
65             @Override
66             public void onSuccess(final T result) {
67                 proxyFuture.set(result);
68             }
69
70             @Override
71             public void onFailure(final Throwable failure) {
72                 proxyFuture.setException(failure);
73             }
74         }, MoreExecutors.directExecutor());
75     }
76
77     private LocalThreePhaseCommitCohort ready() {
78         logModificationCount();
79         return readySupport.onTransactionReady(getWriteDelegate(), operationError);
80     }
81
82     @Override
83     public Future<ActorSelection> readyTransaction(final Boolean havePermit,
84             final Optional<SortedSet<String>> participatingShardNames) {
85         final LocalThreePhaseCommitCohort cohort = ready();
86         return cohort.initiateCoordinatedCommit(participatingShardNames);
87     }
88
89     @Override
90     public Future<Object> directCommit(final Boolean havePermit) {
91         final LocalThreePhaseCommitCohort cohort = ready();
92         return cohort.initiateDirectCommit();
93     }
94
95     @Override
96     public void closeTransaction() {
97         txDelegate.close();
98     }
99 }