BUG-5280: add AbstractClientConnection
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractClientHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import java.util.HashMap;
13 import java.util.Map;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
16 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
19 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
20 import org.opendaylight.controller.cluster.access.client.InversibleLockException;
21 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
22 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
23 import org.opendaylight.controller.cluster.access.concepts.Response;
24 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
25 import org.opendaylight.yangtools.concepts.Identifiable;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * Abstract base class for client view of a history. This class has two implementations, one for normal local histories
32  * and the other for single transactions.
33  *
34  * @author Robert Varga
35  */
36 abstract class AbstractClientHistory extends LocalAbortable implements Identifiable<LocalHistoryIdentifier> {
37     enum State {
38         IDLE,
39         TX_OPEN,
40         CLOSED,
41     }
42
43     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
44     private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
45             AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
46     private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
47             AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
48
49     @GuardedBy("this")
50     private final Map<TransactionIdentifier, ClientTransaction> openTransactions = new HashMap<>();
51     @GuardedBy("this")
52     private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
53
54     private final Map<Long, ProxyHistory> histories = new ConcurrentHashMap<>();
55     private final AbstractDataStoreClientBehavior client;
56     private final LocalHistoryIdentifier identifier;
57
58     // Used via NEXT_TX_UPDATER
59     @SuppressWarnings("unused")
60     private volatile long nextTx = 0;
61
62     private volatile State state = State.IDLE;
63
64     AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
65         this.client = Preconditions.checkNotNull(client);
66         this.identifier = Preconditions.checkNotNull(identifier);
67         Preconditions.checkArgument(identifier.getCookie() == 0);
68     }
69
70     final State state() {
71         return state;
72     }
73
74     final void updateState(final State expected, final State next) {
75         final boolean success = STATE_UPDATER.compareAndSet(this, expected, next);
76         Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
77         LOG.debug("Client history {} changed state from {} to {}", this, expected, next);
78     }
79
80     @Override
81     public final LocalHistoryIdentifier getIdentifier() {
82         return identifier;
83     }
84
85     final long nextTx() {
86         return NEXT_TX_UPDATER.getAndIncrement(this);
87     }
88
89     final Long resolveShardForPath(final YangInstanceIdentifier path) {
90         return client.resolveShardForPath(path);
91     }
92
93     @Override
94     final void localAbort(final Throwable cause) {
95         final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
96         if (oldState != State.CLOSED) {
97             LOG.debug("Force-closing history {}", getIdentifier(), cause);
98
99             synchronized (this) {
100                 for (ClientTransaction t : openTransactions.values()) {
101                     t.localAbort(cause);
102                 }
103                 openTransactions.clear();
104                 readyTransactions.clear();
105             }
106         }
107     }
108
109     /**
110      * Create a new history proxy for a given shard.
111      *
112      * @throws InversibleLockException if the shard is being reconnected
113      */
114     private ProxyHistory createHistoryProxy(final Long shard) {
115         final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
116         final ProxyHistory ret = createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(),
117             identifier.getHistoryId(), shard), connection);
118
119         // Request creation of the history.
120         connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
121             this::createHistoryCallback);
122         return ret;
123     }
124
125     abstract ProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
126             final AbstractClientConnection<ShardBackendInfo> connection);
127
128     private void createHistoryCallback(final Response<?, ?> response) {
129         LOG.debug("Create history response {}", response);
130     }
131
132     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
133         while (true) {
134             final ProxyHistory history;
135             try {
136                 history = histories.computeIfAbsent(shard, this::createHistoryProxy);
137             } catch (InversibleLockException e) {
138                 LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
139                 e.awaitResolution();
140                 LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
141                 continue;
142             }
143
144             return history.createTransactionProxy(transactionId);
145         }
146     }
147
148     public final ClientTransaction createTransaction() {
149         Preconditions.checkState(state != State.CLOSED);
150
151         synchronized (this) {
152             final ClientTransaction ret = doCreateTransaction();
153             openTransactions.put(ret.getIdentifier(), ret);
154             return ret;
155         }
156     }
157
158     @GuardedBy("this")
159     abstract ClientTransaction doCreateTransaction();
160
161     /**
162      * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
163      *
164      * @param txId Transaction identifier
165      * @param cohort Transaction commit cohort
166      */
167     synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
168             final AbstractTransactionCommitCohort cohort) {
169         final ClientTransaction tx = openTransactions.remove(txId);
170         Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId);
171
172         final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
173         Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
174                 cohort, txId, previous);
175
176         LOG.debug("Local history {} readied transaction {}", this, txId);
177         return cohort;
178     }
179
180     /**
181      * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
182      * backend.
183      *
184      * @param txId transaction identifier
185      */
186     synchronized void onTransactionAbort(final TransactionIdentifier txId) {
187         if (openTransactions.remove(txId) == null) {
188             LOG.warn("Could not find aborting transaction {}", txId);
189         }
190     }
191
192     /**
193      * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
194      * and all its state can be removed.
195      *
196      * @param txId transaction identifier
197      */
198     synchronized void onTransactionComplete(final TransactionIdentifier txId) {
199         if (readyTransactions.remove(txId) == null) {
200             LOG.warn("Could not find completed transaction {}", txId);
201         }
202     }
203
204     HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
205         final ProxyHistory oldProxy = histories.get(newConn.cookie());
206         if (oldProxy == null) {
207             return null;
208         }
209
210         final ProxyReconnectCohort proxy = Verify.verifyNotNull(oldProxy.startReconnect(newConn));
211         return new HistoryReconnectCohort() {
212             @Override
213             ProxyReconnectCohort getProxy() {
214                 return proxy;
215             }
216
217             @Override
218             void replaySuccessfulRequests() {
219                 proxy.replaySuccessfulRequests();
220             }
221
222             @Override
223             public void close() {
224                 LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn);
225                 final ProxyHistory newProxy = proxy.finishReconnect();
226                 if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) {
227                     LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy,
228                         AbstractClientHistory.this);
229                 }
230             }
231         };
232     }
233 }