Integrate MRI projects for Neon
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractClientHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import java.util.Collection;
13 import java.util.HashMap;
14 import java.util.Map;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import java.util.concurrent.locks.StampedLock;
19 import javax.annotation.concurrent.GuardedBy;
20 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
21 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
22 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
23 import org.opendaylight.controller.cluster.access.client.InversibleLockException;
24 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
25 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
26 import org.opendaylight.controller.cluster.access.concepts.Response;
27 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
28 import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
29 import org.opendaylight.yangtools.concepts.Identifiable;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
36  * and the other for single transactions.
37  *
38  * @author Robert Varga
39  */
40 public abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
41     enum State {
42         IDLE,
43         TX_OPEN,
44         CLOSED,
45     }
46
47     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
48     private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
49             AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
50     private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
51             AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
52
53     @GuardedBy("this")
54     private final Map<TransactionIdentifier, AbstractClientHandle<?>> openTransactions = new HashMap<>();
55     @GuardedBy("this")
56     private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
57
58     @GuardedBy("lock")
59     private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
60     private final StampedLock lock = new StampedLock();
61
62     private final AbstractDataStoreClientBehavior client;
63     private final LocalHistoryIdentifier identifier;
64
65     // Used via NEXT_TX_UPDATER
66     @SuppressWarnings("unused")
67     private volatile long nextTx = 0;
68
69     private volatile State state = State.IDLE;
70
71     AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
72         this.client = Preconditions.checkNotNull(client);
73         this.identifier = Preconditions.checkNotNull(identifier);
74         Preconditions.checkArgument(identifier.getCookie() == 0);
75     }
76
77     final State state() {
78         return state;
79     }
80
81     final void updateState(final State expected, final State next) {
82         final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
83         Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
84         LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
85     }
86
87     final synchronized void doClose() {
88         final State local = state;
89         if (local != State.CLOSED) {
90             Preconditions.checkState(local == State.IDLE, "Local history %s has an open transaction", this);
91             histories.values().forEach(ProxyHistory::close);
92             updateState(local, State.CLOSED);
93         }
94     }
95
96     final synchronized void onProxyDestroyed(final ProxyHistory proxyHistory) {
97         histories.remove(proxyHistory.getIdentifier().getCookie());
98         LOG.debug("{}: removed destroyed proxy {}", this, proxyHistory);
99     }
100
101     @Override
102     public LocalHistoryIdentifier getIdentifier() {
103         return identifier;
104     }
105
106     final long nextTx() {
107         return NEXT_TX_UPDATER.getAndIncrement(this);
108     }
109
110     final Long resolveShardForPath(final YangInstanceIdentifier path) {
111         return client.resolveShardForPath(path);
112     }
113
114     @Override
115     final void localAbort(final Throwable cause) {
116         final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
117         if (oldState != State.CLOSED) {
118             LOG.debug("Force-closing history {}", getIdentifier(), cause);
119
120             synchronized (this) {
121                 for (AbstractClientHandle<?> t : openTransactions.values()) {
122                     t.localAbort(cause);
123                 }
124                 openTransactions.clear();
125                 readyTransactions.clear();
126             }
127         }
128     }
129
130     /**
131      * Create a new history proxy for a given shard.
132      *
133      * @throws InversibleLockException if the shard is being reconnected
134      */
135     @GuardedBy("lock")
136     private ProxyHistory createHistoryProxy(final Long shard) {
137         final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
138         final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
139             identifier.getHistoryId(), shard);
140         LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard);
141
142         final ProxyHistory ret = createHistoryProxy(proxyId, connection);
143
144         // Request creation of the history, if it is not the single history
145         if (ret.getIdentifier().getHistoryId() != 0) {
146             connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
147                 this::createHistoryCallback);
148         }
149         return ret;
150     }
151
152     abstract ProxyHistory createHistoryProxy(LocalHistoryIdentifier historyId,
153             AbstractClientConnection<ShardBackendInfo> connection);
154
155     private void createHistoryCallback(final Response<?, ?> response) {
156         LOG.debug("Create history response {}", response);
157     }
158
159     private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
160         while (true) {
161             try {
162                 // Short-lived lock to ensure exclusion of createHistoryProxy and the lookup phase in startReconnect,
163                 // see comments in startReconnect() for details.
164                 final long stamp = lock.readLock();
165                 try {
166                     return histories.computeIfAbsent(shard, this::createHistoryProxy);
167                 } finally {
168                     lock.unlockRead(stamp);
169                 }
170             } catch (InversibleLockException e) {
171                 LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
172                 e.awaitResolution();
173                 LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
174             }
175         }
176     }
177
178     final AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId, final Long shard) {
179         return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true);
180     }
181
182     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
183         return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false);
184     }
185
186     private void checkNotClosed() {
187         if (state == State.CLOSED) {
188             throw new DOMTransactionChainClosedException(String.format("Local history %s is closed", identifier));
189         }
190     }
191
192     /**
193      * Allocate a new {@link ClientTransaction}.
194      *
195      * @return A new {@link ClientTransaction}
196      * @throws DOMTransactionChainClosedException if this history is closed
197      * @throws IllegalStateException if a previous dependent transaction has not been closed
198      */
199     public ClientTransaction createTransaction() {
200         checkNotClosed();
201
202         synchronized (this) {
203             final ClientTransaction ret = doCreateTransaction();
204             openTransactions.put(ret.getIdentifier(), ret);
205             return ret;
206         }
207     }
208
209     /**
210      * Create a new {@link ClientSnapshot}.
211      *
212      * @return A new {@link ClientSnapshot}
213      * @throws DOMTransactionChainClosedException if this history is closed
214      * @throws IllegalStateException if a previous dependent transaction has not been closed
215      */
216     public ClientSnapshot takeSnapshot() {
217         checkNotClosed();
218
219         synchronized (this) {
220             final ClientSnapshot ret = doCreateSnapshot();
221             openTransactions.put(ret.getIdentifier(), ret);
222             return ret;
223         }
224     }
225
226     @GuardedBy("this")
227     abstract ClientSnapshot doCreateSnapshot();
228
229     @GuardedBy("this")
230     abstract ClientTransaction doCreateTransaction();
231
232     /**
233      * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
234      *
235      * @param txId Transaction identifier
236      * @param cohort Transaction commit cohort
237      */
238     synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
239             final AbstractTransactionCommitCohort cohort) {
240         final TransactionIdentifier txId = tx.getIdentifier();
241         if (openTransactions.remove(txId) == null) {
242             LOG.warn("Transaction {} not recorded, proceeding with readiness", txId);
243         }
244
245         final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
246         Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
247                 cohort, txId, previous);
248
249         LOG.debug("Local history {} readied transaction {}", this, txId);
250         return cohort;
251     }
252
253     /**
254      * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
255      * backend.
256      *
257      * @param snapshot transaction identifier
258      */
259     synchronized void onTransactionAbort(final AbstractClientHandle<?> snapshot) {
260         if (openTransactions.remove(snapshot.getIdentifier()) == null) {
261             LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier());
262         }
263     }
264
265     /**
266      * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
267      * and all its state can be removed.
268      *
269      * @param txId transaction identifier
270      */
271     synchronized void onTransactionComplete(final TransactionIdentifier txId) {
272         if (readyTransactions.remove(txId) == null) {
273             LOG.warn("Could not find completed transaction {}", txId);
274         }
275     }
276
277     HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
278         /*
279          * This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places.
280          *
281          * We need to make sure that a new proxy is not created while we are reconnecting, which is partially satisfied
282          * by client.getConnection() throwing InversibleLockException by the time this method is invoked. That does
283          * not cover the case when createHistoryProxy() has already acquired the connection, but has not yet populated
284          * the history map.
285          *
286          * Hence we need to make sure no potential computation is happening concurrently with us looking at the history
287          * map. Once we have performed that lookup, though, we can release the lock immediately, as all creation
288          * requests are established to happen either before or after the reconnect attempt.
289          */
290         final ProxyHistory oldProxy;
291         final long stamp = lock.writeLock();
292         try {
293             oldProxy = histories.get(newConn.cookie());
294         } finally {
295             lock.unlockWrite(stamp);
296         }
297
298         if (oldProxy == null) {
299             return null;
300         }
301
302         final ProxyReconnectCohort proxy = Verify.verifyNotNull(oldProxy.startReconnect(newConn));
303         return new HistoryReconnectCohort() {
304             @Override
305             ProxyReconnectCohort getProxy() {
306                 return proxy;
307             }
308
309             @Override
310             void replayRequests(final Collection<ConnectionEntry> previousEntries) {
311                 proxy.replayRequests(previousEntries);
312             }
313
314             @Override
315             public void close() {
316                 LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn);
317                 final ProxyHistory newProxy = proxy.finishReconnect();
318                 if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) {
319                     LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy,
320                         AbstractClientHistory.this);
321                 }
322             }
323         };
324     }
325
326 }