ad105c31f2f194ff610bac75ff8c3d432324b853
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ProxyHistory.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
17 import java.util.Map;
18 import java.util.Optional;
19 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.function.Consumer;
23 import javax.annotation.concurrent.GuardedBy;
24 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
25 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
26 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
27 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
28 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
29 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
31 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
33 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
34 import org.opendaylight.controller.cluster.access.concepts.Request;
35 import org.opendaylight.controller.cluster.access.concepts.RequestException;
36 import org.opendaylight.controller.cluster.access.concepts.Response;
37 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
38 import org.opendaylight.yangtools.concepts.Identifiable;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Per-connection representation of a local history. This class handles state replication across a single connection.
46  *
47  * @author Robert Varga
48  */
49 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
50     private abstract static class AbstractLocal extends ProxyHistory {
51         private final DataTree dataTree;
52
53         AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
54             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
55             super(parent, connection, identifier);
56             this.dataTree = Preconditions.checkNotNull(dataTree);
57         }
58
59         final DataTreeSnapshot takeSnapshot() {
60             return dataTree.takeSnapshot();
61         }
62     }
63
64     private abstract static class AbstractRemote extends ProxyHistory {
65         AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
66             final LocalHistoryIdentifier identifier) {
67             super(parent, connection, identifier);
68         }
69     }
70
71     private static final class Local extends AbstractLocal {
72         private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
73                 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
74
75         // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
76         // the open one and attempts to create a new transaction again.
77         private LocalReadWriteProxyTransaction lastOpen;
78
79         private volatile LocalReadWriteProxyTransaction lastSealed;
80
81         Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
82             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
83             super(parent, connection, identifier, dataTree);
84         }
85
86         @Override
87         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
88                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
89             Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
90
91             if (isDone) {
92                 // Done transactions do not register on our radar on should not have any state associated.
93                 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
94                         : new LocalReadWriteProxyTransaction(this, txId);
95             }
96
97             // onTransactionCompleted() runs concurrently
98             final LocalReadWriteProxyTransaction localSealed = lastSealed;
99             final DataTreeSnapshot baseSnapshot;
100             if (localSealed != null) {
101                 baseSnapshot = localSealed.getSnapshot();
102             } else {
103                 baseSnapshot = takeSnapshot();
104             }
105
106             if (snapshotOnly) {
107                 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
108             }
109
110             lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
111             LOG.debug("Proxy {} open transaction {}", this, lastOpen);
112             return lastOpen;
113         }
114
115         @Override
116         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
117             return createClient(parent(), connection, getIdentifier());
118         }
119
120         @Override
121         void onTransactionAborted(final AbstractProxyTransaction tx) {
122             if (tx.equals(lastOpen)) {
123                 lastOpen = null;
124             }
125         }
126
127         @Override
128         void onTransactionCompleted(final AbstractProxyTransaction tx) {
129             Verify.verify(tx instanceof LocalProxyTransaction);
130             if (tx instanceof LocalReadWriteProxyTransaction
131                     && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
132                 LOG.debug("Completed last sealed transaction {}", tx);
133             }
134         }
135
136         @Override
137         void onTransactionSealed(final AbstractProxyTransaction tx) {
138             Preconditions.checkState(tx.equals(lastOpen));
139             lastSealed = lastOpen;
140             lastOpen = null;
141         }
142     }
143
144     private static final class LocalSingle extends AbstractLocal {
145         LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
146             final LocalHistoryIdentifier identifier, final DataTree dataTree) {
147             super(parent, connection, identifier, dataTree);
148         }
149
150         @Override
151         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
152                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
153             final DataTreeSnapshot snapshot = takeSnapshot();
154             return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
155                 new LocalReadWriteProxyTransaction(this, txId, snapshot);
156         }
157
158         @Override
159         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
160             return createSingle(parent(), connection, getIdentifier());
161         }
162     }
163
164     private static final class Remote extends AbstractRemote {
165         Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
166             final LocalHistoryIdentifier identifier) {
167             super(parent, connection, identifier);
168         }
169
170         @Override
171         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
172                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
173             return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
174         }
175
176         @Override
177         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
178             return createClient(parent(), connection, getIdentifier());
179         }
180     }
181
182     private static final class RemoteSingle extends AbstractRemote {
183         RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
184             final LocalHistoryIdentifier identifier) {
185             super(parent, connection, identifier);
186         }
187
188         @Override
189         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
190                 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
191             return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
192         }
193
194         @Override
195         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
196             return createSingle(parent(), connection, getIdentifier());
197         }
198     }
199
200     private static final class RequestReplayException extends RequestException {
201         private static final long serialVersionUID = 1L;
202
203         RequestReplayException(final String format, final Object... args) {
204             super(String.format(format, args));
205         }
206
207         @Override
208         public boolean isRetriable() {
209             return false;
210         }
211     }
212
213     private final class ReconnectCohort extends ProxyReconnectCohort {
214         @Override
215         public LocalHistoryIdentifier getIdentifier() {
216             return identifier;
217         }
218
219         @GuardedBy("lock")
220         @Override
221         void replayRequests(final Collection<ConnectionEntry> previousEntries) {
222             // First look for our Create message
223             Iterator<ConnectionEntry> it = previousEntries.iterator();
224             while (it.hasNext()) {
225                 final ConnectionEntry e = it.next();
226                 final Request<?, ?> req = e.getRequest();
227                 if (identifier.equals(req.getTarget())) {
228                     Verify.verify(req instanceof LocalHistoryRequest);
229                     if (req instanceof CreateLocalHistoryRequest) {
230                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
231                         it.remove();
232                         break;
233                     }
234                 }
235             }
236
237             for (AbstractProxyTransaction t : proxies.values()) {
238                 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
239                 t.replayMessages(successor, previousEntries);
240             }
241
242             // Now look for any finalizing messages
243             it = previousEntries.iterator();
244             while (it.hasNext()) {
245                 final ConnectionEntry e  = it.next();
246                 final Request<?, ?> req = e.getRequest();
247                 if (identifier.equals(req.getTarget())) {
248                     Verify.verify(req instanceof LocalHistoryRequest);
249                     if (req instanceof DestroyLocalHistoryRequest) {
250                         successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
251                         it.remove();
252                         break;
253                     }
254                 }
255             }
256         }
257
258         @GuardedBy("lock")
259         @Override
260         ProxyHistory finishReconnect() {
261             final ProxyHistory ret = Verify.verifyNotNull(successor);
262
263             for (AbstractProxyTransaction t : proxies.values()) {
264                 t.finishReconnect();
265             }
266
267             LOG.debug("Finished reconnecting proxy history {}", this);
268             lock.unlock();
269             return ret;
270         }
271
272         @Override
273         void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
274                 throws RequestException {
275             final Request<?, ?> request = entry.getRequest();
276             if (request instanceof TransactionRequest) {
277                 lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
278                     entry.getEnqueuedTicks());
279             } else if (request instanceof LocalHistoryRequest) {
280                 replayTo.accept(entry);
281             } else {
282                 throw new IllegalArgumentException("Unhandled request " + request);
283             }
284         }
285
286         @Override
287         void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
288                 throws RequestException {
289             final Request<?, ?> request = entry.getRequest();
290             if (request instanceof TransactionRequest) {
291                 lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
292             } else if (request instanceof LocalHistoryRequest) {
293                 forwardTo.accept(entry);
294             } else {
295                 throw new IllegalArgumentException("Unhandled request " + request);
296             }
297         }
298
299         private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
300                 throws RequestReplayException {
301             final AbstractProxyTransaction proxy;
302             lock.lock();
303             try {
304                 proxy = proxies.get(request.getTarget());
305             } finally {
306                 lock.unlock();
307             }
308             if (proxy != null) {
309                 return proxy;
310             }
311
312             throw new RequestReplayException("Failed to find proxy for %s", request);
313         }
314     }
315
316     private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
317
318     private final Lock lock = new ReentrantLock();
319     private final LocalHistoryIdentifier identifier;
320     private final AbstractClientConnection<ShardBackendInfo> connection;
321     private final AbstractClientHistory parent;
322
323     @GuardedBy("lock")
324     private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
325     @GuardedBy("lock")
326     private ProxyHistory successor;
327
328     private ProxyHistory(final AbstractClientHistory parent,
329             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
330         this.parent = Preconditions.checkNotNull(parent);
331         this.connection = Preconditions.checkNotNull(connection);
332         this.identifier = Preconditions.checkNotNull(identifier);
333     }
334
335     static ProxyHistory createClient(final AbstractClientHistory parent,
336             final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
337         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
338         return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
339              : new Remote(parent, connection, identifier);
340     }
341
342     static ProxyHistory createSingle(final AbstractClientHistory parent,
343             final AbstractClientConnection<ShardBackendInfo> connection,
344             final LocalHistoryIdentifier identifier) {
345         final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
346         return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
347              : new RemoteSingle(parent, connection, identifier);
348     }
349
350     @Override
351     public LocalHistoryIdentifier getIdentifier() {
352         return identifier;
353     }
354
355     final ClientActorContext context() {
356         return connection.context();
357     }
358
359     final long currentTime() {
360         return connection.currentTime();
361     }
362
363     final ActorRef localActor() {
364         return connection.localActor();
365     }
366
367     final AbstractClientHistory parent() {
368         return parent;
369     }
370
371     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
372             final boolean snapshotOnly) {
373         return createTransactionProxy(txId, snapshotOnly, false);
374     }
375
376     AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
377             final boolean isDone) {
378         lock.lock();
379         try {
380             if (successor != null) {
381                 return successor.createTransactionProxy(txId, snapshotOnly, isDone);
382             }
383
384             final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
385             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
386             proxies.put(proxyId, ret);
387             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
388             return ret;
389         } finally {
390             lock.unlock();
391         }
392     }
393
394     final void abortTransaction(final AbstractProxyTransaction tx) {
395         lock.lock();
396         try {
397             // Removal will be completed once purge completes
398             LOG.debug("Proxy {} aborted transaction {}", this, tx);
399             onTransactionAborted(tx);
400         } finally {
401             lock.unlock();
402         }
403     }
404
405     final void completeTransaction(final AbstractProxyTransaction tx) {
406         lock.lock();
407         try {
408             // Removal will be completed once purge completes
409             LOG.debug("Proxy {} completing transaction {}", this, tx);
410             onTransactionCompleted(tx);
411         } finally {
412             lock.unlock();
413         }
414     }
415
416     void purgeTransaction(final AbstractProxyTransaction tx) {
417         lock.lock();
418         try {
419             proxies.remove(tx.getIdentifier());
420             LOG.debug("Proxy {} purged transaction {}", this, tx);
421         } finally {
422             lock.unlock();
423         }
424     }
425
426     final void close() {
427         lock.lock();
428         try {
429             if (successor != null) {
430                 successor.close();
431                 return;
432             }
433
434             LOG.debug("Proxy {} invoking destroy", this);
435             connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
436                 this::onDestroyComplete);
437         } finally {
438             lock.unlock();
439         }
440     }
441
442     final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
443             final long enqueuedTicks) {
444         connection.enqueueRequest(request, callback, enqueuedTicks);
445     }
446
447     final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
448         connection.sendRequest(request, callback);
449     }
450
451     @GuardedBy("lock")
452     @SuppressWarnings("checkstyle:hiddenField")
453     abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
454             TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
455
456     @SuppressWarnings("checkstyle:hiddenField")
457     abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
458
459     @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
460     ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
461         lock.lock();
462         if (successor != null) {
463             lock.unlock();
464             throw new IllegalStateException("Proxy history " + this + " already has a successor");
465         }
466
467         successor = createSuccessor(newConnection);
468         LOG.debug("History {} instantiated successor {}", this, successor);
469
470         for (AbstractProxyTransaction t : proxies.values()) {
471             t.startReconnect();
472         }
473
474         return new ReconnectCohort();
475     }
476
477     private void onDestroyComplete(final Response<?, ?> response) {
478         LOG.debug("Proxy {} destroy completed with {}", this, response);
479
480         lock.lock();
481         try {
482             parent.onProxyDestroyed(this);
483             connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
484                 this::onPurgeComplete);
485         } finally {
486             lock.unlock();
487         }
488     }
489
490     private void onPurgeComplete(final Response<?, ?> response) {
491         LOG.debug("Proxy {} purge completed with {}", this, response);
492     }
493
494     @GuardedBy("lock")
495     void onTransactionAborted(final AbstractProxyTransaction tx) {
496         // No-op for most implementations
497     }
498
499     @GuardedBy("lock")
500     void onTransactionCompleted(final AbstractProxyTransaction tx) {
501         // No-op for most implementations
502     }
503
504     void onTransactionSealed(final AbstractProxyTransaction tx) {
505         // No-op on most implementations
506     }
507 }