2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import java.util.LinkedHashMap;
16 import java.util.Optional;
17 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18 import java.util.concurrent.locks.Lock;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.BiConsumer;
21 import java.util.function.Consumer;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
24 import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
25 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
26 import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
27 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
28 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
29 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
30 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
31 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
32 import org.opendaylight.controller.cluster.access.concepts.Request;
33 import org.opendaylight.controller.cluster.access.concepts.RequestException;
34 import org.opendaylight.controller.cluster.access.concepts.Response;
35 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
36 import org.opendaylight.yangtools.concepts.Identifiable;
37 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
38 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * Per-connection representation of a local history. This class handles state replication across a single connection.
45 * @author Robert Varga
47 abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
48 private abstract static class AbstractLocal extends ProxyHistory {
49 private final DataTree dataTree;
51 AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
52 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
53 super(parent, connection, identifier);
54 this.dataTree = Preconditions.checkNotNull(dataTree);
57 final DataTreeSnapshot takeSnapshot() {
58 return dataTree.takeSnapshot();
62 private abstract static class AbstractRemote extends ProxyHistory {
63 AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
64 final LocalHistoryIdentifier identifier) {
65 super(parent, connection, identifier);
69 private static final class Local extends AbstractLocal {
70 private static final AtomicReferenceFieldUpdater<Local, LocalReadWriteProxyTransaction> LAST_SEALED_UPDATER =
71 AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed");
73 // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting
74 // the open one and attempts to create a new transaction again.
75 private LocalReadWriteProxyTransaction lastOpen;
77 private volatile LocalReadWriteProxyTransaction lastSealed;
79 Local(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
80 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
81 super(parent, connection, identifier, dataTree);
85 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
86 final TransactionIdentifier txId, final boolean snapshotOnly) {
87 Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
89 // onTransactionCompleted() runs concurrently
90 final LocalReadWriteProxyTransaction localSealed = lastSealed;
91 final DataTreeSnapshot baseSnapshot;
92 if (localSealed != null) {
93 baseSnapshot = localSealed.getSnapshot();
95 baseSnapshot = takeSnapshot();
99 return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot);
102 lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot);
103 LOG.debug("Proxy {} open transaction {}", this, lastOpen);
108 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
109 return createClient(parent(), connection, getIdentifier());
113 void onTransactionAborted(final AbstractProxyTransaction tx) {
114 if (tx.equals(lastOpen)) {
120 void onTransactionCompleted(final AbstractProxyTransaction tx) {
121 Verify.verify(tx instanceof LocalProxyTransaction);
122 if (tx instanceof LocalReadWriteProxyTransaction) {
123 if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
124 LOG.debug("Completed last sealed transaction {}", tx);
130 void onTransactionSealed(final AbstractProxyTransaction tx) {
131 Preconditions.checkState(tx.equals(lastOpen));
132 lastSealed = lastOpen;
137 private static final class LocalSingle extends AbstractLocal {
138 LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
139 final LocalHistoryIdentifier identifier, final DataTree dataTree) {
140 super(parent, connection, identifier, dataTree);
144 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
145 final TransactionIdentifier txId, final boolean snapshotOnly) {
146 final DataTreeSnapshot snapshot = takeSnapshot();
147 return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) :
148 new LocalReadWriteProxyTransaction(this, txId, snapshot);
152 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
153 return createSingle(parent(), connection, getIdentifier());
157 private static final class Remote extends AbstractRemote {
158 Remote(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
159 final LocalHistoryIdentifier identifier) {
160 super(parent, connection, identifier);
164 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
165 final TransactionIdentifier txId, final boolean snapshotOnly) {
166 return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
170 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
171 return createClient(parent(), connection, getIdentifier());
175 private static final class RemoteSingle extends AbstractRemote {
176 RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection<ShardBackendInfo> connection,
177 final LocalHistoryIdentifier identifier) {
178 super(parent, connection, identifier);
182 AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
183 final TransactionIdentifier txId, final boolean snapshotOnly) {
184 return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
188 ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
189 return createSingle(parent(), connection, getIdentifier());
193 private static final class RequestReplayException extends RequestException {
194 private static final long serialVersionUID = 1L;
196 RequestReplayException(final String format, final Object... args) {
197 super(String.format(format, args));
201 public boolean isRetriable() {
206 private final class ReconnectCohort extends ProxyReconnectCohort {
208 public LocalHistoryIdentifier getIdentifier() {
214 void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
215 // First look for our Create message
216 for (ConnectionEntry e : previousEntries) {
217 final Request<?, ?> req = e.getRequest();
218 if (identifier.equals(req.getTarget())) {
219 Verify.verify(req instanceof LocalHistoryRequest);
220 if (req instanceof CreateLocalHistoryRequest) {
221 successor.connection.sendRequest(req, e.getCallback());
227 for (AbstractProxyTransaction t : proxies.values()) {
228 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
229 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
231 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
232 t.replayMessages(newProxy, previousEntries);
235 // Now look for any finalizing messages
236 for (ConnectionEntry e : previousEntries) {
237 final Request<?, ?> req = e.getRequest();
238 if (identifier.equals(req.getTarget())) {
239 Verify.verify(req instanceof LocalHistoryRequest);
240 successor.connection.sendRequest(req, e.getCallback());
247 ProxyHistory finishReconnect() {
248 final ProxyHistory ret = Verify.verifyNotNull(successor);
250 for (AbstractProxyTransaction t : proxies.values()) {
254 LOG.debug("Finished reconnecting proxy history {}", this);
260 void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
261 final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
262 if (request instanceof TransactionRequest) {
263 forwardTransactionRequest((TransactionRequest<?>) request, callback);
264 } else if (request instanceof LocalHistoryRequest) {
265 forwardTo.accept(request, callback);
267 throw new IllegalArgumentException("Unhandled request " + request);
271 private void forwardTransactionRequest(final TransactionRequest<?> request,
272 final Consumer<Response<?, ?>> callback) throws RequestException {
274 final AbstractProxyTransaction proxy;
277 proxy = proxies.get(request.getTarget());
282 throw new RequestReplayException("Failed to find proxy for %s", request);
285 proxy.forwardRequest(request, callback);
289 private static final Logger LOG = LoggerFactory.getLogger(ProxyHistory.class);
291 private final Lock lock = new ReentrantLock();
292 private final LocalHistoryIdentifier identifier;
293 private final AbstractClientConnection<ShardBackendInfo> connection;
294 private final AbstractClientHistory parent;
297 private final Map<TransactionIdentifier, AbstractProxyTransaction> proxies = new LinkedHashMap<>();
299 private ProxyHistory successor;
301 private ProxyHistory(final AbstractClientHistory parent,
302 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
303 this.parent = Preconditions.checkNotNull(parent);
304 this.connection = Preconditions.checkNotNull(connection);
305 this.identifier = Preconditions.checkNotNull(identifier);
308 static ProxyHistory createClient(final AbstractClientHistory parent,
309 final AbstractClientConnection<ShardBackendInfo> connection, final LocalHistoryIdentifier identifier) {
310 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
311 return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get())
312 : new Remote(parent, connection, identifier);
315 static ProxyHistory createSingle(final AbstractClientHistory parent,
316 final AbstractClientConnection<ShardBackendInfo> connection,
317 final LocalHistoryIdentifier identifier) {
318 final Optional<DataTree> dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree);
319 return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get())
320 : new RemoteSingle(parent, connection, identifier);
324 public LocalHistoryIdentifier getIdentifier() {
328 final ActorRef localActor() {
329 return connection.localActor();
332 final AbstractClientHistory parent() {
336 final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
337 final boolean snapshotOnly) {
340 if (successor != null) {
341 return successor.createTransactionProxy(txId, snapshotOnly);
344 final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
345 final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly);
346 proxies.put(proxyId, ret);
347 LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
354 final void abortTransaction(final AbstractProxyTransaction tx) {
357 proxies.remove(tx.getIdentifier());
358 LOG.debug("Proxy {} aborting transaction {}", this, tx);
359 onTransactionAborted(tx);
365 final void completeTransaction(final AbstractProxyTransaction tx) {
368 proxies.remove(tx.getIdentifier());
369 LOG.debug("Proxy {} completing transaction {}", this, tx);
370 onTransactionCompleted(tx);
379 if (successor != null) {
384 LOG.debug("Proxy {} invoking destroy", this);
385 connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()),
386 this::onDestroyComplete);
392 final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
393 connection.sendRequest(request, callback);
397 abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection<ShardBackendInfo> connection,
398 TransactionIdentifier txId, boolean snapshotOnly);
400 abstract ProxyHistory createSuccessor(AbstractClientConnection<ShardBackendInfo> connection);
402 @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort")
403 ProxyReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConnection) {
405 if (successor != null) {
407 throw new IllegalStateException("Proxy history " + this + " already has a successor");
410 successor = createSuccessor(newConnection);
411 LOG.debug("History {} instantiated successor {}", this, successor);
413 for (AbstractProxyTransaction t : proxies.values()) {
417 return new ReconnectCohort();
420 private void onDestroyComplete(final Response<?, ?> response) {
421 LOG.debug("Proxy {} destroy completed with {}", this, response);
425 parent.onProxyDestroyed(this);
426 connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()),
427 this::onPurgeComplete);
433 private void onPurgeComplete(final Response<?, ?> response) {
434 LOG.debug("Proxy {} purge completed with {}", this, response);
438 void onTransactionAborted(final AbstractProxyTransaction tx) {
439 // No-op for most implementations
443 void onTransactionCompleted(final AbstractProxyTransaction tx) {
444 // No-op for most implementations
447 void onTransactionSealed(final AbstractProxyTransaction tx) {
448 // No-op on most implementations