07fcbebad01a263d38a0bfabe839e8b02832ffe2
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.LinkedHashMap;
15 import java.util.Map;
16 import java.util.Optional;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.BiConsumer;
21 import java.util.function.Consumer;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
26 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
27 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
28 import org.opendaylight.controller.cluster.access.concepts.Request;
29 import org.opendaylight.controller.cluster.access.concepts.RequestException;
30 import org.opendaylight.controller.cluster.access.concepts.Response;
31 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
32 import org.opendaylight.yangtools.concepts.Identifiable;
33 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Per-connection representation of a local history. This class handles state replication across a single connection.
41  *
42  * @author Robert Varga
43  */
44 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
45     private abstract static class AbstractLocal extends ProxyHistory {
46         private final DataTree dataTree;
47
48         AbstractLocal(final AbstractClientConnection<ShardBackendInfo> connection,
49             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
50             super(connection, identifier);
51             this.dataTree = Preconditions.checkNotNull(dataTree);
52         }
53
54         final DataTreeSnapshot takeSnapshot() {
55             return dataTree.takeSnapshot();
56         }
57     }
58
59     private abstract static class AbstractRemote extends ProxyHistory {
60         AbstractRemote(final AbstractClientConnection<ShardBackendInfo> connection,
61             final LocalHistoryIdentifier identifier) {
62             super(connection, identifier);
63         }
64
65         @Override
66         final AbstractProxyTransaction doCreateTransactionProxy(
67                 final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId) {
68             return new RemoteProxyTransaction(this, txId);
69         }
70     }
71
72     private static final class Local extends AbstractLocal {
73         private static final AtomicReferenceFieldUpdater<Local, LocalProxyTransaction> LAST_SEALED_UPDATER =
74                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed");
75
76         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
77         // the open one and attempts to create a new transaction again.
78         private LocalProxyTransaction lastOpen;
79
80         private volatile LocalProxyTransaction lastSealed;
81
82         Local(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier,
83             final DataTree dataTree) {
84             super(connection, identifier, dataTree);
85         }
86
87         @Override
88         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
89                 final TransactionIdentifier txId) {
90             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
91
92             // onTransactionCompleted() runs concurrently
93             final LocalProxyTransaction localSealed = lastSealed;
94             final DataTreeSnapshot baseSnapshot;
95             if (localSealed != null) {
96                 baseSnapshot = localSealed.getSnapshot();
97             } else {
98                 baseSnapshot = takeSnapshot();
99             }
100
101             lastOpen = new LocalProxyTransaction(this, txId,
102                 (CursorAwareDataTreeModification) baseSnapshot.newModification());
103             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
104             return lastOpen;
105         }
106
107         @Override
108         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
109             return createClient(connection, getIdentifier());
110         }
111
112         @Override
113         void onTransactionAborted(final AbstractProxyTransaction tx) {
114             Preconditions.checkState(tx.equals(lastOpen));
115             lastOpen = null;
116         }
117
118         @Override
119         void onTransactionCompleted(final AbstractProxyTransaction tx) {
120             Verify.verify(tx instanceof LocalProxyTransaction);
121
122             if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) {
123                 LOG.debug("Completed last sealed transaction {}", tx);
124             }
125         }
126
127         @Override
128         void onTransactionSealed(final AbstractProxyTransaction tx) {
129             Preconditions.checkState(tx.equals(lastOpen));
130             lastSealed = lastOpen;
131             lastOpen = null;
132         }
133     }
134
135     private static final class LocalSingle extends AbstractLocal {
136         LocalSingle(final AbstractClientConnection<ShardBackendInfo> connection,
137             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
138             super(connection, identifier, dataTree);
139         }
140
141         @Override
142         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
143                 final TransactionIdentifier txId) {
144             return new LocalProxyTransaction(this, txId,
145                 (CursorAwareDataTreeModification) takeSnapshot().newModification());
146         }
147
148         @Override
149         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
150             return createSingle(connection, getIdentifier());
151         }
152     }
153
154     private static final class Remote extends AbstractRemote {
155         Remote(final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
156             super(connection, identifier);
157         }
158
159         @Override
160         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
161             return createClient(connection, getIdentifier());
162         }
163     }
164
165     private static final class RemoteSingle extends AbstractRemote {
166         RemoteSingle(final AbstractClientConnection<ShardBackendInfo> connection,
167             final LocalHistoryIdentifier identifier) {
168             super(connection, identifier);
169         }
170
171         @Override
172         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
173             return createSingle(connection, getIdentifier());
174         }
175     }
176
177     private static final class RequestReplayException extends RequestException {
178         private static final long serialVersionUID = 1L;
179
180         RequestReplayException(final String format, final Object... args) {
181             super(String.format(format, args));
182         }
183
184         @Override
185         public boolean isRetriable() {
186             return false;
187         }
188     }
189
190     private final class ReconnectCohort extends ProxyReconnectCohort {
191         @Override
192         public LocalHistoryIdentifier getIdentifier() {
193             return identifier;
194         }
195
196         @GuardedBy("lock")
197         @Override
198         void replaySuccessfulRequests() {
199             for (AbstractProxyTransaction t : proxies.values()) {
200                 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
201                 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
202                 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
203                 t.startReconnect(newProxy);
204             }
205         }
206
207         @GuardedBy("lock")
208         @Override
209         ProxyHistory finishReconnect() {
210             final ProxyHistory ret = Verify.verifyNotNull(successor);
211
212             for (AbstractProxyTransaction t : proxies.values()) {
213                 t.finishReconnect();
214             }
215
216             LOG.debug("Finished reconnecting proxy history {}", this);
217             lock.unlock();
218             return ret;
219         }
220
221         @Override
222         void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
223                 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
224             if (request instanceof TransactionRequest) {
225                 replayTransactionRequest((TransactionRequest<?>) request, callback);
226             } else if (request instanceof LocalHistoryRequest) {
227                 replayTo.accept(request, callback);
228             } else {
229                 throw new IllegalArgumentException("Unhandled request " + request);
230             }
231         }
232
233         private void replayTransactionRequest(final TransactionRequest<?> request,
234                 final Consumer<Response<?, ?>> callback) throws RequestException {
235
236             final AbstractProxyTransaction proxy;
237             lock.lock();
238             try {
239                 proxy = proxies.get(request.getTarget());
240             } finally {
241                 lock.unlock();
242             }
243             if (proxy == null) {
244                 throw new RequestReplayException("Failed to find proxy for %s", request);
245             }
246
247             proxy.replayRequest(request, callback);
248         }
249     }
250
251     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
252
253     private final Lock lock = new ReentrantLock();
254     private final LocalHistoryIdentifier identifier;
255     private final AbstractClientConnection<ShardBackendInfo> connection;
256
257     @GuardedBy("lock")
258     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
259     @GuardedBy("lock")
260     private ProxyHistory successor;
261
262     private ProxyHistory(final AbstractClientConnection<ShardBackendInfo> connection,
263             final LocalHistoryIdentifier identifier) {
264         this.connection = Preconditions.checkNotNull(connection);
265         this.identifier = Preconditions.checkNotNull(identifier);
266     }
267
268     static ProxyHistory createClient(final AbstractClientConnection<ShardBackendInfo> connection,
269             final LocalHistoryIdentifier identifier) {
270         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
271         return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get())
272              : new Remote(connection, identifier);
273     }
274
275     static ProxyHistory createSingle(final AbstractClientConnection<ShardBackendInfo> connection,
276             final LocalHistoryIdentifier identifier) {
277         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
278         return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get())
279              : new RemoteSingle(connection, identifier);
280     }
281
282     @Override
283     public LocalHistoryIdentifier getIdentifier() {
284         return identifier;
285     }
286
287     final ActorRef localActor() {
288         return connection.localActor();
289     }
290
291     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
292         lock.lock();
293         try {
294             if (successor != null) {
295                 return successor.createTransactionProxy(txId);
296             }
297
298             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
299             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId);
300             proxies.put(proxyId, ret);
301             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
302             return ret;
303         } finally {
304             lock.unlock();
305         }
306     }
307
308     final void abortTransaction(final AbstractProxyTransaction tx) {
309         lock.lock();
310         try {
311             proxies.remove(tx.getIdentifier());
312             LOG.debug("Proxy {} aborting transaction {}", this, tx);
313             onTransactionAborted(tx);
314         } finally {
315             lock.unlock();
316         }
317     }
318
319     final void completeTransaction(final AbstractProxyTransaction tx) {
320         lock.lock();
321         try {
322             proxies.remove(tx.getIdentifier());
323             LOG.debug("Proxy {} completing transaction {}", this, tx);
324             onTransactionCompleted(tx);
325         } finally {
326             lock.unlock();
327         }
328     }
329
330     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
331         connection.sendRequest(request, callback);
332     }
333
334     @GuardedBy("lock")
335     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
336             TransactionIdentifier txId);
337
338     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
339
340     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
341     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
342         lock.lock();
343         if (successor != null) {
344             lock.unlock();
345             throw new IllegalStateException("Proxy history " + this + " already has a successor");
346         }
347
348         successor = createSuccessor(newConnection);
349         LOG.debug("History {} instantiated successor {}", this, successor);
350         return new ReconnectCohort();
351     }
352
353     @GuardedBy("lock")
354     void onTransactionAborted(final AbstractProxyTransaction tx) {
355         // No-op for most implementations
356     }
357
358     @GuardedBy("lock")
359     void onTransactionCompleted(final AbstractProxyTransaction tx) {
360         // No-op for most implementations
361     }
362
363     void onTransactionSealed(final AbstractProxyTransaction tx) {
364         // No-op on most implementations
365     }
366 }