@Override
public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerDataTreeChangeListener(
final DOMDataTreeIdentifier treeId, final L listener) {
- DOMStore store = getTxFactories().get(treeId.getDatastoreType());
- checkState(store != null, "Requested logical data store is not available.");
-
+ DOMStore store = getDOMStore(treeId.getDatastoreType());
return ((DOMStoreTreeChangePublisher) store).registerTreeChangeListener(
treeId.getRootIdentifier(), listener);
}
@Override
public <T extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<T> registerCommitCohort(
org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier path, T cohort) {
- DOMStore store = getTxFactories().get(toLegacy(path.getDatastoreType()));
- checkState(store != null, "Requested logical data store is not available.");
-
+ DOMStore store = getDOMStore(toLegacy(path.getDatastoreType()));
return ((org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry) store)
.registerCommitCohort(path, cohort);
}
public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
final YangInstanceIdentifier path, final DOMDataChangeListener listener,
final DataChangeScope triggeringScope) {
- DOMStore potentialStore = getTxFactories().get(store);
- checkState(potentialStore != null, "Requested logical data store is not available.");
+ DOMStore potentialStore = getDOMStore(store);
return potentialStore.registerChangeListener(path, listener, triggeringScope);
}
backingChains);
return new DOMBrokerTransactionChain(chainId, backingChains, this, listener);
}
+
+ private DOMStore getDOMStore(final LogicalDatastoreType type) {
+ DOMStore store = getTxFactories().get(type);
+ checkState(store != null, "Requested logical data store is not available.");
+ return store;
+ }
}
this.allocationContext = allocationContext;
}
- static @Nonnull <T extends AbstractClientHandle<?>> T recordTransaction(
+ @Nonnull
+ static <T extends AbstractClientHandle<?>> T recordTransaction(
@Nonnull final ClientBackedTransaction<T> referent, @Nonnull final T transaction,
@Nullable final Throwable allocationContext) {
FINALIZERS.add(new Finalizer(referent, transaction, allocationContext));
*/
package org.opendaylight.controller.cluster.databroker;
+import static org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER;
+import static org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER;
+import static org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER;
+
import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractFuture;
doCanCommit(clientSubmitFuture, transaction, cohorts);
- return MappingCheckedFuture.create(clientSubmitFuture,
- TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
+ return MappingCheckedFuture.create(clientSubmitFuture, COMMIT_ERROR_MAPPER);
}
private void doCanCommit(final AsyncNotifyingSettableFuture clientSubmitFuture,
@Override
public void onSuccess(Boolean result) {
if (result == null || !result) {
- handleException(clientSubmitFuture, transaction, cohorts,
- CAN_COMMIT, TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER,
- new TransactionCommitFailedException(
- "Can Commit failed, no detailed cause available."));
+ handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT, CAN_COMMIT_ERROR_MAPPER,
+ new TransactionCommitFailedException("Can Commit failed, no detailed cause available."));
+ } else if (!cohortIterator.hasNext()) {
+ // All cohorts completed successfully - we can move on to the preCommit phase
+ doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
} else {
- if (!cohortIterator.hasNext()) {
- // All cohorts completed successfully - we can move on to the preCommit phase
- doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
- } else {
- ListenableFuture<Boolean> canCommitFuture = cohortIterator.next().canCommit();
- Futures.addCallback(canCommitFuture, this, MoreExecutors.directExecutor());
- }
+ Futures.addCallback(cohortIterator.next().canCommit(), this, MoreExecutors.directExecutor());
}
}
@Override
public void onFailure(Throwable failure) {
- handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT,
- TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER, failure);
+ handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT, CAN_COMMIT_ERROR_MAPPER, failure);
}
};
@Override
public void onFailure(Throwable failure) {
- handleException(clientSubmitFuture, transaction, cohorts, PRE_COMMIT,
- TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER, failure);
+ handleException(clientSubmitFuture, transaction, cohorts, PRE_COMMIT, PRE_COMMIT_MAPPER, failure);
}
};
@Override
public void onFailure(Throwable throwable) {
- handleException(clientSubmitFuture, transaction, cohorts, COMMIT,
- TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER, throwable);
+ handleException(clientSubmitFuture, transaction, cohorts, COMMIT, COMMIT_ERROR_MAPPER, throwable);
}
};
import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
import org.opendaylight.controller.cluster.access.client.ClientActorContext;
import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
+import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
+import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
startReconnect(h, newConn, cohorts);
}
- return previousEntries -> {
+ return previousEntries -> finishReconnect(newConn, stamp, cohorts, previousEntries);
+ }
+
+ private ReconnectForwarder finishReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn,
+ final long stamp, final Collection<HistoryReconnectCohort> cohorts,
+ final Collection<ConnectionEntry> previousEntries) {
+ try {
+ // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
+ // the non-throttling interface to the connection, hence we use a wrapper consumer
+ for (HistoryReconnectCohort c : cohorts) {
+ c.replayRequests(previousEntries);
+ }
+
+ // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
+ // requests will be immediately sent to it and requests being sent concurrently will get
+ // forwarded once they hit the new connection.
+ return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
+ } finally {
try {
- // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
- // the non-throttling interface to the connection, hence we use a wrapper consumer
+ // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
for (HistoryReconnectCohort c : cohorts) {
- c.replayRequests(previousEntries);
+ c.close();
}
-
- // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
- // requests will be immediately sent to it and requests being sent concurrently will get
- // forwarded once they hit the new connection.
- return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
} finally {
- try {
- // Step 4: Complete switchover of the connection. The cohorts can resume normal operations.
- for (HistoryReconnectCohort c : cohorts) {
- c.close();
- }
- } finally {
- lock.unlockWrite(stamp);
- }
+ lock.unlockWrite(stamp);
}
- };
+ }
}
private static void startReconnect(final AbstractClientHistory history,
}
}
- final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
+ final void recordSuccessfulRequest(@Nonnull final TransactionRequest<?> req) {
successfulRequests.add(Verify.verifyNotNull(req));
}
} else if (t instanceof RequestFailure) {
ret.voteNo(((RequestFailure<?, ?>) t).getCause().unwrap());
} else {
- ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
+ ret.voteNo(unhandledResponseException(t));
}
// This is a terminal request, hence we do not need to record it
ret.setException(cause);
}
} else {
- ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
+ ret.setException(unhandledResponseException(t));
}
// This is a terminal request, hence we do not need to record it
} else if (t instanceof RequestFailure) {
ret.voteNo(((RequestFailure<?, ?>) t).getCause().unwrap());
} else {
- ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
+ ret.voteNo(unhandledResponseException(t));
}
recordSuccessfulRequest(req);
} else if (t instanceof RequestFailure) {
ret.voteNo(((RequestFailure<?, ?>) t).getCause().unwrap());
} else {
- ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
+ ret.voteNo(unhandledResponseException(t));
}
onPreCommitComplete(req);
} else if (t instanceof RequestFailure) {
ret.voteNo(((RequestFailure<?, ?>) t).getCause().unwrap());
} else {
- ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
+ ret.voteNo(unhandledResponseException(t));
}
LOG.debug("Transaction {} doCommit completed", this);
for (Object obj : successfulRequests) {
if (obj instanceof TransactionRequest) {
LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
- successor.doReplayRequest((TransactionRequest<?>) obj, resp -> { }, now);
+ successor.doReplayRequest((TransactionRequest<?>) obj, resp -> { /*NOOP*/ }, now);
} else {
Verify.verify(obj instanceof IncrementSequence);
final IncrementSequence increment = (IncrementSequence) obj;
successor.doReplayRequest(new IncrementTransactionSequenceRequest(getIdentifier(),
- increment.getSequence(), localActor(), isSnapshotOnly(), increment.getDelta()), resp -> { },
- now);
+ increment.getSequence(), localActor(), isSnapshotOnly(),
+ increment.getDelta()), resp -> { /*NOOP*/ }, now);
LOG.debug("Incrementing sequence {} to successor {}", obj, successor);
}
}
abstract void handleReplayedRemoteRequest(TransactionRequest<?> request,
@Nullable Consumer<Response<?, ?>> callback, long enqueuedTicks);
+ private static IllegalStateException unhandledResponseException(Response<?, ?> resp) {
+ return new IllegalStateException("Unhandled response " + resp.getClass());
+ }
+
@Override
public final String toString() {
return MoreObjects.toStringHelper(this).add("identifier", getIdentifier()).add("state", state).toString();
FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, CONNECT_TIMEOUT))
.whenComplete((response, failure) -> {
- if (failure != null) {
- LOG.debug("Connect attempt to {} failed, will retry", shardName, failure);
- future.completeExceptionally(wrap("Connection attempt failed", failure));
- return;
- }
- if (response instanceof RequestFailure) {
- final Throwable cause = ((RequestFailure<?, ?>) response).getCause().unwrap();
- LOG.debug("Connect attempt to {} failed to process", shardName, cause);
- final Throwable result = cause instanceof NotLeaderException
- ? wrap("Leader moved during establishment", cause) : cause;
- future.completeExceptionally(result);
- return;
- }
-
- LOG.debug("Resolved backend information to {}", response);
- Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response %s",
- response);
- final ConnectClientSuccess success = (ConnectClientSuccess) response;
- future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(),
- success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(),
- success.getMaxMessages()));
+ onConnectResponse(shardName, cookie, future, response, failure);
});
}
+
+ private void onConnectResponse(final String shardName, final long cookie,
+ final CompletableFuture<ShardBackendInfo> future, final Object response, final Throwable failure) {
+ if (failure != null) {
+ LOG.debug("Connect attempt to {} failed, will retry", shardName, failure);
+ future.completeExceptionally(wrap("Connection attempt failed", failure));
+ return;
+ }
+ if (response instanceof RequestFailure) {
+ final Throwable cause = ((RequestFailure<?, ?>) response).getCause().unwrap();
+ LOG.debug("Connect attempt to {} failed to process", shardName, cause);
+ final Throwable result = cause instanceof NotLeaderException
+ ? wrap("Leader moved during establishment", cause) : cause;
+ future.completeExceptionally(result);
+ return;
+ }
+
+ LOG.debug("Resolved backend information to {}", response);
+ Preconditions.checkArgument(response instanceof ConnectClientSuccess, "Unhandled response %s",
+ response);
+ final ConnectClientSuccess success = (ConnectClientSuccess) response;
+ future.complete(new ShardBackendInfo(success.getBackend(), nextSessionId.getAndIncrement(),
+ success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie), success.getDataTree(),
+ success.getMaxMessages()));
+ }
}
@Override
public void exit() {
- final YangInstanceIdentifier parent = current.getParent();
- Preconditions.checkState(parent != null);
- current = parent;
+ final YangInstanceIdentifier currentParent = current.getParent();
+ Preconditions.checkState(currentParent != null);
+ current = currentParent;
}
@Override
return identifier;
}
- abstract @Nonnull DataTreeSnapshot readOnlyView();
+ @Nonnull
+ abstract DataTreeSnapshot readOnlyView();
abstract void applyForwardedModifyTransactionRequest(ModifyTransactionRequest request,
@Nullable Consumer<Response<?, ?>> callback);
}
private boolean handleReadRequest(final TransactionRequest<?> request,
- final @Nullable Consumer<Response<?, ?>> callback) {
+ @Nullable final Consumer<Response<?, ?>> callback) {
// Note we delay completion of read requests to limit the scope at which the client can run, as they have
// listeners, which we do not want to execute while we are reconnecting.
if (request instanceof ReadTransactionRequest) {
@Override
void handleReplayedRemoteRequest(final TransactionRequest<?> request,
- final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ @Nullable final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
if (request instanceof ModifyTransactionRequest) {
replayModifyTransactionRequest((ModifyTransactionRequest) request, callback, enqueuedTicks);
} else if (handleReadRequest(request, callback)) {
} else if (request instanceof ModifyTransactionRequest) {
successor.handleForwardedRequest(request, callback);
} else {
- throw new IllegalArgumentException("Unhandled request" + request);
+ throwUnhandledRequest(request);
}
}
} else if (request instanceof TransactionPurgeRequest) {
successor.enqueuePurge(callback);
} else {
- throw new IllegalArgumentException("Unhandled request" + request);
+ throwUnhandledRequest(request);
}
LOG.debug("Forwarded request {} to successor {}", request, successor);
}
+ private static void throwUnhandledRequest(final TransactionRequest<?> request) {
+ throw new IllegalArgumentException("Unhandled request" + request);
+ }
+
void sendAbort(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
sendRequest(request, callback);
}
@Override
void doDelete(final YangInstanceIdentifier path) {
- throw new UnsupportedOperationException("Read-only snapshot");
+ throw new UnsupportedOperationException("doDelete");
}
@Override
void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- throw new UnsupportedOperationException("Read-only snapshot");
+ throw new UnsupportedOperationException("doMerge");
}
@Override
void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- throw new UnsupportedOperationException("Read-only snapshot");
+ throw new UnsupportedOperationException("doWrite");
}
@Override
CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
- throw new UnsupportedOperationException("Read-only snapshot");
+ throw new UnsupportedOperationException("commitRequest");
}
@Override
@Override
void applyForwardedModifyTransactionRequest(final ModifyTransactionRequest request,
- final @Nullable Consumer<Response<?, ?>> callback) {
+ @Nullable final Consumer<Response<?, ?>> callback) {
commonModifyTransactionRequest(request, callback, this::sendRequest);
}
@Override
void replayModifyTransactionRequest(final ModifyTransactionRequest request,
- final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ @Nullable final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
commonModifyTransactionRequest(request, callback, (req, cb) -> enqueueRequest(req, cb, enqueuedTicks));
}
private void commonModifyTransactionRequest(final ModifyTransactionRequest request,
- final @Nullable Consumer<Response<?, ?>> callback,
+ @Nullable final Consumer<Response<?, ?>> callback,
final BiConsumer<TransactionRequest<?>, Consumer<Response<?, ?>>> sendMethod) {
for (final TransactionModification mod : request.getModifications()) {
if (mod instanceof TransactionWrite) {
@Override
void handleReplayedRemoteRequest(final TransactionRequest<?> request,
- final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ @Nullable final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
LOG.debug("Applying replayed request {}", request);
if (request instanceof TransactionPreCommitRequest) {
closedException = this::abortedException;
}
- private @Nonnull CursorAwareDataTreeModification getModification() {
+ @Nonnull
+ private CursorAwareDataTreeModification getModification() {
if (closedException != null) {
throw closedException.get();
}
@Override
void onTransactionCompleted(final AbstractProxyTransaction tx) {
Verify.verify(tx instanceof LocalProxyTransaction);
- if (tx instanceof LocalReadWriteProxyTransaction) {
- if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
- LOG.debug("Completed last sealed transaction {}", tx);
- }
+ if (tx instanceof LocalReadWriteProxyTransaction
+ && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) {
+ LOG.debug("Completed last sealed transaction {}", tx);
}
}
void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
if (request instanceof ModifyTransactionRequest) {
- final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
-
- req.getModifications().forEach(this::appendModification);
-
- final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
- if (maybeProto.isPresent()) {
- // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
- // until we know what we are going to do.
- if (markSealed()) {
- sealOnly();
- }
-
- final TransactionRequest<?> tmp;
- switch (maybeProto.get()) {
- case ABORT:
- tmp = abortRequest();
- sendRequest(tmp, resp -> {
- completeModify(tmp, resp);
- callback.accept(resp);
- });
- break;
- case SIMPLE:
- tmp = commitRequest(false);
- sendRequest(tmp, resp -> {
- completeModify(tmp, resp);
- callback.accept(resp);
- });
- break;
- case THREE_PHASE:
- tmp = commitRequest(true);
- sendRequest(tmp, resp -> {
- recordSuccessfulRequest(tmp);
- callback.accept(resp);
- });
- break;
- case READY:
- tmp = readyRequest();
- sendRequest(tmp, resp -> {
- recordSuccessfulRequest(tmp);
- callback.accept(resp);
- });
- break;
- default:
- throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
- }
- }
+ handleForwardedModifyTransactionRequest(callback, (ModifyTransactionRequest) request);
} else if (request instanceof ReadTransactionRequest) {
ensureFlushedBuider();
sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
}
}
+ private void handleForwardedModifyTransactionRequest(final Consumer<Response<?, ?>> callback,
+ final ModifyTransactionRequest req) {
+ req.getModifications().forEach(this::appendModification);
+
+ final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
+ if (maybeProto.isPresent()) {
+ // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+ // until we know what we are going to do.
+ if (markSealed()) {
+ sealOnly();
+ }
+
+ final TransactionRequest<?> tmp;
+ switch (maybeProto.get()) {
+ case ABORT:
+ tmp = abortRequest();
+ sendRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ callback.accept(resp);
+ });
+ break;
+ case SIMPLE:
+ tmp = commitRequest(false);
+ sendRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ callback.accept(resp);
+ });
+ break;
+ case THREE_PHASE:
+ tmp = commitRequest(true);
+ sendRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ callback.accept(resp);
+ });
+ break;
+ case READY:
+ tmp = readyRequest();
+ sendRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ callback.accept(resp);
+ });
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+ }
+ }
+ }
+
@Override
void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
final Consumer<Response<?, ?>> callback) {
@Override
void handleReplayedRemoteRequest(final TransactionRequest<?> request,
- final @Nullable Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
- final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { };
+ @Nullable final Consumer<Response<?, ?>> callback, final long enqueuedTicks) {
+ final Consumer<Response<?, ?>> cb = callback != null ? callback : resp -> { /* NOOP */ };
final Optional<Long> optTicks = Optional.of(Long.valueOf(enqueuedTicks));
if (request instanceof ModifyTransactionRequest) {
- final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
- for (TransactionModification mod : req.getModifications()) {
- appendModification(mod, optTicks);
- }
-
- final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
- if (maybeProto.isPresent()) {
- // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
- // until we know what we are going to do.
- if (markSealed()) {
- sealOnly();
- }
-
- final TransactionRequest<?> tmp;
- switch (maybeProto.get()) {
- case ABORT:
- tmp = abortRequest();
- enqueueRequest(tmp, resp -> {
- completeModify(tmp, resp);
- cb.accept(resp);
- }, enqueuedTicks);
- break;
- case SIMPLE:
- tmp = commitRequest(false);
- enqueueRequest(tmp, resp -> {
- completeModify(tmp, resp);
- cb.accept(resp);
- }, enqueuedTicks);
- break;
- case THREE_PHASE:
- tmp = commitRequest(true);
- enqueueRequest(tmp, resp -> {
- recordSuccessfulRequest(tmp);
- cb.accept(resp);
- }, enqueuedTicks);
- break;
- case READY:
- tmp = readyRequest();
- enqueueRequest(tmp, resp -> {
- recordSuccessfulRequest(tmp);
- cb.accept(resp);
- }, enqueuedTicks);
- break;
- default:
- throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
- }
- }
+ handleReplayedModifyTransactionRequest(enqueuedTicks, cb, (ModifyTransactionRequest) request);
} else if (request instanceof ReadTransactionRequest) {
ensureFlushedBuider(optTicks);
enqueueRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
throw new IllegalArgumentException("Unhandled request {}" + request);
}
}
+
+ private void handleReplayedModifyTransactionRequest(final long enqueuedTicks, final Consumer<Response<?, ?>> cb,
+ final ModifyTransactionRequest req) {
+ req.getModifications().forEach(this::appendModification);
+
+ final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
+ if (maybeProto.isPresent()) {
+ // Persistence protocol implies we are sealed, propagate the marker, but hold off doing other actions
+ // until we know what we are going to do.
+ if (markSealed()) {
+ sealOnly();
+ }
+
+ final TransactionRequest<?> tmp;
+ switch (maybeProto.get()) {
+ case ABORT:
+ tmp = abortRequest();
+ enqueueRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ case SIMPLE:
+ tmp = commitRequest(false);
+ enqueueRequest(tmp, resp -> {
+ completeModify(tmp, resp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ case THREE_PHASE:
+ tmp = commitRequest(true);
+ enqueueRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ case READY:
+ tmp = readyRequest();
+ enqueueRequest(tmp, resp -> {
+ recordSuccessfulRequest(tmp);
+ cb.accept(resp);
+ }, enqueuedTicks);
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
+ }
+ }
+ }
}
return tree.readTime();
}
- final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
+ @Nullable
+ final TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
final RequestEnvelope envelope, final long now) throws RequestException {
- final TransactionIdentifier id = request.getTarget();
- final UnsignedLong ul = UnsignedLong.fromLongBits(id.getTransactionId());
-
if (request instanceof TransactionPurgeRequest) {
- if (purgedTransactions.contains(ul)) {
- // Retransmitted purge request: nothing to do
- LOG.debug("{}: transaction {} already purged", persistenceId, id);
- return new TransactionPurgeResponse(id, request.getSequence());
- }
-
- // We perform two lookups instead of a straight remove, because once the map becomes empty we switch it
- // to an ImmutableMap, which does not allow remove().
- if (closedTransactions.containsKey(ul)) {
- tree.purgeTransaction(id, () -> {
- closedTransactions.remove(ul);
- if (closedTransactions.isEmpty()) {
- closedTransactions = ImmutableMap.of();
- }
-
- purgedTransactions.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul)));
- LOG.debug("{}: finished purging inherited transaction {}", persistenceId(), id);
- envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now);
- });
- return null;
- }
-
- final FrontendTransaction tx = transactions.get(id);
- if (tx == null) {
- // This should never happen because the purge callback removes the transaction and puts it into
- // purged transactions in one go. If it does, we warn about the situation and
- LOG.warn("{}: transaction {} not tracked in {}, but not present in active transactions", persistenceId,
- id, purgedTransactions);
- purgedTransactions.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul)));
- return new TransactionPurgeResponse(id, request.getSequence());
- }
-
- tree.purgeTransaction(id, () -> {
- purgedTransactions.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul)));
- transactions.remove(id);
- LOG.debug("{}: finished purging transaction {}", persistenceId(), id);
- envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now);
- });
- return null;
+ return handleTransactionPurgeRequest(request, envelope, now);
}
+ final TransactionIdentifier id = request.getTarget();
+ final UnsignedLong ul = UnsignedLong.fromLongBits(id.getTransactionId());
if (purgedTransactions.contains(ul)) {
LOG.warn("{}: Request {} is contained purged transactions {}", persistenceId, request, purgedTransactions);
throw new DeadTransactionException(purgedTransactions);
return tx.handleRequest(request, envelope, now);
}
+ private TransactionSuccess<?> handleTransactionPurgeRequest(final TransactionRequest<?> request,
+ final RequestEnvelope envelope, final long now) {
+ final TransactionIdentifier id = request.getTarget();
+ final UnsignedLong ul = UnsignedLong.fromLongBits(id.getTransactionId());
+ if (purgedTransactions.contains(ul)) {
+ // Retransmitted purge request: nothing to do
+ LOG.debug("{}: transaction {} already purged", persistenceId, id);
+ return new TransactionPurgeResponse(id, request.getSequence());
+ }
+
+ // We perform two lookups instead of a straight remove, because once the map becomes empty we switch it
+ // to an ImmutableMap, which does not allow remove().
+ if (closedTransactions.containsKey(ul)) {
+ tree.purgeTransaction(id, () -> {
+ closedTransactions.remove(ul);
+ if (closedTransactions.isEmpty()) {
+ closedTransactions = ImmutableMap.of();
+ }
+
+ purgedTransactions.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul)));
+ LOG.debug("{}: finished purging inherited transaction {}", persistenceId(), id);
+ envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now);
+ });
+ return null;
+ }
+
+ final FrontendTransaction tx = transactions.get(id);
+ if (tx == null) {
+ // This should never happen because the purge callback removes the transaction and puts it into
+ // purged transactions in one go. If it does, we warn about the situation and
+ LOG.warn("{}: transaction {} not tracked in {}, but not present in active transactions", persistenceId,
+ id, purgedTransactions);
+ purgedTransactions.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul)));
+ return new TransactionPurgeResponse(id, request.getSequence());
+ }
+
+ tree.purgeTransaction(id, () -> {
+ purgedTransactions.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul)));
+ transactions.remove(id);
+ LOG.debug("{}: finished purging transaction {}", persistenceId(), id);
+ envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now);
+ });
+
+ return null;
+ }
+
final void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
tree.closeTransactionChain(getIdentifier(),
tree.getStats().incrementReadWriteTransactionCount();
return createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification());
}
- if (request instanceof AbstractReadTransactionRequest) {
- if (((AbstractReadTransactionRequest<?>) request).isSnapshotOnly()) {
- LOG.debug("{}: allocating new open snapshot {}", persistenceId(), id);
- tree.getStats().incrementReadOnlyTransactionCount();
- return createOpenSnapshot(id);
- }
+ if (request instanceof AbstractReadTransactionRequest
+ && ((AbstractReadTransactionRequest<?>) request).isSnapshotOnly()) {
+ LOG.debug("{}: allocating new open snapshot {}", persistenceId(), id);
+ tree.getStats().incrementReadOnlyTransactionCount();
+ return createOpenSnapshot(id);
}
LOG.debug("{}: allocating new open transaction {}", persistenceId(), id);
import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Abstract base for transactions running on SharrdDataTree.
@NotThreadSafe
abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot>
implements Identifiable<TransactionIdentifier> {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractShardDataTreeTransaction.class);
-
private final ShardDataTreeTransactionParent parent;
private final TransactionIdentifier id;
private final T snapshot;
this.listener = listener;
}
- @SuppressWarnings("checkstyle:IllegalCatch")
+ @SuppressWarnings({"checkstyle:IllegalCatch",
+ "squid:S1166" /* Exception handlers should preserve the original exception */})
private void overlaySettings(ServiceReference<ConfigurationAdmin> configAdminServiceReference) {
try {
ConfigurationAdmin configAdmin = bundleContext.getService(configAdminServiceReference);
LOG.debug("Overlaying settings: {}", properties);
- if (introspector.update(properties)) {
- if (listener != null) {
- listener.onDatastoreContextUpdated(introspector.newContextFactory());
- }
+ if (introspector.update(properties) && listener != null) {
+ listener.onDatastoreContextUpdated(introspector.newContextFactory());
}
} else {
LOG.debug("No Configuration found for {}", CONFIG_ID);
import java.beans.MethodDescriptor;
import java.beans.PropertyDescriptor;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
* constructor that takes a single String argument. For primitive wrappers, this constructor
* converts from a String representation.
*/
- @SuppressWarnings("checkstyle:IllegalCatch")
+ // Disables "Either log or rethrow this exception" sonar warning
+ @SuppressWarnings("squid:S1166")
private static void introspectPrimitiveTypes() {
-
Set<Class<?>> primitives = ImmutableSet.<Class<?>>builder().addAll(
Primitives.allWrapperTypes()).add(String.class).build();
for (Class<?> primitive: primitives) {
try {
processPropertyType(primitive);
- } catch (Exception e) {
+ } catch (NoSuchMethodException e) {
// Ignore primitives that can't be constructed from a String, eg Character and Void.
+ } catch (SecurityException | IntrospectionException e) {
+ LOG.error("Error introspect primitive type {}", primitive, e);
}
}
}
* Finds the appropriate constructor for the specified type that we will use to construct
* instances.
*/
- private static void processPropertyType(Class<?> propertyType) throws Exception {
+ private static void processPropertyType(Class<?> propertyType) throws NoSuchMethodException, SecurityException,
+ IntrospectionException {
Class<?> wrappedType = Primitives.wrap(propertyType);
if (CONSTRUCTORS.containsKey(wrappedType)) {
return;
/**
* Finds the getter method on a yang-generated type for the specified property name.
*/
- private static void findYangTypeGetter(Class<?> type, String propertyName)
- throws Exception {
+ private static void findYangTypeGetter(Class<?> type, String propertyName) throws IntrospectionException {
for (PropertyDescriptor desc: Introspector.getBeanInfo(type).getPropertyDescriptors()) {
if (desc.getName().equals(propertyName)) {
YANG_TYPE_GETTERS.put(type, desc.getReadMethod());
}
}
- throw new IllegalArgumentException(String.format(
+ throw new IntrospectionException(String.format(
"Getter method for constructor property %s not found for YANG type %s",
propertyName, type));
}
Primitives.wrap(setter.getParameterTypes()[0]), value.toString()));
return true;
- } catch (Exception e) {
+ } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
+ | InstantiationException e) {
LOG.error("Error converting value ({}) for property {}", inValue, key, e);
}
return StringUtils.uncapitalize(str);
}
- private Object convertValue(String name, Object from) throws Exception {
+ private Object convertValue(String name, Object from)
+ throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
Class<?> propertyType = DATA_STORE_PROP_TYPES.get(name);
if (propertyType == null) {
LOG.debug("Property not found for {}", name);
return converted;
}
- private Object constructorValueRecursively(Class<?> toType, Object fromValue) throws Exception {
+ private Object constructorValueRecursively(Class<?> toType, Object fromValue)
+ throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
LOG.trace("convertValueRecursively - toType: {}, fromValue {} ({})",
toType.getSimpleName(), fromValue, fromValue.getClass().getSimpleName());
final AsyncDataChangeListener<YangInstanceIdentifier,NormalizedNode<?, ?>> listener,
final DataChangeScope scope, final DataTreeCandidate initialState, String logContext) {
DefaultShardDataChangeListenerPublisher publisher = new DefaultShardDataChangeListenerPublisher(logContext);
- publisher.registerDataChangeListener(path, listener, scope, Optional.absent(), noop -> { });
+ publisher.registerDataChangeListener(path, listener, scope, Optional.absent(), noop -> { /* NOOP */ });
publisher.publishChanges(initialState);
}
}
DefaultShardDataTreeChangeListenerPublisher publisher =
new DefaultShardDataTreeChangeListenerPublisher(logContext);
publisher.logContext = logContext;
- publisher.registerTreeChangeListener(treeId, listener, Optional.absent(), noop -> { });
+ publisher.registerTreeChangeListener(treeId, listener, Optional.absent(), noop -> { /* NOOP */ });
publisher.publishChanges(state);
}
private static final String DEFAULT_MODULE_SHARDS_PATH = "./configuration/initial/module-shards.conf";
private static final String DEFAULT_MODULES_PATH = "./configuration/initial/modules.conf";
+ private DistributedDataStoreFactory() {
+ }
+
/**
* Create a data store instance.
*
case READY:
throw new IllegalStateException("Attempted to preCommit in stage " + ready.stage);
default:
- throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+ throwUnhandledCommitStage(ready);
}
}
case READY:
throw new IllegalStateException("Attempted to doCommit in stage " + ready.stage);
default:
- throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+ throwUnhandledCommitStage(ready);
}
}
case PRE_COMMIT_PENDING:
throw new IllegalStateException("Attempted to canCommit in stage " + ready.stage);
default:
- throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+ throwUnhandledCommitStage(ready);
}
}
});
break;
default:
- throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+ throwUnhandledCommitStage(ready);
}
}
}
}
- private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
+ @Nullable
+ private TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
final RequestEnvelope envelope, final long now) throws RequestException {
// We need to examine the persistence protocol first to see if this is an idempotent request. If there is no
// protocol, there is nothing for us to do.
state);
return ((Sealed) state).sealedModification;
}
+
+ private static void throwUnhandledCommitStage(final Ready ready) {
+ throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+ }
}
// the cached remote leader actor is no longer available.
boolean retryCreateTransaction = primaryShardInfo != null
&& (failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException);
- if (retryCreateTransaction) {
- // Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may
- // be written by different threads however not concurrently, therefore decrementing it
- // non-atomically here is ok.
- if (totalCreateTxTimeout > 0) {
- long scheduleInterval = CREATE_TX_TRY_INTERVAL_IN_MS;
- if (failure instanceof AskTimeoutException) {
- // Since we use the createTxMessageTimeout for the CreateTransaction request and it timed
- // out, subtract it from the total timeout. Also since the createTxMessageTimeout period
- // has already elapsed, we can immediately schedule the retry (10 ms is virtually immediate).
- totalCreateTxTimeout -= createTxMessageTimeout.duration().toMillis();
- scheduleInterval = 10;
- }
-
- totalCreateTxTimeout -= scheduleInterval;
-
- LOG.debug("Tx {}: create tx on shard {} failed with exception \"{}\" - scheduling retry in {} ms",
- getIdentifier(), shardName, failure, scheduleInterval);
-
- getActorContext().getActorSystem().scheduler().scheduleOnce(
- FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS),
- this::tryFindPrimaryShard, getActorContext().getClientDispatcher());
- return;
+
+ // Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may
+ // be written by different threads however not concurrently, therefore decrementing it
+ // non-atomically here is ok.
+ if (retryCreateTransaction && totalCreateTxTimeout > 0) {
+ long scheduleInterval = CREATE_TX_TRY_INTERVAL_IN_MS;
+ if (failure instanceof AskTimeoutException) {
+ // Since we use the createTxMessageTimeout for the CreateTransaction request and it timed
+ // out, subtract it from the total timeout. Also since the createTxMessageTimeout period
+ // has already elapsed, we can immediately schedule the retry (10 ms is virtually immediate).
+ totalCreateTxTimeout -= createTxMessageTimeout.duration().toMillis();
+ scheduleInterval = 10;
}
+
+ totalCreateTxTimeout -= scheduleInterval;
+
+ LOG.debug("Tx {}: create tx on shard {} failed with exception \"{}\" - scheduling retry in {} ms",
+ getIdentifier(), shardName, failure, scheduleInterval);
+
+ getActorContext().getActorSystem().scheduler().scheduleOnce(
+ FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS),
+ this::tryFindPrimaryShard, getActorContext().getClientDispatcher());
+ return;
}
createTransactionContext(failure, response);
throw new OutOfSequenceEnvelopeException(0);
}
- private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) {
+ @Nonnull
+ private static ABIVersion selectVersion(final ConnectClientRequest message) {
final Range<ABIVersion> clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion());
for (ABIVersion v : SUPPORTED_ABIVERSIONS) {
if (clientRange.contains(v)) {
}
}
- private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+ @Nullable
+ private RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
throws RequestException {
// We are not the leader, hence we want to fail-fast.
if (!isLeader() || paused || !isLeaderActive()) {
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(batched, getSender(),
- "Could not commit transaction " + batched.getTransactionId());
+ "Could not process BatchedModifications " + batched.getTransactionId());
} else {
// If this is not the first batch and leadership changed in between batched messages,
// we need to reconstruct previous BatchedModifications from the transaction
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(message, getSender(),
- "Could not commit transaction " + message.getTransactionId());
+ "Could not process ready local transaction " + message.getTransactionId());
} else {
LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
- "Could not commit transaction " + forwardedReady.getTransactionId());
+ "Could not process forwarded ready transaction " + forwardedReady.getTransactionId());
} else {
LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
* @param snapshot Snapshot that needs to be applied
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applyRecoverySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ void applyRecoverySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
applySnapshot(snapshot, this::wrapWithPruning);
}
* @throws IOException when the snapshot fails to deserialize
* @throws DataValidationFailedException when the snapshot fails to apply
*/
- void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException {
+ void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException, DataValidationFailedException {
if (payload instanceof CommitTransactionPayload) {
final Entry<TransactionIdentifier, DataTreeCandidate> e =
((CommitTransactionPayload) payload).getCandidate();
*
* @return Metadata type
*/
- abstract @Nonnull Class<T> getSupportedType();
+ @Nonnull
+ abstract Class<T> getSupportedType();
/**
* Take a snapshot of current metadata state.
*
* @return Metadata snapshot, or null if the metadata is empty.
*/
- abstract @Nullable T toSnapshot();
+ @Nullable
+ abstract T toSnapshot();
// Lifecycle events
protected ShardTransaction(final ActorRef shardActor, final ShardStats shardStats,
final TransactionIdentifier transactionId) {
- super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
+ // actor name override used for metering. This does not change the "real" actor name
+ super("shard-tx");
this.shardActor = shardActor;
this.shardStats = shardStats;
this.transactionId = Preconditions.checkNotNull(transactionId);
@Override
public Collection<MemberName> getMembersFromShardName(final String shardName) {
- Preconditions.checkNotNull(shardName, "shardName should not be null");
+ checkNotNullShardName(shardName);
for (ModuleConfig moduleConfig: moduleConfigMap.values()) {
ShardConfig shardConfig = moduleConfig.getShardConfig(shardName);
return Collections.emptyList();
}
+ private static void checkNotNullShardName(final String shardName) {
+ Preconditions.checkNotNull(shardName, "shardName should not be null");
+ }
+
@Override
public Set<String> getAllShardNames() {
return allShardNames;
@Override
public boolean isShardConfigured(String shardName) {
- Preconditions.checkNotNull(shardName, "shardName should not be null");
+ checkNotNullShardName(shardName);
return allShardNames.contains(shardName);
}
@Override
public void addMemberReplicaForShard(String shardName, MemberName newMemberName) {
- Preconditions.checkNotNull(shardName, "shardName should not be null");
+ checkNotNullShardName(shardName);
Preconditions.checkNotNull(newMemberName, "MemberName should not be null");
for (ModuleConfig moduleConfig: moduleConfigMap.values()) {
@Override
public void removeMemberReplicaForShard(String shardName, MemberName newMemberName) {
- Preconditions.checkNotNull(shardName, "shardName should not be null");
+ checkNotNullShardName(shardName);
Preconditions.checkNotNull(newMemberName, "MemberName should not be null");
for (ModuleConfig moduleConfig: moduleConfigMap.values()) {
@Override
public void readExternal(final ObjectInput objectInput) throws IOException, ClassNotFoundException {
- final DOMDataTreeIdentifier prefix = (DOMDataTreeIdentifier) objectInput.readObject();
- final String strategyName = (String) objectInput.readObject();
+ final DOMDataTreeIdentifier localPrefix = (DOMDataTreeIdentifier) objectInput.readObject();
+ final String localStrategyName = (String) objectInput.readObject();
final int size = objectInput.readInt();
- final Collection<MemberName> shardMemberNames = new ArrayList<>(size);
+ final Collection<MemberName> localShardMemberNames = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
- shardMemberNames.add(MemberName.readFrom(objectInput));
+ localShardMemberNames.add(MemberName.readFrom(objectInput));
}
- prefixShardConfiguration = new PrefixShardConfiguration(prefix, strategyName, shardMemberNames);
+ prefixShardConfiguration = new PrefixShardConfiguration(localPrefix, localStrategyName,
+ localShardMemberNames);
}
private Object readResolve() {
.node(ENTITY_OWNER_QNAME).build();
void init(ShardDataTree shardDataTree) {
- shardDataTree.registerTreeChangeListener(EOS_PATH, this, Optional.absent(), noop -> { });
+ shardDataTree.registerTreeChangeListener(EOS_PATH, this, Optional.absent(), noop -> { /* NOOP */ });
}
protected static String extractOwner(LeafNode<?> ownerLeaf) {
void init(ShardDataTree shardDataTree) {
shardDataTree.registerTreeChangeListener(YangInstanceIdentifier.builder(ENTITY_OWNERS_PATH)
- .node(EntityType.QNAME).node(EntityType.QNAME).node(ENTITY_QNAME).node(ENTITY_QNAME)
- .node(Candidate.QNAME).node(Candidate.QNAME).build(), this, Optional.absent(), noop -> { });
+ .node(EntityType.QNAME).node(EntityType.QNAME).node(ENTITY_QNAME).node(ENTITY_QNAME)
+ .node(Candidate.QNAME).node(Candidate.QNAME).build(), this, Optional.absent(), noop -> { /* NOOP */ });
}
@Override
static final YangInstanceIdentifier ENTITY_TYPES_PATH =
YangInstanceIdentifier.of(EntityOwners.QNAME).node(EntityType.QNAME);
+ private EntityOwnersModel() {
+ }
+
static YangInstanceIdentifier entityPath(String entityType, YangInstanceIdentifier entityId) {
return YangInstanceIdentifier.builder(ENTITY_OWNERS_PATH).node(EntityType.QNAME)
.nodeWithKey(EntityType.QNAME, ENTITY_TYPE_QNAME, entityType).node(ENTITY_QNAME)
LOG.debug("Could not read strategy configuration file, will use default configuration");
} catch (IOException e1) {
- LOG.warn("Failed to get configuration for {}, starting up empty", CONFIG_ID);
+ LOG.warn("Failed to get configuration for {}, starting up empty", CONFIG_ID, e1);
return builder.build();
} finally {
try {
try {
clazz = EntityOwnerSelectionStrategyConfigReader.class.getClassLoader().loadClass(strategyClassAndDelay);
} catch (ClassNotFoundException e) {
- throw new IllegalArgumentException("Failed to load strategy " + strategyClassAndDelay);
+ throw new IllegalArgumentException("Failed to load strategy " + strategyClassAndDelay, e);
}
Preconditions.checkArgument(EntityOwnerSelectionStrategy.class.isAssignableFrom(clazz),
*/
public class ShardMBeanFactory {
+ private ShardMBeanFactory() {
+ }
+
public static ShardStats getShardStatsMBean(final String shardName, final String mxBeanType,
@Nonnull final Shard shard) {
String finalMXBeanType = mxBeanType != null ? mxBeanType : "DistDataStore";
public class ActorInitialized implements Serializable {
private static final long serialVersionUID = 1L;
+
+ public ActorInitialized() {
+ }
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- in.readShort(); // Read the version
+ // Read the version
+ in.readShort();
NormalizedNodeDataInput streamReader = NormalizedNodeInputOutput.newDataInputWithoutValidation(in);
public class DataChangedReply {
public static final DataChangedReply INSTANCE = new DataChangedReply();
+
+ private DataChangedReply() {
+ }
}
this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree);
}
- public @Nonnull String getPrimaryPath() {
+ @Nonnull
+ public String getPrimaryPath() {
return primaryPath;
}
- public @Nonnull DataTree getLocalShardDataTree() {
+ @Nonnull
+ public DataTree getLocalShardDataTree() {
return localShardDataTree;
}
/**
* Returns an ActorSelection representing the primary shard actor.
*/
- public @Nonnull ActorSelection getPrimaryShardActor() {
+ @Nonnull
+ public ActorSelection getPrimaryShardActor() {
return primaryShardActor;
}
* Returns an Optional whose value contains the primary shard's DataTree if the primary shard is local
* to the caller. Otherwise the Optional value is absent.
*/
- public @Nonnull Optional<DataTree> getLocalShardDataTree() {
+ @Nonnull
+ public Optional<DataTree> getLocalShardDataTree() {
return Optional.ofNullable(localShardDataTree);
}
}
this.localShardDataTree = null;
}
- public @Nonnull Optional<DataTree> getLocalShardDataTree() {
+ @Nonnull
+ public Optional<DataTree> getLocalShardDataTree() {
return Optional.ofNullable(localShardDataTree);
}
}
private static final long serialVersionUID = 1L;
public static final SuccessReply INSTANCE = new SuccessReply();
+
+ private SuccessReply() {
+ }
}
*
* @return Closest supported {@link PayloadVersion}
*/
- public final @Nonnull PayloadVersion getClosestVersion() {
+ @Nonnull
+ public final PayloadVersion getClosestVersion() {
return closestVersion;
}
out.writeByte(UNMODIFIED);
break;
default:
- throw new IllegalArgumentException("Unhandled node type " + node.getModificationType());
+ throwUnhandledNodeType(node);
}
}
writer.writeNormalizedNode(node.getDataAfter().get());
break;
default:
- throw new IllegalArgumentException("Unhandled node type " + node.getModificationType());
+ throwUnhandledNodeType(node);
}
}
}
+
+ private static void throwUnhandledNodeType(final DataTreeCandidateNode node) {
+ throw new IllegalArgumentException("Unhandled node type " + node.getModificationType());
+ }
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- String type = (String)in.readObject();
- ShardManagerSnapshot shardManagerSnapshot = (ShardManagerSnapshot) in.readObject();
+ String localType = (String)in.readObject();
+ ShardManagerSnapshot localShardManagerSnapshot = (ShardManagerSnapshot) in.readObject();
int size = in.readInt();
- List<ShardSnapshot> shardSnapshots = new ArrayList<>(size);
+ List<ShardSnapshot> localShardSnapshots = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
- shardSnapshots.add((ShardSnapshot) in.readObject());
+ localShardSnapshots.add((ShardSnapshot) in.readObject());
}
- datastoreSnapshot = new DatastoreSnapshot(type, shardManagerSnapshot, shardSnapshots);
+ datastoreSnapshot = new DatastoreSnapshot(localType, localShardManagerSnapshot, localShardSnapshots);
}
private Object readResolve() {
import java.io.ObjectOutput;
import java.util.Optional;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Abstract base class for snapshots of the ShardDataTree.
*/
@Beta
public abstract class ShardDataTreeSnapshot {
- private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeSnapshot.class);
-
ShardDataTreeSnapshot() {
// Hidden to prevent subclassing from outside of this package
}
*
* @return Externalizable proxy, may not be null
*/
- protected abstract @Nonnull Externalizable externalizableProxy();
+ @Nonnull
+ protected abstract Externalizable externalizableProxy();
public abstract Class<T> getType();
}
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
int size = in.readInt();
- List<String> shardList = new ArrayList<>(size);
+ List<String> localShardList = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
- shardList.add((String) in.readObject());
+ localShardList.add((String) in.readObject());
}
size = in.readInt();
- Map<DOMDataTreeIdentifier, PrefixShardConfiguration> prefixShardConfiguration = new HashMap<>(size);
+ Map<DOMDataTreeIdentifier, PrefixShardConfiguration> localPrefixShardConfiguration = new HashMap<>(size);
for (int i = 0; i < size; i++) {
- prefixShardConfiguration.put((DOMDataTreeIdentifier) in.readObject(),
+ localPrefixShardConfiguration.put((DOMDataTreeIdentifier) in.readObject(),
(PrefixShardConfiguration) in.readObject());
}
- snapshot = new ShardManagerSnapshot(shardList, prefixShardConfiguration);
+ snapshot = new ShardManagerSnapshot(localShardList, localPrefixShardConfiguration);
}
private Object readResolve() {
private void onInitConfigListener() {
LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
- final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
+ final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType =
org.opendaylight.mdsal.common.api.LogicalDatastoreType
.valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
}
configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
- configUpdateHandler.initListener(dataStore, type);
+ configUpdateHandler.initListener(dataStore, datastoreType);
}
private void onShutDown() {
final ActorRef sender = getSender();
if (sender == null) {
- return; //why is a non-actor sending this message? Just ignore.
+ // why is a non-actor sending this message? Just ignore.
+ return;
}
String actorName = sender.path().name();
}
}
- restoreFromSnapshot = null; // null out to GC
+ // null out to GC
+ restoreFromSnapshot = null;
for (String shardName : memberShardNames) {
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
* org.opendaylight.controller.cluster.datastore.ShardManagerSnapshot is removed.
*/
@Deprecated
- public static ShardManagerSnapshot forShardList(final @Nonnull List<String> shardList) {
+ public static ShardManagerSnapshot forShardList(@Nonnull final List<String> shardList) {
return new ShardManagerSnapshot(shardList);
}
public static final YangInstanceIdentifier SHARD_LIST_PATH =
PREFIX_SHARDS_PATH.node(SHARD_LIST_QNAME).toOptimized();
+ private ClusterUtils() {
+ }
+
public static ShardIdentifier getShardIdentifier(final MemberName memberName, final DOMDataTreeIdentifier prefix) {
final String type;
switch (prefix.getDatastoreType()) {
onCompleteTasks.add(task);
}
- @SuppressWarnings("checkstyle:IllegalCatch")
+ @SuppressWarnings({ "checkstyle:IllegalCatch", "squid:S1181" /* Throwable and Error should not be caught */ })
protected void notifyOnCompleteTasks(Throwable failure, T result) {
for (OnComplete<T> task: onCompleteTasks) {
try {
public class PrimaryShardInfoFutureCache {
private final Cache<String, Future<PrimaryShardInfo>> primaryShardInfoCache = CacheBuilder.newBuilder().build();
- public @Nullable Future<PrimaryShardInfo> getIfPresent(@Nonnull String shardName) {
+ @Nullable
+ public Future<PrimaryShardInfo> getIfPresent(@Nonnull String shardName) {
return primaryShardInfoCache.getIfPresent(shardName);
}
public class DOMDataTreeShardCreationFailedException extends Exception {
private static final long serialVersionUID = 1L;
- public DOMDataTreeShardCreationFailedException(final @Nonnull String message) {
+ public DOMDataTreeShardCreationFailedException(@Nonnull final String message) {
super(message);
}
- public DOMDataTreeShardCreationFailedException(final @Nonnull String message, final @Nonnull Throwable cause) {
+ public DOMDataTreeShardCreationFailedException(@Nonnull final String message, @Nonnull final Throwable cause) {
super(message, cause);
}
}
private final AbstractDataStore distributedDataStore;
private final YangInstanceIdentifier shardPath;
- // This will be useful for signaling back pressure
- private final DataStoreClient client;
-
private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
@GuardedBy("this")
final AbstractDataStore distributedDataStore,
final DOMDataTreeIdentifier prefix,
final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
- this.client = client;
this.distributedDataStore = distributedDataStore;
// TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
// maybe the whole listener logic would be better in the backend shards where we have direct access to the
// data tree yet. Postpone processing of these changes till we
// receive changes from current shard.
LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
- pathFromRoot, changes, dataTree);
+ pathFromRoot, changes, dataTree, e);
stashedDataTreeCandidates.addAll(newCandidates);
}
}
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.Collection;
-import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer;
import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
-import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataTreeShard;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Proxy producer implementation that creates transactions that forward all calls to {@link DataStoreClient}.
*/
class ShardProxyProducer implements DOMDataTreeShardProducer {
-
- private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataTreeShard.class);
- private static final AtomicLong COUNTER = new AtomicLong();
-
private final DOMDataTreeIdentifier shardRoot;
private final Collection<DOMDataTreeIdentifier> prefixes;
private final ClientLocalHistory history;
public ListenableFuture<Void> submit() {
LOG.debug("Submitting transaction for shard {}", shardRoot);
- Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+ checkTransactionReadied();
final AsyncFunction<Boolean, Void> validateFunction = input -> prepare();
final AsyncFunction<Void, Void> prepareFunction = input -> commit();
return Futures.transformAsync(prepareFuture, prepareFunction, MoreExecutors.directExecutor());
}
+ private void checkTransactionReadied() {
+ Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+ }
+
@Override
public ListenableFuture<Boolean> validate() {
LOG.debug("Validating transaction for shard {}", shardRoot);
- Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+ checkTransactionReadied();
final List<ListenableFuture<Boolean>> futures =
cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList());
final SettableFuture<Boolean> ret = SettableFuture.create();
public ListenableFuture<Void> prepare() {
LOG.debug("Preparing transaction for shard {}", shardRoot);
- Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+ checkTransactionReadied();
final List<ListenableFuture<Void>> futures =
cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList());
final SettableFuture<Void> ret = SettableFuture.create();
public ListenableFuture<Void> commit() {
LOG.debug("Committing transaction for shard {}", shardRoot);
- Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
+ checkTransactionReadied();
final List<ListenableFuture<Void>> futures =
cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList());
final SettableFuture<Void> ret = SettableFuture.create();
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Status.Success;
-import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberExited;
import akka.cluster.ClusterEvent.MemberRemoved;
private final int lookupTaskMaxRetries;
private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
- private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
-
- private final Cluster cluster;
-
- private final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
clusterWrapper.subscribeToMemberEvents(self());
- cluster = Cluster.get(actorSystem);
}
@Override