BUG-2138: Create DistributedShardFrontend
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractClientHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import java.util.HashMap;
13 import java.util.Map;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
16 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
17 import java.util.concurrent.locks.StampedLock;
18 import javax.annotation.concurrent.GuardedBy;
19 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
20 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
21 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
22 import org.opendaylight.controller.cluster.access.client.InversibleLockException;
23 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
24 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
25 import org.opendaylight.controller.cluster.access.concepts.Response;
26 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
27 import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
28 import org.opendaylight.yangtools.concepts.Identifiable;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
35  * and the other for single transactions.
36  *
37  * @author Robert Varga
38  */
39 public abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
40     enum State {
41         IDLE,
42         TX_OPEN,
43         CLOSED,
44     }
45
46     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
47     private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
48             AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
49     private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
50             AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
51
52     @GuardedBy("this")
53     private final Map<TransactionIdentifier, AbstractClientHandle<?>> openTransactions = new HashMap<>();
54     @GuardedBy("this")
55     private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
56
57     @GuardedBy("lock")
58     private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
59     private final StampedLock lock = new StampedLock();
60
61     private final AbstractDataStoreClientBehavior client;
62     private final LocalHistoryIdentifier identifier;
63
64     // Used via NEXT_TX_UPDATER
65     @SuppressWarnings("unused")
66     private volatile long nextTx = 0;
67
68     private volatile State state = State.IDLE;
69
70     AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
71         this.client = Preconditions.checkNotNull(client);
72         this.identifier = Preconditions.checkNotNull(identifier);
73         Preconditions.checkArgument(identifier.getCookie() == 0);
74     }
75
76     final State state() {
77         return state;
78     }
79
80     final void updateState(final State expected, final State next) {
81         final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
82         Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
83         LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
84     }
85
86     final synchronized void doClose() {
87         final State local = state;
88         if (local != State.CLOSED) {
89             Preconditions.checkState(local == State.IDLE, "Local history %s has an open transaction", this);
90             histories.values().forEach(ProxyHistory::close);
91         }
92     }
93
94     final synchronized void onProxyDestroyed(final ProxyHistory proxyHistory) {
95         histories.remove(proxyHistory.getIdentifier().getCookie());
96         LOG.debug("{}: removed destroyed proxy {}", this, proxyHistory);
97     }
98
99     @Override
100     public final LocalHistoryIdentifier getIdentifier() {
101         return identifier;
102     }
103
104     final long nextTx() {
105         return NEXT_TX_UPDATER.getAndIncrement(this);
106     }
107
108     final Long resolveShardForPath(final YangInstanceIdentifier path) {
109         return client.resolveShardForPath(path);
110     }
111
112     @Override
113     final void localAbort(final Throwable cause) {
114         final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
115         if (oldState != State.CLOSED) {
116             LOG.debug("Force-closing history {}", getIdentifier(), cause);
117
118             synchronized (this) {
119                 for (AbstractClientHandle<?> t : openTransactions.values()) {
120                     t.localAbort(cause);
121                 }
122                 openTransactions.clear();
123                 readyTransactions.clear();
124             }
125         }
126     }
127
128     /**
129      * Create a new history proxy for a given shard.
130      *
131      * @throws InversibleLockException if the shard is being reconnected
132      */
133     @GuardedBy("lock")
134     private ProxyHistory createHistoryProxy(final Long shard) {
135         final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
136         final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
137             identifier.getHistoryId(), shard);
138         LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard);
139
140         final ProxyHistory ret = createHistoryProxy(proxyId, connection);
141
142         // Request creation of the history, if it is not the single history
143         if (ret.getIdentifier().getHistoryId() != 0) {
144             connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
145                 this::createHistoryCallback);
146         }
147         return ret;
148     }
149
150     abstract ProxyHistory createHistoryProxy(LocalHistoryIdentifier historyId,
151             AbstractClientConnection<ShardBackendInfo> connection);
152
153     private void createHistoryCallback(final Response<?, ?> response) {
154         LOG.debug("Create history response {}", response);
155     }
156
157     private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
158         while (true) {
159             try {
160                 // Short-lived lock to ensure exclusion of createHistoryProxy and the lookup phase in startReconnect,
161                 // see comments in startReconnect() for details.
162                 final long stamp = lock.readLock();
163                 try {
164                     return histories.computeIfAbsent(shard, this::createHistoryProxy);
165                 } finally {
166                     lock.unlockRead(stamp);
167                 }
168             } catch (InversibleLockException e) {
169                 LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
170                 e.awaitResolution();
171                 LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
172             }
173         }
174     }
175
176     final AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId, final Long shard) {
177         return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true);
178     }
179
180     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
181         return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false);
182     }
183
184     private void checkNotClosed() {
185         if (state == State.CLOSED) {
186             throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier));
187         }
188     }
189
190     /**
191      * Allocate a new {@link ClientTransaction}.
192      *
193      * @return A new {@link ClientTransaction}
194      * @throws TransactionChainClosedException if this history is closed
195      * @throws IllegalStateException if a previous dependent transaction has not been closed
196      */
197     public ClientTransaction createTransaction() {
198         checkNotClosed();
199
200         synchronized (this) {
201             final ClientTransaction ret = doCreateTransaction();
202             openTransactions.put(ret.getIdentifier(), ret);
203             return ret;
204         }
205     }
206
207     /**
208      * Create a new {@link ClientSnapshot}.
209      *
210      * @return A new {@link ClientSnapshot}
211      * @throws TransactionChainClosedException if this history is closed
212      * @throws IllegalStateException if a previous dependent transaction has not been closed
213      */
214     public final ClientSnapshot takeSnapshot() {
215         checkNotClosed();
216
217         synchronized (this) {
218             final ClientSnapshot ret = doCreateSnapshot();
219             openTransactions.put(ret.getIdentifier(), ret);
220             return ret;
221         }
222     }
223
224     @GuardedBy("this")
225     abstract ClientSnapshot doCreateSnapshot();
226
227     @GuardedBy("this")
228     abstract ClientTransaction doCreateTransaction();
229
230     /**
231      * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
232      *
233      * @param txId Transaction identifier
234      * @param cohort Transaction commit cohort
235      */
236     synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
237             final AbstractTransactionCommitCohort cohort) {
238         final TransactionIdentifier txId = tx.getIdentifier();
239         if (openTransactions.remove(txId) == null) {
240             LOG.warn("Transaction {} not recorded, proceeding with readiness", txId);
241         }
242
243         final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
244         Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
245                 cohort, txId, previous);
246
247         LOG.debug("Local history {} readied transaction {}", this, txId);
248         return cohort;
249     }
250
251     /**
252      * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
253      * backend.
254      *
255      * @param snapshot transaction identifier
256      */
257     synchronized void onTransactionAbort(final AbstractClientHandle<?> snapshot) {
258         if (openTransactions.remove(snapshot.getIdentifier()) == null) {
259             LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier());
260         }
261     }
262
263     /**
264      * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
265      * and all its state can be removed.
266      *
267      * @param txId transaction identifier
268      */
269     synchronized void onTransactionComplete(final TransactionIdentifier txId) {
270         if (readyTransactions.remove(txId) == null) {
271             LOG.warn("Could not find completed transaction {}", txId);
272         }
273     }
274
275     HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
276         /*
277          * This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places.
278          *
279          * We need to make sure that a new proxy is not created while we are reconnecting, which is partially satisfied
280          * by client.getConnection() throwing InversibleLockException by the time this method is invoked. That does
281          * not cover the case when createHistoryProxy() has already acquired the connection, but has not yet populated
282          * the history map.
283          *
284          * Hence we need to make sure no potential computation is happening concurrently with us looking at the history
285          * map. Once we have performed that lookup, though, we can release the lock immediately, as all creation
286          * requests are established to happen either before or after the reconnect attempt.
287          */
288         final ProxyHistory oldProxy;
289         final long stamp = lock.writeLock();
290         try {
291             oldProxy = histories.get(newConn.cookie());
292         } finally {
293             lock.unlockWrite(stamp);
294         }
295
296         if (oldProxy == null) {
297             return null;
298         }
299
300         final ProxyReconnectCohort proxy = Verify.verifyNotNull(oldProxy.startReconnect(newConn));
301         return new HistoryReconnectCohort() {
302             @Override
303             ProxyReconnectCohort getProxy() {
304                 return proxy;
305             }
306
307             @Override
308             void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
309                 proxy.replaySuccessfulRequests(previousEntries);
310             }
311
312             @Override
313             public void close() {
314                 LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn);
315                 final ProxyHistory newProxy = proxy.finishReconnect();
316                 if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) {
317                     LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy,
318                         AbstractClientHistory.this);
319                 }
320             }
321         };
322     }
323
324 }