BUG-5280: fix problems identified by integration tests
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractClientHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import java.util.HashMap;
13 import java.util.Map;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
16 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
19 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
20 import org.opendaylight.controller.cluster.access.client.InversibleLockException;
21 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
22 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
23 import org.opendaylight.controller.cluster.access.concepts.Response;
24 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
25 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
26 import org.opendaylight.yangtools.concepts.Identifiable;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
33  * and the other for single transactions.
34  *
35  * @author Robert Varga
36  */
37 abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
38     enum State {
39         IDLE,
40         TX_OPEN,
41         CLOSED,
42     }
43
44     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
45     private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
46             AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
47     private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
48             AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
49
50     @GuardedBy("this")
51     private final Map<TransactionIdentifier, ClientTransaction> openTransactions = new HashMap<>();
52     @GuardedBy("this")
53     private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
54
55     private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
56     private final AbstractDataStoreClientBehavior client;
57     private final LocalHistoryIdentifier identifier;
58
59     // Used via NEXT_TX_UPDATER
60     @SuppressWarnings("unused")
61     private volatile long nextTx = 0;
62
63     private volatile State state = State.IDLE;
64
65     AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
66         this.client = Preconditions.checkNotNull(client);
67         this.identifier = Preconditions.checkNotNull(identifier);
68         Preconditions.checkArgument(identifier.getCookie() == 0);
69     }
70
71     final State state() {
72         return state;
73     }
74
75     final void updateState(final State expected, final State next) {
76         final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
77         Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
78         LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
79     }
80
81     @Override
82     public final LocalHistoryIdentifier getIdentifier() {
83         return identifier;
84     }
85
86     final long nextTx() {
87         return NEXT_TX_UPDATER.getAndIncrement(this);
88     }
89
90     final Long resolveShardForPath(final YangInstanceIdentifier path) {
91         return client.resolveShardForPath(path);
92     }
93
94     @Override
95     final void localAbort(final Throwable cause) {
96         final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
97         if (oldState != State.CLOSED) {
98             LOG.debug("Force-closing history {}", getIdentifier(), cause);
99
100             synchronized (this) {
101                 for (ClientTransaction t : openTransactions.values()) {
102                     t.localAbort(cause);
103                 }
104                 openTransactions.clear();
105                 readyTransactions.clear();
106             }
107         }
108     }
109
110     /**
111      * Create a new history proxy for a given shard.
112      *
113      * @throws InversibleLockException if the shard is being reconnected
114      */
115     private ProxyHistory createHistoryProxy(final Long shard) {
116         final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
117         final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
118             identifier.getHistoryId(), shard);
119         LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard);
120
121         final ProxyHistory ret = createHistoryProxy(proxyId, connection);
122
123         // Request creation of the history, if it is not the single history
124         if (ret.getIdentifier().getHistoryId() != 0) {
125             connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
126                 this::createHistoryCallback);
127         }
128         return ret;
129     }
130
131     abstract ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
132             final AbstractClientConnection<ShardBackendInfo> connection);
133
134     private void createHistoryCallback(final Response<?, ?> response) {
135         LOG.debug("Create history response {}", response);
136     }
137
138     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
139         while (true) {
140             final ProxyHistory history;
141             try {
142                 history = histories.computeIfAbsent(shard, this::createHistoryProxy);
143             } catch (InversibleLockException e) {
144                 LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
145                 e.awaitResolution();
146                 LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
147                 continue;
148             }
149
150             return history.createTransactionProxy(transactionId);
151         }
152     }
153
154     /**
155      * Allocate a {@link ClientTransaction}.
156      *
157      * @return A new {@link ClientTransaction}
158      * @throws TransactionChainClosedException if this history is closed
159      */
160     public final ClientTransaction createTransaction() {
161         if (state == State.CLOSED) {
162             throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier));
163         }
164
165         synchronized (this) {
166             final ClientTransaction ret = doCreateTransaction();
167             openTransactions.put(ret.getIdentifier(), ret);
168             return ret;
169         }
170     }
171
172     @GuardedBy("this")
173     abstract ClientTransaction doCreateTransaction();
174
175     /**
176      * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
177      *
178      * @param txId Transaction identifier
179      * @param cohort Transaction commit cohort
180      */
181     synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
182             final AbstractTransactionCommitCohort cohort) {
183         final ClientTransaction tx = openTransactions.remove(txId);
184         Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId);
185
186         final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
187         Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
188                 cohort, txId, previous);
189
190         LOG.debug("Local history {} readied transaction {}", this, txId);
191         return cohort;
192     }
193
194     /**
195      * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
196      * backend.
197      *
198      * @param txId transaction identifier
199      */
200     synchronized void onTransactionAbort(final TransactionIdentifier txId) {
201         if (openTransactions.remove(txId) == null) {
202             LOG.warn("Could not find aborting transaction {}", txId);
203         }
204     }
205
206     /**
207      * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
208      * and all its state can be removed.
209      *
210      * @param txId transaction identifier
211      */
212     synchronized void onTransactionComplete(final TransactionIdentifier txId) {
213         if (readyTransactions.remove(txId) == null) {
214             LOG.warn("Could not find completed transaction {}", txId);
215         }
216     }
217
218     HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
219         final ProxyHistory oldProxy = histories.get(newConn.cookie());
220         if (oldProxy == null) {
221             return null;
222         }
223
224         final ProxyReconnectCohort proxy = Verify.verifyNotNull(oldProxy.startReconnect(newConn));
225         return new HistoryReconnectCohort() {
226             @Override
227             ProxyReconnectCohort getProxy() {
228                 return proxy;
229             }
230
231             @Override
232             void replaySuccessfulRequests() {
233                 proxy.replaySuccessfulRequests();
234             }
235
236             @Override
237             public void close() {
238                 LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn);
239                 final ProxyHistory newProxy = proxy.finishReconnect();
240                 if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) {
241                     LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy,
242                         AbstractClientHistory.this);
243                 }
244             }
245         };
246     }
247 }