d306d13e2e29f50063b89f42136a5dffd4fc03ed
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractClientHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static com.google.common.base.Verify.verifyNotNull;
13 import static java.util.Objects.requireNonNull;
14
15 import java.util.Collection;
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
20 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
21 import java.util.concurrent.locks.StampedLock;
22 import org.checkerframework.checker.lock.qual.GuardedBy;
23 import org.checkerframework.checker.lock.qual.Holding;
24 import org.eclipse.jdt.annotation.NonNull;
25 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
26 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
27 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
28 import org.opendaylight.controller.cluster.access.client.InversibleLockException;
29 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
31 import org.opendaylight.controller.cluster.access.concepts.Response;
32 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
33 import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
34 import org.opendaylight.yangtools.concepts.Identifiable;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
41  * and the other for single transactions.
42  *
43  * @author Robert Varga
44  */
45 public abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
46     enum State {
47         IDLE,
48         TX_OPEN,
49         CLOSED,
50     }
51
52     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
53     private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
54             AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
55     private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
56             AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
57
58     @GuardedBy("this")
59     private final Map<TransactionIdentifier, AbstractClientHandle<?>> openTransactions = new HashMap<>();
60     @GuardedBy("this")
61     private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
62
63     @GuardedBy("lock")
64     private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
65     private final StampedLock lock = new StampedLock();
66
67     private final @NonNull AbstractDataStoreClientBehavior client;
68     private final @NonNull LocalHistoryIdentifier identifier;
69
70     // Used via NEXT_TX_UPDATER
71     @SuppressWarnings("unused")
72     private volatile long nextTx = 0;
73
74     private volatile State state = State.IDLE;
75
76     AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
77         this.client = requireNonNull(client);
78         this.identifier = requireNonNull(identifier);
79         checkArgument(identifier.getCookie() == 0);
80     }
81
82     final State state() {
83         return state;
84     }
85
86     final void updateState(final State expected, final State next) {
87         final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
88         checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
89         LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
90     }
91
92     final synchronized void doClose() {
93         final State local = state;
94         if (local != State.CLOSED) {
95             checkState(local == State.IDLE, "Local history %s has an open transaction", this);
96             histories.values().forEach(ProxyHistory::close);
97             updateState(local, State.CLOSED);
98         }
99     }
100
101     final synchronized void onProxyDestroyed(final ProxyHistory proxyHistory) {
102         histories.remove(proxyHistory.getIdentifier().getCookie());
103         LOG.debug("{}: removed destroyed proxy {}", this, proxyHistory);
104     }
105
106     @Override
107     public final LocalHistoryIdentifier getIdentifier() {
108         return identifier;
109     }
110
111     final long nextTx() {
112         return NEXT_TX_UPDATER.getAndIncrement(this);
113     }
114
115     final Long resolveShardForPath(final YangInstanceIdentifier path) {
116         return client.resolveShardForPath(path);
117     }
118
119     @Override
120     final void localAbort(final Throwable cause) {
121         final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
122         if (oldState != State.CLOSED) {
123             LOG.debug("Force-closing history {}", getIdentifier(), cause);
124
125             synchronized (this) {
126                 for (AbstractClientHandle<?> t : openTransactions.values()) {
127                     t.localAbort(cause);
128                 }
129                 openTransactions.clear();
130                 readyTransactions.clear();
131             }
132         }
133     }
134
135     /**
136      * Create a new history proxy for a given shard.
137      *
138      * @param shard Shard cookie
139      * @throws InversibleLockException if the shard is being reconnected
140      */
141     @Holding("lock")
142     private ProxyHistory createHistoryProxy(final Long shard) {
143         final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
144         final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
145             identifier.getHistoryId(), shard);
146         LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard);
147
148         final ProxyHistory ret = createHistoryProxy(proxyId, connection);
149
150         // Request creation of the history, if it is not the single history
151         if (ret.getIdentifier().getHistoryId() != 0) {
152             connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
153                 this::createHistoryCallback);
154         }
155         return ret;
156     }
157
158     abstract ProxyHistory createHistoryProxy(LocalHistoryIdentifier historyId,
159             AbstractClientConnection<ShardBackendInfo> connection);
160
161     private void createHistoryCallback(final Response<?, ?> response) {
162         LOG.debug("Create history response {}", response);
163     }
164
165     private @NonNull ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
166         while (true) {
167             try {
168                 // Short-lived lock to ensure exclusion of createHistoryProxy and the lookup phase in startReconnect,
169                 // see comments in startReconnect() for details.
170                 final long stamp = lock.readLock();
171                 try {
172                     return histories.computeIfAbsent(shard, this::createHistoryProxy);
173                 } finally {
174                     lock.unlockRead(stamp);
175                 }
176             } catch (InversibleLockException e) {
177                 LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
178                 e.awaitResolution();
179                 LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
180             }
181         }
182     }
183
184     final @NonNull AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId,
185             final Long shard) {
186         return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true);
187     }
188
189     final @NonNull AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId,
190             final Long shard) {
191         return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false);
192     }
193
194     private void checkNotClosed() {
195         if (state == State.CLOSED) {
196             throw new DOMTransactionChainClosedException(String.format("Local history %s is closed", identifier));
197         }
198     }
199
200     /**
201      * Allocate a new {@link ClientTransaction}.
202      *
203      * @return A new {@link ClientTransaction}
204      * @throws DOMTransactionChainClosedException if this history is closed
205      * @throws IllegalStateException if a previous dependent transaction has not been closed
206      */
207     // Non-final for mocking
208     public @NonNull ClientTransaction createTransaction() {
209         checkNotClosed();
210
211         synchronized (this) {
212             final ClientTransaction ret = doCreateTransaction();
213             openTransactions.put(ret.getIdentifier(), ret);
214             return ret;
215         }
216     }
217
218     /**
219      * Create a new {@link ClientSnapshot}.
220      *
221      * @return A new {@link ClientSnapshot}
222      * @throws DOMTransactionChainClosedException if this history is closed
223      * @throws IllegalStateException if a previous dependent transaction has not been closed
224      */
225     // Non-final for mocking
226     public ClientSnapshot takeSnapshot() {
227         checkNotClosed();
228
229         synchronized (this) {
230             final ClientSnapshot ret = doCreateSnapshot();
231             openTransactions.put(ret.getIdentifier(), ret);
232             return ret;
233         }
234     }
235
236     @Holding("this")
237     abstract ClientSnapshot doCreateSnapshot();
238
239     @Holding("this")
240     abstract ClientTransaction doCreateTransaction();
241
242     /**
243      * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
244      *
245      * @param tx Client transaction
246      * @param cohort Transaction commit cohort
247      */
248     synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
249             final AbstractTransactionCommitCohort cohort) {
250         final TransactionIdentifier txId = tx.getIdentifier();
251         if (openTransactions.remove(txId) == null) {
252             LOG.warn("Transaction {} not recorded, proceeding with readiness", txId);
253         }
254
255         final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
256         checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s", cohort, txId, previous);
257
258         LOG.debug("Local history {} readied transaction {}", this, txId);
259         return cohort;
260     }
261
262     /**
263      * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
264      * backend.
265      *
266      * @param snapshot transaction identifier
267      */
268     synchronized void onTransactionAbort(final AbstractClientHandle<?> snapshot) {
269         if (openTransactions.remove(snapshot.getIdentifier()) == null) {
270             LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier());
271         }
272     }
273
274     /**
275      * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
276      * and all its state can be removed.
277      *
278      * @param txId transaction identifier
279      */
280     // Non-final for mocking
281     synchronized void onTransactionComplete(final TransactionIdentifier txId) {
282         if (readyTransactions.remove(txId) == null) {
283             LOG.warn("Could not find completed transaction {}", txId);
284         }
285     }
286
287     final HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
288         /*
289          * This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places.
290          *
291          * We need to make sure that a new proxy is not created while we are reconnecting, which is partially satisfied
292          * by client.getConnection() throwing InversibleLockException by the time this method is invoked. That does
293          * not cover the case when createHistoryProxy() has already acquired the connection, but has not yet populated
294          * the history map.
295          *
296          * Hence we need to make sure no potential computation is happening concurrently with us looking at the history
297          * map. Once we have performed that lookup, though, we can release the lock immediately, as all creation
298          * requests are established to happen either before or after the reconnect attempt.
299          */
300         final ProxyHistory oldProxy;
301         final long stamp = lock.writeLock();
302         try {
303             oldProxy = histories.get(newConn.cookie());
304         } finally {
305             lock.unlockWrite(stamp);
306         }
307
308         if (oldProxy == null) {
309             return null;
310         }
311
312         final ProxyReconnectCohort proxy = verifyNotNull(oldProxy.startReconnect(newConn));
313         return new HistoryReconnectCohort() {
314             @Override
315             ProxyReconnectCohort getProxy() {
316                 return proxy;
317             }
318
319             @Override
320             void replayRequests(final Collection<ConnectionEntry> previousEntries) {
321                 proxy.replayRequests(previousEntries);
322             }
323
324             @Override
325             public void close() {
326                 LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn);
327                 final ProxyHistory newProxy = proxy.finishReconnect();
328                 if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) {
329                     LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy,
330                         AbstractClientHistory.this);
331                 }
332             }
333         };
334     }
335 }