Implement scatter/gather on module shards
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractClientHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static com.google.common.base.Verify.verifyNotNull;
13 import static java.util.Objects.requireNonNull;
14
15 import java.util.Collection;
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.Set;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
21 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
22 import java.util.concurrent.locks.StampedLock;
23 import java.util.stream.Stream;
24 import org.checkerframework.checker.lock.qual.GuardedBy;
25 import org.checkerframework.checker.lock.qual.Holding;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
28 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
29 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
30 import org.opendaylight.controller.cluster.access.client.InversibleLockException;
31 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
33 import org.opendaylight.controller.cluster.access.concepts.Response;
34 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
36 import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
37 import org.opendaylight.yangtools.concepts.Identifiable;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
44  * and the other for single transactions.
45  *
46  * @author Robert Varga
47  */
48 public abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
49     enum State {
50         IDLE,
51         TX_OPEN,
52         CLOSED,
53     }
54
55     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
56     private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
57             AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
58     private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
59             AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
60
61     @GuardedBy("this")
62     private final Map<TransactionIdentifier, AbstractClientHandle<?>> openTransactions = new HashMap<>();
63     @GuardedBy("this")
64     private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
65
66     @GuardedBy("lock")
67     private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
68     private final StampedLock lock = new StampedLock();
69
70     private final @NonNull AbstractDataStoreClientBehavior client;
71     private final @NonNull LocalHistoryIdentifier identifier;
72
73     // Used via NEXT_TX_UPDATER
74     @SuppressWarnings("unused")
75     private volatile long nextTx = 0;
76
77     private volatile State state = State.IDLE;
78
79     AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
80         this.client = requireNonNull(client);
81         this.identifier = requireNonNull(identifier);
82         checkArgument(identifier.getCookie() == 0);
83     }
84
85     final State state() {
86         return state;
87     }
88
89     final void updateState(final State expected, final State next) {
90         final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
91         checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
92         LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
93     }
94
95     final synchronized void doClose() {
96         final State local = state;
97         if (local != State.CLOSED) {
98             checkState(local == State.IDLE, "Local history %s has an open transaction", this);
99             histories.values().forEach(ProxyHistory::close);
100             updateState(local, State.CLOSED);
101         }
102     }
103
104     final synchronized void onProxyDestroyed(final ProxyHistory proxyHistory) {
105         histories.remove(proxyHistory.getIdentifier().getCookie());
106         LOG.debug("{}: removed destroyed proxy {}", this, proxyHistory);
107     }
108
109     @Override
110     public final LocalHistoryIdentifier getIdentifier() {
111         return identifier;
112     }
113
114     final long nextTx() {
115         return NEXT_TX_UPDATER.getAndIncrement(this);
116     }
117
118     final Long resolveShardForPath(final YangInstanceIdentifier path) {
119         return client.resolveShardForPath(path);
120     }
121
122     final Stream<Long> resolveAllShards() {
123         return client.resolveAllShards();
124     }
125
126     final ActorUtils actorUtils() {
127         return client.actorUtils();
128     }
129
130     @Override
131     final void localAbort(final Throwable cause) {
132         final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
133         if (oldState != State.CLOSED) {
134             LOG.debug("Force-closing history {}", getIdentifier(), cause);
135
136             synchronized (this) {
137                 for (AbstractClientHandle<?> t : openTransactions.values()) {
138                     t.localAbort(cause);
139                 }
140                 openTransactions.clear();
141                 readyTransactions.clear();
142             }
143         }
144     }
145
146     /**
147      * Create a new history proxy for a given shard.
148      *
149      * @param shard Shard cookie
150      * @throws InversibleLockException if the shard is being reconnected
151      */
152     @Holding("lock")
153     private ProxyHistory createHistoryProxy(final Long shard) {
154         final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
155         final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
156             identifier.getHistoryId(), shard);
157         LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard);
158
159         final ProxyHistory ret = createHistoryProxy(proxyId, connection);
160
161         // Request creation of the history, if it is not the single history
162         if (ret.getIdentifier().getHistoryId() != 0) {
163             connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
164                 this::createHistoryCallback);
165         }
166         return ret;
167     }
168
169     abstract ProxyHistory createHistoryProxy(LocalHistoryIdentifier historyId,
170             AbstractClientConnection<ShardBackendInfo> connection);
171
172     private void createHistoryCallback(final Response<?, ?> response) {
173         LOG.debug("Create history response {}", response);
174     }
175
176     private @NonNull ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
177         while (true) {
178             try {
179                 // Short-lived lock to ensure exclusion of createHistoryProxy and the lookup phase in startReconnect,
180                 // see comments in startReconnect() for details.
181                 final long stamp = lock.readLock();
182                 try {
183                     return histories.computeIfAbsent(shard, this::createHistoryProxy);
184                 } finally {
185                     lock.unlockRead(stamp);
186                 }
187             } catch (InversibleLockException e) {
188                 LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
189                 e.awaitResolution();
190                 LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
191             }
192         }
193     }
194
195     final @NonNull AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId,
196             final Long shard) {
197         return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true);
198     }
199
200     final @NonNull AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId,
201             final Long shard) {
202         return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false);
203     }
204
205     private void checkNotClosed() {
206         if (state == State.CLOSED) {
207             throw new DOMTransactionChainClosedException(String.format("Local history %s is closed", identifier));
208         }
209     }
210
211     /**
212      * Allocate a new {@link ClientTransaction}.
213      *
214      * @return A new {@link ClientTransaction}
215      * @throws DOMTransactionChainClosedException if this history is closed
216      * @throws IllegalStateException if a previous dependent transaction has not been closed
217      */
218     // Non-final for mocking
219     public @NonNull ClientTransaction createTransaction() {
220         checkNotClosed();
221
222         synchronized (this) {
223             final ClientTransaction ret = doCreateTransaction();
224             openTransactions.put(ret.getIdentifier(), ret);
225             return ret;
226         }
227     }
228
229     /**
230      * Create a new {@link ClientSnapshot}.
231      *
232      * @return A new {@link ClientSnapshot}
233      * @throws DOMTransactionChainClosedException if this history is closed
234      * @throws IllegalStateException if a previous dependent transaction has not been closed
235      */
236     // Non-final for mocking
237     public ClientSnapshot takeSnapshot() {
238         checkNotClosed();
239
240         synchronized (this) {
241             final ClientSnapshot ret = doCreateSnapshot();
242             openTransactions.put(ret.getIdentifier(), ret);
243             return ret;
244         }
245     }
246
247     @Holding("this")
248     abstract ClientSnapshot doCreateSnapshot();
249
250     @Holding("this")
251     abstract ClientTransaction doCreateTransaction();
252
253     /**
254      * Callback invoked from {@link AbstractClientHandle}'s lifecycle to inform that a particular transaction is
255      * completing with a set of participating shards.
256      *
257      * @param txId Transaction identifier
258      * @param participatingShards Participating shard cookies
259      */
260     final void onTransactionShardsBound(final TransactionIdentifier txId, final Set<Long> participatingShards) {
261         // Guard against startReconnect() kicking in. It is okay to connect new participants concurrently, as those
262         // will not see the holes caused by this.
263         final long stamp = lock.readLock();
264         try {
265             for (var entry : histories.entrySet()) {
266                 if (!participatingShards.contains(entry.getKey())) {
267                     entry.getValue().skipTransaction(txId);
268                 }
269             }
270         } finally {
271             lock.unlockRead(stamp);
272         }
273     }
274
275     /**
276      * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
277      *
278      * @param tx Client transaction
279      * @param cohort Transaction commit cohort
280      */
281     synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
282             final AbstractTransactionCommitCohort cohort) {
283         final TransactionIdentifier txId = tx.getIdentifier();
284         if (openTransactions.remove(txId) == null) {
285             LOG.warn("Transaction {} not recorded, proceeding with readiness", txId);
286         }
287
288         final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
289         checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s", cohort, txId, previous);
290
291         LOG.debug("Local history {} readied transaction {}", this, txId);
292         return cohort;
293     }
294
295     /**
296      * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
297      * backend.
298      *
299      * @param snapshot transaction identifier
300      */
301     synchronized void onTransactionAbort(final AbstractClientHandle<?> snapshot) {
302         if (openTransactions.remove(snapshot.getIdentifier()) == null) {
303             LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier());
304         }
305     }
306
307     /**
308      * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
309      * and all its state can be removed.
310      *
311      * @param txId transaction identifier
312      */
313     // Non-final for mocking
314     synchronized void onTransactionComplete(final TransactionIdentifier txId) {
315         if (readyTransactions.remove(txId) == null) {
316             LOG.warn("Could not find completed transaction {}", txId);
317         }
318     }
319
320     final HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
321         /*
322          * This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places.
323          *
324          * We need to make sure that a new proxy is not created while we are reconnecting, which is partially satisfied
325          * by client.getConnection() throwing InversibleLockException by the time this method is invoked. That does
326          * not cover the case when createHistoryProxy() has already acquired the connection, but has not yet populated
327          * the history map.
328          *
329          * Hence we need to make sure no potential computation is happening concurrently with us looking at the history
330          * map. Once we have performed that lookup, though, we can release the lock immediately, as all creation
331          * requests are established to happen either before or after the reconnect attempt.
332          */
333         final ProxyHistory oldProxy;
334         final long stamp = lock.writeLock();
335         try {
336             oldProxy = histories.get(newConn.cookie());
337         } finally {
338             lock.unlockWrite(stamp);
339         }
340
341         if (oldProxy == null) {
342             return null;
343         }
344
345         final ProxyReconnectCohort proxy = verifyNotNull(oldProxy.startReconnect(newConn));
346         return new HistoryReconnectCohort() {
347             @Override
348             ProxyReconnectCohort getProxy() {
349                 return proxy;
350             }
351
352             @Override
353             void replayRequests(final Collection<ConnectionEntry> previousEntries) {
354                 proxy.replayRequests(previousEntries);
355             }
356
357             @Override
358             public void close() {
359                 LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn);
360                 final ProxyHistory newProxy = proxy.finishReconnect();
361                 if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) {
362                     LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy,
363                         AbstractClientHistory.this);
364                 }
365             }
366         };
367     }
368 }