2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
13 import akka.actor.ActorRef;
14 import com.google.common.base.Verify;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.Collection;
17 import java.util.Iterator;
18 import java.util.LinkedHashMap;
20 import java.util.Optional;
21 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
22 import java.util.concurrent.locks.Lock;
23 import java.util.concurrent.locks.ReentrantLock;
24 import java.util.function.Consumer;
25 import org.checkerframework.checker.lock.qual.GuardedBy;
26 import org.checkerframework.checker.lock.qual.Holding;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
29 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
30 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
31 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
32 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
33 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
34 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
35 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
36 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
37 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
38 import org.opendaylight.controller.cluster.access.concepts.Request;
39 import org.opendaylight.controller.cluster.access.concepts.RequestException;
40 import org.opendaylight.controller.cluster.access.concepts.Response;
41 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
42 import org.opendaylight.yangtools.concepts.Identifiable;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
49 * Per-connection representation of a local history. This class handles state replication across a single connection.
51 * @author Robert Varga
53 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
54 private abstract static class AbstractLocal extends ProxyHistory {
55 private final ReadOnlyDataTree dataTree;
57 AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
58 final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
59 super(parent, connection, identifier);
60 this.dataTree = requireNonNull(dataTree);
63 final DataTreeSnapshot takeSnapshot() {
64 return dataTree.takeSnapshot();
68 private abstract static class AbstractRemote extends ProxyHistory {
69 AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
70 final LocalHistoryIdentifier identifier) {
71 super(parent, connection, identifier);
75 private static final class Local extends AbstractLocal {
76 private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
77 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
79 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
80 // the open one and attempts to create a new transaction again.
81 private LocalReadWriteProxyTransaction lastOpen;
83 private volatile LocalReadWriteProxyTransaction lastSealed;
85 Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
86 final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
87 super(parent, connection, identifier, dataTree);
91 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
92 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
93 checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
96 // Done transactions do not register on our radar on should not have any state associated.
97 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
98 : new LocalReadWriteProxyTransaction(this, txId);
101 // onTransactionCompleted() runs concurrently
102 final LocalReadWriteProxyTransaction localSealed = lastSealed;
103 final DataTreeSnapshot baseSnapshot;
104 if (localSealed != null) {
105 baseSnapshot = localSealed.getSnapshot();
107 baseSnapshot = takeSnapshot();
111 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
114 lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
115 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
120 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
121 return createClient(parent(), connection, getIdentifier());
125 void onTransactionAborted(final AbstractProxyTransaction tx) {
126 if (tx.equals(lastOpen)) {
132 void onTransactionCompleted(final AbstractProxyTransaction tx) {
133 Verify.verify(tx instanceof LocalProxyTransaction);
134 if (tx instanceof LocalReadWriteProxyTransaction
135 && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
136 LOG.debug("Completed last sealed transaction {}", tx);
141 void onTransactionSealed(final AbstractProxyTransaction tx) {
142 checkState(tx.equals(lastOpen));
143 lastSealed = lastOpen;
148 private static final class LocalSingle extends AbstractLocal {
149 LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
150 final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) {
151 super(parent, connection, identifier, dataTree);
155 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
156 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
157 final DataTreeSnapshot snapshot = takeSnapshot();
158 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
159 new LocalReadWriteProxyTransaction(this, txId, snapshot);
163 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
164 return createSingle(parent(), connection, getIdentifier());
168 private static final class Remote extends AbstractRemote {
169 Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
170 final LocalHistoryIdentifier identifier) {
171 super(parent, connection, identifier);
175 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
176 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
177 return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
181 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
182 return createClient(parent(), connection, getIdentifier());
186 private static final class RemoteSingle extends AbstractRemote {
187 RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
188 final LocalHistoryIdentifier identifier) {
189 super(parent, connection, identifier);
193 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
194 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
195 return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
199 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
200 return createSingle(parent(), connection, getIdentifier());
204 private static final class RequestReplayException extends RequestException {
205 private static final long serialVersionUID = 1L;
207 RequestReplayException(final String format, final Object... args) {
208 super(String.format(format, args));
212 public boolean isRetriable() {
217 private final class ReconnectCohort extends ProxyReconnectCohort {
219 public LocalHistoryIdentifier getIdentifier() {
225 void replayRequests(final Collection<ConnectionEntry> previousEntries) {
226 // First look for our Create message
227 Iterator<ConnectionEntry> it = previousEntries.iterator();
228 while (it.hasNext()) {
229 final ConnectionEntry e = it.next();
230 final Request<?, ?> req = e.getRequest();
231 if (identifier.equals(req.getTarget())) {
232 Verify.verify(req instanceof LocalHistoryRequest);
233 if (req instanceof CreateLocalHistoryRequest) {
234 successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
241 for (AbstractProxyTransaction t : proxies.values()) {
242 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
243 t.replayMessages(successor, previousEntries);
246 // Now look for any finalizing messages
247 it = previousEntries.iterator();
248 while (it.hasNext()) {
249 final ConnectionEntry e = it.next();
250 final Request<?, ?> req = e.getRequest();
251 if (identifier.equals(req.getTarget())) {
252 Verify.verify(req instanceof LocalHistoryRequest);
253 if (req instanceof DestroyLocalHistoryRequest) {
254 successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
264 ProxyHistory finishReconnect() {
265 final ProxyHistory ret = Verify.verifyNotNull(successor);
267 for (AbstractProxyTransaction t : proxies.values()) {
271 LOG.debug("Finished reconnecting proxy history {}", this);
277 void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
278 throws RequestException {
279 final Request<?, ?> request = entry.getRequest();
280 if (request instanceof TransactionRequest) {
281 lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
282 entry.getEnqueuedTicks());
283 } else if (request instanceof LocalHistoryRequest) {
284 replayTo.accept(entry);
286 throw new IllegalArgumentException("Unhandled request " + request);
291 void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
292 throws RequestException {
293 final Request<?, ?> request = entry.getRequest();
294 if (request instanceof TransactionRequest) {
295 lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
296 } else if (request instanceof LocalHistoryRequest) {
297 forwardTo.accept(entry);
299 throw new IllegalArgumentException("Unhandled request " + request);
303 private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
304 throws RequestReplayException {
305 final AbstractProxyTransaction proxy;
308 proxy = proxies.get(request.getTarget());
316 throw new RequestReplayException("Failed to find proxy for %s", request);
320 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
322 private final Lock lock = new ReentrantLock();
323 private final @NonNull LocalHistoryIdentifier identifier;
324 private final @NonNull AbstractClientConnection<ShardBackendInfo> connection;
325 private final @NonNull AbstractClientHistory parent;
328 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
330 private ProxyHistory successor;
332 private ProxyHistory(final AbstractClientHistory parent,
333 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
334 this.parent = requireNonNull(parent);
335 this.connection = requireNonNull(connection);
336 this.identifier = requireNonNull(identifier);
339 static ProxyHistory createClient(final AbstractClientHistory parent,
340 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
341 final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
342 return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
343 : new Remote(parent, connection, identifier);
346 static ProxyHistory createSingle(final AbstractClientHistory parent,
347 final AbstractClientConnection<ShardBackendInfo> connection,
348 final LocalHistoryIdentifier identifier) {
349 final Optional<ReadOnlyDataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
350 return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
351 : new RemoteSingle(parent, connection, identifier);
355 public LocalHistoryIdentifier getIdentifier() {
359 final ClientActorContext context() {
360 return connection.context();
363 final long currentTime() {
364 return connection.currentTime();
367 final ActorRef localActor() {
368 return connection.localActor();
371 final AbstractClientHistory parent() {
375 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
376 final boolean snapshotOnly) {
377 return createTransactionProxy(txId, snapshotOnly, false);
380 AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
381 final boolean isDone) {
384 if (successor != null) {
385 return successor.createTransactionProxy(txId, snapshotOnly, isDone);
388 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
389 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
390 proxies.put(proxyId, ret);
391 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
398 final void abortTransaction(final AbstractProxyTransaction tx) {
401 // Removal will be completed once purge completes
402 LOG.debug("Proxy {} aborted transaction {}", this, tx);
403 onTransactionAborted(tx);
409 final void completeTransaction(final AbstractProxyTransaction tx) {
412 // Removal will be completed once purge completes
413 LOG.debug("Proxy {} completing transaction {}", this, tx);
414 onTransactionCompleted(tx);
420 void purgeTransaction(final AbstractProxyTransaction tx) {
423 proxies.remove(tx.getIdentifier());
424 LOG.debug("Proxy {} purged transaction {}", this, tx);
433 if (successor != null) {
438 LOG.debug("Proxy {} invoking destroy", this);
439 connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
440 this::onDestroyComplete);
446 final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
447 final long enqueuedTicks) {
448 connection.enqueueRequest(request, callback, enqueuedTicks);
451 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
452 connection.sendRequest(request, callback);
456 @SuppressWarnings("checkstyle:hiddenField")
457 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
458 TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
461 @SuppressWarnings("checkstyle:hiddenField")
462 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
464 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
465 ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
467 if (successor != null) {
469 throw new IllegalStateException("Proxy history " + this + " already has a successor");
472 successor = createSuccessor(newConnection);
473 LOG.debug("History {} instantiated successor {}", this, successor);
475 for (AbstractProxyTransaction t : proxies.values()) {
479 return new ReconnectCohort();
482 private void onDestroyComplete(final Response<?, ?> response) {
483 LOG.debug("Proxy {} destroy completed with {}", this, response);
487 parent.onProxyDestroyed(this);
488 connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
489 this::onPurgeComplete);
495 private void onPurgeComplete(final Response<?, ?> response) {
496 LOG.debug("Proxy {} purge completed with {}", this, response);
500 void onTransactionAborted(final AbstractProxyTransaction tx) {
501 // No-op for most implementations
505 void onTransactionCompleted(final AbstractProxyTransaction tx) {
506 // No-op for most implementations
509 void onTransactionSealed(final AbstractProxyTransaction tx) {
510 // No-op on most implementations