Track skipped transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractClientHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static com.google.common.base.Verify.verifyNotNull;
13 import static java.util.Objects.requireNonNull;
14
15 import java.util.Collection;
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.Set;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
21 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
22 import java.util.concurrent.locks.StampedLock;
23 import org.checkerframework.checker.lock.qual.GuardedBy;
24 import org.checkerframework.checker.lock.qual.Holding;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
27 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
28 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
29 import org.opendaylight.controller.cluster.access.client.InversibleLockException;
30 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
31 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
32 import org.opendaylight.controller.cluster.access.concepts.Response;
33 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
34 import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
35 import org.opendaylight.yangtools.concepts.Identifiable;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
42  * and the other for single transactions.
43  *
44  * @author Robert Varga
45  */
46 public abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
47     enum State {
48         IDLE,
49         TX_OPEN,
50         CLOSED,
51     }
52
53     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
54     private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
55             AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
56     private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
57             AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
58
59     @GuardedBy("this")
60     private final Map<TransactionIdentifier, AbstractClientHandle<?>> openTransactions = new HashMap<>();
61     @GuardedBy("this")
62     private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
63
64     @GuardedBy("lock")
65     private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
66     private final StampedLock lock = new StampedLock();
67
68     private final @NonNull AbstractDataStoreClientBehavior client;
69     private final @NonNull LocalHistoryIdentifier identifier;
70
71     // Used via NEXT_TX_UPDATER
72     @SuppressWarnings("unused")
73     private volatile long nextTx = 0;
74
75     private volatile State state = State.IDLE;
76
77     AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
78         this.client = requireNonNull(client);
79         this.identifier = requireNonNull(identifier);
80         checkArgument(identifier.getCookie() == 0);
81     }
82
83     final State state() {
84         return state;
85     }
86
87     final void updateState(final State expected, final State next) {
88         final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
89         checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
90         LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
91     }
92
93     final synchronized void doClose() {
94         final State local = state;
95         if (local != State.CLOSED) {
96             checkState(local == State.IDLE, "Local history %s has an open transaction", this);
97             histories.values().forEach(ProxyHistory::close);
98             updateState(local, State.CLOSED);
99         }
100     }
101
102     final synchronized void onProxyDestroyed(final ProxyHistory proxyHistory) {
103         histories.remove(proxyHistory.getIdentifier().getCookie());
104         LOG.debug("{}: removed destroyed proxy {}", this, proxyHistory);
105     }
106
107     @Override
108     public final LocalHistoryIdentifier getIdentifier() {
109         return identifier;
110     }
111
112     final long nextTx() {
113         return NEXT_TX_UPDATER.getAndIncrement(this);
114     }
115
116     final Long resolveShardForPath(final YangInstanceIdentifier path) {
117         return client.resolveShardForPath(path);
118     }
119
120     @Override
121     final void localAbort(final Throwable cause) {
122         final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
123         if (oldState != State.CLOSED) {
124             LOG.debug("Force-closing history {}", getIdentifier(), cause);
125
126             synchronized (this) {
127                 for (AbstractClientHandle<?> t : openTransactions.values()) {
128                     t.localAbort(cause);
129                 }
130                 openTransactions.clear();
131                 readyTransactions.clear();
132             }
133         }
134     }
135
136     /**
137      * Create a new history proxy for a given shard.
138      *
139      * @param shard Shard cookie
140      * @throws InversibleLockException if the shard is being reconnected
141      */
142     @Holding("lock")
143     private ProxyHistory createHistoryProxy(final Long shard) {
144         final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
145         final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
146             identifier.getHistoryId(), shard);
147         LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard);
148
149         final ProxyHistory ret = createHistoryProxy(proxyId, connection);
150
151         // Request creation of the history, if it is not the single history
152         if (ret.getIdentifier().getHistoryId() != 0) {
153             connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
154                 this::createHistoryCallback);
155         }
156         return ret;
157     }
158
159     abstract ProxyHistory createHistoryProxy(LocalHistoryIdentifier historyId,
160             AbstractClientConnection<ShardBackendInfo> connection);
161
162     private void createHistoryCallback(final Response<?, ?> response) {
163         LOG.debug("Create history response {}", response);
164     }
165
166     private @NonNull ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
167         while (true) {
168             try {
169                 // Short-lived lock to ensure exclusion of createHistoryProxy and the lookup phase in startReconnect,
170                 // see comments in startReconnect() for details.
171                 final long stamp = lock.readLock();
172                 try {
173                     return histories.computeIfAbsent(shard, this::createHistoryProxy);
174                 } finally {
175                     lock.unlockRead(stamp);
176                 }
177             } catch (InversibleLockException e) {
178                 LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
179                 e.awaitResolution();
180                 LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
181             }
182         }
183     }
184
185     final @NonNull AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId,
186             final Long shard) {
187         return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true);
188     }
189
190     final @NonNull AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId,
191             final Long shard) {
192         return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false);
193     }
194
195     private void checkNotClosed() {
196         if (state == State.CLOSED) {
197             throw new DOMTransactionChainClosedException(String.format("Local history %s is closed", identifier));
198         }
199     }
200
201     /**
202      * Allocate a new {@link ClientTransaction}.
203      *
204      * @return A new {@link ClientTransaction}
205      * @throws DOMTransactionChainClosedException if this history is closed
206      * @throws IllegalStateException if a previous dependent transaction has not been closed
207      */
208     // Non-final for mocking
209     public @NonNull ClientTransaction createTransaction() {
210         checkNotClosed();
211
212         synchronized (this) {
213             final ClientTransaction ret = doCreateTransaction();
214             openTransactions.put(ret.getIdentifier(), ret);
215             return ret;
216         }
217     }
218
219     /**
220      * Create a new {@link ClientSnapshot}.
221      *
222      * @return A new {@link ClientSnapshot}
223      * @throws DOMTransactionChainClosedException if this history is closed
224      * @throws IllegalStateException if a previous dependent transaction has not been closed
225      */
226     // Non-final for mocking
227     public ClientSnapshot takeSnapshot() {
228         checkNotClosed();
229
230         synchronized (this) {
231             final ClientSnapshot ret = doCreateSnapshot();
232             openTransactions.put(ret.getIdentifier(), ret);
233             return ret;
234         }
235     }
236
237     @Holding("this")
238     abstract ClientSnapshot doCreateSnapshot();
239
240     @Holding("this")
241     abstract ClientTransaction doCreateTransaction();
242
243     /**
244      * Callback invoked from {@link AbstractClientHandle}'s lifecycle to inform that a particular transaction is
245      * completing with a set of participating shards.
246      *
247      * @param txId Transaction identifier
248      * @param participatingShards Participating shard cookies
249      */
250     final void onTransactionShardsBound(final TransactionIdentifier txId, final Set<Long> participatingShards) {
251         // Guard against startReconnect() kicking in. It is okay to connect new participants concurrently, as those
252         // will not see the holes caused by this.
253         final long stamp = lock.readLock();
254         try {
255             for (var entry : histories.entrySet()) {
256                 if (!participatingShards.contains(entry.getKey())) {
257                     entry.getValue().skipTransaction(txId);
258                 }
259             }
260         } finally {
261             lock.unlockRead(stamp);
262         }
263     }
264
265     /**
266      * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
267      *
268      * @param tx Client transaction
269      * @param cohort Transaction commit cohort
270      */
271     synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
272             final AbstractTransactionCommitCohort cohort) {
273         final TransactionIdentifier txId = tx.getIdentifier();
274         if (openTransactions.remove(txId) == null) {
275             LOG.warn("Transaction {} not recorded, proceeding with readiness", txId);
276         }
277
278         final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
279         checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s", cohort, txId, previous);
280
281         LOG.debug("Local history {} readied transaction {}", this, txId);
282         return cohort;
283     }
284
285     /**
286      * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
287      * backend.
288      *
289      * @param snapshot transaction identifier
290      */
291     synchronized void onTransactionAbort(final AbstractClientHandle<?> snapshot) {
292         if (openTransactions.remove(snapshot.getIdentifier()) == null) {
293             LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier());
294         }
295     }
296
297     /**
298      * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
299      * and all its state can be removed.
300      *
301      * @param txId transaction identifier
302      */
303     // Non-final for mocking
304     synchronized void onTransactionComplete(final TransactionIdentifier txId) {
305         if (readyTransactions.remove(txId) == null) {
306             LOG.warn("Could not find completed transaction {}", txId);
307         }
308     }
309
310     final HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
311         /*
312          * This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places.
313          *
314          * We need to make sure that a new proxy is not created while we are reconnecting, which is partially satisfied
315          * by client.getConnection() throwing InversibleLockException by the time this method is invoked. That does
316          * not cover the case when createHistoryProxy() has already acquired the connection, but has not yet populated
317          * the history map.
318          *
319          * Hence we need to make sure no potential computation is happening concurrently with us looking at the history
320          * map. Once we have performed that lookup, though, we can release the lock immediately, as all creation
321          * requests are established to happen either before or after the reconnect attempt.
322          */
323         final ProxyHistory oldProxy;
324         final long stamp = lock.writeLock();
325         try {
326             oldProxy = histories.get(newConn.cookie());
327         } finally {
328             lock.unlockWrite(stamp);
329         }
330
331         if (oldProxy == null) {
332             return null;
333         }
334
335         final ProxyReconnectCohort proxy = verifyNotNull(oldProxy.startReconnect(newConn));
336         return new HistoryReconnectCohort() {
337             @Override
338             ProxyReconnectCohort getProxy() {
339                 return proxy;
340             }
341
342             @Override
343             void replayRequests(final Collection<ConnectionEntry> previousEntries) {
344                 proxy.replayRequests(previousEntries);
345             }
346
347             @Override
348             public void close() {
349                 LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn);
350                 final ProxyHistory newProxy = proxy.finishReconnect();
351                 if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) {
352                     LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy,
353                         AbstractClientHistory.this);
354                 }
355             }
356         };
357     }
358 }