2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.LinkedHashMap;
18 import java.util.Optional;
19 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
20 import java.util.concurrent.locks.Lock;
21 import java.util.concurrent.locks.ReentrantLock;
22 import java.util.function.Consumer;
23 import javax.annotation.concurrent.GuardedBy;
24 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
25 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
26 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
27 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
28 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
29 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
31 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
32 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
33 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
34 import org.opendaylight.controller.cluster.access.concepts.Request;
35 import org.opendaylight.controller.cluster.access.concepts.RequestException;
36 import org.opendaylight.controller.cluster.access.concepts.Response;
37 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
38 import org.opendaylight.yangtools.concepts.Identifiable;
39 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
40 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
45 * Per-connection representation of a local history. This class handles state replication across a single connection.
47 * @author Robert Varga
49 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
50 private abstract static class AbstractLocal extends ProxyHistory {
51 private final DataTree dataTree;
53 AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
54 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
55 super(parent, connection, identifier);
56 this.dataTree = Preconditions.checkNotNull(dataTree);
59 final DataTreeSnapshot takeSnapshot() {
60 return dataTree.takeSnapshot();
64 private abstract static class AbstractRemote extends ProxyHistory {
65 AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
66 final LocalHistoryIdentifier identifier) {
67 super(parent, connection, identifier);
71 private static final class Local extends AbstractLocal {
72 private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
73 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
75 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
76 // the open one and attempts to create a new transaction again.
77 private LocalReadWriteProxyTransaction lastOpen;
79 private volatile LocalReadWriteProxyTransaction lastSealed;
81 Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
82 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
83 super(parent, connection, identifier, dataTree);
87 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
88 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
89 Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
92 // Done transactions do not register on our radar on should not have any state associated.
93 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId)
94 : new LocalReadWriteProxyTransaction(this, txId);
97 // onTransactionCompleted() runs concurrently
98 final LocalReadWriteProxyTransaction localSealed = lastSealed;
99 final DataTreeSnapshot baseSnapshot;
100 if (localSealed != null) {
101 baseSnapshot = localSealed.getSnapshot();
103 baseSnapshot = takeSnapshot();
107 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
110 lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
111 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
116 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
117 return createClient(parent(), connection, getIdentifier());
121 void onTransactionAborted(final AbstractProxyTransaction tx) {
122 if (tx.equals(lastOpen)) {
128 void onTransactionCompleted(final AbstractProxyTransaction tx) {
129 Verify.verify(tx instanceof LocalProxyTransaction);
130 if (tx instanceof LocalReadWriteProxyTransaction) {
131 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
132 LOG.debug("Completed last sealed transaction {}", tx);
138 void onTransactionSealed(final AbstractProxyTransaction tx) {
139 Preconditions.checkState(tx.equals(lastOpen));
140 lastSealed = lastOpen;
145 private static final class LocalSingle extends AbstractLocal {
146 LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
147 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
148 super(parent, connection, identifier, dataTree);
152 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
153 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
154 final DataTreeSnapshot snapshot = takeSnapshot();
155 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
156 new LocalReadWriteProxyTransaction(this, txId, snapshot);
160 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
161 return createSingle(parent(), connection, getIdentifier());
165 private static final class Remote extends AbstractRemote {
166 Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
167 final LocalHistoryIdentifier identifier) {
168 super(parent, connection, identifier);
172 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
173 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
174 return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone);
178 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
179 return createClient(parent(), connection, getIdentifier());
183 private static final class RemoteSingle extends AbstractRemote {
184 RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
185 final LocalHistoryIdentifier identifier) {
186 super(parent, connection, identifier);
190 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
191 final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) {
192 return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone);
196 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
197 return createSingle(parent(), connection, getIdentifier());
201 private static final class RequestReplayException extends RequestException {
202 private static final long serialVersionUID = 1L;
204 RequestReplayException(final String format, final Object... args) {
205 super(String.format(format, args));
209 public boolean isRetriable() {
214 private final class ReconnectCohort extends ProxyReconnectCohort {
216 public LocalHistoryIdentifier getIdentifier() {
222 void replayRequests(final Collection<ConnectionEntry> previousEntries) {
223 // First look for our Create message
224 Iterator<ConnectionEntry> it = previousEntries.iterator();
225 while (it.hasNext()) {
226 final ConnectionEntry e = it.next();
227 final Request<?, ?> req = e.getRequest();
228 if (identifier.equals(req.getTarget())) {
229 Verify.verify(req instanceof LocalHistoryRequest);
230 if (req instanceof CreateLocalHistoryRequest) {
231 successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
238 for (AbstractProxyTransaction t : proxies.values()) {
239 LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
240 t.replayMessages(successor, previousEntries);
243 // Now look for any finalizing messages
244 it = previousEntries.iterator();
245 while (it.hasNext()) {
246 final ConnectionEntry e = it.next();
247 final Request<?, ?> req = e.getRequest();
248 if (identifier.equals(req.getTarget())) {
249 Verify.verify(req instanceof LocalHistoryRequest);
250 if (req instanceof DestroyLocalHistoryRequest) {
251 successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
261 ProxyHistory finishReconnect() {
262 final ProxyHistory ret = Verify.verifyNotNull(successor);
264 for (AbstractProxyTransaction t : proxies.values()) {
268 LOG.debug("Finished reconnecting proxy history {}", this);
274 void replayEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> replayTo)
275 throws RequestException {
276 final Request<?, ?> request = entry.getRequest();
277 if (request instanceof TransactionRequest) {
278 lookupProxy(request).replayRequest((TransactionRequest<?>) request, entry.getCallback(),
279 entry.getEnqueuedTicks());
280 } else if (request instanceof LocalHistoryRequest) {
281 replayTo.accept(entry);
283 throw new IllegalArgumentException("Unhandled request " + request);
288 void forwardEntry(final ConnectionEntry entry, final Consumer<ConnectionEntry> forwardTo)
289 throws RequestException {
290 final Request<?, ?> request = entry.getRequest();
291 if (request instanceof TransactionRequest) {
292 lookupProxy(request).forwardRequest((TransactionRequest<?>) request, entry.getCallback());
293 } else if (request instanceof LocalHistoryRequest) {
294 forwardTo.accept(entry);
296 throw new IllegalArgumentException("Unhandled request " + request);
300 private AbstractProxyTransaction lookupProxy(final Request<?, ?> request)
301 throws RequestReplayException {
302 final AbstractProxyTransaction proxy;
305 proxy = proxies.get(request.getTarget());
313 throw new RequestReplayException("Failed to find proxy for %s", request);
317 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
319 private final Lock lock = new ReentrantLock();
320 private final LocalHistoryIdentifier identifier;
321 private final AbstractClientConnection<ShardBackendInfo> connection;
322 private final AbstractClientHistory parent;
325 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
327 private ProxyHistory successor;
329 private ProxyHistory(final AbstractClientHistory parent,
330 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
331 this.parent = Preconditions.checkNotNull(parent);
332 this.connection = Preconditions.checkNotNull(connection);
333 this.identifier = Preconditions.checkNotNull(identifier);
336 static ProxyHistory createClient(final AbstractClientHistory parent,
337 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
338 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
339 return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
340 : new Remote(parent, connection, identifier);
343 static ProxyHistory createSingle(final AbstractClientHistory parent,
344 final AbstractClientConnection<ShardBackendInfo> connection,
345 final LocalHistoryIdentifier identifier) {
346 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
347 return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
348 : new RemoteSingle(parent, connection, identifier);
352 public LocalHistoryIdentifier getIdentifier() {
356 final ClientActorContext context() {
357 return connection.context();
360 final long currentTime() {
361 return connection.currentTime();
364 final ActorRef localActor() {
365 return connection.localActor();
368 final AbstractClientHistory parent() {
372 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
373 final boolean snapshotOnly) {
374 return createTransactionProxy(txId, snapshotOnly, false);
377 AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly,
378 final boolean isDone) {
381 if (successor != null) {
382 return successor.createTransactionProxy(txId, snapshotOnly, isDone);
385 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
386 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone);
387 proxies.put(proxyId, ret);
388 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
395 final void abortTransaction(final AbstractProxyTransaction tx) {
398 // Removal will be completed once purge completes
399 LOG.debug("Proxy {} aborted transaction {}", this, tx);
400 onTransactionAborted(tx);
406 final void completeTransaction(final AbstractProxyTransaction tx) {
409 // Removal will be completed once purge completes
410 LOG.debug("Proxy {} completing transaction {}", this, tx);
411 onTransactionCompleted(tx);
417 void purgeTransaction(final AbstractProxyTransaction tx) {
420 proxies.remove(tx.getIdentifier());
421 LOG.debug("Proxy {} purged transaction {}", this, tx);
430 if (successor != null) {
435 LOG.debug("Proxy {} invoking destroy", this);
436 connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
437 this::onDestroyComplete);
443 final void enqueueRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback,
444 final long enqueuedTicks) {
445 connection.enqueueRequest(request, callback, enqueuedTicks);
448 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
449 connection.sendRequest(request, callback);
453 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
454 TransactionIdentifier txId, boolean snapshotOnly, boolean isDone);
456 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
458 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
459 ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
461 if (successor != null) {
463 throw new IllegalStateException("Proxy history " + this + " already has a successor");
466 successor = createSuccessor(newConnection);
467 LOG.debug("History {} instantiated successor {}", this, successor);
469 for (AbstractProxyTransaction t : proxies.values()) {
473 return new ReconnectCohort();
476 private void onDestroyComplete(final Response<?, ?> response) {
477 LOG.debug("Proxy {} destroy completed with {}", this, response);
481 parent.onProxyDestroyed(this);
482 connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
483 this::onPurgeComplete);
489 private void onPurgeComplete(final Response<?, ?> response) {
490 LOG.debug("Proxy {} purge completed with {}", this, response);
494 void onTransactionAborted(final AbstractProxyTransaction tx) {
495 // No-op for most implementations
499 void onTransactionCompleted(final AbstractProxyTransaction tx) {
500 // No-op for most implementations
503 void onTransactionSealed(final AbstractProxyTransaction tx) {
504 // No-op on most implementations