package org.opendaylight.controller.cluster.databroker;
import static com.google.common.base.Preconditions.checkState;
+
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.EnumMap;
}
if (treeChange) {
- extensions = ImmutableMap.<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension>of(DOMDataTreeChangeService.class, new DOMDataTreeChangeService() {
+ extensions = ImmutableMap.of(DOMDataTreeChangeService.class, new DOMDataTreeChangeService() {
@Override
- public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerDataTreeChangeListener(final DOMDataTreeIdentifier treeId, final L listener) {
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerDataTreeChangeListener(
+ final DOMDataTreeIdentifier treeId, final L listener) {
DOMStore publisher = getTxFactories().get(treeId.getDatastoreType());
checkState(publisher != null, "Requested logical data store is not available.");
- return ((DOMStoreTreeChangePublisher) publisher).registerTreeChangeListener(treeId.getRootIdentifier(), listener);
+ return ((DOMStoreTreeChangePublisher) publisher).registerTreeChangeListener(
+ treeId.getRootIdentifier(), listener);
}
});
} else {
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void close() {
super.close();
@Override
public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
- final YangInstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) {
-
+ final YangInstanceIdentifier path, final DOMDataChangeListener listener,
+ final DataChangeScope triggeringScope) {
DOMStore potentialStore = getTxFactories().get(store);
checkState(potentialStore != null, "Requested logical data store is not available.");
return potentialStore.registerChangeListener(path, listener, triggeringScope);
public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
checkNotClosed();
- final Map<LogicalDatastoreType, DOMStoreTransactionChain> backingChains = new EnumMap<>(LogicalDatastoreType.class);
+ final Map<LogicalDatastoreType, DOMStoreTransactionChain> backingChains =
+ new EnumMap<>(LogicalDatastoreType.class);
for (Map.Entry<LogicalDatastoreType, DOMStore> entry : getTxFactories().entrySet()) {
backingChains.put(entry.getKey(), entry.getValue().createTransactionChain());
}
public abstract class AbstractDOMBrokerTransaction<T extends DOMStoreTransaction> implements
AsyncTransaction<YangInstanceIdentifier, NormalizedNode<?, ?>> {
- private EnumMap<LogicalDatastoreType, T> backingTxs;
+ private final EnumMap<LogicalDatastoreType, T> backingTxs;
private final Object identifier;
private final Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories;
/**
- *
* Creates new composite Transactions.
*
- * @param identifier
- * Identifier of transaction.
+ * @param identifier Identifier of transaction.
*/
- protected AbstractDOMBrokerTransaction(final Object identifier, Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories) {
+ protected AbstractDOMBrokerTransaction(final Object identifier,
+ Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories) {
this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null");
- this.storeTxFactories = Preconditions.checkNotNull(storeTxFactories, "Store Transaction Factories should not be null");
+ this.storeTxFactories = Preconditions.checkNotNull(storeTxFactories,
+ "Store Transaction Factories should not be null");
this.backingTxs = new EnumMap<>(LogicalDatastoreType.class);
}
/**
* Returns subtransaction associated with supplied key.
*
- * @param key
- * @return
+ * @param key the data store type key
+ * @return the subtransaction
* @throws NullPointerException
* if key is null
* @throws IllegalArgumentException
Preconditions.checkNotNull(key, "key must not be null.");
T ret = backingTxs.get(key);
- if(ret == null){
+ if (ret == null) {
ret = createTransaction(key);
backingTxs.put(key, ret);
}
return identifier;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
protected void closeSubtransactions() {
/*
* We share one exception for all failures, which are added
}
}
- protected DOMStoreTransactionFactory getTxFactory(LogicalDatastoreType type){
+ protected DOMStoreTransactionFactory getTxFactory(LogicalDatastoreType type) {
return storeTxFactories.get(type);
}
}
extends AbstractDOMBrokerTransaction<T> implements DOMDataWriteTransaction {
@SuppressWarnings("rawtypes")
- private static final AtomicReferenceFieldUpdater<AbstractDOMBrokerWriteTransaction, AbstractDOMTransactionFactory> IMPL_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(AbstractDOMBrokerWriteTransaction.class, AbstractDOMTransactionFactory.class, "commitImpl");
+ private static final AtomicReferenceFieldUpdater<AbstractDOMBrokerWriteTransaction, AbstractDOMTransactionFactory>
+ IMPL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractDOMBrokerWriteTransaction.class,
+ AbstractDOMTransactionFactory.class, "commitImpl");
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<AbstractDOMBrokerWriteTransaction, Future> FUTURE_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(AbstractDOMBrokerWriteTransaction.class, Future.class, "commitFuture");
+ AtomicReferenceFieldUpdater.newUpdater(AbstractDOMBrokerWriteTransaction.class, Future.class,
+ "commitFuture");
private static final Logger LOG = LoggerFactory.getLogger(AbstractDOMBrokerWriteTransaction.class);
private static final Future<?> CANCELLED_FUTURE = Futures.immediateCancelledFuture();
* Future task of transaction commit. It starts off as null, but is
* set appropriately on {@link #submit()} and {@link #cancel()} via
* {@link AtomicReferenceFieldUpdater#lazySet(Object, Object)}.
- *
+ * <p/>
* Lazy set is safe for use because it is only referenced to in the
* {@link #cancel()} slow path, where we will busy-wait for it. The
* fast path gets the benefit of a store-store barrier instead of the
private volatile Future<?> commitFuture;
protected AbstractDOMBrokerWriteTransaction(final Object identifier,
- final Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories, final AbstractDOMTransactionFactory<?> commitImpl) {
+ final Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories,
+ final AbstractDOMTransactionFactory<?> commitImpl) {
super(identifier, storeTxFactories);
this.commitImpl = Preconditions.checkNotNull(commitImpl, "commitImpl must not be null.");
}
@Override
- public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> data) {
checkRunning(commitImpl);
checkInstanceIdentifierReferencesData(path,data);
getSubtransaction(store).write(path, data);
final NormalizedNode<?, ?> data) {
final PathArgument lastArg = path.getLastPathArgument();
Preconditions.checkArgument(
- (lastArg == data.getIdentifier()) || (lastArg != null && lastArg.equals(data.getIdentifier())),
+ lastArg == data.getIdentifier() || lastArg != null && lastArg.equals(data.getIdentifier()),
"Instance identifier references %s but data identifier is %s", lastArg, data);
}
}
@Override
- public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
+ final NormalizedNode<?, ?> data) {
checkRunning(commitImpl);
checkInstanceIdentifierReferencesData(path, data);
getSubtransaction(store).merge(path, data);
/**
* Implementations must return unique identifier for each and every call of
- * this method;
+ * this method.
*
* @return new Unique transaction identifier.
*/
protected abstract Object newTransactionIdentifier();
/**
+ * Submits a transaction asynchronously for commit.
*
- * @param transaction
- * @param cohorts
- * @return
+ * @param transaction the transaction to submit
+ * @param cohorts the associated cohorts
+ * @return a resulting Future
*/
- protected abstract CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
- final Collection<DOMStoreThreePhaseCommitCohort> cohorts);
+ protected abstract CheckedFuture<Void,TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction,
+ Collection<DOMStoreThreePhaseCommitCohort> cohorts);
/**
+ * Creates a new read-only transaction.
*
- * @return
+ * @return the transaction instance
*/
public final DOMDataReadOnlyTransaction newReadOnlyTransaction() {
checkNotClosed();
/**
+ * Creates a new write-only transaction.
*
- * @return
+ * @return the transaction instance
*/
public final DOMDataWriteTransaction newWriteOnlyTransaction() {
checkNotClosed();
/**
+ * Creates a new read-write transaction.
*
- * @return
+ * @return the transaction instance
*/
public final DOMDataReadWriteTransaction newReadWriteTransaction() {
checkNotClosed();
*/
private final Executor clientFutureCallbackExecutor;
- public ConcurrentDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores, Executor listenableFutureExecutor) {
+ public ConcurrentDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores,
+ Executor listenableFutureExecutor) {
this(datastores, listenableFutureExecutor, DurationStatisticsTracker.createConcurrent());
}
- public ConcurrentDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores, Executor listenableFutureExecutor,
- DurationStatisticsTracker commitStatsTracker) {
+ public ConcurrentDOMDataBroker(final Map<LogicalDatastoreType, DOMStore> datastores,
+ Executor listenableFutureExecutor, DurationStatisticsTracker commitStatsTracker) {
super(datastores);
this.clientFutureCallbackExecutor = Preconditions.checkNotNull(listenableFutureExecutor);
this.commitStatsTracker = Preconditions.checkNotNull(commitStatsTracker);
Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
- if(cohorts.isEmpty()){
+ if (cohorts.isEmpty()) {
return Futures.immediateCheckedFuture(null);
}
new TransactionCommitFailedException(
"Can Commit failed, no detailed cause available."));
} else {
- if(!cohortIterator.hasNext()) {
+ if (!cohortIterator.hasNext()) {
// All cohorts completed successfully - we can move on to the preCommit phase
doPreCommit(startTime, clientSubmitFuture, transaction, cohorts);
} else {
}
@Override
- public void onFailure(Throwable t) {
+ public void onFailure(Throwable failure) {
handleException(clientSubmitFuture, transaction, cohorts, CAN_COMMIT,
- TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER, t);
+ TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER, failure);
}
};
FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
@Override
public void onSuccess(Void notUsed) {
- if(!cohortIterator.hasNext()) {
+ if (!cohortIterator.hasNext()) {
// All cohorts completed successfully - we can move on to the commit phase
doCommit(startTime, clientSubmitFuture, transaction, cohorts);
} else {
}
@Override
- public void onFailure(Throwable t) {
+ public void onFailure(Throwable failure) {
handleException(clientSubmitFuture, transaction, cohorts, PRE_COMMIT,
- TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER, t);
+ TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER, failure);
}
};
FutureCallback<Void> futureCallback = new FutureCallback<Void>() {
@Override
public void onSuccess(Void notUsed) {
- if(!cohortIterator.hasNext()) {
+ if (!cohortIterator.hasNext()) {
// All cohorts completed successfully - we're done.
commitStatsTracker.addDuration(System.nanoTime() - startTime);
}
@Override
- public void onFailure(Throwable t) {
+ public void onFailure(Throwable throwable) {
handleException(clientSubmitFuture, transaction, cohorts, COMMIT,
- TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER, t);
+ TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER, throwable);
}
};
final DOMDataWriteTransaction transaction,
final Collection<DOMStoreThreePhaseCommitCohort> cohorts,
final String phase, final TransactionCommitFailedExceptionMapper exMapper,
- final Throwable t) {
+ final Throwable throwable) {
if (clientSubmitFuture.isDone()) {
// We must have had failures from multiple cohorts.
return;
}
- LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, t);
+ LOG.warn("Tx: {} Error during phase {}, starting Abort", transaction.getIdentifier(), phase, throwable);
final Exception e;
- if(t instanceof NoShardLeaderException || t instanceof ShardLeaderNotRespondingException) {
- e = new DataStoreUnavailableException(t.getMessage(), t);
- } else if (t instanceof Exception) {
- e = (Exception)t;
+ if (throwable instanceof NoShardLeaderException || throwable instanceof ShardLeaderNotRespondingException) {
+ e = new DataStoreUnavailableException(throwable.getMessage(), throwable);
+ } else if (throwable instanceof Exception) {
+ e = (Exception)throwable;
} else {
- e = new RuntimeException("Unexpected error occurred", t);
+ e = new RuntimeException("Unexpected error occurred", throwable);
}
final TransactionCommitFailedException clientException = exMapper.apply(e);
@SuppressWarnings("unchecked")
ListenableFuture<Void>[] canCommitFutures = new ListenableFuture[cohorts.size()];
- int i = 0;
+ int index = 0;
for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- canCommitFutures[i++] = cohort.abort();
+ canCommitFutures[index++] = cohort.abort();
}
ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(canCommitFutures);
}
@Override
- public void onFailure(Throwable t) {
- LOG.error("Tx: {} Error during Abort.", transaction.getIdentifier(), t);
+ public void onFailure(Throwable failure) {
+ LOG.error("Tx: {} Error during Abort.", transaction.getIdentifier(), failure);
// Propagate the original exception as that is what caused the Tx to fail and is
// what's interesting to the client.
* the thread that completed this future, as a common use case is to pass an executor that runs
* tasks in the same thread as the caller (ie MoreExecutors#sameThreadExecutor)
* to {@link #addListener}.
- *
* FIXME: This class should probably be moved to yangtools common utils for re-usability and
* unified with AsyncNotifyingListenableFutureTask.
*/
*
* @param identifier Identifier of transaction.
*/
- protected DOMBrokerReadOnlyTransaction(Object identifier, Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories) {
+ protected DOMBrokerReadOnlyTransaction(Object identifier,
+ Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories) {
super(identifier, storeTxFactories);
}
public class DOMBrokerReadWriteTransaction
extends AbstractDOMBrokerWriteTransaction<DOMStoreReadWriteTransaction> implements DOMDataReadWriteTransaction {
+
/**
- * Creates new composite Transactions.
+ * Constructs an instance.
*
- * @param identifier Identifier of transaction.
- * @param storeTxFactories
+ * @param identifier identifier of transaction.
+ * @param storeTxFactories the backing transaction store factories
*/
- protected DOMBrokerReadWriteTransaction(Object identifier, Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories, final AbstractDOMTransactionFactory<?> commitImpl) {
+ protected DOMBrokerReadWriteTransaction(Object identifier,
+ Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories,
+ final AbstractDOMTransactionFactory<?> commitImpl) {
super(identifier, storeTxFactories, commitImpl);
}
protected DOMStoreReadWriteTransaction createTransaction(LogicalDatastoreType key) {
return getTxFactory(key).newReadWriteTransaction();
}
-
-
}
private volatile int counter = 0;
/**
+ * Constructs an instance.
*
* @param chainId
* ID of transaction chain
* @throws NullPointerException
* If any of arguments is null.
*/
- public DOMBrokerTransactionChain(final long chainId,
- final Map<LogicalDatastoreType, DOMStoreTransactionChain> chains,
- AbstractDOMBroker broker, final TransactionChainListener listener) {
+ DOMBrokerTransactionChain(final long chainId, final Map<LogicalDatastoreType, DOMStoreTransactionChain> chains,
+ AbstractDOMBroker broker, final TransactionChainListener listener) {
super(chains);
this.chainId = chainId;
this.broker = Preconditions.checkNotNull(broker);
}
@Override
- public void onFailure(final Throwable t) {
- transactionFailed(transaction, t);
+ public void onFailure(final Throwable failure) {
+ transactionFailed(transaction, failure);
}
});
LOG.debug("Transaction chain {}Â failed.", this, cause);
listener.onTransactionChainFailed(this, tx, cause);
}
-}
\ No newline at end of file
+}
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
public class DOMBrokerWriteOnlyTransaction extends AbstractDOMBrokerWriteTransaction<DOMStoreWriteTransaction> {
+
/**
- * Creates new composite Transactions.
+ * Constructs an instance.
*
- * @param identifier
- * Identifier of transaction.
- * @param storeTxFactories
+ * @param identifier identifier of transaction.
+ * @param storeTxFactories the backing transaction store factories
*/
public DOMBrokerWriteOnlyTransaction(Object identifier,
Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories,
}
@Override
- public final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
+ public final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+ final YangInstanceIdentifier path) {
return transaction().read(path);
}
*
* @author Robert Varga
*/
-final class ShardedDOMStoreReadWriteTransaction extends ShardedDOMStoreReadTransaction implements DOMStoreReadWriteTransaction {
-
+final class ShardedDOMStoreReadWriteTransaction extends ShardedDOMStoreReadTransaction
+ implements DOMStoreReadWriteTransaction {
ShardedDOMStoreReadWriteTransaction(final ClientTransaction tx) {
super(tx);
}
}
@Override
- public final void close() {
+ public void close() {
transaction().abort();
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-abstract class AbstractDataListenerSupport<L extends EventListener, R extends ListenerRegistrationMessage,
- D extends DelayedListenerRegistration<L, R>, LR extends ListenerRegistration<L>>
- extends LeaderLocalDelegateFactory<R, LR, Optional<DataTreeCandidate>> {
+abstract class AbstractDataListenerSupport<L extends EventListener, M extends ListenerRegistrationMessage,
+ D extends DelayedListenerRegistration<L, M>, R extends ListenerRegistration<L>>
+ extends LeaderLocalDelegateFactory<M, R, Optional<DataTreeCandidate>> {
private final Logger log = LoggerFactory.getLogger(getClass());
private final ArrayList<D> delayedListenerRegistrations = new ArrayList<>();
log.debug("{}: onLeadershipChange, isLeader: {}, hasLeader : {}", persistenceId(), isLeader, hasLeader);
final EnableNotification msg = new EnableNotification(isLeader);
- for(ActorSelection dataChangeListener : actors) {
+ for (ActorSelection dataChangeListener : actors) {
dataChangeListener.tell(msg, getSelf());
}
- if(hasLeader) {
- for(D reg : delayedListenerOnAllRegistrations) {
+ if (hasLeader) {
+ for (D reg : delayedListenerOnAllRegistrations) {
reg.createDelegate(this);
}
delayedListenerOnAllRegistrations.trimToSize();
}
- if(isLeader) {
- for(D reg : delayedListenerRegistrations) {
+ if (isLeader) {
+ for (D reg : delayedListenerRegistrations) {
reg.createDelegate(this);
}
}
@Override
- void onMessage(R message, boolean isLeader, boolean hasLeader) {
+ void onMessage(M message, boolean isLeader, boolean hasLeader) {
log.debug("{}: {} for {}, leader: {}", persistenceId(), logName(), message.getPath(), isLeader);
final ListenerRegistration<L> registration;
- if((hasLeader && message.isRegisterOnAllInstances()) || isLeader) {
- final Entry<LR, Optional<DataTreeCandidate>> res = createDelegate(message);
+ if (hasLeader && message.isRegisterOnAllInstances() || isLeader) {
+ final Entry<R, Optional<DataTreeCandidate>> res = createDelegate(message);
registration = res.getKey();
} else {
log.debug("{}: Shard is not the leader - delaying registration", persistenceId());
D delayedReg = newDelayedListenerRegistration(message);
- if(message.isRegisterOnAllInstances()) {
+ if (message.isRegisterOnAllInstances()) {
delayedListenerOnAllRegistrations.add(delayedReg);
} else {
delayedListenerRegistrations.add(delayedReg);
actors.add(actor);
}
- protected abstract D newDelayedListenerRegistration(R message);
+ protected abstract D newDelayedListenerRegistration(M message);
protected abstract ActorRef newRegistrationActor(ListenerRegistration<L> registration);
*/
@NotThreadSafe
abstract class AbstractShardDataTreeNotificationPublisherActorProxy implements ShardDataTreeNotificationPublisher {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractShardDataTreeNotificationPublisherActorProxy.class);
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbstractShardDataTreeNotificationPublisherActorProxy.class);
private final ActorContext actorContext;
private final String actorName;
}
private ActorRef getNotifierActor() {
- if(notifierActor == null) {
+ if (notifierActor == null) {
LOG.debug("Creating actor {}", actorName);
String dispatcher = new Dispatchers(actorContext.system().dispatchers()).getDispatcherPath(
Dispatchers.DispatcherType.Notification);
notifierActor = actorContext.actorOf(ShardDataTreeNotificationPublisherActor.props(actorName)
.withDispatcher(dispatcher).withMailbox(
- org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX), actorName);
+ org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX),
+ actorName);
}
return notifierActor;
@Override
public final String toString() {
- return MoreObjects.toStringHelper(this).add("id", id).add("closed", closed).add("snapshot", snapshot).toString();
+ return MoreObjects.toStringHelper(this).add("id", id).add("closed", closed).add("snapshot", snapshot)
+ .toString();
}
abstract void abort();
handOffComplete = true;
}
- protected boolean isOperationHandOffComplete(){
+ protected boolean isOperationHandOffComplete() {
return handOffComplete;
}
return historyId;
}
- private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent, final String shardName) {
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent,
+ final String shardName) {
final LocalTransactionFactory local = knownLocal.get(shardName);
if (local != null) {
LOG.debug("Tx {} - Creating local component for shard {} using factory {}", parent.getIdentifier(),
try {
return createLocalTransactionContext(local, parent);
- } catch(Exception e) {
+ } catch (Exception e) {
return new NoOpTransactionContext(e, parent.getIdentifier());
}
}
private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
String shardName, TransactionContextWrapper transactionContextWrapper) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
- primaryShardInfo.getPrimaryShardActor(), shardName);
- }
+ LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
+ primaryShardInfo.getPrimaryShardActor(), shardName);
updateShardInfo(shardName, primaryShardInfo);
try {
TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
- if(localContext != null) {
+ if (localContext != null) {
transactionContextWrapper.executePriorTransactionOperations(localContext);
} else {
RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
}
}
- final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent, final String shardName) {
+ final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
+ final String shardName) {
final TransactionContextWrapper transactionContextWrapper =
new TransactionContextWrapper(parent.getIdentifier(), actorContext);
Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
- if(findPrimaryFuture.isCompleted()) {
+ if (findPrimaryFuture.isCompleted()) {
Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
- if(maybe.isSuccess()) {
+ if (maybe.isSuccess()) {
onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
} else {
onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
if (maybeDataTree.isPresent()) {
- if(!knownLocal.containsKey(shardName)) {
+ if (!knownLocal.containsKey(shardName)) {
LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get());
knownLocal.putIfAbsent(shardName, factory);
}
- } else if(knownLocal.containsKey(shardName)) {
+ } else if (knownLocal.containsKey(shardName)) {
LOG.debug("Shard {} invalidating local data tree", shardName);
knownLocal.remove(shardName);
* Create local transaction factory for specified shard, backed by specified shard leader
* and data tree instance.
*
- * @param shardName
- * @param shardLeader
+ * @param shardName the shard name
+ * @param shardLeader the shard leader
* @param dataTree Backing data tree instance. The data tree may only be accessed in
* read-only manner.
* @return Transaction factory for local use.
* be waited for before the next transaction is allocated.
* @param cohortFutures Collection of futures
*/
- protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
+ protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction,
+ @Nonnull Collection<Future<T>> cohortFutures);
/**
* Callback invoked when the internal TransactionContext has been created for a transaction.
private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
final TransactionProxy parent) {
- switch(parent.getType()) {
+ switch (parent.getType()) {
case READ_ONLY:
final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
throw new UnsupportedOperationException();
}
};
- default:
- throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
+ default:
+ throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
}
}
}
private final ShardDataTreeTransactionChain chain;
private final ShardDataTreeCohort delegate;
- ChainedCommitCohort(final ShardDataTreeTransactionChain chain, final ReadWriteShardDataTreeTransaction transaction, final ShardDataTreeCohort delegate) {
+ ChainedCommitCohort(final ShardDataTreeTransactionChain chain, final ReadWriteShardDataTreeTransaction transaction,
+ final ShardDataTreeCohort delegate) {
this.transaction = Preconditions.checkNotNull(transaction);
this.delegate = Preconditions.checkNotNull(delegate);
this.chain = Preconditions.checkNotNull(chain);
}
@Override
- public void onFailure(final Throwable t) {
- LOG.error("Transaction {} commit failed, cannot recover", transaction, t);
- callback.onFailure(t);
+ public void onFailure(final Throwable failure) {
+ LOG.error("Transaction {} commit failed, cannot recover", transaction, failure);
+ callback.onFailure(failure);
}
});
}
public State getState() {
return delegate.getState();
}
-}
\ No newline at end of file
+}
public interface ClusterWrapper {
void subscribeToMemberEvents(ActorRef actorRef);
+
MemberName getCurrentMemberName();
+
Address getSelfAddress();
}
private final MemberName currentMemberName;
private final Address selfAddress;
- public ClusterWrapperImpl(ActorSystem actorSystem){
+ public ClusterWrapperImpl(ActorSystem actorSystem) {
Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
cluster = Cluster.get(actorSystem);
Preconditions.checkState(cluster.getSelfRoles().size() > 0,
- "No akka roles were specified\n" +
- "One way to specify the member name is to pass a property on the command line like so\n" +
- " -Dakka.cluster.roles.0=member-3\n" +
- "member-3 here would be the name of the member"
- );
+ "No akka roles were specified.\n"
+ + "One way to specify the member name is to pass a property on the command line like so\n"
+ + " -Dakka.cluster.roles.0=member-3\n"
+ + "member-3 here would be the name of the member");
currentMemberName = MemberName.forName(cluster.getSelfRoles().iterator().next());
selfAddress = cluster.selfAddress();
}
@Override
- public void subscribeToMemberEvents(ActorRef actorRef){
+ public void subscribeToMemberEvents(ActorRef actorRef) {
Preconditions.checkNotNull(actorRef, "actorRef should not be null");
cluster.subscribe(actorRef, ClusterEvent.initialStateAsEvents(),
return lastBatchedModificationsException;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
void applyModifications(final Iterable<Modification> modifications) {
totalBatchedModificationsReceived++;
- if(lastBatchedModificationsException == null) {
+ if (lastBatchedModificationsException == null) {
for (Modification modification : modifications) {
- try {
- modification.apply(transaction.getSnapshot());
- } catch (RuntimeException e) {
- lastBatchedModificationsException = e;
- throw e;
- }
+ try {
+ modification.apply(transaction.getSnapshot());
+ } catch (RuntimeException e) {
+ lastBatchedModificationsException = e;
+ throw e;
+ }
}
}
}
cohort = transaction.ready();
- if(cohortDecorator != null) {
+ if (cohortDecorator != null) {
// Call the hook for unit tests.
cohort = cohortDecorator.decorate(transactionID, cohort);
}
.append(doImmediateCommit).append("]");
return builder.toString();
}
-}
\ No newline at end of file
+}
import akka.dispatch.ExecutionContexts;
import akka.dispatch.Futures;
import akka.dispatch.Recover;
-import akka.japi.Function;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
import scala.concurrent.Future;
/**
- *
* Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
- *
+ * <p/>
* It tracks current operation and list of cohorts which successfuly finished previous phase in
* case, if abort is necessary to invoke it only on cohort steps which are still active.
*
COMMITED,
/**
* Some of cohorts responsed back with unsuccessful message.
- *
*/
FAILED,
/**
- *
* Abort message was send to all cohorts which responded with success previously.
- *
*/
ABORTED
}
void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
// FIXME: Optimize empty collection list with pre-created futures, containing success.
- Future<Iterable<Object>> canCommitsFuture =
- Futures.traverse(messages, new Function<CanCommit, Future<Object>>() {
- @Override
- public Future<Object> apply(final CanCommit input) {
- return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
- ExecutionContexts.global());
- }
- }, ExecutionContexts.global());
+ Future<Iterable<Object>> canCommitsFuture = Futures.traverse(messages,
+ input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
+ ExecutionContexts.global()), ExecutionContexts.global());
changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
}
}
private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
- return Futures.traverse(successfulFromPrevious, new Function<DataTreeCohortActor.Success, Future<Object>>() {
-
- @Override
- public Future<Object> apply(final DataTreeCohortActor.Success cohortResponse) throws Exception {
- return Patterns.ask(cohortResponse.getCohort(), message, timeout);
- }
-
- }, ExecutionContexts.global());
+ return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask(
+ cohortResponse.getCohort(), message, timeout), ExecutionContexts.global());
}
- private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState, final State afterState)
- throws TimeoutException, ExecutionException {
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState,
+ final State afterState) throws TimeoutException, ExecutionException {
final Iterable<Object> results;
try {
results = Await.result(resultsFuture, timeout.duration());
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
+ * Actor for a DataChangeListener.
+ *
* @Deprecated Replaced by {@link DataTreeChangeListener}
*/
@Deprecated
public class DataChangeListener extends AbstractUntypedActor {
- private static final Logger LOG = LoggerFactory.getLogger(DataChangeListener.class);
-
private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
private boolean notificationsEnabled = false;
@Override
public void handleReceive(Object message) {
- if (message instanceof DataChanged){
+ if (message instanceof DataChanged) {
dataChanged(message);
- } else if (message instanceof EnableNotification){
+ } else if (message instanceof EnableNotification) {
enableNotification((EnableNotification) message);
} else {
unknownMessage(message);
private void enableNotification(EnableNotification message) {
notificationsEnabled = message.isEnabled();
- LOG.debug("{} notifications for listener {}", (notificationsEnabled ? "Enabled" : "Disabled"),
+ LOG.debug("{} notifications for listener {}", notificationsEnabled ? "Enabled" : "Disabled",
listener);
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void dataChanged(Object message) {
// Do nothing if notifications are not enabled
- if(!notificationsEnabled) {
+ if (!notificationsEnabled) {
LOG.debug("Notifications not enabled for listener {} - dropping change notification", listener);
return;
}
LOG.error( String.format( "Error notifying listener %s", this.listener ), e );
}
- if(isValidSender(getSender())) {
+ if (isValidSender(getSender())) {
getSender().tell(DataChangedReply.INSTANCE, getSelf());
}
}
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
/**
- * DataChangeListenerProxy represents a single remote DataChangeListener
+ * DataChangeListenerProxy represents a single remote DataChangeListener.
*/
-public class DataChangeListenerProxy implements AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>{
+public class DataChangeListenerProxy implements AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
private final ActorSelection dataChangeListenerActor;
public DataChangeListenerProxy(ActorSelection dataChangeListenerActor) {
}
}
- public static Props props(
- final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+ public static Props props(final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+ NormalizedNode<?, ?>>> registration) {
return Props.create(new DataChangeListenerRegistrationCreator(registration));
}
private void closeListenerRegistration() {
registration.close();
- if(isValidSender(getSender())) {
+ if (isValidSender(getSender())) {
getSender().tell(CloseDataChangeListenerRegistrationReply.INSTANCE, getSelf());
}
private boolean closed = false;
public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- DataChangeListenerRegistrationProxy (
- String shardName, ActorContext actorContext, L listener) {
+ DataChangeListenerRegistrationProxy(String shardName, ActorContext actorContext, L listener) {
this.shardName = shardName;
this.actorContext = actorContext;
this.listener = listener;
}
private void setListenerRegistrationActor(ActorSelection listenerRegistrationActor) {
- if(listenerRegistrationActor == null) {
+ if (listenerRegistrationActor == null) {
return;
}
boolean sendCloseMessage = false;
- synchronized(this) {
- if(closed) {
+ synchronized (this) {
+ if (closed) {
sendCloseMessage = true;
} else {
this.listenerRegistrationActor = listenerRegistrationActor;
}
}
- if(sendCloseMessage) {
+ if (sendCloseMessage) {
listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, null);
}
}
findFuture.onComplete(new OnComplete<ActorRef>() {
@Override
public void onComplete(Throwable failure, ActorRef shard) {
- if(failure instanceof LocalShardNotFoundException) {
- LOG.debug("No local shard found for {} - DataChangeListener {} at path {} " +
- "cannot be registered", shardName, listener, path);
- } else if(failure != null) {
- LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} " +
- "cannot be registered: {}", shardName, listener, path, failure);
+ if (failure instanceof LocalShardNotFoundException) {
+ LOG.debug("No local shard found for {} - DataChangeListener {} at path {} "
+ + "cannot be registered", shardName, listener, path);
+ } else if (failure != null) {
+ LOG.error("Failed to find local shard {} - DataChangeListener {} at path {} "
+ + "cannot be registered: {}", shardName, listener, path, failure);
} else {
doRegistration(shard, path, scope);
}
listener instanceof ClusteredDOMDataChangeListener),
actorContext.getDatastoreContext().getShardInitializationTimeout());
- future.onComplete(new OnComplete<Object>(){
+ future.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object result) {
- if(failure != null) {
+ if (failure != null) {
LOG.error("Failed to register DataChangeListener {} at path {}",
listener, path.toString(), failure);
} else {
public void close() {
boolean sendCloseMessage;
- synchronized(this) {
+ synchronized (this) {
sendCloseMessage = !closed && listenerRegistrationActor != null;
closed = true;
}
- if(sendCloseMessage) {
+ if (sendCloseMessage) {
listenerRegistrationActor.tell(CloseDataChangeListenerRegistration.INSTANCE, ActorRef.noSender());
listenerRegistrationActor = null;
}
- if(dataChangeListenerActor != null) {
+ if (dataChangeListenerActor != null) {
dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
dataChangeListenerActor = null;
}
// Now store a reference to the data change listener so it can be notified
// at a later point if notifications should be enabled or disabled
- if(!message.isRegisterOnAllInstances()) {
+ if (!message.isRegisterOnAllInstances()) {
addListenerActor(dataChangeListenerPath);
}
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
/**
+ * Payload wrapper for a DataTreeCandidatePayload.
+ *
* @deprecated Deprecated in Boron in favor of CommitTransactionPayload
*/
@Deprecated
private transient byte[] serialized;
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
public DataTreeCandidatePayload() {
// Required by Externalizable
}
}
/**
+ * Creates a DataTreeCandidatePayload.
+ *
* @deprecated Use CommitTransactionPayload instead
*/
@Deprecated
import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Proxy actor which acts as a facade to the user-provided listener. Responsible for decapsulating
* DataTreeChanged messages and dispatching their context to the user.
*/
final class DataTreeChangeListenerActor extends AbstractUntypedActor {
- private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerActor.class);
private final DOMDataTreeChangeListener listener;
private boolean notificationsEnabled = false;
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void dataChanged(final DataTreeChanged message) {
// Do nothing if notifications are not enabled
if (!notificationsEnabled) {
private void enableNotification(final EnableNotification message) {
notificationsEnabled = message.isEnabled();
- LOG.debug("{} notifications for listener {}", (notificationsEnabled ? "Enabled" : "Disabled"),
+ LOG.debug("{} notifications for listener {}", notificationsEnabled ? "Enabled" : "Disabled",
listener);
}
@GuardedBy("this")
private ActorSelection listenerRegistrationActor;
- public DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) {
+ DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) {
super(listener);
this.actorContext = Preconditions.checkNotNull(actorContext);
this.dataChangeListenerActor = actorContext.getActorSystem().actorOf(
- DataTreeChangeListenerActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath()));
+ DataTreeChangeListenerActor.props(getInstance())
+ .withDispatcher(actorContext.getNotificationDispatcherPath()));
}
@Override
@Override
public void onComplete(final Throwable failure, final ActorRef shard) {
if (failure instanceof LocalShardNotFoundException) {
- LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} " +
- "cannot be registered", shardName, getInstance(), treeId);
+ LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} "
+ + "cannot be registered", shardName, getInstance(), treeId);
} else if (failure != null) {
- LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} " +
- "cannot be registered: {}", shardName, getInstance(), treeId, failure);
+ LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} "
+ + "cannot be registered: {}", shardName, getInstance(), treeId, failure);
} else {
doRegistration(shard, treeId);
}
getInstance() instanceof ClusteredDOMDataTreeChangeListener),
actorContext.getDatastoreContext().getShardInitializationTimeout());
- future.onComplete(new OnComplete<Object>(){
+ future.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(final Throwable failure, final Object result) {
if (failure != null) {
protected void handleReceive(Object message) throws Exception {
if (message instanceof CloseDataTreeChangeListenerRegistration) {
registration.close();
- if(isValidSender(getSender())) {
+ if (isValidSender(getSender())) {
getSender().tell(CloseDataTreeChangeListenerRegistrationReply.getInstance(), getSelf());
}
return Props.create(new DataTreeChangeListenerRegistrationCreator(registration));
}
- private static final class DataTreeChangeListenerRegistrationCreator implements Creator<DataTreeChangeListenerRegistrationActor> {
+ private static final class DataTreeChangeListenerRegistrationCreator
+ implements Creator<DataTreeChangeListenerRegistrationActor> {
private static final long serialVersionUID = 1L;
final ListenerRegistration<DOMDataTreeChangeListener> registration;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport<DOMDataTreeChangeListener,
- RegisterDataTreeChangeListener, DelayedDataTreeListenerRegistration, ListenerRegistration<DOMDataTreeChangeListener>> {
+ RegisterDataTreeChangeListener, DelayedDataTreeListenerRegistration,
+ ListenerRegistration<DOMDataTreeChangeListener>> {
DataTreeChangeListenerSupport(final Shard shard) {
super(shard);
}
// Now store a reference to the data change listener so it can be notified
// at a later point if notifications should be enabled or disabled
- if(!message.isRegisterOnAllInstances()) {
+ if (!message.isRegisterOnAllInstances()) {
addListenerActor(dataChangeListenerPath);
}
}
@Override
- protected DelayedDataTreeListenerRegistration newDelayedListenerRegistration(RegisterDataTreeChangeListener message) {
+ protected DelayedDataTreeListenerRegistration newDelayedListenerRegistration(
+ RegisterDataTreeChangeListener message) {
return new DelayedDataTreeListenerRegistration(message);
}
import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
* decapsulating DataTreeChanged messages and dispatching their context to the user.
*/
final class DataTreeCohortActor extends AbstractUntypedActor {
- private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActor.class);
private final CohortBehaviour<?> idleState = new Idle();
private final DOMDataTreeCommitCohort cohort;
private CohortBehaviour<?> currentState = idleState;
*
* @param <R> Reply message type
*/
- static abstract class CommitProtocolCommand<R extends CommitReply> {
+ abstract static class CommitProtocolCommand<R extends CommitReply> {
private final TransactionIdentifier txId;
}
- static abstract class CommitReply {
+ abstract static class CommitReply {
private final ActorRef cohortRef;
private final TransactionIdentifier txId;
static final class Success extends CommitReply {
- public Success(ActorRef cohortRef, TransactionIdentifier txId) {
+ Success(ActorRef cohortRef, TransactionIdentifier txId) {
super(cohortRef, txId);
}
static final class PreCommit extends CommitProtocolCommand<Success> {
- public PreCommit(TransactionIdentifier txId) {
+ PreCommit(TransactionIdentifier txId) {
super(txId);
}
}
static final class Abort extends CommitProtocolCommand<Success> {
- public Abort(TransactionIdentifier txId) {
+ Abort(TransactionIdentifier txId) {
super(txId);
}
}
static final class Commit extends CommitProtocolCommand<Success> {
- public Commit(TransactionIdentifier txId) {
+ Commit(TransactionIdentifier txId) {
super(txId);
}
}
- private static abstract class CohortBehaviour<E> {
+ private abstract static class CohortBehaviour<E> {
abstract Class<E> getHandledMessageType();
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
CohortBehaviour<?> process(CanCommit message) {
final PostCanCommitStep nextStep;
try {
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
final CohortBehaviour<?> abort() {
try {
getStep().abort().get();
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
CohortBehaviour<?> process(PreCommit message) {
final PostPreCommitStep nextStep;
try {
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
CohortBehaviour<?> process(Commit message) {
try {
getStep().commit().get();
private final Map<ActorRef, RegistrationTreeNode<ActorRef>> cohortToNode = new HashMap<>();
-
+ @SuppressWarnings("checkstyle:IllegalCatch")
void registerCohort(final ActorRef sender, final RegisterCohort cohort) {
takeLock();
try {
}
}
- static abstract class CohortRegistryCommand {
+ abstract static class CohortRegistryCommand {
private final ActorRef cohort;
private final Collection<DataTreeCohortActor.CanCommit> messages =
new ArrayList<>();
- CanCommitMessageBuilder(final TransactionIdentifier txId, final DataTreeCandidate candidate, final SchemaContext schema) {
+ CanCommitMessageBuilder(final TransactionIdentifier txId, final DataTreeCandidate candidate,
+ final SchemaContext schema) {
this.txId = Preconditions.checkNotNull(txId);
this.candidate = Preconditions.checkNotNull(candidate);
this.schema = schema;
}
}
- private void lookupAndCreateCanCommits(final YangInstanceIdentifier path, final RegistrationTreeNode<ActorRef> regNode,
- final DataTreeCandidateNode candNode) {
+ private void lookupAndCreateCanCommits(final YangInstanceIdentifier path,
+ final RegistrationTreeNode<ActorRef> regNode, final DataTreeCandidateNode candNode) {
if (candNode.getModificationType() == ModificationType.UNMODIFIED) {
LOG.debug("Skipping unmodified candidate {}", path);
return;
private void createCanCommits(final Collection<ActorRef> regs, final YangInstanceIdentifier path,
final DataTreeCandidateNode node) {
- final DOMDataTreeCandidate candidate = DOMDataTreeCandidateTO.create(treeIdentifier(path), node);
+ final DOMDataTreeCandidate domCandidate = DOMDataTreeCandidateTO.create(treeIdentifier(path), node);
for (final ActorRef reg : regs) {
- final CanCommit message = new DataTreeCohortActor.CanCommit(txId, candidate, schema, reg);
+ final CanCommit message = new DataTreeCohortActor.CanCommit(txId, domCandidate, schema, reg);
messages.add(message);
}
}
future.onComplete(new OnComplete<Object>() {
@Override
- public void onComplete(Throwable e, Object val) throws Throwable {
- if (e != null) {
- LOG.error("Unable to register {} as commit cohort", getInstance(), e);
+ public void onComplete(Throwable failure, Object val) {
+ if (failure != null) {
+ LOG.error("Unable to register {} as commit cohort", getInstance(), failure);
}
if (isClosed()) {
removeRegistration();
public static final int DEFAULT_JOURNAL_RECOVERY_BATCH_SIZE = 1;
public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
- public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
+ public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS =
+ DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 1000;
- public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
+ public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS =
+ TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
public static final int DEFAULT_SHARD_SNAPSHOT_CHUNK_SIZE = 2048000;
- private static final Set<String> globalDatastoreNames = Sets.newConcurrentHashSet();
+ private static final Set<String> GLOBAL_DATASTORE_NAMES = Sets.newConcurrentHashSet();
private InMemoryDOMDataStoreConfigProperties dataStoreProperties;
private Duration shardTransactionIdleTimeout = DatastoreContext.DEFAULT_SHARD_TRANSACTION_IDLE_TIMEOUT;
private String shardManagerPersistenceId;
public static Set<String> getGlobalDatastoreNames() {
- return globalDatastoreNames;
+ return GLOBAL_DATASTORE_NAMES;
}
private DatastoreContext() {
return configurationReader;
}
- public long getShardElectionTimeoutFactor(){
+ public long getShardElectionTimeoutFactor() {
return raftConfig.getElectionTimeoutFactor();
}
- public String getDataStoreName(){
+ public String getDataStoreName() {
return dataStoreName;
}
raftConfig.setPeerAddressResolver(resolver);
}
- private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis){
+ private void setHeartbeatInterval(long shardHeartbeatIntervalInMillis) {
raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
TimeUnit.MILLISECONDS));
}
- private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize){
+ private void setShardJournalRecoveryLogBatchSize(int shardJournalRecoveryLogBatchSize) {
raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
}
private Builder(DatastoreContext datastoreContext) {
this.datastoreContext = datastoreContext;
- if(datastoreContext.getDataStoreProperties() != null) {
+ if (datastoreContext.getDataStoreProperties() != null) {
maxShardDataChangeExecutorPoolSize =
datastoreContext.getDataStoreProperties().getMaxDataChangeExecutorPoolSize();
maxShardDataChangeExecutorQueueSize =
return shardLeaderElectionTimeout(timeout, TimeUnit.SECONDS);
}
- public Builder configurationReader(AkkaConfigurationReader configurationReader){
+ public Builder configurationReader(AkkaConfigurationReader configurationReader) {
datastoreContext.configurationReader = configurationReader;
return this;
}
- public Builder persistent(boolean persistent){
+ public Builder persistent(boolean persistent) {
datastoreContext.persistent = persistent;
return this;
}
return this;
}
- public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor){
+ public Builder shardElectionTimeoutFactor(long shardElectionTimeoutFactor) {
datastoreContext.setElectionTimeoutFactor(shardElectionTimeoutFactor);
return this;
}
- public Builder transactionCreationInitialRateLimit(long initialRateLimit){
+ public Builder transactionCreationInitialRateLimit(long initialRateLimit) {
datastoreContext.transactionCreationInitialRateLimit = initialRateLimit;
return this;
}
- public Builder logicalStoreType(LogicalDatastoreType logicalStoreType){
+ public Builder logicalStoreType(LogicalDatastoreType logicalStoreType) {
datastoreContext.logicalStoreType = Preconditions.checkNotNull(logicalStoreType);
// Retain compatible naming
switch (logicalStoreType) {
- case CONFIGURATION:
- dataStoreName("config");
- break;
- case OPERATIONAL:
- dataStoreName("operational");
- break;
- default:
- dataStoreName(logicalStoreType.name());
+ case CONFIGURATION:
+ dataStoreName("config");
+ break;
+ case OPERATIONAL:
+ dataStoreName("operational");
+ break;
+ default:
+ dataStoreName(logicalStoreType.name());
}
return this;
}
- public Builder dataStoreName(String dataStoreName){
+ public Builder dataStoreName(String dataStoreName) {
datastoreContext.dataStoreName = Preconditions.checkNotNull(dataStoreName);
datastoreContext.dataStoreMXBeanType = "Distributed" + WordUtils.capitalize(dataStoreName) + "Datastore";
return this;
maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize,
maxShardDataChangeListenerQueueSize, maxShardDataStoreExecutorQueueSize);
- if(datastoreContext.dataStoreName != null) {
- globalDatastoreNames.add(datastoreContext.dataStoreName);
+ if (datastoreContext.dataStoreName != null) {
+ GLOBAL_DATASTORE_NAMES.add(datastoreContext.dataStoreName);
}
return datastoreContext;
ServiceReference<ConfigurationAdmin> configAdminServiceReference =
bundleContext.getServiceReference(ConfigurationAdmin.class);
- if(configAdminServiceReference == null) {
+ if (configAdminServiceReference == null) {
LOG.warn("No ConfigurationAdmin service found");
} else {
overlaySettings(configAdminServiceReference);
this.listener = listener;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void overlaySettings(ServiceReference<ConfigurationAdmin> configAdminServiceReference) {
try {
ConfigurationAdmin configAdmin = bundleContext.getService(configAdminServiceReference);
Configuration config = configAdmin.getConfiguration(CONFIG_ID);
- if(config != null) {
+ if (config != null) {
Dictionary<String, Object> properties = config.getProperties();
LOG.debug("Overlaying settings: {}", properties);
- if(introspector.update(properties)) {
- if(listener != null) {
+ if (introspector.update(properties)) {
+ if (listener != null) {
listener.onDatastoreContextUpdated(introspector.newContextFactory());
}
}
}
} catch (IOException e) {
LOG.error("Error obtaining Configuration for pid {}", CONFIG_ID, e);
- } catch(IllegalStateException e) {
+ } catch (IllegalStateException e) {
// Ignore - indicates the bundleContext has been closed.
} finally {
try {
public void close() {
listener = null;
- if(configListenerServiceRef != null) {
+ if (configListenerServiceRef != null) {
configListenerServiceRef.unregister();
}
}
private class DatastoreConfigurationListener implements ConfigurationListener {
@Override
public void configurationEvent(ConfigurationEvent event) {
- if(CONFIG_ID.equals(event.getPid()) && event.getType() == ConfigurationEvent.CM_UPDATED) {
+ if (CONFIG_ID.equals(event.getPid()) && event.getType() == ConfigurationEvent.CM_UPDATED) {
LOG.debug("configurationEvent: config {} was updated", CONFIG_ID);
overlaySettings(event.getReference());
}
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.List;
public class DatastoreContextIntrospector {
private static final Logger LOG = LoggerFactory.getLogger(DatastoreContextIntrospector.class);
- private static final Map<String, Class<?>> dataStorePropTypes = new HashMap<>();
+ private static final Map<String, Class<?>> DATA_STORE_PROP_TYPES = new HashMap<>();
- private static final Map<Class<?>, Constructor<?>> constructors = new HashMap<>();
+ private static final Map<Class<?>, Constructor<?>> CONSTRUCTORS = new HashMap<>();
- private static final Map<Class<?>, Method> yangTypeGetters = new HashMap<>();
+ private static final Map<Class<?>, Method> YANG_TYPE_GETTERS = new HashMap<>();
- private static final Map<String, Method> builderSetters = new HashMap<>();
+ private static final Map<String, Method> BUILDER_SETTERS = new HashMap<>();
static {
try {
* constructor that takes a single String argument. For primitive wrappers, this constructor
* converts from a String representation.
*/
+ @SuppressWarnings("checkstyle:IllegalCatch")
private static void introspectPrimitiveTypes() {
Set<Class<?>> primitives = ImmutableSet.<Class<?>>builder().addAll(
Primitives.allWrapperTypes()).add(String.class).build();
- for(Class<?> primitive: primitives) {
+ for (Class<?> primitive: primitives) {
try {
processPropertyType(primitive);
} catch (Exception e) {
* the methods that return Builder.
*/
private static void introspectDatastoreContextBuilder() {
- for(Method method: Builder.class.getMethods()) {
- if(Builder.class.equals(method.getReturnType())) {
- builderSetters.put(method.getName(), method);
+ for (Method method: Builder.class.getMethods()) {
+ if (Builder.class.equals(method.getReturnType())) {
+ BUILDER_SETTERS.put(method.getName(), method);
}
}
}
*/
private static void introspectDataStoreProperties() throws IntrospectionException {
BeanInfo beanInfo = Introspector.getBeanInfo(DataStoreProperties.class);
- for(PropertyDescriptor desc: beanInfo.getPropertyDescriptors()) {
+ for (PropertyDescriptor desc: beanInfo.getPropertyDescriptors()) {
processDataStoreProperty(desc.getName(), desc.getPropertyType());
}
// properties and thus aren't returned from getPropertyDescriptors. A getter starting with
// "is" is only supported if it returns primitive boolean. So we'll check for these via
// getMethodDescriptors.
- for(MethodDescriptor desc: beanInfo.getMethodDescriptors()) {
+ for (MethodDescriptor desc: beanInfo.getMethodDescriptors()) {
String methodName = desc.getName();
- if(Boolean.class.equals(desc.getMethod().getReturnType()) && methodName.startsWith("is")) {
+ if (Boolean.class.equals(desc.getMethod().getReturnType()) && methodName.startsWith("is")) {
String propertyName = WordUtils.uncapitalize(methodName.substring(2));
processDataStoreProperty(propertyName, Boolean.class);
}
/**
* Processes a property defined on the DataStoreProperties interface.
*/
+ @SuppressWarnings("checkstyle:IllegalCatch")
private static void processDataStoreProperty(String name, Class<?> propertyType) {
- Preconditions.checkArgument(builderSetters.containsKey(name), String.format(
- "DataStoreProperties property \"%s\" does not have corresponding setter in DatastoreContext.Builder", name));
+ Preconditions.checkArgument(BUILDER_SETTERS.containsKey(name), String.format(
+ "DataStoreProperties property \"%s\" does not have corresponding setter in DatastoreContext.Builder",
+ name));
try {
processPropertyType(propertyType);
- dataStorePropTypes.put(name, propertyType);
+ DATA_STORE_PROP_TYPES.put(name, propertyType);
} catch (Exception e) {
LOG.error("Error finding constructor for type {}", propertyType, e);
}
*/
private static void processPropertyType(Class<?> propertyType) throws Exception {
Class<?> wrappedType = Primitives.wrap(propertyType);
- if(constructors.containsKey(wrappedType)) {
+ if (CONSTRUCTORS.containsKey(wrappedType)) {
return;
}
// If the type is a primitive (or String type), we look for the constructor that takes a
// single String argument, which, for primitives, validates and converts from a String
// representation which is the form we get on ingress.
- if(propertyType.isPrimitive() || Primitives.isWrapperType(propertyType) ||
- propertyType.equals(String.class))
- {
- constructors.put(wrappedType, propertyType.getConstructor(String.class));
+ if (propertyType.isPrimitive() || Primitives.isWrapperType(propertyType) || propertyType.equals(String.class)) {
+ CONSTRUCTORS.put(wrappedType, propertyType.getConstructor(String.class));
} else {
// This must be a yang-defined type. We need to find the constructor that takes a
// primitive as the only argument. This will be used to construct instances to perform
// validation (eg range checking). The yang-generated types have a couple single-argument
// constructors but the one we want has the bean ConstructorProperties annotation.
- for(Constructor<?> ctor: propertyType.getConstructors()) {
+ for (Constructor<?> ctor: propertyType.getConstructors()) {
ConstructorProperties ctorPropsAnnotation = ctor.getAnnotation(ConstructorProperties.class);
- if(ctor.getParameterTypes().length == 1 && ctorPropsAnnotation != null) {
+ if (ctor.getParameterTypes().length == 1 && ctorPropsAnnotation != null) {
findYangTypeGetter(propertyType, ctorPropsAnnotation.value()[0]);
- constructors.put(propertyType, ctor);
+ CONSTRUCTORS.put(propertyType, ctor);
break;
}
}
*/
private static void findYangTypeGetter(Class<?> type, String propertyName)
throws Exception {
- for(PropertyDescriptor desc: Introspector.getBeanInfo(type).getPropertyDescriptors()) {
- if(desc.getName().equals(propertyName)) {
- yangTypeGetters.put(type, desc.getReadMethod());
+ for (PropertyDescriptor desc: Introspector.getBeanInfo(type).getPropertyDescriptors()) {
+ if (desc.getName().equals(propertyName)) {
+ YANG_TYPE_GETTERS.put(type, desc.getReadMethod());
return;
}
}
}
public synchronized DatastoreContext getShardDatastoreContext(String forShardName) {
- if(currentProperties == null) {
+ if (currentProperties == null) {
return context;
}
List<String> keys = getSortedKeysByDatastoreType(currentProperties.keySet(), dataStoreTypePrefix);
- for(String key: keys) {
+ for (String key: keys) {
Object value = currentProperties.get(key);
- if(key.startsWith(dataStoreTypePrefix)) {
+ if (key.startsWith(dataStoreTypePrefix)) {
key = key.replaceFirst(dataStoreTypePrefix, "");
}
- if(key.startsWith(shardNamePrefix)) {
+ if (key.startsWith(shardNamePrefix)) {
key = key.replaceFirst(shardNamePrefix, "");
convertValueAndInvokeSetter(key, value, builder);
}
*/
public synchronized boolean update(Dictionary<String, Object> properties) {
currentProperties = null;
- if(properties == null || properties.isEmpty()) {
+ if (properties == null || properties.isEmpty()) {
return false;
}
List<String> keys = getSortedKeysByDatastoreType(Collections.list(properties.keys()), dataStoreTypePrefix);
boolean updated = false;
- for(String key: keys) {
+ for (String key: keys) {
Object value = properties.get(key);
mapBuilder.put(key, value);
// If the key is prefixed with the data store type, strip it off.
- if(key.startsWith(dataStoreTypePrefix)) {
+ if (key.startsWith(dataStoreTypePrefix)) {
key = key.replaceFirst(dataStoreTypePrefix, "");
}
- if(convertValueAndInvokeSetter(key, value, builder)) {
+ if (convertValueAndInvokeSetter(key, value, builder)) {
updated = true;
}
}
currentProperties = mapBuilder.build();
- if(updated) {
+ if (updated) {
context = builder.build();
}
// Sort the property keys by putting the names prefixed with the data store type last. This
// is done so data store specific settings are applied after global settings.
ArrayList<String> keys = new ArrayList<>(inKeys);
- Collections.sort(keys, new Comparator<String>() {
- @Override
- public int compare(String key1, String key2) {
- return key1.startsWith(dataStoreTypePrefix) ? 1 :
- key2.startsWith(dataStoreTypePrefix) ? -1 : key1.compareTo(key2);
- }
- });
+ Collections.sort(keys, (key1, key2) -> key1.startsWith(dataStoreTypePrefix) ? 1 :
+ key2.startsWith(dataStoreTypePrefix) ? -1 : key1.compareTo(key2));
return keys;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private boolean convertValueAndInvokeSetter(String inKey, Object inValue, Builder builder) {
String key = convertToCamelCase(inKey);
try {
// Convert the value to the right type.
Object value = convertValue(key, inValue);
- if(value == null) {
+ if (value == null) {
return false;
}
key, value, value.getClass().getSimpleName());
// Call the setter method on the Builder instance.
- Method setter = builderSetters.get(key);
+ Method setter = BUILDER_SETTERS.get(key);
setter.invoke(builder, constructorValueRecursively(
Primitives.wrap(setter.getParameterTypes()[0]), value.toString()));
private static String convertToCamelCase(String inString) {
String str = inString.trim();
- if(StringUtils.contains(str, '-') || StringUtils.contains(str, ' ')) {
+ if (StringUtils.contains(str, '-') || StringUtils.contains(str, ' ')) {
str = inString.replace('-', ' ');
str = WordUtils.capitalizeFully(str);
str = StringUtils.deleteWhitespace(str);
}
private Object convertValue(String name, Object from) throws Exception {
- Class<?> propertyType = dataStorePropTypes.get(name);
- if(propertyType == null) {
+ Class<?> propertyType = DATA_STORE_PROP_TYPES.get(name);
+ if (propertyType == null) {
LOG.debug("Property not found for {}", name);
return null;
}
Object converted = constructorValueRecursively(propertyType, from.toString());
// If the converted type is a yang-generated type, call the getter to obtain the actual value.
- Method getter = yangTypeGetters.get(converted.getClass());
- if(getter != null) {
+ Method getter = YANG_TYPE_GETTERS.get(converted.getClass());
+ if (getter != null) {
converted = getter.invoke(converted);
}
LOG.trace("convertValueRecursively - toType: {}, fromValue {} ({})",
toType.getSimpleName(), fromValue, fromValue.getClass().getSimpleName());
- Constructor<?> ctor = constructors.get(toType);
+ Constructor<?> ctor = CONSTRUCTORS.get(toType);
LOG.trace("Found {}", ctor);
- if(ctor == null) {
+ if (ctor == null) {
throw new IllegalArgumentException(String.format("Constructor not found for type %s", toType));
}
// Since the original input type is a String, once we find a constructor that takes a String
// argument, we're done recursing.
- if(!ctor.getParameterTypes()[0].equals(String.class)) {
+ if (!ctor.getParameterTypes()[0].equals(String.class)) {
value = constructorValueRecursively(ctor.getParameterTypes()[0], fromValue);
}
// synchronize this method so that, in case of concurrent access to getAndRemove(),
// no one ends up with partially initialized data
+ @SuppressWarnings("checkstyle:IllegalCatch")
private synchronized void initialize() {
File restoreDirectoryFile = new File(restoreDirectoryPath);
String[] files = restoreDirectoryFile.list();
- if(files == null || files.length == 0) {
+ if (files == null || files.length == 0) {
LOG.debug("Restore directory {} does not exist or is empty", restoreDirectoryFile);
return;
}
- if(files.length > 1) {
- LOG.error("Found {} files in clustered datastore restore directory {} - expected 1. No restore will be attempted",
- files.length, restoreDirectoryFile);
+ if (files.length > 1) {
+ LOG.error(
+ "Found {} files in clustered datastore restore directory {} - expected 1. No restore will be attempted",
+ files.length, restoreDirectoryFile);
return;
}
LOG.info("Clustered datastore will be restored from file {}", restoreFile);
- try(FileInputStream fis = new FileInputStream(restoreFile)) {
+ try (FileInputStream fis = new FileInputStream(restoreFile)) {
DatastoreSnapshotList snapshots = deserialize(fis);
LOG.debug("Deserialized {} snapshots", snapshots.size());
- for(DatastoreSnapshot snapshot: snapshots) {
+ for (DatastoreSnapshot snapshot: snapshots) {
datastoreSnapshots.put(snapshot.getType(), snapshot);
}
} catch (Exception e) {
LOG.error("Error reading clustered datastore restore file {}", restoreFile, e);
} finally {
- if(!restoreFile.delete()) {
+ if (!restoreFile.delete()) {
LOG.error("Could not delete clustered datastore restore file {}", restoreFile);
}
}
}
- private static DatastoreSnapshotList deserialize(InputStream inputStream) throws IOException, ClassNotFoundException {
- try(ObjectInputStream ois = new ObjectInputStream(inputStream)) {
+ private static DatastoreSnapshotList deserialize(InputStream inputStream)
+ throws IOException, ClassNotFoundException {
+ try (ObjectInputStream ois = new ObjectInputStream(inputStream)) {
return (DatastoreSnapshotList) ois.readObject();
}
}
}
@Override
- public void onFailure(Throwable t) {
+ public void onFailure(Throwable failure) {
log.warn("Transaction {} failed with error \"{}\" - was allocated in the following context",
- transactionId, t, debugContext);
+ transactionId, failure, debugContext);
}
});
private final ListenerTree dataChangeListenerTree = ListenerTree.create();
@Override
- public void submitNotification(final DataChangeListenerRegistration<?> listener, final DOMImmutableDataChangeEvent notification) {
+ public void submitNotification(final DataChangeListenerRegistration<?> listener,
+ final DOMImmutableDataChangeEvent notification) {
LOG.debug("Notifying listener {} about {}", listener.getInstance(), notification);
listener.getInstance().onDataChanged(notification);
}
@Override
- public void submitNotifications(final DataChangeListenerRegistration<?> listener, final Iterable<DOMImmutableDataChangeEvent> notifications) {
+ public void submitNotifications(final DataChangeListenerRegistration<?> listener,
+ final Iterable<DOMImmutableDataChangeEvent> notifications) {
final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> instance = listener.getInstance();
LOG.debug("Notifying listener {} about {}", instance, notifications);
}
@Override
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L>
- registerDataChangeListener(YangInstanceIdentifier path, L listener, DataChangeScope scope) {
+ public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ DataChangeListenerRegistration<L> registerDataChangeListener(YangInstanceIdentifier path, L listener,
+ DataChangeScope scope) {
return dataChangeListenerTree.registerDataChangeListener(path, listener, scope);
}
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-abstract class DelayedListenerRegistration<L extends EventListener, R> implements ListenerRegistration<L> {
- private final R registrationMessage;
+abstract class DelayedListenerRegistration<L extends EventListener, M> implements ListenerRegistration<L> {
+ private final M registrationMessage;
private volatile ListenerRegistration<L> delegate;
@GuardedBy("this")
private boolean closed;
- protected DelayedListenerRegistration(R registrationMessage) {
+ protected DelayedListenerRegistration(M registrationMessage) {
this.registrationMessage = registrationMessage;
}
- R getRegistrationMessage() {
+ M getRegistrationMessage() {
return registrationMessage;
}
return delegate;
}
- synchronized <LR extends ListenerRegistration<L>> void createDelegate(
- final LeaderLocalDelegateFactory<R, LR, Optional<DataTreeCandidate>> factory) {
+ synchronized <R extends ListenerRegistration<L>> void createDelegate(
+ final LeaderLocalDelegateFactory<M, R, Optional<DataTreeCandidate>> factory) {
if (!closed) {
- final Entry<LR, Optional<DataTreeCandidate>> res = factory.createDelegate(registrationMessage);
+ final Entry<R, Optional<DataTreeCandidate>> res = factory.createDelegate(registrationMessage);
this.delegate = res.getKey();
}
}
/**
* Base class for factories instantiating delegates.
*
- * <D> delegate type
- * <M> message type
- * <I> initial state type
+ * @param <M> message type
+ * @param <D> delegate type
+ * @param <I> initial state type
*/
abstract class DelegateFactory<M, D, I> {
abstract Entry<D, I> createDelegate(M message);
import org.slf4j.LoggerFactory;
/**
- *
+ * Implements a distributed DOMStore.
*/
public class DistributedDataStore implements DistributedDataStoreInterface, SchemaContextListener,
- DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry, AutoCloseable {
+ DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher,
+ DOMDataTreeCommitCohortRegistry, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
private final TransactionContextFactory txContextFactory;
+ @SuppressWarnings("checkstyle:IllegalCatch")
public DistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
final DatastoreSnapshot restoreFromSnapshot) {
PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
- ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration).
- datastoreContextFactory(datastoreContextFactory).waitTillReadyCountdownLatch(waitTillReadyCountDownLatch).
- primaryShardInfoCache(primaryShardInfoCache).restoreFromSnapshot(restoreFromSnapshot);
+ ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration)
+ .datastoreContextFactory(datastoreContextFactory)
+ .waitTillReadyCountdownLatch(waitTillReadyCountDownLatch)
+ .primaryShardInfoCache(primaryShardInfoCache)
+ .restoreFromSnapshot(restoreFromSnapshot);
actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
- shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(), primaryShardInfoCache);
+ shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(),
+ primaryShardInfoCache);
final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(), actorContext);
identifier = client.getIdentifier();
LOG.debug("Distributed data store client {} started", identifier);
- this.waitTillReadyTimeInMillis =
- actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
+ this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout()
+ .duration().toMillis() * READY_WAIT_FACTOR;
this.txContextFactory = new TransactionContextFactory(actorContext, identifier);
datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext());
datastoreConfigMXBean.registerMBean();
- datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContextFactory.getBaseDatastoreContext().
- getDataStoreMXBeanType(), actorContext);
+ datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContextFactory.getBaseDatastoreContext()
+ .getDataStoreMXBeanType(), actorContext);
datastoreInfoMXBean.registerMBean();
}
this.client = null;
this.identifier = Preconditions.checkNotNull(identifier);
this.txContextFactory = new TransactionContextFactory(actorContext, identifier);
- this.waitTillReadyTimeInMillis =
- actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
+ this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout()
+ .duration().toMillis() * READY_WAIT_FACTOR;
}
public void setCloseable(final AutoCloseable closeable) {
}
@Override
- public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
+ final YangInstanceIdentifier treeId, final L listener) {
Preconditions.checkNotNull(treeId, "treeId should not be null");
Preconditions.checkNotNull(listener, "listener should not be null");
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new TransactionProxy(txContextFactory, TransactionType.READ_ONLY);
+ return new TransactionProxy(txContextFactory, TransactionType.READ_ONLY);
}
@Override
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void close() {
LOG.info("Closing data store {}", identifier);
return actorContext;
}
- public void waitTillReady(){
+ public void waitTillReady() {
LOG.info("Beginning to wait for data store to become ready : {}", identifier);
try {
if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) {
LOG.debug("Data store {} is now ready", identifier);
} else {
- LOG.error("Shared leaders failed to settle in {} seconds, giving up", TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis));
+ LOG.error("Shard leaders failed to settle in {} seconds, giving up",
+ TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis));
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for shards to settle", e);
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator,
final String shardDispatcher, final String shardManagerId) {
Exception lastException = null;
- for(int i=0;i<100;i++) {
+ for (int i = 0; i < 100; i++) {
try {
return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox(
ActorContext.BOUNDED_MAILBOX), shardManagerId);
- } catch (Exception e){
+ } catch (Exception e) {
lastException = e;
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying (retry count = {})",
- shardManagerId, e.getMessage(), i);
+ LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying "
+ + "(retry count = {})", shardManagerId, e.getMessage(), i);
}
}
*/
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.base.Preconditions;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import com.google.common.base.Preconditions;
import java.util.Collection;
import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.concepts.Builder;
import org.opendaylight.yangtools.concepts.Identifiable;
-final class FrontendHistoryMetadataBuilder implements Builder<FrontendHistoryMetadata>, Identifiable<LocalHistoryIdentifier> {
+final class FrontendHistoryMetadataBuilder implements Builder<FrontendHistoryMetadata>,
+ Identifiable<LocalHistoryIdentifier> {
private final LocalHistoryIdentifier identifier;
private long nextTransaction;
* Base class for factories instantiating delegates which are local to the
* shard leader.
*
- * <D> delegate type
- * <M> message type
- * <I> initial state type
+ * @param <D> delegate type
+ * @param <M> message type
+ * @param <I> initial state type
*/
abstract class LeaderLocalDelegateFactory<M, D, I> extends DelegateFactory<M, D, I> {
private final Shard shard;
private final Exception operationError;
protected LocalThreePhaseCommitCohort(final ActorContext actorContext, final ActorSelection leader,
- final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final DataTreeModification modification) {
+ final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction,
+ final DataTreeModification modification) {
this.actorContext = Preconditions.checkNotNull(actorContext);
this.leader = Preconditions.checkNotNull(leader);
this.transaction = Preconditions.checkNotNull(transaction);
}
private Future<Object> initiateCommit(final boolean immediate) {
- if(operationError != null) {
+ if (operationError != null) {
return Futures.failed(operationError);
}
throw new UnsupportedOperationException();
}
- protected void transactionAborted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
+ protected void transactionAborted(SnapshotBackedWriteTransaction<TransactionIdentifier> aborted) {
}
- protected void transactionCommitted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
+ protected void transactionCommitted(SnapshotBackedWriteTransaction<TransactionIdentifier> comitted) {
}
}
return super.newWriteOnlyTransaction(identifier);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch"})
@Override
public LocalThreePhaseCommitCohort onTransactionReady(@Nonnull DOMStoreWriteTransaction tx,
@Nullable Exception operationError) {
- if(operationError != null) {
+ if (operationError != null) {
return new LocalChainThreePhaseCommitCohort((SnapshotBackedWriteTransaction<TransactionIdentifier>)tx,
operationError);
}
protected abstract DOMStoreReadTransaction getReadDelegate();
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void executeModification(AbstractModification modification) {
incrementModificationCount();
- if(operationError == null) {
+ if (operationError == null) {
try {
modification.apply(getWriteDelegate());
} catch (Exception e) {
}
@Override
- public void onFailure(final Throwable t) {
- proxyFuture.setException(t);
+ public void onFailure(final Throwable failure) {
+ proxyFuture.setException(failure);
}
});
}
}
@Override
- protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction<TransactionIdentifier> tx,
- final DataTreeModification tree) {
+ protected DOMStoreThreePhaseCommitCohort transactionReady(
+ final SnapshotBackedWriteTransaction<TransactionIdentifier> tx, final DataTreeModification tree) {
return new LocalThreePhaseCommitCohort(actorContext, leader, tx, tree);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch"})
@Override
public LocalThreePhaseCommitCohort onTransactionReady(@Nonnull DOMStoreWriteTransaction tx,
@Nullable Exception operationError) {
- if(operationError != null) {
+ if (operationError != null) {
return new LocalThreePhaseCommitCohort(actorContext, leader,
(SnapshotBackedWriteTransaction<TransactionIdentifier>)tx, operationError);
}
private final Throwable failure;
- public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier) {
+ NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier) {
super(identifier);
this.failure = failure;
}
@Override
public void executeModification(AbstractModification modification) {
- LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(), modification.getClass().getSimpleName(),
- modification.getPath());
+ LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
+ modification.getClass().getSimpleName(), modification.getPath());
}
@Override
class Reference extends AtomicReference<OperationCallback> {
private static final long serialVersionUID = 1L;
- public Reference(OperationCallback initialValue) {
+ Reference(OperationCallback initialValue) {
super(initialValue);
}
}
void run();
+
void pause();
+
void resume();
+
void success();
+
void failure();
}
}
@VisibleForTesting
- int availablePermits(){
+ int availablePermits() {
return semaphore.availablePermits();
}
/**
- * Release all the permits
+ * Release all the permits.
*/
public void releaseAll() {
- this.semaphore.release(maxPermits-availablePermits());
+ this.semaphore.release(maxPermits - availablePermits());
}
}
this.actorContext = actorContext;
}
- private Future<Object> completeOperation(Future<Object> operationFuture){
+ private Future<Object> completeOperation(Future<Object> operationFuture) {
operationFuture.onComplete(limiter, actorContext.getClientDispatcher());
return operationFuture;
}
private void batchModification(Modification modification) {
incrementModificationCount();
- if(batchedModifications == null) {
+ if (batchedModifications == null) {
batchedModifications = newBatchedModifications();
}
batchedModifications.addModification(modification);
- if(batchedModifications.getModifications().size() >=
- actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
+ if (batchedModifications.getModifications().size()
+ >= actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
sendBatchedModifications();
}
}
protected Future<Object> sendBatchedModifications(boolean ready, boolean doCommitOnReady) {
Future<Object> sent = null;
- if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) {
- if(batchedModifications == null) {
+ if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
+ if (batchedModifications == null) {
batchedModifications = newBatchedModifications();
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
- batchedModifications.getModifications().size(), ready);
- }
+ LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
+ batchedModifications.getModifications().size(), ready);
batchedModifications.setReady(ready);
batchedModifications.setDoCommitOnReady(doCommitOnReady);
batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
sent = executeOperationAsync(batchedModifications, actorContext.getTransactionCommitOperationTimeout());
- if(ready) {
+ if (ready) {
batchedModifications = null;
} else {
batchedModifications = newBatchedModifications();
@Override
public void executeModification(AbstractModification modification) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(), modification.getClass()
- .getSimpleName(), modification.getPath());
- }
+ LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
+ modification.getClass().getSimpleName(), modification.getPath());
acquireOperation();
batchModification(modification);
@Override
public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
- readCmd.getPath());
- }
+ LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
+ readCmd.getPath());
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object response) throws Throwable {
- if(failure != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} {} operation failed: {}", getIdentifier(), readCmd.getClass().getSimpleName(),
- failure);
- }
- returnFuture.setException(new ReadFailedException("Error checking " + readCmd.getClass().getSimpleName()
- + " for path " + readCmd.getPath(), failure));
+ if (failure != null) {
+ LOG.debug("Tx {} {} operation failed: {}", getIdentifier(), readCmd.getClass().getSimpleName(),
+ failure);
+
+ returnFuture.setException(new ReadFailedException("Error checking "
+ + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure));
} else {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} {} operation succeeded", getIdentifier(), readCmd.getClass().getSimpleName());
- }
+ LOG.debug("Tx {} {} operation succeeded", getIdentifier(), readCmd.getClass().getSimpleName());
readCmd.processResponse(response, returnFuture);
}
}
* Acquire operation from the limiter if the hand-off has completed. If
* the hand-off is still ongoing, this method does nothing.
*/
- private final void acquireOperation() {
+ private void acquireOperation() {
if (isOperationHandOffComplete()) {
limiter.acquire();
}
* Handles creation of TransactionContext instances for remote transactions. This class creates
* remote transactions, if necessary, by sending CreateTransaction messages with retries, up to a limit,
* if the shard doesn't have a leader yet. This is done by scheduling a retry task after a short delay.
- * <p>
+ * <p/>
* The end result from a completed CreateTransaction message is a TransactionContext that is
* used to perform transaction operations. Transaction operations that occur before the
* CreateTransaction completes are cache via a TransactionContextWrapper and executed once the
private final TransactionContextWrapper transactionContextWrapper;
- RemoteTransactionContextSupport(final TransactionContextWrapper transactionContextWrapper, final TransactionProxy parent,
- final String shardName) {
+ RemoteTransactionContextSupport(final TransactionContextWrapper transactionContextWrapper,
+ final TransactionProxy parent, final String shardName) {
this.parent = Preconditions.checkNotNull(parent);
this.shardName = shardName;
this.transactionContextWrapper = transactionContextWrapper;
void setPrimaryShard(PrimaryShardInfo primaryShardInfo) {
this.primaryShardInfo = primaryShardInfo;
- if (getTransactionType() == TransactionType.WRITE_ONLY &&
- getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+ if (getTransactionType() == TransactionType.WRITE_ONLY
+ && getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
ActorSelection primaryShard = primaryShardInfo.getPrimaryShardActor();
LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
Performs a CreateTransaction try async.
*/
private void tryCreateTransaction() {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(),
- primaryShardInfo.getPrimaryShardActor());
- }
+ LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(),
+ primaryShardInfo.getPrimaryShardActor());
Object serializedCreateMessage = new CreateTransaction(getIdentifier(), getTransactionType().ordinal(),
primaryShardInfo.getPrimaryShardVersion()).toSerializable();
Future<PrimaryShardInfo> findPrimaryFuture = getActorContext().findPrimaryShardAsync(shardName);
findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
- public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
- onFindPrimaryShardComplete(failure, primaryShardInfo);
+ public void onComplete(final Throwable failure, final PrimaryShardInfo newPrimaryShardInfo) {
+ onFindPrimaryShardComplete(failure, newPrimaryShardInfo);
}
}, getActorContext().getClientDispatcher());
}
- private void onFindPrimaryShardComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
+ private void onFindPrimaryShardComplete(final Throwable failure, final PrimaryShardInfo newPrimaryShardInfo) {
if (failure == null) {
- this.primaryShardInfo = primaryShardInfo;
+ this.primaryShardInfo = newPrimaryShardInfo;
tryCreateTransaction();
} else {
LOG.debug("Tx {}: Find primary for shard {} failed", getIdentifier(), shardName, failure);
private void onCreateTransactionComplete(Throwable failure, Object response) {
// An AskTimeoutException will occur if the local shard forwards to an unavailable remote leader or
// the cached remote leader actor is no longer available.
- boolean retryCreateTransaction = primaryShardInfo != null &&
- (failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException);
- if(retryCreateTransaction) {
+ boolean retryCreateTransaction = primaryShardInfo != null
+ && (failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException);
+ if (retryCreateTransaction) {
// Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may
// be written by different threads however not concurrently, therefore decrementing it
// non-atomically here is ok.
- if(totalCreateTxTimeout > 0) {
+ if (totalCreateTxTimeout > 0) {
long scheduleInterval = CREATE_TX_TRY_INTERVAL_IN_MS;
- if(failure instanceof AskTimeoutException) {
+ if (failure instanceof AskTimeoutException) {
// Since we use the createTxMessageTimeout for the CreateTransaction request and it timed
// out, subtract it from the total timeout. Also since the createTxMessageTimeout period
// has already elapsed, we can immediately schedule the retry (10 ms is virtually immediate).
getIdentifier(), shardName, failure, scheduleInterval);
getActorContext().getActorSystem().scheduler().scheduleOnce(
- FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS),
- new Runnable() {
- @Override
- public void run() {
- tryFindPrimaryShard();
- }
- }, getActorContext().getClientDispatcher());
+ FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS),
+ () -> tryFindPrimaryShard(), getActorContext().getClientDispatcher());
return;
}
}
// TransactionOperations. So to avoid thus timing, we don't publish the
// TransactionContext until after we've executed all cached TransactionOperations.
TransactionContext localTransactionContext;
- if(failure != null) {
+ if (failure != null) {
LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
Throwable resultingEx = failure;
- if(failure instanceof AskTimeoutException) {
+ if (failure instanceof AskTimeoutException) {
resultingEx = new ShardLeaderNotRespondingException(String.format(
"Could not create a %s transaction on shard %s. The shard leader isn't responding.",
parent.getType(), shardName), failure);
- } else if(!(failure instanceof NoShardLeaderException)) {
+ } else if (!(failure instanceof NoShardLeaderException)) {
resultingEx = new Exception(String.format(
"Error creating %s transaction on shard %s", parent.getType(), shardName), failure);
}
final TransactionContext ret = new RemoteTransactionContext(transactionContextWrapper.getIdentifier(),
transactionActor, getActorContext(), remoteTransactionVersion, transactionContextWrapper.getLimiter());
- if(parent.getType() == TransactionType.READ_ONLY) {
+ if (parent.getType() == TransactionType.READ_ONLY) {
TransactionContextCleanup.track(parent, ret);
}
import scala.concurrent.duration.FiniteDuration;
/**
- * A Shard represents a portion of the logical data tree <br/>
- * <p>
+ * A Shard represents a portion of the logical data tree.
+ * <p/>
* Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it
- * </p>
*/
public class Shard extends RaftActor {
new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher");
ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher");
- if(builder.getDataTree() != null) {
+ if (builder.getDataTree() != null) {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
treeChangeListenerPublisher, dataChangeListenerPublisher, name);
} else {
commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
} else if (message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
- } else if (message instanceof RegisterRoleChangeListener){
+ } else if (message instanceof RegisterRoleChangeListener) {
roleChangeNotifier.get().forward(message, context());
} else if (message instanceof FollowerInitialSyncUpStatus) {
shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
context().parent().tell(message, self());
- } else if (GET_SHARD_MBEAN_MESSAGE.equals(message)){
+ } else if (GET_SHARD_MBEAN_MESSAGE.equals(message)) {
sender().tell(getShardMBean(), self());
} else if (message instanceof GetShardDataTree) {
sender().tell(store.getDataTree(), self());
- } else if (message instanceof ServerRemoved){
+ } else if (message instanceof ServerRemoved) {
context().parent().forward(message, context());
} else if (ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
}
@Override
- protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId, final short leaderPayloadVersion) {
+ protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
+ final short leaderPayloadVersion) {
return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
: new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
}
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
if (isLeader()) {
- commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+ commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
protected void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
try {
commitCoordinator.handleBatchedModifications(batched, sender, this);
// we need to reconstruct previous BatchedModifications from the transaction
// DataTreeModification, honoring the max batched modification count, and forward all the
// previous BatchedModifications to the new leader.
- Collection<BatchedModifications> newModifications = commitCoordinator.createForwardedBatchedModifications(
- batched, datastoreContext.getShardBatchedModificationCount());
+ Collection<BatchedModifications> newModifications = commitCoordinator
+ .createForwardedBatchedModifications(batched,
+ datastoreContext.getShardBatchedModificationCount());
LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(),
newModifications.size(), leader);
private boolean failIfIsolatedLeader(final ActorRef sender) {
if (isIsolatedLeader()) {
sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
- "Shard %s was the leader but has lost contact with all of its followers. Either all" +
- " other follower nodes are down or this node is isolated by a network partition.",
+ "Shard %s was the leader but has lost contact with all of its followers. Either all"
+ + " other follower nodes are down or this node is isolated by a network partition.",
persistenceId()))), getSelf());
return true;
}
return getRaftState() == RaftState.IsolatedLeader;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), message.getTransactionID());
store.closeTransactionChain(closeTransactionChain.getIdentifier());
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void createTransaction(final CreateTransaction createTransaction) {
try {
- if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY &&
- failIfIsolatedLeader(getSender())) {
+ if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY
+ && failIfIsolatedLeader(getSender())) {
return;
}
}
}
} else {
- commitCoordinator.abortPendingTransactions(
- "The transacton was aborted due to inflight leadership change and the leader address isn't available.",
- this);
+ commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership "
+ + "change and the leader address isn't available.", this);
}
}
* @return the converted messages
*/
public Collection<?> convertPendingTransactionsToMessages() {
- return commitCoordinator.convertPendingTransactionsToMessages(datastoreContext.getShardBatchedModificationCount());
+ return commitCoordinator.convertPendingTransactionsToMessages(
+ datastoreContext.getShardBatchedModificationCount());
}
@Override
return new Builder();
}
- public static abstract class AbstractBuilder<T extends AbstractBuilder<T, S>, S extends Shard> {
+ public abstract static class AbstractBuilder<T extends AbstractBuilder<T, S>, S extends Shard> {
private final Class<S> shardClass;
private ShardIdentifier id;
private Map<String, String> peerAddresses = Collections.emptyMap();
return (T) this;
}
- public T id(final ShardIdentifier id) {
+ public T id(final ShardIdentifier newId) {
checkSealed();
- this.id = id;
+ this.id = newId;
return self();
}
- public T peerAddresses(final Map<String, String> peerAddresses) {
+ public T peerAddresses(final Map<String, String> newPeerAddresses) {
checkSealed();
- this.peerAddresses = peerAddresses;
+ this.peerAddresses = newPeerAddresses;
return self();
}
- public T datastoreContext(final DatastoreContext datastoreContext) {
+ public T datastoreContext(final DatastoreContext newDatastoreContext) {
checkSealed();
- this.datastoreContext = datastoreContext;
+ this.datastoreContext = newDatastoreContext;
return self();
}
- public T schemaContext(final SchemaContext schemaContext) {
+ public T schemaContext(final SchemaContext newSchemaContext) {
checkSealed();
- this.schemaContext = schemaContext;
+ this.schemaContext = newSchemaContext;
return self();
}
- public T restoreFromSnapshot(final DatastoreSnapshot.ShardSnapshot restoreFromSnapshot) {
+ public T restoreFromSnapshot(final DatastoreSnapshot.ShardSnapshot newRestoreFromSnapshot) {
checkSealed();
- this.restoreFromSnapshot = restoreFromSnapshot;
+ this.restoreFromSnapshot = newRestoreFromSnapshot;
return self();
}
- public T dataTree(final TipProducingDataTree dataTree) {
+ public T dataTree(final TipProducingDataTree newDataTree) {
checkSealed();
- this.dataTree = dataTree;
+ this.dataTree = newDataTree;
return self();
}
public TreeType getTreeType() {
switch (datastoreContext.getLogicalStoreType()) {
- case CONFIGURATION:
- return TreeType.CONFIGURATION;
- case OPERATIONAL:
- return TreeType.OPERATIONAL;
+ case CONFIGURATION:
+ return TreeType.CONFIGURATION;
+ case OPERATIONAL:
+ return TreeType.OPERATIONAL;
+ default:
+ throw new IllegalStateException("Unhandled logical store type "
+ + datastoreContext.getLogicalStoreType());
}
-
- throw new IllegalStateException("Unhandled logical store type " + datastoreContext.getLogicalStoreType());
}
protected void verify() {
* @param ready the ForwardedReadyTransaction message to process
* @param sender the sender of the message
* @param shard the transaction's shard actor
- * @param schema
*/
void handleForwardedReadyTransaction(final ForwardedReadyTransaction ready, final ActorRef sender,
final Shard shard) {
cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
@Override
protected BatchedModifications getModifications() {
- if (newModifications.isEmpty() ||
- newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
+ if (newModifications.isEmpty()
+ || newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion()));
}
}
@Override
- public void onFailure(final Throwable t) {
+ public void onFailure(final Throwable failure) {
log.debug("{}: An exception occurred during canCommit for {}: {}", name,
- cohortEntry.getTransactionID(), t);
+ cohortEntry.getTransactionID(), failure);
cohortCache.remove(cohortEntry.getTransactionID());
- cohortEntry.getReplySender().tell(new Failure(t), cohortEntry.getShard().self());
+ cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
}
});
}
}
@Override
- public void onFailure(final Throwable t) {
+ public void onFailure(final Throwable failure) {
log.error("{} An exception occurred while preCommitting transaction {}", name,
- cohortEntry.getTransactionID(), t);
+ cohortEntry.getTransactionID(), failure);
cohortCache.remove(cohortEntry.getTransactionID());
- cohortEntry.getReplySender().tell(new Failure(t), cohortEntry.getShard().self());
+ cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
}
});
}
}
@Override
- public void onFailure(final Throwable t) {
+ public void onFailure(final Throwable failure) {
log.error("{}, An exception occurred while committing transaction {}", persistenceId(),
- cohortEntry.getTransactionID(), t);
+ cohortEntry.getTransactionID(), failure);
cohortCache.remove(cohortEntry.getTransactionID());
- sender.tell(new Failure(t), cohortEntry.getShard().self());
+ sender.tell(new Failure(failure), cohortEntry.getShard().self());
}
});
}
doCommit(cohortEntry);
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
CohortEntry cohortEntry = cohortCache.remove(transactionID);
if (cohortEntry == null) {
Iterator<CohortEntry> iter = cohortCache.values().iterator();
while (iter.hasNext()) {
CohortEntry cohortEntry = iter.next();
- if(cohortEntry.isFailed()) {
+ if (cohortEntry.isFailed()) {
iter.remove();
}
}
}
@Override
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L> registerDataChangeListener(
- YangInstanceIdentifier path, L listener, DataChangeScope scope) {
+ public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ DataChangeListenerRegistration<L> registerDataChangeListener(YangInstanceIdentifier path, L listener,
+ DataChangeScope scope) {
return delegatePublisher.registerDataChangeListener(path, listener, scope);
}
* Internal shard state, similar to a DOMStore, but optimized for use in the actor system,
* e.g. it does not expose public interfaces and assumes it is only ever called from a
* single thread.
- *
+ * <p/>
* This class is not part of the API contract and is subject to change at any time.
*/
@NotThreadSafe
return schemaContext;
}
- void updateSchemaContext(final SchemaContext schemaContext) {
- dataTree.setSchemaContext(schemaContext);
- this.schemaContext = Preconditions.checkNotNull(schemaContext);
+ void updateSchemaContext(final SchemaContext newSchemaContext) {
+ dataTree.setSchemaContext(newSchemaContext);
+ this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
}
/**
return new MetadataShardDataTreeSnapshot(rootNode, metaBuilder.build());
}
- private void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot,
+ private void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot,
final UnaryOperator<DataTreeModification> wrapper) throws DataValidationFailedException {
final Stopwatch elapsed = Stopwatch.createStarted();
LOG.debug("{}: state snapshot applied in %s", logContext, elapsed);
}
+ /**
+ * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and
+ * does not perform any pruning.
+ *
+ * @param snapshot Snapshot that needs to be applied
+ * @throws DataValidationFailedException when the snapshot fails to apply
+ */
+ void applySnapshot(@Nonnull final ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
+ applySnapshot(snapshot, UnaryOperator.identity());
+ }
+
private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) {
return new PruningDataTreeModification(delegate, dataTree, schemaContext);
}
applySnapshot(snapshot, this::wrapWithPruning);
}
-
- /**
- * Apply a snapshot coming from the leader. This method assumes the leader and follower SchemaContexts match and
- * does not perform any pruning.
- *
- * @param snapshot Snapshot that needs to be applied
- * @throws DataValidationFailedException when the snapshot fails to apply
- */
- void applySnapshot(final @Nonnull ShardDataTreeSnapshot snapshot) throws DataValidationFailedException {
- applySnapshot(snapshot, UnaryOperator.identity());
- }
-
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException {
final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
DataTreeCandidates.applyToModification(mod, candidate);
*/
void applyRecoveryPayload(final @Nonnull Payload payload) throws IOException, DataValidationFailedException {
if (payload instanceof CommitTransactionPayload) {
- final Entry<TransactionIdentifier, DataTreeCandidate> e = ((CommitTransactionPayload) payload).getCandidate();
+ final Entry<TransactionIdentifier, DataTreeCandidate> e =
+ ((CommitTransactionPayload) payload).getCandidate();
applyRecoveryCandidate(e.getValue());
allMetadataCommittedTransaction(e.getKey());
} else if (payload instanceof DataTreeCandidatePayload) {
*/
if (payload instanceof CommitTransactionPayload) {
if (identifier == null) {
- final Entry<TransactionIdentifier, DataTreeCandidate> e = ((CommitTransactionPayload) payload).getCandidate();
+ final Entry<TransactionIdentifier, DataTreeCandidate> e =
+ ((CommitTransactionPayload) payload).getCandidate();
applyReplicatedCandidate(e.getKey(), e.getValue());
allMetadataCommittedTransaction(e.getKey());
} else {
Optional<DataTreeCandidate>> registerChangeListener(final YangInstanceIdentifier path,
final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
final DataChangeScope scope) {
- final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
+ DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope);
return new SimpleEntry<>(reg, readCurrentData());
}
private Optional<DataTreeCandidate> readCurrentData() {
- final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
+ final Optional<NormalizedNode<?, ?>> currentState =
+ dataTree.takeSnapshot().readNode(YangInstanceIdentifier.EMPTY);
return currentState.isPresent() ? Optional.of(DataTreeCandidates.fromNormalizedNode(
YangInstanceIdentifier.EMPTY, currentState.get())) : Optional.<DataTreeCandidate>absent();
}
- public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> registerTreeChangeListener(
- final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
- final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangeListenerPublisher.registerTreeChangeListener(
- path, listener);
+ public Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>>
+ registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener) {
+ final ListenerRegistration<DOMDataTreeChangeListener> reg =
+ treeChangeListenerPublisher.registerTreeChangeListener(path, listener);
return new SimpleEntry<>(reg, readCurrentData());
}
}
/**
+ * Commits a modification.
+ *
* @deprecated This method violates DataTree containment and will be removed.
*/
@VisibleForTesting
public Collection<ShardDataTreeCohort> getAndClearPendingTransactions() {
Collection<ShardDataTreeCohort> ret = new ArrayList<>(pendingTransactions.size());
- for(CommitEntry entry: pendingTransactions) {
+ for (CommitEntry entry: pendingTransactions) {
ret.add(entry.cohort);
}
return ret;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void processNextTransaction() {
while (!pendingTransactions.isEmpty()) {
final CommitEntry entry = pendingTransactions.peek();
final SimpleShardDataTreeCohort cohort = entry.cohort;
final DataTreeModification modification = cohort.getDataTreeModification();
- if(cohort.getState() != State.CAN_COMMIT_PENDING) {
+ if (cohort.getState() != State.CAN_COMMIT_PENDING) {
break;
}
// For debugging purposes, allow dumping of the modification. Coupled with the above
// precondition log, it should allow us to understand what went on.
- LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification, dataTree);
+ LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification,
+ dataTree);
cause = new TransactionCommitFailedException("Data did not pass validation.", e);
} catch (Exception e) {
LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
processNextTransaction();
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
void startPreCommit(final SimpleShardDataTreeCohort cohort) {
final CommitEntry entry = pendingTransactions.peek();
Preconditions.checkState(entry != null, "Attempted to pre-commit of %s when no transactions pending", cohort);
processNextTransaction();
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void finishCommit(final SimpleShardDataTreeCohort cohort) {
final TransactionIdentifier txId = cohort.getIdentifier();
final DataTreeCandidate candidate = cohort.getCandidate();
}
private void maybeRunOperationOnPendingTransactionsComplete() {
- if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
- LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
- runOnPendingTransactionsComplete);
-
- runOnPendingTransactionsComplete.run();
- runOnPendingTransactionsComplete = null;
- }
- }
+ if (runOnPendingTransactionsComplete != null && pendingTransactions.isEmpty()) {
+ LOG.debug("{}: Pending transactions complete - running operation {}", logContext,
+ runOnPendingTransactionsComplete);
+
+ runOnPendingTransactionsComplete.run();
+ runOnPendingTransactionsComplete = null;
+ }
+ }
}
class ShardDataTreeChangeListenerPublisherActorProxy extends AbstractShardDataTreeNotificationPublisherActorProxy
implements ShardDataTreeChangeListenerPublisher {
- private final ShardDataTreeChangeListenerPublisher delegatePublisher = new DefaultShardDataTreeChangeListenerPublisher();
+ private final ShardDataTreeChangeListenerPublisher delegatePublisher =
+ new DefaultShardDataTreeChangeListenerPublisher();
ShardDataTreeChangeListenerPublisherActorProxy(ActorContext actorContext, String actorName) {
super(actorContext, actorName);
// Lifecycle events
abstract void onTransactionCommitted(TransactionIdentifier txId);
+
abstract void onHistoryClosed(LocalHistoryIdentifier historyId);
+
abstract void onHistoryPurged(LocalHistoryIdentifier historyId);
}
@Override
protected void handleReceive(Object message) {
- if(message instanceof PublishNotifications) {
+ if (message instanceof PublishNotifications) {
PublishNotifications publisher = (PublishNotifications)message;
timer.start();
} finally {
long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS);
- if(elapsedTime >= ShardDataTreeNotificationPublisher.PUBLISH_DELAY_THRESHOLD_IN_MS) {
+ if (elapsedTime >= ShardDataTreeNotificationPublisher.PUBLISH_DELAY_THRESHOLD_IN_MS) {
LOG.warn("{}: Generation of change events for {} took longer than expected. Elapsed time: {}",
publisher.logContext, name, timer);
} else {
@Override
protected void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
if (transaction instanceof ReadWriteShardDataTreeTransaction) {
- Preconditions.checkState(openTransaction != null, "Attempted to abort transaction %s while none is outstanding", transaction);
+ Preconditions.checkState(openTransaction != null,
+ "Attempted to abort transaction %s while none is outstanding", transaction);
LOG.debug("Aborted transaction {}", transaction);
openTransaction = null;
}
@Override
protected ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
- Preconditions.checkState(openTransaction != null, "Attempted to finish transaction %s while none is outstanding", transaction);
+ Preconditions.checkState(openTransaction != null,
+ "Attempted to finish transaction %s while none is outstanding", transaction);
// dataTree is finalizing ready the transaction, we just record it for the next
// transaction in chain
abstract class ShardDataTreeTransactionParent {
abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction);
+
abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
}
import java.util.Set;
/**
+ * Manages shards.
+ *
* @deprecated This is a deprecated placeholder to keep its inner class present. It serves no other purpose.
*/
@Deprecated
private final Set<String> modules;
- public SchemaContextModules(Set<String> modules){
+ public SchemaContextModules(Set<String> modules) {
this.modules = modules;
}
import javax.annotation.Nonnull;
/**
- * Persisted data of the ShardManager
+ * Persisted data of the ShardManager.
*
* @deprecated Use {@link org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerSnapshot} instead.
* This class is scheduled for removal once persistence migration from Beryllium is no longer needed.
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
/**
+ * Actor for a shard read transaction.
+ *
* @author: syedbahm
- * Date: 8/6/14
*/
public class ShardReadTransaction extends ShardTransaction {
private final AbstractShardDataTreeTransaction<?> transaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
/**
+ * Actor for a shard read/write transaction.
+ *
* @author: syedbahm
- * Date: 8/6/14
*/
public class ShardReadWriteTransaction extends ShardWriteTransaction {
public ShardReadWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor,
@Override
public void handleReceive(Object message) {
- if(ReadData.isSerializedType(message)) {
+ if (ReadData.isSerializedType(message)) {
readData(ReadData.fromSerializable(message));
- } else if(DataExists.isSerializedType(message)) {
+ } else if (DataExists.isSerializedType(message)) {
dataExists((DataExists) message);
} else {
super.handleReceive(message);
private boolean open;
- ShardRecoveryCoordinator(final ShardDataTree store, final byte[] restoreFromSnapshot, final String shardName, final Logger log) {
+ ShardRecoveryCoordinator(final ShardDataTree store, final byte[] restoreFromSnapshot, final String shardName,
+ final Logger log) {
this.store = Preconditions.checkNotNull(store);
this.shardName = Preconditions.checkNotNull(shardName);
this.log = Preconditions.checkNotNull(log);
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void appendRecoveredLogEntry(final Payload payload) {
Preconditions.checkState(open, "call startLogRecovery before calling appendRecoveredLogEntry");
* @param snapshotBytes the serialized snapshot
*/
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void applyRecoverySnapshot(final byte[] snapshotBytes) {
log.debug("{}: Applying recovered snapshot", shardName);
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void applySnapshot(final byte[] snapshotBytes) {
// Since this will be done only on Recovery or when this actor is a Follower
// we can safely commit everything in here. We not need to worry about event notifications
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
/**
- * The ShardTransaction Actor represents a remote transaction
- * <p>
- * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
- * </p>
- * <p>
- * Handles Messages <br/>
- * ---------------- <br/>
- * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
- * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
- * </p>
+ * The ShardTransaction Actor represents a remote transaction that delegates all actions to DOMDataReadWriteTransaction.
*/
public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
private final ActorRef shardActor;
this.transactionID = Preconditions.checkNotNull(transactionID);
}
- public static Props props(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
- DatastoreContext datastoreContext, ShardStats shardStats) {
+ public static Props props(TransactionType type, AbstractShardDataTreeTransaction<?> transaction,
+ ActorRef shardActor, DatastoreContext datastoreContext, ShardStats shardStats) {
return Props.create(new ShardTransactionCreator(type, transaction, shardActor, datastoreContext, shardStats));
}
private void closeTransaction(boolean sendReply) {
getDOMStoreTransaction().abort();
- if(sendReply && returnCloseTransactionReply()) {
+ if (sendReply && returnCloseTransactionReply()) {
getSender().tell(new CloseTransactionReply(), getSelf());
}
final boolean ret = transaction.isClosed();
if (ret) {
shardStats.incrementFailedReadTransactionsCount();
- getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")), getSelf());
+ getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")),
+ getSelf());
}
return ret;
}
final ShardStats shardStats;
final TransactionType type;
- ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
- DatastoreContext datastoreContext, ShardStats shardStats) {
+ ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction<?> transaction,
+ ActorRef shardActor, DatastoreContext datastoreContext, ShardStats shardStats) {
this.transaction = Preconditions.checkNotNull(transaction);
this.shardActor = shardActor;
this.shardStats = shardStats;
public ShardTransaction create() throws Exception {
final ShardTransaction tx;
switch (type) {
- case READ_ONLY:
- tx = new ShardReadTransaction(transaction, shardActor, shardStats);
- break;
- case READ_WRITE:
- tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor, shardStats);
- break;
- case WRITE_ONLY:
- tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor, shardStats);
- break;
- default:
- throw new IllegalArgumentException("Unhandled transaction type " + type);
+ case READ_ONLY:
+ tx = new ShardReadTransaction(transaction, shardActor, shardStats);
+ break;
+ case READ_WRITE:
+ tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor,
+ shardStats);
+ break;
+ case WRITE_ONLY:
+ tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction, shardActor,
+ shardStats);
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled transaction type " + type);
}
tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
ActorRef newShardTransaction(TransactionType type, TransactionIdentifier transactionID) {
final AbstractShardDataTreeTransaction<?> transaction;
switch (type) {
- case READ_ONLY:
- transaction = dataTree.newReadOnlyTransaction(transactionID);
- shardMBean.incrementReadOnlyTransactionCount();
- break;
- case READ_WRITE:
- transaction = dataTree.newReadWriteTransaction(transactionID);
- shardMBean.incrementReadWriteTransactionCount();
- break;
- case WRITE_ONLY:
- transaction = dataTree.newReadWriteTransaction(transactionID);
- shardMBean.incrementWriteOnlyTransactionCount();
- break;
- default:
- throw new IllegalArgumentException("Unsupported transaction type " + type);
+ case READ_ONLY:
+ transaction = dataTree.newReadOnlyTransaction(transactionID);
+ shardMBean.incrementReadOnlyTransactionCount();
+ break;
+ case READ_WRITE:
+ transaction = dataTree.newReadWriteTransaction(transactionID);
+ shardMBean.incrementReadWriteTransactionCount();
+ break;
+ case WRITE_ONLY:
+ transaction = dataTree.newReadWriteTransaction(transactionID);
+ shardMBean.incrementWriteOnlyTransactionCount();
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported transaction type " + type);
}
return actorContext.actorOf(ShardTransaction.props(type, transaction, shardActor, datastoreContext, shardMBean)
}
void retryMessages() {
- if(messagesToRetry.isEmpty()) {
+ if (messagesToRetry.isEmpty()) {
return;
}
MessageInfo[] copy = messagesToRetry.toArray(new MessageInfo[messagesToRetry.size()]);
messagesToRetry.clear();
- for(MessageInfo info: copy) {
+ for (MessageInfo info: copy) {
LOG.debug("{}: Retrying message {}", shard.persistenceId(), info.message);
info.retry(shard);
}
@Override
public void close() {
- for(MessageInfo info: messagesToRetry) {
+ for (MessageInfo info: messagesToRetry) {
info.timedOut(shard);
}
}
void timedOut(Shard shard) {
- replyTo.tell(new Failure(new NoShardLeaderException(failureMessage, shard.persistenceId())), shard.getSelf());
+ replyTo.tell(new Failure(new NoShardLeaderException(failureMessage, shard.persistenceId())),
+ shard.getSelf());
}
}
}
import org.opendaylight.controller.cluster.datastore.modification.Modification;
/**
+ * Actor for a shard write-only transaction.
+ *
* @author: syedbahm
- * Date: 8/6/14
*/
public class ShardWriteTransaction extends ShardTransaction {
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void batchedModifications(BatchedModifications batched) {
if (checkClosed()) {
if (batched.isReady()) {
}
try {
- for(Modification modification: batched.getModifications()) {
+ for (Modification modification: batched.getModifications()) {
modification.apply(transaction.getSnapshot());
}
totalBatchedModificationsReceived++;
- if(batched.isReady()) {
- if(lastBatchedModificationsException != null) {
+ if (batched.isReady()) {
+ if (lastBatchedModificationsException != null) {
throw lastBatchedModificationsException;
}
- if(totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
+ if (totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
throw new IllegalStateException(String.format(
"The total number of batched messages received %d does not match the number sent %d",
totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
lastBatchedModificationsException = e;
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
- if(batched.isReady()) {
+ if (batched.isReady()) {
getSelf().tell(PoisonPill.getInstance(), getSelf());
}
}
private boolean checkClosed() {
if (transaction.isClosed()) {
- getSender().tell(new akka.actor.Status.Failure(new IllegalStateException("Transaction is closed, no modifications allowed")), getSelf());
+ getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(
+ "Transaction is closed, no modifications allowed")), getSelf());
return true;
} else {
return false;
}
@Override
- public void canCommit(final FutureCallback<Void> callback) {
- if(state == State.CAN_COMMIT_PENDING) {
+ public void canCommit(final FutureCallback<Void> newCallback) {
+ if (state == State.CAN_COMMIT_PENDING) {
return;
}
checkState(State.READY);
- this.callback = Preconditions.checkNotNull(callback);
+ this.callback = Preconditions.checkNotNull(newCallback);
state = State.CAN_COMMIT_PENDING;
dataTree.startCanCommit(this);
}
@Override
- public void preCommit(final FutureCallback<DataTreeCandidate> callback) {
+ public void preCommit(final FutureCallback<DataTreeCandidate> newCallback) {
checkState(State.CAN_COMMIT_COMPLETE);
- this.callback = Preconditions.checkNotNull(callback);
+ this.callback = Preconditions.checkNotNull(newCallback);
state = State.PRE_COMMIT_PENDING;
if (nextFailure == null) {
}
@Override
- public void commit(final FutureCallback<UnsignedLong> callback) {
+ public void commit(final FutureCallback<UnsignedLong> newCallback) {
checkState(State.PRE_COMMIT_COMPLETE);
- this.callback = Preconditions.checkNotNull(callback);
+ this.callback = Preconditions.checkNotNull(newCallback);
state = State.COMMIT_PENDING;
dataTree.startCommit(this, candidate);
}
* Run user-defined canCommit and preCommit hooks. We want to run these before we initiate persistence so that
* any failure to validate is propagated before we record the transaction.
*
- * @param candidate {@link DataTreeCandidate} under consideration
- * @throws ExecutionException
- * @throws TimeoutException
+ * @param dataTreeCandidate {@link DataTreeCandidate} under consideration
+ * @throws ExecutionException if the operation fails
+ * @throws TimeoutException if the operation times out
*/
// FIXME: this should be asynchronous
- void userPreCommit(final DataTreeCandidate candidate) throws ExecutionException, TimeoutException {
- userCohorts.canCommit(candidate);
+ void userPreCommit(final DataTreeCandidate dataTreeCandidate) throws ExecutionException, TimeoutException {
+ userCohorts.canCommit(dataTreeCandidate);
userCohorts.preCommit();
}
- void successfulPreCommit(final DataTreeCandidateTip candidate) {
- LOG.trace("Transaction {} prepared candidate {}", transaction, candidate);
- this.candidate = Verify.verifyNotNull(candidate);
- switchState(State.PRE_COMMIT_COMPLETE).onSuccess(candidate);
+ void successfulPreCommit(final DataTreeCandidateTip dataTreeCandidate) {
+ LOG.trace("Transaction {} prepared candidate {}", transaction, dataTreeCandidate);
+ this.candidate = Verify.verifyNotNull(dataTreeCandidate);
+ switchState(State.PRE_COMMIT_COMPLETE).onSuccess(dataTreeCandidate);
}
void failedPreCommit(final Exception cause) {
cohortFuture.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object cohortResponse) {
- if(failure != null) {
+ if (failure != null) {
operationCallbackRef.get().failure();
returnFuture.setException(failure);
return;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TerminationMonitor extends UntypedActor{
+public class TerminationMonitor extends UntypedActor {
private static final Logger LOG = LoggerFactory.getLogger(TerminationMonitor.class);
public static final String ADDRESS = "termination-monitor";
- public TerminationMonitor(){
+ public TerminationMonitor() {
LOG.debug("Created TerminationMonitor");
}
@Override
public void onReceive(Object message) throws Exception {
- if(message instanceof Terminated){
+ if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
LOG.debug("Actor terminated : {}", terminated.actor());
- } else if(message instanceof Monitor){
+ } else if (message instanceof Monitor) {
Monitor monitor = (Monitor) message;
getContext().watch(monitor.getActorRef());
}
import scala.concurrent.Future;
/**
- * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
+ * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
*/
public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
this.cohorts = cohorts;
this.transactionId = Preconditions.checkNotNull(transactionId);
- if(cohorts.isEmpty()) {
+ if (cohorts.isEmpty()) {
cohortsResolvedFuture.set(null);
}
}
private ListenableFuture<Void> resolveCohorts() {
- if(cohortsResolvedFuture.isDone()) {
+ if (cohortsResolvedFuture.isDone()) {
return cohortsResolvedFuture;
}
final AtomicInteger completed = new AtomicInteger(cohorts.size());
- for(final CohortInfo info: cohorts) {
+ for (final CohortInfo info: cohorts) {
info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
@Override
public void onComplete(Throwable failure, ActorSelection actor) {
- synchronized(completed) {
+ synchronized (completed) {
boolean done = completed.decrementAndGet() == 0;
- if(failure != null) {
+ if (failure != null) {
LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
cohortsResolvedFuture.setException(failure);
- } else if(!cohortsResolvedFuture.isDone()) {
+ } else if (!cohortsResolvedFuture.isDone()) {
LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
info.setResolvedActor(actor);
- if(done) {
+ if (done) {
LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
cohortsResolvedFuture.set(null);
}
LOG.debug("Tx {} finishCanCommit", transactionId);
// For empty transactions return immediately
- if(cohorts.size() == 0){
+ if (cohorts.size() == 0) {
LOG.debug("Tx {}: canCommit returning result true", transactionId);
returnFuture.set(Boolean.TRUE);
return;
return;
}
- if(iterator.hasNext() && result) {
+ if (iterator.hasNext() && result) {
sendCanCommitTransaction(iterator.next(), this);
} else {
LOG.debug("Tx {}: canCommit returning result: {}", transactionId, result);
private void sendCanCommitTransaction(CohortInfo toCohortInfo, OnComplete<Object> onComplete) {
CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
- }
+ LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
Future<Object> future = actorContext.executeOperationAsync(toCohortInfo.getResolvedActor(),
message.toSerializable(), actorContext.getTransactionCommitOperationTimeout());
private Future<Iterable<Object>> invokeCohorts(MessageSupplier messageSupplier) {
List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
- for(CohortInfo cohort : cohorts) {
+ for (CohortInfo cohort : cohorts) {
Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort);
- }
+ LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort);
futureList.add(actorContext.executeOperationAsync(cohort.getResolvedActor(), message,
actorContext.getTransactionCommitOperationTimeout()));
CommitTransactionReply.class, true, operationCallback);
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private static boolean successfulFuture(ListenableFuture<Void> future) {
- if(!future.isDone()) {
+ if (!future.isDone()) {
return false;
}
try {
future.get();
return true;
- } catch(Exception e) {
+ } catch (Exception e) {
return false;
}
}
// if not for some reason, we'll try to build it here.
ListenableFuture<Void> future = resolveCohorts();
- if(successfulFuture(future)) {
+ if (successfulFuture(future)) {
finishVoidOperation(operationName, messageSupplier, expectedResponseClass, propagateException,
returnFuture, callback);
} else {
public void onFailure(Throwable failure) {
LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure);
- if(propagateException) {
+ if (propagateException) {
returnFuture.setException(failure);
} else {
returnFuture.set(null);
@Override
public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
Throwable exceptionToPropagate = failure;
- if(exceptionToPropagate == null) {
- for(Object response: responses) {
- if(!response.getClass().equals(expectedResponseClass)) {
+ if (exceptionToPropagate == null) {
+ for (Object response: responses) {
+ if (!response.getClass().equals(expectedResponseClass)) {
exceptionToPropagate = new IllegalArgumentException(
String.format("Unexpected response type %s", response.getClass()));
break;
}
}
- if(exceptionToPropagate != null) {
+ if (exceptionToPropagate != null) {
LOG.debug("Tx {}: a {} cohort Future failed", transactionId, operationName, exceptionToPropagate);
- if(propagateException) {
+ if (propagateException) {
// We don't log the exception here to avoid redundant logging since we're
// propagating to the caller in MD-SAL core who will log it.
returnFuture.setException(exceptionToPropagate);
@Override
List<Future<ActorSelection>> getCohortFutures() {
List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
- for(CohortInfo info: cohorts) {
+ for (CohortInfo info: cohorts) {
cohortFutures.add(info.getActorFuture());
}
private interface MessageSupplier {
Object newMessage(TransactionIdentifier transactionId, short version);
+
boolean isSerializedReplyType(Object reply);
}
}
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import java.util.function.Function;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
* at a time. For remote transactions, it also tracks the outstanding readiness requests
* towards the shard and unblocks operations only after all have completed.
*/
-final class TransactionChainProxy extends AbstractTransactionContextFactory<LocalTransactionChain> implements DOMStoreTransactionChain {
- private static abstract class State {
+final class TransactionChainProxy extends AbstractTransactionContextFactory<LocalTransactionChain>
+ implements DOMStoreTransactionChain {
+ private abstract static class State {
/**
* Check if it is okay to allocate a new transaction.
* @throws IllegalStateException if a transaction may not be allocated.
abstract Future<?> previousFuture();
}
- private static abstract class Pending extends State {
+ private abstract static class Pending extends State {
private final TransactionIdentifier transaction;
private final Future<?> previousFuture;
}
}
- private static abstract class DefaultState extends State {
+ private abstract static class DefaultState extends State {
@Override
final Future<?> previousFuture() {
return null;
* This map holds Promise instances for each read-only tx. It is used to maintain ordering of tx creates
* wrt to read-only tx's between this class and a LocalTransactionChain since they're bridged by
* asynchronous futures. Otherwise, in the following scenario, eg:
- *
+ * <p/>
* 1) Create write tx1 on chain
* 2) do write and submit
* 3) Create read-only tx2 on chain and issue read
* 4) Create write tx3 on chain, do write but do not submit
- *
+ * <p/>
* if the sequence/timing is right, tx3 may create its local tx on the LocalTransactionChain before tx2,
* which results in tx2 failing b/c tx3 isn't ready yet. So maintaining ordering prevents this issue
* (see Bug 4774).
- * <p>
+ * <p/>
* A Promise is added via newReadOnlyTransaction. When the parent class completes the primary shard
* lookup and creates the TransactionContext (either success or failure), onTransactionContextCreated is
* called which completes the Promise. A write tx that is created prior to completion will wait on the
* Promise's Future via findPrimaryShard.
*/
- private final ConcurrentMap<TransactionIdentifier, Promise<Object>> priorReadOnlyTxPromises = new ConcurrentHashMap<>();
+ private final ConcurrentMap<TransactionIdentifier, Promise<Object>> priorReadOnlyTxPromises =
+ new ConcurrentHashMap<>();
TransactionChainProxy(final TransactionContextFactory parent, final LocalHistoryIdentifier historyId) {
super(parent.getActorContext(), historyId);
// Send a close transaction chain request to each and every shard
- getActorContext().broadcast(new Function<Short, Object>() {
- @Override
- public Object apply(Short version) {
- return new CloseTransactionChain(getHistoryId(), version).toSerializable();
- }
- }, CloseTransactionChain.class);
+ getActorContext().broadcast(version -> new CloseTransactionChain(getHistoryId(), version).toSerializable(),
+ CloseTransactionChain.class);
}
private TransactionProxy allocateWriteTransaction(final TransactionType type) {
}
@Override
- protected LocalTransactionChain factoryForShard(final String shardName, final ActorSelection shardLeader, final DataTree dataTree) {
+ protected LocalTransactionChain factoryForShard(final String shardName, final ActorSelection shardLeader,
+ final DataTree dataTree) {
final LocalTransactionChain ret = new LocalTransactionChain(this, shardLeader, dataTree);
LOG.debug("Allocated transaction chain {} for shard {} leader {}", ret, shardName, shardLeader);
return ret;
final String previousTransactionId;
- if(localState instanceof Pending){
+ if (localState instanceof Pending) {
previousTransactionId = ((Pending) localState).getIdentifier().toString();
LOG.debug("Tx: {} - waiting for ready futures with pending Tx {}", txId, previousTransactionId);
} else {
private <T> Future<T> combineFutureWithPossiblePriorReadOnlyTxFutures(final Future<T> future,
final TransactionIdentifier txId) {
- if(!priorReadOnlyTxPromises.containsKey(txId) && !priorReadOnlyTxPromises.isEmpty()) {
+ if (!priorReadOnlyTxPromises.containsKey(txId) && !priorReadOnlyTxPromises.isEmpty()) {
Collection<Entry<TransactionIdentifier, Promise<Object>>> priorReadOnlyTxPromiseEntries =
new ArrayList<>(priorReadOnlyTxPromises.entrySet());
- if(priorReadOnlyTxPromiseEntries.isEmpty()) {
+ if (priorReadOnlyTxPromiseEntries.isEmpty()) {
return future;
}
List<Future<Object>> priorReadOnlyTxFutures = new ArrayList<>(priorReadOnlyTxPromiseEntries.size());
- for(Entry<TransactionIdentifier, Promise<Object>> entry: priorReadOnlyTxPromiseEntries) {
+ for (Entry<TransactionIdentifier, Promise<Object>> entry: priorReadOnlyTxPromiseEntries) {
LOG.debug("Tx: {} - waiting on future for prior read-only Tx {}", txId, entry.getKey());
priorReadOnlyTxFutures.add(entry.getValue().future());
}
}
@Override
- protected <T> void onTransactionReady(final TransactionIdentifier transaction, final Collection<Future<T>> cohortFutures) {
+ protected <T> void onTransactionReady(final TransactionIdentifier transaction,
+ final Collection<Future<T>> cohortFutures) {
final State localState = currentState;
- Preconditions.checkState(localState instanceof Allocated, "Readying transaction %s while state is %s", transaction, localState);
+ Preconditions.checkState(localState instanceof Allocated, "Readying transaction %s while state is %s",
+ transaction, localState);
final TransactionIdentifier currentTx = ((Allocated)localState).getIdentifier();
- Preconditions.checkState(transaction.equals(currentTx), "Readying transaction %s while %s is allocated", transaction, currentTx);
+ Preconditions.checkState(transaction.equals(currentTx), "Readying transaction %s while %s is allocated",
+ transaction, currentTx);
// Transaction ready and we are not waiting for futures -- go to idle
if (cohortFutures.isEmpty()) {
@Override
protected void onTransactionContextCreated(@Nonnull TransactionIdentifier transactionId) {
Promise<Object> promise = priorReadOnlyTxPromises.remove(transactionId);
- if(promise != null) {
+ if (promise != null) {
promise.success(null);
}
}
* Invoked by {@link TransactionContextWrapper} when it has finished handing
* off operations to this context. From this point on, the context is responsible
* for throttling operations.
- *
+ * <p/>
* Implementations can rely on the wrapper calling this operation in a synchronized
* block, so they do not need to ensure visibility of this state transition themselves.
*/
void operationHandOffComplete();
/**
- * A TransactionContext that uses Operation limiting should return true else false
- * @return
+ * A TransactionContext that uses operation limiting should return true else false.
+ *
+ * @return true if operation limiting is used, false otherwise
*/
boolean usesOperationLimiting();
}
@Override
- protected LocalTransactionFactoryImpl factoryForShard(final String shardName, final ActorSelection shardLeader, final DataTree dataTree) {
+ protected LocalTransactionFactoryImpl factoryForShard(final String shardName, final ActorSelection shardLeader,
+ final DataTree dataTree) {
return new LocalTransactionFactoryImpl(getActorContext(), shardLeader, dataTree);
}
}
@Override
- protected <T> void onTransactionReady(final TransactionIdentifier transaction, final Collection<Future<T>> cohortFutures) {
+ protected <T> void onTransactionReady(final TransactionIdentifier transaction,
+ final Collection<Future<T>> cohortFutures) {
// Transactions are disconnected, this is a no-op
}
TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext) {
this.identifier = Preconditions.checkNotNull(identifier);
this.limiter = new OperationLimiter(identifier,
- actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1, // 1 extra permit for the ready operation
+ // 1 extra permit for the ready operation
+ actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1,
TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis()));
}
}
void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
- while(true) {
+ while (true) {
// Access to queuedTxOperations and transactionContext must be protected and atomic
// (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
// issues and ensure no TransactionOperation is missed and that they are processed
// We're done invoking the TransactionOperations so we can now publish the
// TransactionContext.
localTransactionContext.operationHandOffComplete();
- if(!localTransactionContext.usesOperationLimiting()){
+ if (!localTransactionContext.usesOperationLimiting()){
limiter.releaseAll();
}
transactionContext = localTransactionContext;
final Promise<ActorSelection> promise = Futures.promise();
enqueueTransactionOperation(new TransactionOperation() {
@Override
- public void invoke(TransactionContext transactionContext) {
- promise.completeWith(transactionContext.readyTransaction());
+ public void invoke(TransactionContext newTransactionContext) {
+ promise.completeWith(newTransactionContext.readyTransaction());
}
});
/**
* Execute the delayed operation.
*
- * @param transactionContext
+ * @param transactionContext the TransactionContext
*/
protected abstract void invoke(TransactionContext transactionContext);
}
/**
* A transaction potentially spanning multiple backend shards.
*/
-public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
+public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier>
+ implements DOMStoreReadWriteTransaction {
private enum TransactionState {
OPEN,
READY,
CLOSED,
}
+
private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
private final Map<String, TransactionContextWrapper> txContextWrappers = new HashMap<>();
}
private <T> CheckedFuture<T, ReadFailedException> executeRead(String shardName, final AbstractRead<T> readCmd) {
- Preconditions.checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed");
+ Preconditions.checkState(type != TransactionType.WRITE_ONLY,
+ "Reads from write-only transactions are not allowed");
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} {} {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath());
- }
+ LOG.debug("Tx {} {} {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath());
final SettableFuture<T> proxyFuture = SettableFuture.create();
TransactionContextWrapper contextWrapper = getContextWrapper(shardName);
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
- Preconditions.checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed");
+ Preconditions.checkState(type != TransactionType.WRITE_ONLY,
+ "Reads from write-only transactions are not allowed");
LOG.debug("Tx {} read {}", getIdentifier(), path);
private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readAllData() {
final Set<String> allShardNames = txContextFactory.getActorContext().getConfiguration().getAllShardNames();
- final Collection<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>> futures = new ArrayList<>(allShardNames.size());
+ final Collection<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>> futures =
+ new ArrayList<>(allShardNames.size());
for (String shardName : allShardNames) {
futures.add(singleShardRead(shardName, YangInstanceIdentifier.EMPTY));
final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> listFuture = Futures.allAsList(futures);
final ListenableFuture<Optional<NormalizedNode<?, ?>>> aggregateFuture;
- aggregateFuture = Futures.transform(listFuture, new Function<List<Optional<NormalizedNode<?, ?>>>, Optional<NormalizedNode<?, ?>>>() {
- @Override
- public Optional<NormalizedNode<?, ?>> apply(final List<Optional<NormalizedNode<?, ?>>> input) {
+ aggregateFuture = Futures.transform(listFuture,
+ (Function<List<Optional<NormalizedNode<?, ?>>>, Optional<NormalizedNode<?, ?>>>) input -> {
try {
return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.EMPTY, input,
txContextFactory.getActorContext().getSchemaContext(),
} catch (DataValidationFailedException e) {
throw new IllegalArgumentException("Failed to aggregate", e);
}
- }
- });
+ });
return MappingCheckedFuture.create(aggregateFuture, ReadFailedException.MAPPER);
}
private void executeModification(final AbstractModification modification) {
checkModificationState();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} executeModification {} {}", getIdentifier(), modification.getClass().getSimpleName(),
- modification.getPath());
- }
+ LOG.debug("Tx {} executeModification {} {}", getIdentifier(), modification.getClass().getSimpleName(),
+ modification.getPath());
TransactionContextWrapper contextWrapper = getContextWrapper(modification.getPath());
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
final AbstractThreePhaseCommitCohort<?> ret;
switch (txContextWrappers.size()) {
- case 0:
- ret = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
- break;
- case 1:
- final Entry<String, TransactionContextWrapper> e = Iterables.getOnlyElement(txContextWrappers.entrySet());
- ret = createSingleCommitCohort(e.getKey(), e.getValue());
- break;
- default:
- ret = createMultiCommitCohort(txContextWrappers.entrySet());
+ case 0:
+ ret = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
+ break;
+ case 1:
+ final Entry<String, TransactionContextWrapper> e = Iterables.getOnlyElement(
+ txContextWrappers.entrySet());
+ ret = createSingleCommitCohort(e.getKey(), e.getValue());
+ break;
+ default:
+ ret = createMultiCommitCohort(txContextWrappers.entrySet());
}
txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures());
final Promise promise = akka.dispatch.Futures.promise();
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
- public void invoke(TransactionContext transactionContext) {
- promise.completeWith(getDirectCommitFuture(transactionContext, operationCallbackRef));
+ public void invoke(TransactionContext newTransactionContext) {
+ promise.completeWith(getDirectCommitFuture(newTransactionContext, operationCallbackRef));
}
});
future = promise.future();
// The remote tx version is obtained the via TransactionContext which may not be available yet so
// we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the
// TransactionContext is available.
- Supplier<Short> txVersionSupplier = new Supplier<Short>() {
- @Override
- public Short get() {
- return wrapper.getTransactionContext().getTransactionVersion();
- }
- };
+ Supplier<Short> txVersionSupplier = () -> wrapper.getTransactionContext().getTransactionVersion();
cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(), txVersionSupplier));
}
/**
* TransactionRateLimitingCallback computes the new transaction rate limit on the successful completion of a
- * transaction
+ * transaction.
*/
-public class TransactionRateLimitingCallback implements OperationCallback{
+public class TransactionRateLimitingCallback implements OperationCallback {
private static Ticker TICKER = Ticker.systemTicker();
private enum State {
private long elapsedTime;
private volatile State state = State.STOPPED;
- TransactionRateLimitingCallback(ActorContext actorContext){
+ TransactionRateLimitingCallback(ActorContext actorContext) {
commitTimer = actorContext.getOperationTimer(ActorContext.COMMIT);
}
@Override
public void pause() {
- if(state == State.RUNNING) {
+ if (state == State.RUNNING) {
elapsedTime += TICKER.read() - startTime;
state = State.PAUSED;
}
@Override
public void resume() {
- if(state != State.RUNNING) {
+ if (state != State.RUNNING) {
startTime = TICKER.read();
state = State.RUNNING;
}
static void setTicker(Ticker ticker) {
TICKER = ticker;
}
-}
\ No newline at end of file
+}
/**
* A {@link Mapper} extracting the {@link ActorSelection} pointing to the actor which
* is backing a particular transaction.
- *
+ * <p/>
* This class is not for general consumption. It is public only to support the pre-lithium compatibility
* package.
- *
* TODO: once we remove compatibility, make this class package-private and final.
*/
public class TransactionReadyReplyMapper extends Mapper<Object, ActorSelection> {
boolean isShardConfigured(String shardName);
/**
- * Adds the given member as the new replica for the given shardName
+ * Adds the given member as the new replica for the given shardName.
*/
- void addMemberReplicaForShard (String shardName, MemberName memberName);
+ void addMemberReplicaForShard(String shardName, MemberName memberName);
/**
- * Removes the given member as a replica for the given shardName
+ * Removes the given member as a replica for the given shardName.
*/
- void removeMemberReplicaForShard (String shardName, MemberName memberName);
+ void removeMemberReplicaForShard(String shardName, MemberName memberName);
}
public ConfigurationImpl(final ModuleShardConfigProvider provider) {
ImmutableMap.Builder<String, ModuleConfig> mapBuilder = ImmutableMap.builder();
- for(Map.Entry<String, ModuleConfig.Builder> e: provider.retrieveModuleConfigs(this).entrySet()) {
+ for (Map.Entry<String, ModuleConfig.Builder> e: provider.retrieveModuleConfigs(this).entrySet()) {
mapBuilder.put(e.getKey(), e.getValue().build());
}
private static Set<String> createAllShardNames(Iterable<ModuleConfig> moduleConfigs) {
final ImmutableSet.Builder<String> builder = ImmutableSet.builder();
- for(ModuleConfig moduleConfig : moduleConfigs) {
+ for (ModuleConfig moduleConfig : moduleConfigs) {
builder.addAll(moduleConfig.getShardNames());
}
private static Map<String, String> createNamespaceToModuleName(Iterable<ModuleConfig> moduleConfigs) {
final ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
- for(ModuleConfig moduleConfig : moduleConfigs) {
- if(moduleConfig.getNameSpace() != null) {
+ for (ModuleConfig moduleConfig : moduleConfigs) {
+ if (moduleConfig.getNameSpace() != null) {
builder.put(moduleConfig.getNameSpace(), moduleConfig.getName());
}
}
}
@Override
- public Collection<String> getMemberShardNames(final MemberName memberName){
+ public Collection<String> getMemberShardNames(final MemberName memberName) {
Preconditions.checkNotNull(memberName, "memberName should not be null");
List<String> shards = new ArrayList<>();
for (ModuleConfig moduleConfig: moduleConfigMap.values()) {
for (ShardConfig shardConfig: moduleConfig.getShardConfigs()) {
- if(shardConfig.getReplicas().contains(memberName)) {
+ if (shardConfig.getReplicas().contains(memberName)) {
shards.add(shardConfig.getName());
}
}
Preconditions.checkNotNull(moduleName, "moduleName should not be null");
ModuleConfig moduleConfig = moduleConfigMap.get(moduleName);
- return moduleConfig != null ? moduleConfig.getShardStrategy(): null;
+ return moduleConfig != null ? moduleConfig.getShardStrategy() : null;
}
@Override
ModuleConfig moduleConfig = moduleConfigMap.get(moduleName);
Collection<ShardConfig> shardConfigs = moduleConfig != null ? moduleConfig.getShardConfigs() :
Collections.<ShardConfig>emptySet();
- return !shardConfigs.isEmpty() ? shardConfigs.iterator().next().getName(): null;
+ return !shardConfigs.isEmpty() ? shardConfigs.iterator().next().getName() : null;
}
@Override
public Collection<MemberName> getMembersFromShardName(final String shardName) {
Preconditions.checkNotNull(shardName, "shardName should not be null");
- for(ModuleConfig moduleConfig: moduleConfigMap.values()) {
+ for (ModuleConfig moduleConfig: moduleConfigMap.values()) {
ShardConfig shardConfig = moduleConfig.getShardConfig(shardName);
- if(shardConfig != null) {
+ if (shardConfig != null) {
return shardConfig.getReplicas();
}
}
@Override
public Collection<MemberName> getUniqueMemberNamesForAllShards() {
Set<MemberName> allNames = new HashSet<>();
- for(String shardName: getAllShardNames()) {
+ for (String shardName: getAllShardNames()) {
allNames.addAll(getMembersFromShardName(shardName));
}
public synchronized void addModuleShardConfiguration(ModuleShardConfiguration config) {
Preconditions.checkNotNull(config, "ModuleShardConfiguration should not be null");
- ModuleConfig moduleConfig = ModuleConfig.builder(config.getModuleName()).
- nameSpace(config.getNamespace().toASCIIString()).
- shardStrategy(createShardStrategy(config.getModuleName(), config.getShardStrategyName())).
- shardConfig(config.getShardName(), config.getShardMemberNames()).build();
+ ModuleConfig moduleConfig = ModuleConfig.builder(config.getModuleName())
+ .nameSpace(config.getNamespace().toASCIIString())
+ .shardStrategy(createShardStrategy(config.getModuleName(), config.getShardStrategyName()))
+ .shardConfig(config.getShardName(), config.getShardMemberNames()).build();
updateModuleConfigMap(moduleConfig);
- namespaceToModuleName = ImmutableMap.<String, String>builder().putAll(namespaceToModuleName).
- put(moduleConfig.getNameSpace(), moduleConfig.getName()).build();
+ namespaceToModuleName = ImmutableMap.<String, String>builder().putAll(namespaceToModuleName)
+ .put(moduleConfig.getNameSpace(), moduleConfig.getName()).build();
allShardNames = ImmutableSet.<String>builder().addAll(allShardNames).add(config.getShardName()).build();
}
}
@Override
- public void addMemberReplicaForShard (String shardName, MemberName newMemberName) {
+ public void addMemberReplicaForShard(String shardName, MemberName newMemberName) {
Preconditions.checkNotNull(shardName, "shardName should not be null");
Preconditions.checkNotNull(newMemberName, "MemberName should not be null");
- for(ModuleConfig moduleConfig: moduleConfigMap.values()) {
+ for (ModuleConfig moduleConfig: moduleConfigMap.values()) {
ShardConfig shardConfig = moduleConfig.getShardConfig(shardName);
- if(shardConfig != null) {
+ if (shardConfig != null) {
Set<MemberName> replicas = new HashSet<>(shardConfig.getReplicas());
replicas.add(newMemberName);
updateModuleConfigMap(ModuleConfig.builder(moduleConfig).shardConfig(shardName, replicas).build());
}
@Override
- public void removeMemberReplicaForShard (String shardName, MemberName newMemberName) {
+ public void removeMemberReplicaForShard(String shardName, MemberName newMemberName) {
Preconditions.checkNotNull(shardName, "shardName should not be null");
Preconditions.checkNotNull(newMemberName, "MemberName should not be null");
- for(ModuleConfig moduleConfig: moduleConfigMap.values()) {
+ for (ModuleConfig moduleConfig: moduleConfigMap.values()) {
ShardConfig shardConfig = moduleConfig.getShardConfig(shardName);
- if(shardConfig != null) {
+ if (shardConfig != null) {
Set<MemberName> replicas = new HashSet<>(shardConfig.getReplicas());
replicas.remove(newMemberName);
updateModuleConfigMap(ModuleConfig.builder(moduleConfig).shardConfig(shardName, replicas).build());
File modulesFile = new File("./configuration/initial/" + modulesConfigPath);
Config moduleShardsConfig = null;
- if(moduleShardsFile.exists()) {
+ if (moduleShardsFile.exists()) {
LOG.info("module shards config file exists - reading config from it");
moduleShardsConfig = ConfigFactory.parseFile(moduleShardsFile);
} else {
}
Config modulesConfig = null;
- if(modulesFile.exists()) {
+ if (modulesFile.exists()) {
LOG.info("modules config file exists - reading config from it");
modulesConfig = ConfigFactory.parseFile(modulesFile);
} else {
Configuration configuration) {
List<? extends ConfigObject> modulesConfigObjectList = modulesConfig.getObjectList("modules");
- for(ConfigObject o : modulesConfigObjectList){
- ConfigObjectWrapper w = new ConfigObjectWrapper(o);
+ for (ConfigObject o : modulesConfigObjectList) {
+ ConfigObjectWrapper wrapper = new ConfigObjectWrapper(o);
- String moduleName = w.stringValue("name");
+ String moduleName = wrapper.stringValue("name");
ModuleConfig.Builder builder = moduleConfigMap.get(moduleName);
- if(builder == null) {
+ if (builder == null) {
builder = ModuleConfig.builder(moduleName);
moduleConfigMap.put(moduleName, builder);
}
- builder.nameSpace(w.stringValue("namespace"));
+ builder.nameSpace(wrapper.stringValue("namespace"));
builder.shardStrategy(ShardStrategyFactory.newShardStrategyInstance(moduleName,
- w.stringValue("shard-strategy"), configuration));
+ wrapper.stringValue("shard-strategy"), configuration));
}
}
moduleShardsConfig.getObjectList("module-shards");
Map<String, ModuleConfig.Builder> moduleConfigMap = new HashMap<>();
- for(ConfigObject moduleShardConfigObject : moduleShardsConfigObjectList){
+ for (ConfigObject moduleShardConfigObject : moduleShardsConfigObjectList) {
String moduleName = moduleShardConfigObject.get("name").unwrapped().toString();
ModuleConfig.Builder builder = ModuleConfig.builder(moduleName);
List<? extends ConfigObject> shardsConfigObjectList =
moduleShardConfigObject.toConfig().getObjectList("shards");
- for(ConfigObject shard : shardsConfigObjectList){
+ for (ConfigObject shard : shardsConfigObjectList) {
String shardName = shard.get("name").unwrapped().toString();
List<MemberName> replicas = shard.toConfig().getStringList("replicas").stream()
.map(MemberName::forName).collect(Collectors.toList());
return moduleConfigMap;
}
- private static class ConfigObjectWrapper{
+ private static class ConfigObjectWrapper {
private final ConfigObject configObject;
- ConfigObjectWrapper(final ConfigObject configObject){
+ ConfigObjectWrapper(final ConfigObject configObject) {
this.configObject = configObject;
}
- public String stringValue(final String name){
+ public String stringValue(final String name) {
return configObject.get(name).unwrapped().toString();
}
}
}
@Nullable
- public ShardConfig getShardConfig(String name) {
- return shardConfigs.get(name);
+ public ShardConfig getShardConfig(String forName) {
+ return shardConfigs.get(forName);
}
@Nonnull
}
}
- public Builder name(String name) {
- this.name = name;
+ public Builder name(String newName) {
+ this.name = newName;
return this;
}
- public Builder nameSpace(String nameSpace) {
- this.nameSpace = nameSpace;
+ public Builder nameSpace(String newNameSpace) {
+ this.nameSpace = newNameSpace;
return this;
}
- public Builder shardStrategy(ShardStrategy shardStrategy) {
- this.shardStrategy = shardStrategy;
+ public Builder shardStrategy(ShardStrategy newShardStrategy) {
+ this.shardStrategy = newShardStrategy;
return this;
}
- public Builder shardConfig(String name, Collection<MemberName> replicas) {
- shardConfigs.put(name, new ShardConfig(name, replicas));
+ public Builder shardConfig(String shardName, Collection<MemberName> replicas) {
+ shardConfigs.put(shardName, new ShardConfig(shardName, replicas));
return this;
}
public class LocalShardNotFoundException extends RuntimeException {
private static final long serialVersionUID = 1L;
- public LocalShardNotFoundException(String message){
+ public LocalShardNotFoundException(String message) {
super(message);
}
}
public class NotInitializedException extends RuntimeException {
private static final long serialVersionUID = 1L;
+
public NotInitializedException(String message) {
super(message);
}
public class PrimaryNotFoundException extends RuntimeException {
private static final long serialVersionUID = 1L;
- public PrimaryNotFoundException(String message){
+
+ public PrimaryNotFoundException(String message) {
super(message);
}
}
public class TimeoutException extends RuntimeException {
private static final long serialVersionUID = 1L;
- public TimeoutException(String message, Exception e){
- super(message, e);
+
+ public TimeoutException(String message, Exception cause) {
+ super(message, cause);
}
}
}
@Override
- public boolean equals(Object o) {
- if (this == o) {
+ public boolean equals(Object obj) {
+ if (this == obj) {
return true;
}
- if (o == null || getClass() != o.getClass()) {
+ if (obj == null || getClass() != obj.getClass()) {
return false;
}
- ShardIdentifier that = (ShardIdentifier) o;
+ ShardIdentifier that = (ShardIdentifier) obj;
if (!memberName.equals(that.memberName)) {
return false;
private MemberName memberName;
private String type;
- public ShardIdentifier build(){
+ public ShardIdentifier build() {
return new ShardIdentifier(shardName, memberName, type);
}
- public Builder shardName(String shardName){
- this.shardName = shardName;
+ public Builder shardName(String newShardName) {
+ this.shardName = newShardName;
return this;
}
- public Builder memberName(MemberName memberName){
- this.memberName = memberName;
+ public Builder memberName(MemberName newMemberName) {
+ this.memberName = newMemberName;
return this;
}
- public Builder type(String type){
- this.type = type;
+ public Builder type(String newType) {
+ this.type = newType;
return this;
}
- public Builder fromShardIdString(String shardId){
+ public Builder fromShardIdString(String shardId) {
Matcher matcher = PATTERN.matcher(shardId);
if (matcher.matches()) {
}
@Override
- public boolean equals(Object o) {
- if (this == o) {
+ public boolean equals(Object obj) {
+ if (this == obj) {
return true;
}
- if (o == null || getClass() != o.getClass()) {
+ if (obj == null || getClass() != obj.getClass()) {
return false;
}
- ShardManagerIdentifier that = (ShardManagerIdentifier) o;
+ ShardManagerIdentifier that = (ShardManagerIdentifier) obj;
if (!type.equals(that.type)) {
return false;
return builder.toString();
}
- public static Builder builder(){
+ public static Builder builder() {
return new Builder();
}
public static class Builder {
private String type;
- public Builder type(String type){
- this.type = type;
+ public Builder type(String newType) {
+ this.type = newType;
return this;
}
- public ShardManagerIdentifier build(){
+ public ShardManagerIdentifier build() {
return new ShardManagerIdentifier(this.type);
}
/**
- * @author Basheeruddin syedbahm@cisco.com
+ * Factory for creating ShardStats mbeans.
*
+ * @author Basheeruddin syedbahm@cisco.com
*/
public class ShardMBeanFactory {
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- private static final Cache<String, OnDemandRaftState> onDemandRaftStateCache =
+ private static final Cache<String, OnDemandRaftState> ONDEMAND_RAFT_STATE_CACHE =
CacheBuilder.newBuilder().expireAfterWrite(2, TimeUnit.SECONDS).build();
private long committedTransactionsCount;
this.shard = shard;
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private OnDemandRaftState getOnDemandRaftState() {
String name = getShardName();
- OnDemandRaftState state = onDemandRaftStateCache.getIfPresent(name);
- if(state == null) {
+ OnDemandRaftState state = ONDEMAND_RAFT_STATE_CACHE.getIfPresent(name);
+ if (state == null) {
statRetrievalError = null;
statRetrievalTime = null;
- if(shard != null) {
+ if (shard != null) {
Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
try {
Stopwatch timer = Stopwatch.createStarted();
GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
statRetrievalTime = timer.stop().toString();
- onDemandRaftStateCache.put(name, state);
+ ONDEMAND_RAFT_STATE_CACHE.put(name, state);
} catch (Exception e) {
statRetrievalError = e.toString();
}
public String getVotedFor() {
return getOnDemandRaftState().getVotedFor();
}
+
@Override
public boolean isVoting() {
return getOnDemandRaftState().isVoting();
return failedReadTransactionsCount.incrementAndGet();
}
- public long incrementAbortTransactionsCount ()
- {
+ public long incrementAbortTransactionsCount() {
return ++abortTransactionsCount;
}
}
@Override
- public long getInMemoryJournalDataSize(){
+ public long getInMemoryJournalDataSize() {
return getOnDemandRaftState().getInMemoryJournalDataSize();
}
}
/**
- * resets the counters related to transactions
+ * Resets the counters related to transactions.
*/
@Override
- public void resetTransactionCounters(){
+ public void resetTransactionCounters() {
committedTransactionsCount = 0;
readOnlyTransactionCount = 0;
@Override
public void captureSnapshot() {
- if(shard != null) {
+ if (shard != null) {
shard.getSelf().tell(new InitiateCaptureSnapshot(), ActorRef.noSender());
}
}
import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
/**
+ * MXBean interface for shard stats.
+ *
* @author: syedbahm
*/
public interface ShardStatsMXBean {
- String getShardName();
+ String getShardName();
- String getStatRetrievalTime();
+ String getStatRetrievalTime();
- String getStatRetrievalError();
+ String getStatRetrievalError();
- long getCommittedTransactionsCount();
+ long getCommittedTransactionsCount();
- long getReadOnlyTransactionCount();
+ long getReadOnlyTransactionCount();
- long getWriteOnlyTransactionCount();
+ long getWriteOnlyTransactionCount();
- long getReadWriteTransactionCount();
+ long getReadWriteTransactionCount();
- long getLastLogIndex();
+ long getLastLogIndex();
- long getLastLogTerm();
+ long getLastLogTerm();
- long getCurrentTerm();
+ long getCurrentTerm();
- long getCommitIndex();
+ long getCommitIndex();
- long getLastApplied();
+ long getLastApplied();
- long getLastIndex();
+ long getLastIndex();
- long getLastTerm();
+ long getLastTerm();
- long getSnapshotIndex();
+ long getSnapshotIndex();
- long getSnapshotTerm();
+ long getSnapshotTerm();
- long getReplicatedToAllIndex();
+ long getReplicatedToAllIndex();
- String getLastCommittedTransactionTime();
+ String getLastCommittedTransactionTime();
- long getFailedTransactionsCount();
+ long getFailedTransactionsCount();
- long getAbortTransactionsCount();
+ long getAbortTransactionsCount();
- long getFailedReadTransactionsCount();
+ long getFailedReadTransactionsCount();
- String getLeader();
+ String getLeader();
- String getRaftState();
+ String getRaftState();
- String getVotedFor();
+ String getVotedFor();
- boolean isSnapshotCaptureInitiated();
+ boolean isSnapshotCaptureInitiated();
- boolean isVoting();
+ boolean isVoting();
- void resetTransactionCounters();
+ void resetTransactionCounters();
- long getInMemoryJournalDataSize();
+ long getInMemoryJournalDataSize();
- long getInMemoryJournalLogSize();
+ long getInMemoryJournalLogSize();
- boolean getFollowerInitialSyncStatus();
+ boolean getFollowerInitialSyncStatus();
- List<FollowerInfo> getFollowerInfo();
+ List<FollowerInfo> getFollowerInfo();
- String getPeerAddresses();
+ String getPeerAddresses();
- String getPeerVotingStates();
+ String getPeerVotingStates();
- long getLeadershipChangeCount();
+ long getLeadershipChangeCount();
- String getLastLeadershipChangeTime();
+ String getLastLeadershipChangeTime();
- int getPendingTxCommitQueueSize();
+ int getPendingTxCommitQueueSize();
- int getTxCohortCacheSize();
+ int getTxCohortCacheSize();
- void captureSnapshot();
+ void captureSnapshot();
}
public static AbortTransaction fromSerializable(Object serializable) {
Preconditions.checkArgument(serializable instanceof AbortTransaction);
- return (AbortTransaction)serializable;
+ return (AbortTransaction)serializable;
}
public static boolean isSerializedType(Object message) {
package org.opendaylight.controller.cluster.datastore.messages;
-import javax.annotation.Nonnull;
import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
/**
* A message sent to the ShardManager to dynamically add a new local shard
* @param shardName name of the shard that is to be locally replicated.
*/
- public AddShardReplica (@Nonnull String shardName){
+ public AddShardReplica(@Nonnull String shardName) {
this.shardName = Preconditions.checkNotNull(shardName, "ShardName should not be null");
}
- public String getShardName(){
+ public String getShardName() {
return this.shardName;
}
@Override
- public String toString(){
+ public String toString() {
return "AddShardReplica[ShardName=" + shardName + "]";
}
}
public class CloseDataChangeListenerRegistrationReply {
- public static final CloseDataChangeListenerRegistrationReply INSTANCE = new CloseDataChangeListenerRegistrationReply();
+ public static final CloseDataChangeListenerRegistrationReply INSTANCE =
+ new CloseDataChangeListenerRegistrationReply();
private CloseDataChangeListenerRegistrationReply() {
}
public final class CloseDataTreeChangeListenerRegistration implements Serializable {
private static final long serialVersionUID = 1L;
- private static final CloseDataTreeChangeListenerRegistration INSTANCE = new CloseDataTreeChangeListenerRegistration();
+ private static final CloseDataTreeChangeListenerRegistration INSTANCE =
+ new CloseDataTreeChangeListenerRegistration();
private CloseDataTreeChangeListenerRegistration() {
}
public final class CloseDataTreeChangeListenerRegistrationReply implements Serializable {
private static final long serialVersionUID = 1L;
- private static final CloseDataTreeChangeListenerRegistrationReply INSTANCE = new CloseDataTreeChangeListenerRegistrationReply();
+ private static final CloseDataTreeChangeListenerRegistrationReply INSTANCE =
+ new CloseDataTreeChangeListenerRegistrationReply();
private CloseDataTreeChangeListenerRegistrationReply() {
// Use getInstance() instead
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.yangtools.concepts.Identifiable;
-public class CloseTransactionChain extends VersionedExternalizableMessage implements Identifiable<LocalHistoryIdentifier> {
+public class CloseTransactionChain extends VersionedExternalizableMessage
+ implements Identifiable<LocalHistoryIdentifier> {
private static final long serialVersionUID = 1L;
private LocalHistoryIdentifier transactionChainId;
transactionChainId.writeTo(out);
}
- public static CloseTransactionChain fromSerializable(final Object serializable){
+ public static CloseTransactionChain fromSerializable(final Object serializable) {
Preconditions.checkArgument(serializable instanceof CloseTransactionChain);
return (CloseTransactionChain)serializable;
}
// Read created data
int size = in.readInt();
- for(int i = 0; i < size; i++) {
+ for (int i = 0; i < size; i++) {
YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
builder.addCreated(path, node);
// Read updated data
size = in.readInt();
- for(int i = 0; i < size; i++) {
+ for (int i = 0; i < size; i++) {
YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
NormalizedNode<?, ?> before = streamReader.readNormalizedNode();
NormalizedNode<?, ?> after = streamReader.readNormalizedNode();
// Read removed data
size = in.readInt();
- for(int i = 0; i < size; i++) {
+ for (int i = 0; i < size; i++) {
YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
builder.addRemoved(path, node);
// Read original subtree
boolean present = in.readBoolean();
- if(present) {
+ if (present) {
builder.setBefore(streamReader.readNormalizedNode());
}
// Read updated subtree
present = in.readBoolean();
- if(present) {
+ if (present) {
builder.setAfter(streamReader.readNormalizedNode());
}
Map<YangInstanceIdentifier, NormalizedNode<?, ?>> createdData = change.getCreatedData();
out.writeInt(createdData.size());
- for(Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: createdData.entrySet()) {
+ for (Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: createdData.entrySet()) {
streamWriter.writeYangInstanceIdentifier(e.getKey());
streamWriter.writeNormalizedNode(e.getValue());
}
Map<YangInstanceIdentifier, NormalizedNode<?, ?>> originalData = change.getOriginalData();
Map<YangInstanceIdentifier, NormalizedNode<?, ?>> updatedData = change.getUpdatedData();
out.writeInt(updatedData.size());
- for(Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: updatedData.entrySet()) {
+ for (Map.Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> e: updatedData.entrySet()) {
streamWriter.writeYangInstanceIdentifier(e.getKey());
streamWriter.writeNormalizedNode(originalData.get(e.getKey()));
streamWriter.writeNormalizedNode(e.getValue());
Set<YangInstanceIdentifier> removed = change.getRemovedPaths();
out.writeInt(removed.size());
- for(YangInstanceIdentifier path: removed) {
+ for (YangInstanceIdentifier path: removed) {
streamWriter.writeYangInstanceIdentifier(path);
streamWriter.writeNormalizedNode(originalData.get(path));
}
NormalizedNode<?, ?> originalSubtree = change.getOriginalSubtree();
out.writeBoolean(originalSubtree != null);
- if(originalSubtree != null) {
+ if (originalSubtree != null) {
streamWriter.writeNormalizedNode(originalSubtree);
}
NormalizedNode<?, ?> updatedSubtree = change.getUpdatedSubtree();
out.writeBoolean(updatedSubtree != null);
- if(updatedSubtree != null) {
+ if (updatedSubtree != null) {
streamWriter.writeNormalizedNode(updatedSubtree);
}
}
@Override
public void processResponse(Object response, SettableFuture<Boolean> returnFuture) {
- if(DataExistsReply.isSerializedType(response)) {
+ if (DataExistsReply.isSerializedType(response)) {
returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
} else {
- returnFuture.setException(new ReadFailedException("Invalid response checking exists for path " + getPath()));
+ returnFuture.setException(new ReadFailedException("Invalid response checking exists for path "
+ + getPath()));
}
}
return new DataExists(getPath(), withVersion);
}
- public static DataExists fromSerializable(final Object serializable){
+ public static DataExists fromSerializable(final Object serializable) {
Preconditions.checkArgument(serializable instanceof DataExists);
return (DataExists)serializable;
}
package org.opendaylight.controller.cluster.datastore.messages;
/**
- * FindLocalShard is a message that should be sent to the {@link org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager}
- * when we need to find a reference to a LocalShard
+ * FindLocalShard is a message that should be sent to the
+ * {@link org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager} when we need to find a reference
+ * to a LocalShard.
*/
public class FindLocalShard {
private final String shardName;
import java.io.Serializable;
/**
- * The FindPrimary message is used to locate the primary of any given shard
- *
+ * The FindPrimary message is used to locate the primary of any given shard.
*/
public class FindPrimary implements Serializable {
private static final long serialVersionUID = 1L;
import akka.actor.ActorRef;
/**
- * LocalShardFound is a message that is sent by the {@link org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager}
- * when it finds a shard with the specified name in it's local shard registry
+ * LocalShardFound is a message that is sent by the
+ * {@link org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager}
+ * when it finds a shard with the specified name in it's local shard registry.
*/
public class LocalShardFound {
private final ActorRef path;
package org.opendaylight.controller.cluster.datastore.messages;
/**
- * LocalShardNotFound is a message that is sent by the {@link org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager}
- * when it cannot locate a shard in it's local registry with the shardName specified
+ * LocalShardNotFound is a message that is sent by the
+ * {@link org.opendaylight.controller.cluster.datastore.shardmanager.ShardManager}
+ * when it cannot locate a shard in it's local registry with the shardName specified.
*/
public class LocalShardNotFound {
private final String shardName;
/**
+ * Constructs an instance.
*
* @param shardName the name of the shard that could not be found
*/
}
@Override
- public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> apply(DOMStoreReadTransaction readDelegate) {
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> apply(
+ DOMStoreReadTransaction readDelegate) {
return readDelegate.read(getPath());
}
@Override
public void processResponse(Object readResponse, SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
- if(ReadDataReply.isSerializedType(readResponse)) {
+ if (ReadDataReply.isSerializedType(readResponse)) {
ReadDataReply reply = ReadDataReply.fromSerializable(readResponse);
- returnFuture.set(Optional.<NormalizedNode<?, ?>> fromNullable(reply.getNormalizedNode()));
+ returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
} else {
returnFuture.setException(new ReadFailedException("Invalid response reading data for path " + getPath()));
}
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
/**
- * Request a {@link org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener} registration be made on the shard
- * leader.
+ * Request a {@link org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener} registration be made on the
+ * shard leader.
*/
public final class RegisterDataTreeChangeListener implements Externalizable, ListenerRegistrationMessage {
private static final long serialVersionUID = 1L;
*
* @param shardName name of the local shard that is to be dynamically removed.
*/
- public RemoveShardReplica (@Nonnull String shardName, @Nonnull MemberName memberName) {
+ public RemoveShardReplica(@Nonnull String shardName, @Nonnull MemberName memberName) {
this.shardName = Preconditions.checkNotNull(shardName, "shardName should not be null");
this.memberName = Preconditions.checkNotNull(memberName, "memberName should not be null");
}
- public String getShardName(){
+ public String getShardName() {
return shardName;
}
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public class UpdateSchemaContext {
- private final SchemaContext schemaContext;
+ private final SchemaContext schemaContext;
- public UpdateSchemaContext(SchemaContext schemaContext) {
- this.schemaContext = schemaContext;
- }
+ public UpdateSchemaContext(SchemaContext schemaContext) {
+ this.schemaContext = schemaContext;
+ }
- public SchemaContext getSchemaContext() {
- return schemaContext;
- }
+ public SchemaContext getSchemaContext() {
+ return schemaContext;
+ }
}
}
public VersionedExternalizableMessage(short version) {
- this.version = version <= DataStoreVersions.CURRENT_VERSION ? version: DataStoreVersions.CURRENT_VERSION;
+ this.version = version <= DataStoreVersions.CURRENT_VERSION ? version : DataStoreVersions.CURRENT_VERSION;
}
public short getVersion() {
@Override
public final Object toSerializable() {
if (getVersion() < DataStoreVersions.BORON_VERSION) {
- throw new UnsupportedOperationException("Versions prior to " + DataStoreVersions.BORON_VERSION + " are not supported");
+ throw new UnsupportedOperationException("Versions prior to " + DataStoreVersions.BORON_VERSION
+ + " are not supported");
}
return this;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
/**
- * Base class to be used for all simple modifications that can be applied to a DOMStoreTransaction
+ * Base class to be used for all simple modifications that can be applied to a DOMStoreTransaction.
*/
public abstract class AbstractModification implements Modification {
*/
public interface CompositeModification extends Modification {
/**
- * Get a list of Modifications contained by this Composite
- * @return
+ * Get a list of modifications contained by this composite.
+ *
+ * @return list of modifications
*/
List<Modification> getModifications();
}
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
/**
- * DeleteModification store all the parameters required to delete a path from the data tree
+ * DeleteModification store all the parameters required to delete a path from the data tree.
*/
public class DeleteModification extends AbstractModification {
private static final long serialVersionUID = 1L;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
/**
- * MergeModification stores all the parameters required to merge data into the specified path
+ * MergeModification stores all the parameters required to merge data into the specified path.
*/
public class MergeModification extends WriteModification {
private static final long serialVersionUID = 1L;
* </p>
*
* <p>
- * Modifications can in turn be lumped into a single {@link org.opendaylight.controller.cluster.datastore.modification.CompositeModification}
- * which can then be applied to a write transaction
+ * Modifications can in turn be lumped into a single
+ * {@link org.opendaylight.controller.cluster.datastore.modification.CompositeModification}
+ * which can then be applied to a write transaction.
* </p>
*/
public interface Modification extends Externalizable {
byte DELETE = 4;
/**
- * Apply the modification to the specified transaction
+ * Apply the modification to the specified transaction.
*
- * @param transaction
+ * @param transaction the transaction
*/
void apply(DOMStoreWriteTransaction transaction);
/**
- * Apply the modification to the specified transaction
+ * Apply the modification to the specified transaction.
*
- * @param transaction
+ * @param transaction the transaction
*/
void apply(DataTreeModification transaction);
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
/**
- * MutableCompositeModification is just a mutable version of a
- * CompositeModification {@link org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification#addModification(Modification)}
+ * MutableCompositeModification is just a mutable version of a CompositeModification.
*/
public class MutableCompositeModification extends VersionedExternalizableMessage implements CompositeModification {
private static final long serialVersionUID = 1L;
}
/**
- * Add a new Modification to the list of Modifications represented by this
- * composite
+ * Add a new Modification to the list of Modifications represented by this composite.
*
- * @param modification
+ * @param modification the modification to add.
*/
public void addModification(Modification modification) {
modifications.add(modification);
int size = in.readInt();
- if(size > 1) {
+ if (size > 1) {
SerializationUtils.REUSABLE_READER_TL.set(new NormalizedNodeInputStreamReader(in));
}
try {
- for(int i = 0; i < size; i++) {
+ for (int i = 0; i < size; i++) {
byte type = in.readByte();
- switch(type) {
- case Modification.WRITE:
- modifications.add(WriteModification.fromStream(in, getVersion()));
- break;
-
- case Modification.MERGE:
- modifications.add(MergeModification.fromStream(in, getVersion()));
- break;
-
- case Modification.DELETE:
- modifications.add(DeleteModification.fromStream(in, getVersion()));
- break;
+ switch (type) {
+ case Modification.WRITE:
+ modifications.add(WriteModification.fromStream(in, getVersion()));
+ break;
+
+ case Modification.MERGE:
+ modifications.add(MergeModification.fromStream(in, getVersion()));
+ break;
+
+ case Modification.DELETE:
+ modifications.add(DeleteModification.fromStream(in, getVersion()));
+ break;
+ default:
+ break;
}
}
} finally {
out.writeInt(modifications.size());
- if(modifications.size() > 1) {
+ if (modifications.size() > 1) {
SerializationUtils.REUSABLE_WRITER_TL.set(NormalizedNodeInputOutput.newDataOutput(out));
}
try {
- for(Modification mod: modifications) {
+ for (Modification mod: modifications) {
out.writeByte(mod.getType());
mod.writeExternal(out);
}
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
/**
- * WriteModification stores all the parameters required to write data to the specified path
+ * WriteModification stores all the parameters required to write data to the specified path.
*/
public class WriteModification extends AbstractModification {
private static final long serialVersionUID = 1L;
return mod;
}
- private static final Applier<WriteModification> APPLIER = new Applier<WriteModification>() {
- @Override
- public void apply(WriteModification instance, YangInstanceIdentifier path,
- NormalizedNode<?, ?> node) {
- instance.setPath(path);
- instance.data = node;
- }
+ private static final Applier<WriteModification> APPLIER = (instance, path, node) -> {
+ instance.setPath(path);
+ instance.data = node;
};
}
private static final class Proxy extends AbstractProxy<TransactionIdentifier> {
private static final long serialVersionUID = 1L;
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
public Proxy() {
// For Externalizable
}
*
* @author Robert Varga
*/
-public abstract class AbstractIdentifiablePayload<T extends Identifier> extends Payload implements Identifiable<T>, Serializable {
+public abstract class AbstractIdentifiablePayload<T extends Identifier>
+ extends Payload implements Identifiable<T>, Serializable {
protected abstract static class AbstractProxy<T extends Identifier> implements Externalizable {
private static final long serialVersionUID = 1L;
private byte[] serialized;
return Verify.verifyNotNull(createObject(identifier, serialized));
}
- protected abstract @Nonnull T readIdentifier(@Nonnull DataInput in) throws IOException;
- protected abstract @Nonnull Identifiable<T> createObject(@Nonnull T identifier, @Nonnull byte[] serialized);
+ @Nonnull
+ protected abstract T readIdentifier(@Nonnull DataInput in) throws IOException;
+
+ @Nonnull
+ protected abstract Identifiable<T> createObject(@Nonnull T identifier, @Nonnull byte[] serialized);
}
private static final long serialVersionUID = 1L;
private final byte[] serialized;
private final T identifier;
- AbstractIdentifiablePayload(final @Nonnull T identifier, final @Nonnull byte[] serialized) {
+ AbstractIdentifiablePayload(@Nonnull final T identifier, @Nonnull final byte[] serialized) {
this.identifier = Preconditions.checkNotNull(identifier);
this.serialized = Preconditions.checkNotNull(serialized);
}
return Verify.verifyNotNull(externalizableProxy(serialized));
}
- protected abstract @Nonnull AbstractProxy<T> externalizableProxy(@Nonnull byte[] serialized);
+ @Nonnull
+ protected abstract AbstractProxy<T> externalizableProxy(@Nonnull byte[] serialized);
}
abstract class AbstractVersionedShardDataTreeSnapshot extends ShardDataTreeSnapshot {
private static final Logger LOG = LoggerFactory.getLogger(AbstractVersionedShardDataTreeSnapshot.class);
+ @SuppressWarnings("checkstyle:FallThrough")
static ShardDataTreeSnapshot deserialize(final DataInputStream is) throws IOException {
final PayloadVersion version = PayloadVersion.readFrom(is);
switch (version) {
case TEST_PAST_VERSION:
// These versions are never returned and this code is effectively dead
break;
+ default:
+ throw new IOException("Invalid payload version in snapshot");
}
// Not included as default in above switch to ensure we get warnings when new versions are added
*
* @return The root node.
*/
- abstract @Nonnull NormalizedNode<?, ?> rootNode();
+ @Nonnull
+ abstract NormalizedNode<?, ?> rootNode();
/**
* Return the snapshot payload version. Implementations of this method should return a constant.
*
* @return Snapshot payload version
*/
- abstract @Nonnull PayloadVersion version();
+ @Nonnull
+ abstract PayloadVersion version();
private void versionedSerialize(final DataOutputStream dos, final PayloadVersion version) throws IOException {
switch (version) {
case TEST_FUTURE_VERSION:
case TEST_PAST_VERSION:
break;
-
+ default:
+ throw new IOException("Invalid payload version in snapshot");
}
throw new IOException("Encountered unhandled version" + version);
private static final long serialVersionUID = 1L;
private byte[] serialized;
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
public Proxy() {
// For Externalizable
}
private static DataTreeCandidateNode readNode(final NormalizedNodeDataInput in) throws IOException {
final byte type = in.readByte();
switch (type) {
- case APPEARED:
- return readModifiedNode(ModificationType.APPEARED, in);
- case DELETE:
- return DeletedDataTreeCandidateNode.create(in.readPathArgument());
- case DISAPPEARED:
- return readModifiedNode(ModificationType.DISAPPEARED, in);
- case SUBTREE_MODIFIED:
- return readModifiedNode(ModificationType.SUBTREE_MODIFIED, in);
- case UNMODIFIED:
- return null;
- case WRITE:
- return DataTreeCandidateNodes.fromNormalizedNode(in.readNormalizedNode());
- default:
- throw new IllegalArgumentException("Unhandled node type " + type);
+ case APPEARED:
+ return readModifiedNode(ModificationType.APPEARED, in);
+ case DELETE:
+ return DeletedDataTreeCandidateNode.create(in.readPathArgument());
+ case DISAPPEARED:
+ return readModifiedNode(ModificationType.DISAPPEARED, in);
+ case SUBTREE_MODIFIED:
+ return readModifiedNode(ModificationType.SUBTREE_MODIFIED, in);
+ case UNMODIFIED:
+ return null;
+ case WRITE:
+ return DataTreeCandidateNodes.fromNormalizedNode(in.readNormalizedNode());
+ default:
+ throw new IllegalArgumentException("Unhandled node type " + type);
}
}
final DataTreeCandidateNode rootNode;
switch (type) {
- case DELETE:
- rootNode = DeletedDataTreeCandidateNode.create();
- break;
- case SUBTREE_MODIFIED:
- rootNode = ModifiedDataTreeCandidateNode.create(readChildren(reader));
- break;
- case WRITE:
- rootNode = DataTreeCandidateNodes.fromNormalizedNode(reader.readNormalizedNode());
- break;
- default:
- throw new IllegalArgumentException("Unhandled node type " + type);
+ case DELETE:
+ rootNode = DeletedDataTreeCandidateNode.create();
+ break;
+ case SUBTREE_MODIFIED:
+ rootNode = ModifiedDataTreeCandidateNode.create(readChildren(reader));
+ break;
+ case WRITE:
+ rootNode = DataTreeCandidateNodes.fromNormalizedNode(reader.readNormalizedNode());
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled node type " + type);
}
return DataTreeCandidates.newDataTreeCandidate(rootPath, rootNode);
private static void writeNode(final NormalizedNodeDataOutput out, final DataTreeCandidateNode node)
throws IOException {
switch (node.getModificationType()) {
- case APPEARED:
- out.writeByte(APPEARED);
- out.writePathArgument(node.getIdentifier());
- writeChildren(out, node.getChildNodes());
- break;
- case DELETE:
- out.writeByte(DELETE);
- out.writePathArgument(node.getIdentifier());
- break;
- case DISAPPEARED:
- out.writeByte(DISAPPEARED);
- out.writePathArgument(node.getIdentifier());
- writeChildren(out, node.getChildNodes());
- break;
- case SUBTREE_MODIFIED:
- out.writeByte(SUBTREE_MODIFIED);
- out.writePathArgument(node.getIdentifier());
- writeChildren(out, node.getChildNodes());
- break;
- case WRITE:
- out.writeByte(WRITE);
- out.writeNormalizedNode(node.getDataAfter().get());
- break;
- case UNMODIFIED:
- out.writeByte(UNMODIFIED);
- break;
- default:
- throw new IllegalArgumentException("Unhandled node type " + node.getModificationType());
- }
- }
-
- public static void writeDataTreeCandidate(final DataOutput out, DataTreeCandidate candidate) throws IOException {
- try (final NormalizedNodeDataOutput writer = NormalizedNodeInputOutput.newDataOutput(out)) {
- writer.writeYangInstanceIdentifier(candidate.getRootPath());
-
- final DataTreeCandidateNode node = candidate.getRootNode();
- switch (node.getModificationType()) {
case APPEARED:
- writer.writeByte(APPEARED);
- writeChildren(writer, node.getChildNodes());
+ out.writeByte(APPEARED);
+ out.writePathArgument(node.getIdentifier());
+ writeChildren(out, node.getChildNodes());
break;
case DELETE:
- writer.writeByte(DELETE);
+ out.writeByte(DELETE);
+ out.writePathArgument(node.getIdentifier());
break;
case DISAPPEARED:
- writer.writeByte(DISAPPEARED);
- writeChildren(writer, node.getChildNodes());
+ out.writeByte(DISAPPEARED);
+ out.writePathArgument(node.getIdentifier());
+ writeChildren(out, node.getChildNodes());
break;
case SUBTREE_MODIFIED:
- writer.writeByte(SUBTREE_MODIFIED);
- writeChildren(writer, node.getChildNodes());
- break;
- case UNMODIFIED:
- writer.writeByte(UNMODIFIED);
+ out.writeByte(SUBTREE_MODIFIED);
+ out.writePathArgument(node.getIdentifier());
+ writeChildren(out, node.getChildNodes());
break;
case WRITE:
- writer.writeByte(WRITE);
- writer.writeNormalizedNode(node.getDataAfter().get());
+ out.writeByte(WRITE);
+ out.writeNormalizedNode(node.getDataAfter().get());
+ break;
+ case UNMODIFIED:
+ out.writeByte(UNMODIFIED);
break;
default:
throw new IllegalArgumentException("Unhandled node type " + node.getModificationType());
+ }
+ }
+
+ public static void writeDataTreeCandidate(final DataOutput out, DataTreeCandidate candidate) throws IOException {
+ try (final NormalizedNodeDataOutput writer = NormalizedNodeInputOutput.newDataOutput(out)) {
+ writer.writeYangInstanceIdentifier(candidate.getRootPath());
+
+ final DataTreeCandidateNode node = candidate.getRootNode();
+ switch (node.getModificationType()) {
+ case APPEARED:
+ writer.writeByte(APPEARED);
+ writeChildren(writer, node.getChildNodes());
+ break;
+ case DELETE:
+ writer.writeByte(DELETE);
+ break;
+ case DISAPPEARED:
+ writer.writeByte(DISAPPEARED);
+ writeChildren(writer, node.getChildNodes());
+ break;
+ case SUBTREE_MODIFIED:
+ writer.writeByte(SUBTREE_MODIFIED);
+ writeChildren(writer, node.getChildNodes());
+ break;
+ case UNMODIFIED:
+ writer.writeByte(UNMODIFIED);
+ break;
+ case WRITE:
+ writer.writeByte(WRITE);
+ writer.writeNormalizedNode(node.getDataAfter().get());
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled node type " + node.getModificationType());
}
}
}
static DataTreeCandidateNode create(final PathArgument identifier) {
return new DeletedDataTreeCandidateNode() {
@Override
- public final PathArgument getIdentifier() {
+ public PathArgument getIdentifier() {
return identifier;
}
};
private List<FrontendClientMetadata> clients;
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
public Proxy() {
// For Externalizable
}
* @author Robert Varga
*/
@Beta
-public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardDataTreeSnapshot implements Serializable {
+public final class MetadataShardDataTreeSnapshot extends AbstractVersionedShardDataTreeSnapshot
+ implements Serializable {
private static final class Proxy implements Externalizable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(MetadataShardDataTreeSnapshot.class);
private Map<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metadata;
private NormalizedNode<?, ?> rootNode;
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
public Proxy() {
// For Externalizable
}
Preconditions.checkArgument(metaSize >= 0, "Invalid negative metadata map length %s", metaSize);
// Default pre-allocate is 4, which should be fine
- final Builder<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>> metaBuilder =
- ImmutableMap.builder();
+ final Builder<Class<? extends ShardDataTreeSnapshotMetadata<?>>, ShardDataTreeSnapshotMetadata<?>>
+ metaBuilder = ImmutableMap.builder();
for (int i = 0; i < metaSize; ++i) {
final ShardDataTreeSnapshotMetadata<?> m = (ShardDataTreeSnapshotMetadata<?>) in.readObject();
if (m != null) {
abstract class ModifiedDataTreeCandidateNode extends AbstractDataTreeCandidateNode {
private final Collection<DataTreeCandidateNode> children;
- private ModifiedDataTreeCandidateNode(final ModificationType type, final Collection<DataTreeCandidateNode> children) {
+ private ModifiedDataTreeCandidateNode(final ModificationType type,
+ final Collection<DataTreeCandidateNode> children) {
super(type);
this.children = Preconditions.checkNotNull(children);
}
};
}
- static DataTreeCandidateNode create(final PathArgument identifier, final ModificationType type, final Collection<DataTreeCandidateNode> children) {
+ static DataTreeCandidateNode create(final PathArgument identifier, final ModificationType type,
+ final Collection<DataTreeCandidateNode> children) {
return new ModifiedDataTreeCandidateNode(type, children) {
@Override
- public final PathArgument getIdentifier() {
+ public PathArgument getIdentifier() {
return identifier;
}
};
* whenever:
* - a new event is defined
* - serialization format is changed
- *
+ * <p/>
* This version effectively defines the protocol version between actors participating on a particular shard. A shard
* participant instance should oppose RAFT candidates which produce persistence of an unsupported version. If a follower
* encounters an unsupported version it must not become fully-operational, as it does not have an accurate view
*
* @return Current {@link PayloadVersion}
*/
- public static @Nonnull PayloadVersion current() {
+ @Nonnull
+ public static PayloadVersion current() {
return BORON;
}
* Return the {@link PayloadVersion} corresponding to an unsigned short integer. This method is provided for callers
* which provide their own recovery strategy in case of version incompatibility.
*
- * @param s Short integer as returned from {@link #shortValue()}
+ * @param version Short integer as returned from {@link #shortValue()}
* @return {@link PayloadVersion}
* @throws FutureVersionException if the specified integer identifies a future version
* @throws PastVersionException if the specified integer identifies a past version which is no longer supported
*/
- public static @Nonnull PayloadVersion valueOf(final short s) throws FutureVersionException, PastVersionException {
- switch (Short.toUnsignedInt(s)) {
+ @Nonnull
+ public static PayloadVersion valueOf(final short version) throws FutureVersionException, PastVersionException {
+ switch (Short.toUnsignedInt(version)) {
case 0:
case 1:
case 2:
case 3:
case 4:
- throw new PastVersionException(s, BORON);
+ throw new PastVersionException(version, BORON);
case 5:
return BORON;
default:
- throw new FutureVersionException(s, BORON);
+ throw new FutureVersionException(version, BORON);
}
}
* @return An {@link PayloadVersion}
* @throws IOException If read fails or an unsupported version is encountered
*/
- public static @Nonnull PayloadVersion readFrom(final @Nonnull DataInput in) throws IOException {
+ @Nonnull
+ public static PayloadVersion readFrom(@Nonnull final DataInput in) throws IOException {
final short s = in.readShort();
try {
return valueOf(s);
* Base class for various bits of metadata attached to a {@link MetadataShardDataTreeSnapshot}. This class is not
* an interface because we want to make sure all subclasses implement the externalizable proxy pattern, for which
* we need to force {@link #readResolve()} to be abstract.
- *
+ * <p/>
* All concrete subclasses of this class should be final so as to form a distinct set of possible metadata. Since
* metadata is serialized along with {@link MetadataShardDataTreeSnapshot}, this set is part of the serialization format
* guarded by {@link PayloadVersion}.
- *
+ * <p/>
* If a new metadata type is introduced or a type is removed, {@link PayloadVersion} needs to be bumped to ensure
* compatibility.
*
* @author Robert Varga
*/
-public abstract class ShardDataTreeSnapshotMetadata<T extends ShardDataTreeSnapshotMetadata<T>> implements Serializable {
+public abstract class ShardDataTreeSnapshotMetadata<T extends ShardDataTreeSnapshotMetadata<T>>
+ implements Serializable {
private static final long serialVersionUID = 1L;
ShardDataTreeSnapshotMetadata() {
}
/**
- * Return an Externalizable proxy
+ * Return an Externalizable proxy.
*
* @return Externalizable proxy, may not be null
*/
return cluster;
}
- public T cluster(ClusterWrapper cluster) {
+ public T cluster(ClusterWrapper newCluster) {
checkSealed();
- this.cluster = cluster;
+ this.cluster = newCluster;
return self();
}
return configuration;
}
- public T configuration(Configuration configuration) {
+ public T configuration(Configuration newConfiguration) {
checkSealed();
- this.configuration = configuration;
+ this.configuration = newConfiguration;
return self();
}
return datastoreContextFactory;
}
- public T datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
+ public T datastoreContextFactory(DatastoreContextFactory newDatastoreContextFactory) {
checkSealed();
- this.datastoreContextFactory = datastoreContextFactory;
+ this.datastoreContextFactory = newDatastoreContextFactory;
return self();
}
return waitTillReadyCountdownLatch;
}
- public T waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
+ public T waitTillReadyCountdownLatch(CountDownLatch newWaitTillReadyCountdownLatch) {
checkSealed();
- this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+ this.waitTillReadyCountdownLatch = newWaitTillReadyCountdownLatch;
return self();
}
return primaryShardInfoCache;
}
- public T primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
+ public T primaryShardInfoCache(PrimaryShardInfoFutureCache newPrimaryShardInfoCache) {
checkSealed();
- this.primaryShardInfoCache = primaryShardInfoCache;
+ this.primaryShardInfoCache = newPrimaryShardInfoCache;
return self();
}
return restoreFromSnapshot;
}
- public T restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
+ public T restoreFromSnapshot(DatastoreSnapshot newRestoreFromSnapshot) {
checkSealed();
- this.restoreFromSnapshot = restoreFromSnapshot;
+ this.restoreFromSnapshot = newRestoreFromSnapshot;
return self();
}
verify();
return Props.create(ShardManager.class, this);
}
-}
\ No newline at end of file
+}
Props newProps(SchemaContext schemaContext) {
Preconditions.checkNotNull(builder);
- Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
- schemaContext(schemaContext).props();
+ Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext)
+ .schemaContext(schemaContext).props();
builder = null;
return props;
}
}
@Nullable
- ActorRef getActor(){
+ ActorRef getActor() {
return actor;
}
}
}
- void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
+ void updatePeerAddress(String peerId, String peerAddress, ActorRef sender) {
LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
- if(actor != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
- peerId, peerAddress, actor.path());
- }
+ if (actor != null) {
+ LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", peerId,
+ peerAddress, actor.path());
actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
}
}
void peerDown(MemberName memberName, String peerId, ActorRef sender) {
- if(actor != null) {
+ if (actor != null) {
actor.tell(new PeerDown(memberName, peerId), sender);
}
}
void peerUp(MemberName memberName, String peerId, ActorRef sender) {
- if(actor != null) {
+ if (actor != null) {
actor.tell(new PeerUp(memberName, peerId), sender);
}
}
}
boolean isShardReadyWithLeaderId() {
- return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
- !RaftState.PreLeader.name().equals(role) &&
- (isLeader() || addressResolver.resolve(leaderId) != null);
+ return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role)
+ && !RaftState.PreLeader.name().equals(role)
+ && (isLeader() || addressResolver.resolve(leaderId) != null);
}
boolean isShardInitialized() {
}
String getSerializedLeaderActor() {
- if(isLeader()) {
+ if (isLeader()) {
return Serialization.serializedActorPath(getActor());
} else {
return addressResolver.resolve(leaderId);
}
private void notifyOnShardInitializedCallbacks() {
- if(onShardInitializedSet.isEmpty()) {
+ if (onShardInitializedSet.isEmpty()) {
return;
}
ready ? "ready" : "initialized", onShardInitializedSet.size());
Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
- while(iter.hasNext()) {
+ while (iter.hasNext()) {
OnShardInitialized onShardInitialized = iter.next();
if (!(onShardInitialized instanceof OnShardReady) || ready) {
iter.remove();
notifyOnShardInitializedCallbacks();
}
- void setFollowerSyncStatus(boolean syncStatus){
+ void setFollowerSyncStatus(boolean syncStatus) {
this.followerSyncStatus = syncStatus;
}
- boolean isInSync(){
- if(RaftState.Follower.name().equals(this.role)){
+ boolean isInSync() {
+ if (RaftState.Follower.name().equals(this.role)) {
return followerSyncStatus;
- } else if(RaftState.Leader.name().equals(this.role)){
+ } else if (RaftState.Leader.name().equals(this.role)) {
return true;
}
}
boolean setLeaderId(String leaderId) {
- boolean changed = !Objects.equals(this.leaderId, leaderId);
+ final boolean changed = !Objects.equals(this.leaderId, leaderId);
this.leaderId = leaderId;
- if(leaderId != null) {
+ if (leaderId != null) {
this.leaderAvailable = true;
}
notifyOnShardInitializedCallbacks();
void setLeaderAvailable(boolean leaderAvailable) {
this.leaderAvailable = leaderAvailable;
- if(leaderAvailable) {
+ if (leaderAvailable) {
notifyOnShardInitializedCallbacks();
}
}
void setActiveMember(boolean isActiveMember) {
this.isActiveMember = isActiveMember;
}
-}
\ No newline at end of file
+}
package org.opendaylight.controller.cluster.datastore.shardmanager;
import static akka.pattern.Patterns.ask;
+
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Cancellable;
import scala.concurrent.duration.FiniteDuration;
/**
- * The ShardManager has the following jobs,
+ * Manages the shards for a data store. The ShardManager has the following jobs:
* <ul>
* <li> Create all the local shard replicas that belong on this cluster member
* <li> Find the address of the local shard
* <li> Find the primary replica for any given shard
* <li> Monitor the cluster members and store their addresses
- * <ul>
+ * </ul>
*/
class ShardManager extends AbstractUntypedPersistentActorWithMetering {
-
private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
// Stores a mapping between a shard name and it's corresponding information
private final String shardDispatcherPath;
- private final ShardManagerInfo mBean;
+ private final ShardManagerInfo shardManagerMBean;
private DatastoreContextFactory datastoreContextFactory;
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
- mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type,
+ shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
+ "shard-manager-" + this.type,
datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
- mBean.registerMBean();
+ shardManagerMBean.registerMBean();
}
@Override
public void postStop() {
LOG.info("Stopping ShardManager {}", persistenceId());
- mBean.unregisterMBean();
+ shardManagerMBean.unregisterMBean();
}
@Override
public void handleCommand(Object message) throws Exception {
if (message instanceof FindPrimary) {
findPrimary((FindPrimary)message);
- } else if(message instanceof FindLocalShard){
+ } else if (message instanceof FindLocalShard) {
findLocalShard((FindLocalShard) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext(message);
- } else if(message instanceof ActorInitialized) {
+ } else if (message instanceof ActorInitialized) {
onActorInitialized(message);
- } else if (message instanceof ClusterEvent.MemberUp){
+ } else if (message instanceof ClusterEvent.MemberUp) {
memberUp((ClusterEvent.MemberUp) message);
- } else if (message instanceof ClusterEvent.MemberWeaklyUp){
+ } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
- } else if (message instanceof ClusterEvent.MemberExited){
+ } else if (message instanceof ClusterEvent.MemberExited) {
memberExited((ClusterEvent.MemberExited) message);
- } else if(message instanceof ClusterEvent.MemberRemoved) {
+ } else if (message instanceof ClusterEvent.MemberRemoved) {
memberRemoved((ClusterEvent.MemberRemoved) message);
- } else if(message instanceof ClusterEvent.UnreachableMember) {
- memberUnreachable((ClusterEvent.UnreachableMember)message);
- } else if(message instanceof ClusterEvent.ReachableMember) {
+ } else if (message instanceof ClusterEvent.UnreachableMember) {
+ memberUnreachable((ClusterEvent.UnreachableMember) message);
+ } else if (message instanceof ClusterEvent.ReachableMember) {
memberReachable((ClusterEvent.ReachableMember) message);
- } else if(message instanceof DatastoreContextFactory) {
- onDatastoreContextFactory((DatastoreContextFactory)message);
- } else if(message instanceof RoleChangeNotification) {
+ } else if (message instanceof DatastoreContextFactory) {
+ onDatastoreContextFactory((DatastoreContextFactory) message);
+ } else if (message instanceof RoleChangeNotification) {
onRoleChangeNotification((RoleChangeNotification) message);
- } else if(message instanceof FollowerInitialSyncUpStatus){
+ } else if (message instanceof FollowerInitialSyncUpStatus) {
onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
- } else if(message instanceof ShardNotInitializedTimeout) {
- onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
- } else if(message instanceof ShardLeaderStateChanged) {
+ } else if (message instanceof ShardNotInitializedTimeout) {
+ onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
+ } else if (message instanceof ShardLeaderStateChanged) {
onLeaderStateChanged((ShardLeaderStateChanged) message);
- } else if(message instanceof SwitchShardBehavior){
+ } else if (message instanceof SwitchShardBehavior) {
onSwitchShardBehavior((SwitchShardBehavior) message);
- } else if(message instanceof CreateShard) {
- onCreateShard((CreateShard)message);
- } else if(message instanceof AddShardReplica){
- onAddShardReplica((AddShardReplica)message);
- } else if(message instanceof ForwardedAddServerReply) {
- ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
- onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
- msg.removeShardOnFailure);
- } else if(message instanceof ForwardedAddServerFailure) {
- ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
+ } else if (message instanceof CreateShard) {
+ onCreateShard((CreateShard) message);
+ } else if (message instanceof AddShardReplica) {
+ onAddShardReplica((AddShardReplica) message);
+ } else if (message instanceof ForwardedAddServerReply) {
+ ForwardedAddServerReply msg = (ForwardedAddServerReply) message;
+ onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, msg.removeShardOnFailure);
+ } else if (message instanceof ForwardedAddServerFailure) {
+ ForwardedAddServerFailure msg = (ForwardedAddServerFailure) message;
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
- } else if(message instanceof RemoveShardReplica) {
+ } else if (message instanceof RemoveShardReplica) {
onRemoveShardReplica((RemoveShardReplica) message);
- } else if(message instanceof WrappedShardResponse){
+ } else if (message instanceof WrappedShardResponse) {
onWrappedShardResponse((WrappedShardResponse) message);
- } else if(message instanceof GetSnapshot) {
+ } else if (message instanceof GetSnapshot) {
onGetSnapshot();
- } else if(message instanceof ServerRemoved){
+ } else if (message instanceof ServerRemoved) {
onShardReplicaRemoved((ServerRemoved) message);
- } else if(message instanceof ChangeShardMembersVotingStatus){
+ } else if (message instanceof ChangeShardMembersVotingStatus) {
onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
- } else if(message instanceof FlipShardMembersVotingStatus){
+ } else if (message instanceof FlipShardMembersVotingStatus) {
onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
- } else if(message instanceof SaveSnapshotSuccess) {
- onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
- } else if(message instanceof SaveSnapshotFailure) {
- LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
- persistenceId(), ((SaveSnapshotFailure) message).cause());
- } else if(message instanceof Shutdown) {
+ } else if (message instanceof SaveSnapshotSuccess) {
+ onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
+ } else if (message instanceof SaveSnapshotFailure) {
+ LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
+ ((SaveSnapshotFailure) message).cause());
+ } else if (message instanceof Shutdown) {
onShutDown();
} else if (message instanceof GetLocalShardIds) {
onGetLocalShardIds();
- } else if(message instanceof RunnableMessage) {
+ } else if (message instanceof RunnableMessage) {
((RunnableMessage)message).run();
} else {
unknownMessage(message);
if (info.getActor() != null) {
LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
- FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
+ FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
+ .getElectionTimeOutInterval().$times(2);
stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
}
}
LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
- ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
+ ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
+ .getDispatcher(Dispatchers.DispatcherType.Client);
Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
self().tell(PoisonPill.getInstance(), self());
- if(failure != null) {
+ if (failure != null) {
LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
} else {
int nfailed = 0;
- for(Boolean r: results) {
- if(!r) {
+ for (Boolean result: results) {
+ if (!result) {
nfailed++;
}
}
- if(nfailed > 0) {
+ if (nfailed > 0) {
LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
}
}
String leaderPath) {
shardReplicaOperationsInProgress.remove(shardId.getShardName());
- LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
+ LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
- LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
+ LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
shardId.getShardName());
originalSender.tell(new Status.Success(null), getSelf());
} else {
- LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
+ LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
persistenceId(), shardId, replyMsg.getStatus());
Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
final ActorRef sender) {
- if(isShardReplicaOperationInProgress(shardName, sender)) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
return;
}
final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
//inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
- LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+ LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
primaryPath, shardId);
- Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
- duration());
+ Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
new RemoveServer(shardId.toString()), removeServerTimeout);
String msg = String.format("RemoveServer request to leader %s for shard %s failed",
primaryPath, shardName);
- LOG.debug ("{}: {}", persistenceId(), msg, failure);
+ LOG.debug("{}: {}", persistenceId(), msg, failure);
// FAILURE
sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
private void onShardReplicaRemoved(ServerRemoved message) {
final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
- if(shardInformation == null) {
+ if (shardInformation == null) {
LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
return;
- } else if(shardInformation.getActor() != null) {
+ } else if (shardInformation.getActor() != null) {
LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
shardInformation.getActor().tell(Shutdown.INSTANCE, self());
}
LOG.debug("{}: onGetSnapshot", persistenceId());
List<String> notInitialized = null;
- for(ShardInformation shardInfo: localShards.values()) {
- if(!shardInfo.isShardInitialized()) {
- if(notInitialized == null) {
+ for (ShardInformation shardInfo : localShards.values()) {
+ if (!shardInfo.isShardInitialized()) {
+ if (notInitialized == null) {
notInitialized = new ArrayList<>();
}
}
}
- if(notInitialized != null) {
+ if (notInitialized != null) {
getSender().tell(new Status.Failure(new IllegalStateException(String.format(
"%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
return;
}
byte[] shardManagerSnapshot = null;
- if(currentSnapshot != null) {
+ if (currentSnapshot != null) {
shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
}
new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
- for(ShardInformation shardInfo: localShards.values()) {
+ for (ShardInformation shardInfo: localShards.values()) {
shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void onCreateShard(CreateShard createShard) {
LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
Object reply;
try {
String shardName = createShard.getModuleShardConfig().getShardName();
- if(localShards.containsKey(shardName)) {
+ if (localShards.containsKey(shardName)) {
LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
} else {
reply = new Status.Failure(e);
}
- if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+ if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
getSender().tell(reply, getSelf());
}
}
configuration.addModuleShardConfiguration(moduleShardConfig);
DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
- if(shardDatastoreContext == null) {
+ if (shardDatastoreContext == null) {
shardDatastoreContext = newShardDatastoreContext(shardName);
} else {
shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
- currentSnapshot.getShardList().contains(shardName);
+ boolean shardWasInRecoveredSnapshot = currentSnapshot != null
+ && currentSnapshot.getShardList().contains(shardName);
Map<String, String> peerAddresses;
boolean isActiveMember;
- if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
- contains(cluster.getCurrentMemberName())) {
+ if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
+ .contains(cluster.getCurrentMemberName())) {
peerAddresses = getPeerAddresses(shardName);
isActiveMember = true;
} else {
// subsequent AddServer request will be needed to make it an active member.
isActiveMember = false;
peerAddresses = Collections.emptyMap();
- shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
- customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
+ shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
+ .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
}
LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
info.setActiveMember(isActiveMember);
localShards.put(info.getShardName(), info);
- if(schemaContext != null) {
+ if (schemaContext != null) {
info.setActor(newShardActor(schemaContext, info));
}
}
private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
- return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
- shardPeerAddressResolver(peerAddressResolver);
+ return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
+ .shardPeerAddressResolver(peerAddressResolver);
}
private DatastoreContext newShardDatastoreContext(String shardName) {
return newShardDatastoreContextBuilder(shardName).build();
}
- private void checkReady(){
+ private void checkReady() {
if (isReadyWithLeaderId()) {
LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
persistenceId(), type, waitTillReadyCountdownLatch.getCount());
LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
- if(shardInformation != null) {
+ if (shardInformation != null) {
shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
- if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
+ if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
primaryShardInfoCache.remove(shardInformation.getShardName());
}
shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
- if(!shardInfo.isShardInitialized()) {
+ if (!shardInfo.isShardInitialized()) {
LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
} else {
ShardInformation shardInformation = findShardInformation(status.getName());
- if(shardInformation != null) {
+ if (shardInformation != null) {
shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
- mBean.setSyncStatus(isInSync());
+ shardManagerMBean.setSyncStatus(isInSync());
}
}
roleChanged.getOldRole(), roleChanged.getNewRole());
ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
- if(shardInformation != null) {
+ if (shardInformation != null) {
shardInformation.setRole(roleChanged.getNewRole());
checkReady();
- mBean.setSyncStatus(isInSync());
+ shardManagerMBean.setSyncStatus(isInSync());
}
}
private ShardInformation findShardInformation(String memberId) {
- for(ShardInformation info : localShards.values()){
- if(info.getShardId().toString().equals(memberId)){
+ for (ShardInformation info : localShards.values()) {
+ if (info.getShardId().toString().equals(memberId)) {
return info;
}
}
private boolean isReadyWithLeaderId() {
boolean isReady = true;
for (ShardInformation info : localShards.values()) {
- if(!info.isShardReadyWithLeaderId()){
+ if (!info.isShardReadyWithLeaderId()) {
isReady = false;
break;
}
return isReady;
}
- private boolean isInSync(){
+ private boolean isInSync() {
for (ShardInformation info : localShards.values()) {
- if(!info.isInSync()){
+ if (!info.isInSync()) {
return false;
}
}
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void onRecoveryCompleted() {
LOG.info("Recovery complete : {}", persistenceId());
// journal on upgrade from Helium.
deleteMessages(lastSequenceNr());
- if(currentSnapshot == null && restoreFromSnapshot != null &&
- restoreFromSnapshot.getShardManagerSnapshot() != null) {
- try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
+ if (currentSnapshot == null && restoreFromSnapshot != null
+ && restoreFromSnapshot.getShardManagerSnapshot() != null) {
+ try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
restoreFromSnapshot.getShardManagerSnapshot()))) {
ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
applyShardManagerSnapshot(snapshot);
- } catch(Exception e) {
+ } catch (Exception e) {
LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
}
}
createLocalShards();
}
- private void findLocalShard(FindLocalShard message) {
- final ShardInformation shardInformation = localShards.get(message.getShardName());
-
- if(shardInformation == null){
- getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
- return;
- }
-
- sendResponse(shardInformation, message.isWaitUntilInitialized(), false, () -> new LocalShardFound(shardInformation.getActor()));
- }
-
private void sendResponse(ShardInformation shardInformation, boolean doWait,
boolean wantShardReady, final Supplier<Object> messageSupplier) {
- if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
- if(doWait) {
+ if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
+ if (doWait) {
final ActorRef sender = getSender();
final ActorRef self = self();
shardInformation.addOnShardInitialized(onShardInitialized);
- FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
- if(shardInformation.isShardInitialized()) {
+ FiniteDuration timeout = shardInformation.getDatastoreContext()
+ .getShardInitializationTimeout().duration();
+ if (shardInformation.isShardInitialized()) {
// If the shard is already initialized then we'll wait enough time for the shard to
// elect a leader, ie 2 times the election timeout.
timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
peerAddressResolver.removePeerAddress(memberName);
- for(ShardInformation info : localShards.values()){
+ for (ShardInformation info : localShards.values()) {
info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
}
}
peerAddressResolver.removePeerAddress(memberName);
- for(ShardInformation info : localShards.values()){
+ for (ShardInformation info : localShards.values()) {
info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
}
}
private void addPeerAddress(MemberName memberName, Address address) {
peerAddressResolver.addPeerAddress(memberName, address);
- for(ShardInformation info : localShards.values()){
+ for (ShardInformation info : localShards.values()) {
String shardName = info.getShardName();
String peerId = getShardIdentifier(memberName, shardName).toString();
info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
final ActorRef actor = info.getActor();
if (actor != null) {
actor.tell(switchBehavior, getSelf());
- } else {
+ } else {
LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
info.getShardName(), switchBehavior.getNewState());
}
}
/**
- * Notifies all the local shards of a change in the schema context
+ * Notifies all the local shards of a change in the schema context.
*
- * @param message
+ * @param message the message to send
*/
private void updateSchemaContext(final Object message) {
schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
if (info != null && info.isActiveMember()) {
sendResponse(info, message.isWaitUntilReady(), true, () -> {
String primaryPath = info.getSerializedLeaderActor();
- Object found = canReturnLocalShardState && info.isLeader() ?
- new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+ Object found = canReturnLocalShardState && info.isLeader()
+ ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
- }
-
- return found;
+ LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+ return found;
});
return;
}
final Collection<String> visitedAddresses;
- if(message instanceof RemoteFindPrimary) {
+ if (message instanceof RemoteFindPrimary) {
visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
} else {
visitedAddresses = new ArrayList<>(1);
visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
- for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
- if(visitedAddresses.contains(address)) {
+ for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
+ if (visitedAddresses.contains(address)) {
continue;
}
String.format("No primary shard found for %s.", shardName)), getSelf());
}
+ private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
+ Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
+ .getShardInitializationTimeout().duration().$times(2));
+
+ Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ handler.onFailure(failure);
+ } else {
+ if (response instanceof RemotePrimaryShardFound) {
+ handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
+ } else if (response instanceof LocalPrimaryShardFound) {
+ handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
+ } else {
+ handler.onUnknownResponse(response);
+ }
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
/**
* Construct the name of the shard actor given the name of the member on
- * which the shard resides and the name of the shard
+ * which the shard resides and the name of the shard.
*
- * @param memberName
- * @param shardName
- * @return
+ * @param memberName the member name
+ * @param shardName the shard name
+ * @return a b
*/
- private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName){
+ private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName) {
return peerAddressResolver.getShardIdentifier(memberName, shardName);
}
/**
- * Create shards that are local to the member on which the ShardManager
- * runs
- *
+ * Create shards that are local to the member on which the ShardManager runs.
*/
private void createLocalShards() {
MemberName memberName = this.cluster.getCurrentMemberName();
Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
- if(restoreFromSnapshot != null)
- {
- for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
+ if (restoreFromSnapshot != null) {
+ for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
shardSnapshots.put(snapshot.getName(), snapshot);
}
}
restoreFromSnapshot = null; // null out to GC
- for(String shardName : memberShardNames){
+ for (String shardName : memberShardNames) {
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
}
/**
- * Given the name of the shard find the addresses of all it's peers
+ * Given the name of the shard find the addresses of all it's peers.
*
- * @param shardName
+ * @param shardName the shard name
*/
private Map<String, String> getPeerAddresses(String shardName) {
Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
(Function<Throwable, Directive>) t -> {
LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
return SupervisorStrategy.resume();
- }
- );
-
+ });
}
@Override
}
@VisibleForTesting
- ShardManagerInfoMBean getMBean(){
- return mBean;
+ ShardManagerInfoMBean getMBean() {
+ return shardManagerMBean;
}
private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
if (shardReplicaOperationsInProgress.contains(shardName)) {
String msg = String.format("A shard replica operation for %s is already in progress", shardName);
- LOG.debug ("{}: {}", persistenceId(), msg);
+ LOG.debug("{}: {}", persistenceId(), msg);
sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
return true;
}
return false;
}
- private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
+ private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
// verify the shard with the specified name is present in the cluster configuration
- if (!(this.configuration.isShardConfigured(shardName))) {
+ if (!this.configuration.isShardConfigured(shardName)) {
String msg = String.format("No module configuration exists for shard %s", shardName);
- LOG.debug ("{}: {}", persistenceId(), msg);
+ LOG.debug("{}: {}", persistenceId(), msg);
getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
return;
}
if (schemaContext == null) {
String msg = String.format(
"No SchemaContext is available in order to create a local shard instance for %s", shardName);
- LOG.debug ("{}: {}", persistenceId(), msg);
+ LOG.debug("{}: {}", persistenceId(), msg);
getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
return;
}
- findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
+ getSelf()) {
@Override
public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()), getTargetActor());
+ getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()),
+ getTargetActor());
}
@Override
private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
String msg = String.format("Local shard %s already exists", shardName);
- LOG.debug ("{}: {}", persistenceId(), msg);
+ LOG.debug("{}: {}", persistenceId(), msg);
sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
}
private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
- if(isShardReplicaOperationInProgress(shardName, sender)) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
return;
}
final ShardInformation shardInfo;
final boolean removeShardOnFailure;
ShardInformation existingShardInfo = localShards.get(shardName);
- if(existingShardInfo == null) {
+ if (existingShardInfo == null) {
removeShardOnFailure = true;
ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
- DisableElectionsRaftPolicy.class.getName()).build();
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
+ .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
Shard.builder(), peerAddressResolver);
String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
//inform ShardLeader to add this shard as a replica by sending an AddServer message
- LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
+ LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
response.getPrimaryPath(), shardInfo.getShardId());
- Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
- duration());
+ Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
+ .getShardLeaderElectionTimeout().duration());
Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
@Override
public void onComplete(Throwable failure, Object addServerResponse) {
if (failure != null) {
- LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
+ LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
response.getPrimaryPath(), shardName, failure);
String msg = String.format("AddServer request to leader %s for shard %s failed",
boolean removeShardOnFailure) {
shardReplicaOperationsInProgress.remove(shardName);
- if(removeShardOnFailure) {
+ if (removeShardOnFailure) {
ShardInformation shardInfo = localShards.remove(shardName);
if (shardInfo.getActor() != null) {
shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
String shardName = shardInfo.getShardName();
shardReplicaOperationsInProgress.remove(shardName);
- LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+ LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
- LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+ LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
// Make the local shard voting capable
shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
persistShardList();
sender.tell(new Status.Success(null), getSelf());
- } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
+ } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
sendLocalReplicaAlreadyExistsReply(shardName, sender);
} else {
- LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
+ LOG.warn("{}: Leader failed to add shard replica {} with status {}",
persistenceId(), shardName, replyMsg.getStatus());
- Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
+ Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
+ shardInfo.getShardId());
onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
}
switch (serverChangeStatus) {
case TIMEOUT:
failure = new TimeoutException(String.format(
- "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
- "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
- leaderPath, shardId.getShardName()));
+ "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
+ + "Possible causes - there was a problem replicating the data or shard leadership changed "
+ + "while replicating the shard data", leaderPath, shardId.getShardName()));
break;
case NO_LEADER:
failure = createNoShardLeaderException(shardId);
return failure;
}
- private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
+ private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
}
private void doRemoveShardReplicaAsync(final String primaryPath) {
- getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender()), getTargetActor());
+ getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
+ primaryPath, getSender()), getTargetActor());
}
});
}
shardList.remove(shardInfo.getShardName());
}
}
- LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
+ LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
saveSnapshot(updateShardManagerSnapshot(shardList));
}
private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
currentSnapshot = snapshot;
- LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
+ LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
final MemberName currentMember = cluster.getCurrentMemberName();
Set<String> configuredShardList =
for (String shard : currentSnapshot.getShardList()) {
if (!configuredShardList.contains(shard)) {
// add the current member as a replica for the shard
- LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+ LOG.debug("{}: adding shard {}", persistenceId(), shard);
configuration.addMemberReplicaForShard(shard, currentMember);
} else {
configuredShardList.remove(shard);
}
for (String shard : configuredShardList) {
// remove the member as a replica for the shard
- LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+ LOG.debug("{}: removing shard {}", persistenceId(), shard);
configuration.removeMemberReplicaForShard(shard, currentMember);
}
}
- private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
- LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
+ private void onSaveSnapshotSuccess(SaveSnapshotSuccess successMessage) {
+ LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
persistenceId());
deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
0, 0));
String shardName = changeMembersVotingStatus.getShardName();
Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
- for(Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
+ for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
e.getValue());
}
ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
findLocalShard(shardName, getSender(),
- localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
- localShardFound.getPath(), getSender()));
+ localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
+ localShardFound.getPath(), getSender()));
}
private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
OnDemandRaftState raftState = (OnDemandRaftState) response;
Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
- for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+ for ( Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
serverVotingStatusMap.put(e.getKey(), !e.getValue());
}
- serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName).
- toString(), !raftState.isVoting());
+ serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
+ .toString(), !raftState.isVoting());
changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
shardName, localShardFound.getPath(), sender);
}
+ private void findLocalShard(FindLocalShard message) {
+ final ShardInformation shardInformation = localShards.get(message.getShardName());
+
+ if (shardInformation == null) {
+ getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
+ return;
+ }
+
+ sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
+ () -> new LocalShardFound(shardInformation.getActor()));
+ }
+
private void findLocalShard(final String shardName, final ActorRef sender,
final Consumer<LocalShardFound> onLocalShardFound) {
- Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
- getShardInitializationTimeout().duration().$times(2));
+ Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
+ .getShardInitializationTimeout().duration().$times(2));
Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
futureObj.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object response) {
if (failure != null) {
- LOG.debug ("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure);
+ LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
+ failure);
sender.tell(new Status.Failure(new RuntimeException(
String.format("Failed to find local shard %s", shardName), failure)), self());
} else {
- if(response instanceof LocalShardFound) {
- getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response), sender);
- } else if(response instanceof LocalShardNotFound) {
+ if (response instanceof LocalShardFound) {
+ getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
+ sender);
+ } else if (response instanceof LocalShardNotFound) {
String msg = String.format("Local shard %s does not exist", shardName);
- LOG.debug ("{}: {}", persistenceId, msg);
+ LOG.debug("{}: {}", persistenceId, msg);
sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
} else {
String msg = String.format("Failed to find local shard %s: received response: %s",
shardName, response);
- LOG.debug ("{}: {}", persistenceId, msg);
+ LOG.debug("{}: {}", persistenceId, msg);
sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
new RuntimeException(msg)), self());
}
private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
- if(isShardReplicaOperationInProgress(shardName, sender)) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
return;
}
if (failure != null) {
String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
shardActorRef.path());
- LOG.debug ("{}: {}", persistenceId(), msg, failure);
+ LOG.debug("{}: {}", persistenceId(), msg, failure);
sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
} else {
- LOG.debug ("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
+ LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
ServerChangeReply replyMsg = (ServerChangeReply) response;
- if(replyMsg.getStatus() == ServerChangeStatus.OK) {
- LOG.debug ("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
+ if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
sender.tell(new Status.Success(null), getSelf());
- } else if(replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
+ } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
- "The requested voting state change for shard %s is invalid. At least one member must be voting",
- shardId.getShardName()))), getSelf());
+ "The requested voting state change for shard %s is invalid. At least one member "
+ + "must be voting", shardId.getShardName()))), getSelf());
} else {
- LOG.warn ("{}: ChangeServersVotingStatus failed for shard {} with status {}",
+ LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
persistenceId(), shardName, replyMsg.getStatus());
Exception error = getServerChangeException(ChangeServersVotingStatus.class,
}
}
- private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
- Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
- getShardInitializationTimeout().duration().$times(2));
-
-
- Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
- futureObj.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(Throwable failure, Object response) {
- if (failure != null) {
- handler.onFailure(failure);
- } else {
- if(response instanceof RemotePrimaryShardFound) {
- handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
- } else if(response instanceof LocalPrimaryShardFound) {
- handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
- } else {
- handler.onUnknownResponse(response);
- }
- }
- }
- }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
- }
-
private interface RunnableMessage extends Runnable {
}
/**
* The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
- * a remote or local find primary message is processed
+ * a remote or local find primary message is processed.
*/
private interface FindPrimaryResponseHandler {
/**
- * Invoked when a Failure message is received as a response
+ * Invoked when a Failure message is received as a response.
*
- * @param failure
+ * @param failure the failure exception
*/
void onFailure(Throwable failure);
/**
- * Invoked when a RemotePrimaryShardFound response is received
+ * Invoked when a RemotePrimaryShardFound response is received.
*
- * @param response
+ * @param response the response
*/
void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
/**
- * Invoked when a LocalPrimaryShardFound response is received
- * @param response
+ * Invoked when a LocalPrimaryShardFound response is received.
+ *
+ * @param response the response
*/
void onLocalPrimaryFound(LocalPrimaryShardFound response);
/**
* Invoked when an unknown response is received. This is another type of failure.
*
- * @param response
+ * @param response the response
*/
void onUnknownResponse(Object response);
}
/**
* The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
- * replica and sends a wrapped Failure response to some targetActor
+ * replica and sends a wrapped Failure response to some targetActor.
*/
- private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
+ private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
private final ActorRef targetActor;
private final String shardName;
private final String persistenceId;
private final ActorRef shardManagerActor;
/**
+ * Constructs an instance.
+ *
* @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
* @param shardName The name of the shard for which the primary replica had to be found
* @param persistenceId The persistenceId for the ShardManager
* @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
*/
- protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
+ protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId,
+ ActorRef shardManagerActor) {
this.targetActor = Preconditions.checkNotNull(targetActor);
this.shardName = Preconditions.checkNotNull(shardName);
this.persistenceId = Preconditions.checkNotNull(persistenceId);
@Override
public void onFailure(Throwable failure) {
- LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
+ LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
targetActor.tell(new Status.Failure(new RuntimeException(
String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
}
public void onUnknownResponse(Object response) {
String msg = String.format("Failed to find leader for shard %s: received response: %s",
shardName, response);
- LOG.debug ("{}: {}", persistenceId, msg);
+ LOG.debug("{}: {}", persistenceId, msg);
targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
new RuntimeException(msg)), shardManagerActor);
}
@Override
public void onReceive(Object message) {
- if(message instanceof GetSnapshotReply) {
+ if (message instanceof GetSnapshotReply) {
onGetSnapshotReply((GetSnapshotReply)message);
- } else if(message instanceof Failure) {
+ } else if (message instanceof Failure) {
LOG.debug("{}: Received {}", params.id, message);
params.replyToActor.tell(message, getSelf());
getSelf().tell(PoisonPill.getInstance(), getSelf());
} else if (message instanceof ReceiveTimeout) {
String msg = String.format(
- "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s did not respond.",
- params.receiveTimeout.toMillis(), params.shardNames.size(), remainingShardNames.size(),
- remainingShardNames);
+ "Timed out after %s ms while waiting for snapshot replies from %d shard(s). %d shard(s) %s "
+ + "did not respond.", params.receiveTimeout.toMillis(), params.shardNames.size(),
+ remainingShardNames.size(), remainingShardNames);
LOG.warn("{}: {}", params.id, msg);
params.replyToActor.tell(new Failure(new TimeoutException(msg)), getSelf());
getSelf().tell(PoisonPill.getInstance(), getSelf());
shardSnapshots.add(new ShardSnapshot(shardId.getShardName(), getSnapshotReply.getSnapshot()));
remainingShardNames.remove(shardId.getShardName());
- if(remainingShardNames.isEmpty()) {
+ if (remainingShardNames.isEmpty()) {
LOG.debug("{}: All shard snapshots received", params.id);
- DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType, params.shardManagerSnapshot,
- shardSnapshots);
+ DatastoreSnapshot datastoreSnapshot = new DatastoreSnapshot(params.datastoreType,
+ params.shardManagerSnapshot, shardSnapshots);
params.replyToActor.tell(datastoreSnapshot, getSelf());
getSelf().tell(PoisonPill.getInstance(), getSelf());
}
this.memberName = Preconditions.checkNotNull(memberName);
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch"})
@Override
public List<String> getLocalShards() {
try {
return memberName.getName();
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void requestSwitchShardState(final ShardIdentifier shardId, final String newState, final long term) {
// Validates strings argument
final RaftState state = RaftState.valueOf(newState);
public interface ShardManagerInfoMBean {
/**
+ * Returns the list of all the local shard names.
*
* @return a list of all the local shard names
*/
List<String> getLocalShards();
/**
+ * Returns the overall sync status for all shards.
*
* @return true if all local shards are in sync with their corresponding leaders
*/
boolean getSyncStatus();
/**
- * Get the name of of the current member
+ * Returns the name of the local member.
*
- * @return
+ * @return the local member name
*/
String getMemberName();
/**
- * Switch the Raft Behavior of all the local shards to the newBehavior
+ * Switches the raft behavior of all the local shards to the newBehavior.
*
* @param newBehavior should be either Leader/Follower only
- * @param term when switching to the Leader specifies for which term the Shard would be the Leader. Any modifications
- * made to state will be written with this term. This term will then be used by the Raft replication
- * implementation to decide which modifications should stay and which ones should be removed. Ideally
- * the term provided when switching to a new Leader should always be higher than the previous term.
+ * @param term when switching to the Leader specifies for which term the Shard would be the Leader. Any
+ * modifications made to state will be written with this term. This term will then be used by the Raft
+ * replication implementation to decide which modifications should stay and which ones should be
+ * removed. Ideally the term provided when switching to a new Leader should always be higher than the
+ * previous term.
*/
void switchAllLocalShardsState(String newBehavior, long term);
/**
- * Switch the Raft Behavior of the shard specified by shardName to the newBehavior
+ * Switches the raft behavior of the shard specified by shardName to the newBehavior.
*
* @param shardName a shard that is local to this shard manager
* @param newBehavior should be either Leader/Follower only
- * @param term when switching to the Leader specifies for which term the Shard would be the Leader. Any modifications
- * made to state will be written with this term. This term will then be used by the Raft replication
- * implementation to decide which modifications should stay and which ones should be removed. Ideally
- * the term provided when switching to a new Leader should always be higher than the previous term.
+ * @param term when switching to the Leader specifies for which term the Shard would be the Leader. Any
+ * modifications made to state will be written with this term. This term will then be used by the Raft
+ * replication implementation to decide which modifications should stay and which ones should be
+ * removed. Ideally the term provided when switching to a new Leader should always be higher than the
+ * previous term.
*/
void switchShardState(String shardName, String newBehavior, long term);
}
import javax.annotation.Nonnull;
/**
- * Persisted data of the ShardManager
+ * Persisted data of the ShardManager.
*/
// FIXME: make this package-protected once forShardList is removed.
public final class ShardManagerSnapshot implements Serializable {
}
/**
+ * Creates a ShardManagerSnapshot.
+ *
* @deprecated This method is for migration only and should me removed once
* org.opendaylight.controller.cluster.datastore.ShardManagerSnapshot is removed.
*/
private final String shardManagerType;
private final MemberName localMemberName;
- public ShardPeerAddressResolver(String shardManagerType, MemberName localMemberName) {
+ ShardPeerAddressResolver(String shardManagerType, MemberName localMemberName) {
this.shardManagerIdentifier = ShardManagerIdentifier.builder().type(shardManagerType).build().toString();
this.shardManagerType = shardManagerType;
this.localMemberName = Preconditions.checkNotNull(localMemberName);
Collection<String> getShardManagerPeerActorAddresses() {
Collection<String> peerAddresses = new ArrayList<>();
- for(Map.Entry<MemberName, Address> entry: memberNameToAddress.entrySet()) {
- if(!localMemberName.equals(entry.getKey())) {
+ for (Map.Entry<MemberName, Address> entry: memberNameToAddress.entrySet()) {
+ if (!localMemberName.equals(entry.getKey())) {
peerAddresses.add(getShardManagerActorPathBuilder(entry.getValue()).toString());
}
}
String getShardActorAddress(String shardName, MemberName memberName) {
Address memberAddress = memberNameToAddress.get(memberName);
- if(memberAddress != null) {
+ if (memberAddress != null) {
return getShardManagerActorPathBuilder(memberAddress).append("/").append(
getShardIdentifier(memberName, shardName)).toString();
}
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
/**
- * The DefaultShardStrategy basically puts all data into the default Shard
- * <p>
- * The default shard stores data for all modules for which a specific set of shards has not been configured
- * </p>
+ * The DefaultShardStrategy basically puts all data into the default shard. The default shard stores data for all
+ * modules for which a specific set of shards has not been configured. This is only intended for testing.
*/
public final class DefaultShardStrategy implements ShardStrategy {
public static final String NAME = "default";
private final String moduleName;
private final Configuration configuration;
- public ModuleShardStrategy(String moduleName, Configuration configuration){
+ public ModuleShardStrategy(String moduleName, Configuration configuration) {
this.moduleName = moduleName;
this.configuration = configuration;
}
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
/**
- * The role of ShardStrategy is to figure out which Shards a given piece of data belongs to
+ * The role of ShardStrategy is to figure out which shards a given piece of data belongs to.
*/
public interface ShardStrategy {
- /**
- * Find the name of the shard in which the data pointed to by the specified path belongs in
- * <p>
- * Should return the name of the default shard DefaultShardStrategy.DEFAULT_SHARD
- * if no matching shard was found
- *
- * @param path The location of the data in the logical tree
- * @return
- */
- String findShard(YangInstanceIdentifier path);
+ /**
+ * Find the name of the shard in which the data pointed to by the specified path belongs in.
+ * <p/>
+ * Should return the name of the default shard DefaultShardStrategy.DEFAULT_SHARD
+ * if no matching shard was found
+ *
+ * @param path the location of the data in the logical tree
+ * @return the corresponding shard name.
+ */
+ String findShard(YangInstanceIdentifier path);
}
public static ShardStrategy newShardStrategyInstance(String moduleName, String strategyName,
Configuration configuration) {
- if(ModuleShardStrategy.NAME.equals(strategyName)){
+ if (ModuleShardStrategy.NAME.equals(strategyName)) {
return new ModuleShardStrategy(moduleName, configuration);
}
package org.opendaylight.controller.cluster.datastore.utils;
import static akka.pattern.Patterns.ask;
+
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
@Override
public Throwable apply(Throwable failure) {
Throwable actualFailure = failure;
- if(failure instanceof AskTimeoutException) {
+ if (failure instanceof AskTimeoutException) {
// A timeout exception most likely means the shard isn't initialized.
actualFailure = new NotInitializedException(
- "Timed out trying to find the primary shard. Most likely cause is the " +
- "shard is not initialized yet.");
+ "Timed out trying to find the primary shard. Most likely cause is the "
+ + "shard is not initialized yet.");
}
return actualFailure;
private final Dispatchers dispatchers;
private volatile SchemaContext schemaContext;
+
+ // Used as a write memory barrier.
+ @SuppressWarnings("unused")
private volatile boolean updated;
- private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
+
+ private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
+ .getMetricsRegistry();
private final PrimaryShardInfoFutureCache primaryShardInfoCache;
private final ShardStrategyFactory shardStrategyFactory;
public void setSchemaContext(SchemaContext schemaContext) {
this.schemaContext = schemaContext;
- if(shardManager != null) {
+ if (shardManager != null) {
shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
}
}
updated = true;
- if(shardManager != null) {
+ if (shardManager != null) {
shardManager.tell(contextFactory, ActorRef.noSender());
}
}
public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
- if(ret != null){
+ if (ret != null) {
return ret;
}
Future<Object> future = executeOperationAsync(shardManager,
return future.transform(new Mapper<Object, PrimaryShardInfo>() {
@Override
public PrimaryShardInfo checkedApply(Object response) throws UnknownMessageException {
- if(response instanceof RemotePrimaryShardFound) {
+ if (response instanceof RemotePrimaryShardFound) {
LOG.debug("findPrimaryShardAsync received: {}", response);
RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
- } else if(response instanceof LocalPrimaryShardFound) {
+ } else if (response instanceof LocalPrimaryShardFound) {
LOG.debug("findPrimaryShardAsync received: {}", response);
LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
found.getLocalShardDataTree());
- } else if(response instanceof NotInitializedException) {
+ } else if (response instanceof NotInitializedException) {
throw (NotInitializedException)response;
- } else if(response instanceof PrimaryNotFoundException) {
+ } else if (response instanceof PrimaryNotFoundException) {
throw (PrimaryNotFoundException)response;
- } else if(response instanceof NoShardLeaderException) {
+ } else if (response instanceof NoShardLeaderException) {
throw (NoShardLeaderException)response;
}
}
/**
- * Finds a local shard given its shard name and return it's ActorRef
+ * Finds a local shard given its shard name and return it's ActorRef.
*
* @param shardName the name of the local shard that needs to be found
* @return a reference to a local shard actor which represents the shard
return future.map(new Mapper<Object, ActorRef>() {
@Override
public ActorRef checkedApply(Object response) throws Throwable {
- if(response instanceof LocalShardFound) {
+ if (response instanceof LocalShardFound) {
LocalShardFound found = (LocalShardFound)response;
LOG.debug("Local shard found {}", found.getPath());
return found.getPath();
- } else if(response instanceof NotInitializedException) {
+ } else if (response instanceof NotInitializedException) {
throw (NotInitializedException)response;
- } else if(response instanceof LocalShardNotFound) {
+ } else if (response instanceof LocalShardNotFound) {
throw new LocalShardNotFoundException(
String.format("Local shard for %s does not exist.", shardName));
}
}
/**
- * Executes an operation on a local actor and wait for it's response
+ * Executes an operation on a local actor and wait for it's response.
*
- * @param actor
- * @param message
+ * @param actor the actor
+ * @param message the message to send
* @return The response of the operation
*/
+ @SuppressWarnings("checkstyle:IllegalCatch")
public Object executeOperation(ActorRef actor, Object message) {
Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
try {
return Await.result(future, operationDuration);
} catch (Exception e) {
- throw new TimeoutException("Sending message " + message.getClass().toString() +
- " to actor " + actor.toString() + " failed. Try again later.", e);
+ throw new TimeoutException("Sending message " + message.getClass().toString()
+ + " to actor " + actor.toString() + " failed. Try again later.", e);
}
}
- public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
- Preconditions.checkArgument(actor != null, "actor must not be null");
- Preconditions.checkArgument(message != null, "message must not be null");
-
- LOG.debug("Sending message {} to {}", message.getClass(), actor);
- return doAsk(actor, message, timeout);
- }
-
/**
- * Execute an operation on a remote actor and wait for it's response
+ * Execute an operation on a remote actor and wait for it's response.
*
- * @param actor
- * @param message
- * @return
+ * @param actor the actor
+ * @param message the message
+ * @return the response message
*/
+ @SuppressWarnings("checkstyle:IllegalCatch")
public Object executeOperation(ActorSelection actor, Object message) {
Future<Object> future = executeOperationAsync(actor, message);
try {
return Await.result(future, operationDuration);
} catch (Exception e) {
- throw new TimeoutException("Sending message " + message.getClass().toString() +
- " to actor " + actor.toString() + " failed. Try again later.", e);
+ throw new TimeoutException("Sending message " + message.getClass().toString()
+ + " to actor " + actor.toString() + " failed. Try again later.", e);
}
}
+ public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
+ Preconditions.checkArgument(actor != null, "actor must not be null");
+ Preconditions.checkArgument(message != null, "message must not be null");
+
+ LOG.debug("Sending message {} to {}", message.getClass(), actor);
+ return doAsk(actor, message, timeout);
+ }
+
/**
* Execute an operation on a remote actor asynchronously.
*
actor.tell(message, ActorRef.noSender());
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void shutdown() {
FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
try {
Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
- } catch(Exception e) {
+ } catch (Exception e) {
LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
}
}
return clusterWrapper;
}
- public MemberName getCurrentMemberName(){
+ public MemberName getCurrentMemberName() {
return clusterWrapper.getCurrentMemberName();
}
/**
- * Send the message to each and every shard
+ * Send the message to each and every shard.
*/
- public void broadcast(final Function<Short, Object> messageSupplier, Class<?> messageClass){
- for(final String shardName : configuration.getAllShardNames()){
+ public void broadcast(final Function<Short, Object> messageSupplier, Class<?> messageClass) {
+ for (final String shardName : configuration.getAllShardNames()) {
Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
- if(failure != null) {
+ if (failure != null) {
LOG.warn("broadcast failed to send message {} to shard {}: {}",
messageClass.getSimpleName(), shardName, failure);
} else {
* This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
* us to create a timer for pretty much anything.
*
- * @param operationName
- * @return
+ * @param operationName the name of the operation
+ * @return the Timer instance
*/
- public Timer getOperationTimer(String operationName){
+ public Timer getOperationTimer(String operationName) {
return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
}
- public Timer getOperationTimer(String dataStoreType, String operationName){
+ public Timer getOperationTimer(String dataStoreType, String operationName) {
final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
operationName, METRIC_RATE);
return metricRegistry.timer(rate);
}
/**
- * Get the name of the data store to which this ActorContext belongs
+ * Get the name of the data store to which this ActorContext belongs.
*
- * @return
+ * @return the data store name
*/
public String getDataStoreName() {
return datastoreContext.getDataStoreName();
}
/**
- * Get the current transaction creation rate limit
- * @return
+ * Get the current transaction creation rate limit.
+ *
+ * @return the rate limit
*/
- public double getTxCreationLimit(){
+ public double getTxCreationLimit() {
return txRateLimiter.getTxCreationLimit();
}
/**
* Try to acquire a transaction creation permit. Will block if no permits are available.
*/
- public void acquireTxCreationPermit(){
+ public void acquireTxCreationPermit() {
txRateLimiter.acquire();
}
/**
- * Return the operation timeout to be used when committing transactions
- * @return
+ * Returns the operation timeout to be used when committing transactions.
+ *
+ * @return the operation timeout
*/
- public Timeout getTransactionCommitOperationTimeout(){
+ public Timeout getTransactionCommitOperationTimeout() {
return transactionCommitOperationTimeout;
}
/**
* An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
- * code on the datastore
- * @return
+ * code on the datastore.
+ *
+ * @return the dispatcher
*/
public ExecutionContext getClientDispatcher() {
return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
}
- public String getNotificationDispatcherPath(){
+ public String getNotificationDispatcherPath() {
return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
}
return shardStrategyFactory;
}
- protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
+ protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
return ask(actorRef, message, timeout);
}
- protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
+ protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout) {
return ask(actorRef, message, timeout);
}
private DataTreeModificationOutput() {
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
public static void toFile(File file, DataTreeModification modification) {
- try(FileOutputStream outStream = new FileOutputStream(file)) {
+ try (FileOutputStream outStream = new FileOutputStream(file)) {
modification.applyToCursor(new DataTreeModificationOutputCursor(new DataOutputStream(outStream)));
- } catch(Exception e) {
+ } catch (Exception e) {
LOG.error("Error writing DataTreeModification to file {}", file, e);
}
}
output.write("\nDELETE -> ".getBytes());
output.write(current().node(child).toString().getBytes());
output.writeByte('\n');
- } catch(IOException e) {
+ } catch (IOException e) {
Throwables.propagate(e);
}
}
output.write(": \n".getBytes());
NormalizedNodeXMLOutput.toStream(output, data);
output.writeByte('\n');
- } catch(IOException | XMLStreamException e) {
+ } catch (IOException | XMLStreamException e) {
Throwables.propagate(e);
}
}
Notification(NOTIFICATION_DISPATCHER_PATH);
private final String path;
- DispatcherType(String path){
+ DispatcherType(String path) {
this.path = path;
}
- private String path(akka.dispatch.Dispatchers dispatchers){
- if(dispatchers.hasDispatcher(path)){
+
+ private String path(akka.dispatch.Dispatchers dispatchers) {
+ if (dispatchers.hasDispatcher(path)) {
return path;
}
return DEFAULT_DISPATCHER_PATH;
}
- private ExecutionContext dispatcher(akka.dispatch.Dispatchers dispatchers){
- if(dispatchers.hasDispatcher(path)){
+ private ExecutionContext dispatcher(akka.dispatch.Dispatchers dispatchers) {
+ if (dispatchers.hasDispatcher(path)) {
return dispatchers.lookup(path);
}
return dispatchers.defaultGlobalDispatcher();
}
}
- public Dispatchers(akka.dispatch.Dispatchers dispatchers){
+ public Dispatchers(akka.dispatch.Dispatchers dispatchers) {
Preconditions.checkNotNull(dispatchers, "dispatchers should not be null");
this.dispatchers = dispatchers;
}
- public ExecutionContext getDispatcher(DispatcherType dispatcherType){
+ public ExecutionContext getDispatcher(DispatcherType dispatcherType) {
return dispatcherType.dispatcher(this.dispatchers);
}
- public String getDispatcherPath(DispatcherType dispatcherType){
+ public String getDispatcherPath(DispatcherType dispatcherType) {
return dispatcherType.path(this.dispatchers);
}
}
private final List<Optional<NormalizedNode<?, ?>>> nodes;
private final DataTree dataTree;
- private NormalizedNodeAggregator(final YangInstanceIdentifier rootIdentifier, final List<Optional<NormalizedNode<?, ?>>> nodes,
- final SchemaContext schemaContext, LogicalDatastoreType logicalDatastoreType) {
+ private NormalizedNodeAggregator(final YangInstanceIdentifier rootIdentifier,
+ final List<Optional<NormalizedNode<?, ?>>> nodes, final SchemaContext schemaContext,
+ LogicalDatastoreType logicalDatastoreType) {
this.rootIdentifier = rootIdentifier;
this.nodes = nodes;
this.dataTree = InMemoryDataTreeFactory.getInstance().create(
}
/**
- * Combine data from all the nodes in the list into a tree with root as rootIdentifier
- *
- * @param nodes
- * @param schemaContext
- * @param logicalDatastoreType
- * @return
- * @throws DataValidationFailedException
+ * Combine data from all the nodes in the list into a tree with root as rootIdentifier.
*/
public static Optional<NormalizedNode<?,?>> aggregate(final YangInstanceIdentifier rootIdentifier,
- final List<Optional<NormalizedNode<?, ?>>> nodes,
- final SchemaContext schemaContext,
- LogicalDatastoreType logicalDatastoreType) throws DataValidationFailedException {
+ final List<Optional<NormalizedNode<?, ?>>> nodes, final SchemaContext schemaContext,
+ LogicalDatastoreType logicalDatastoreType) throws DataValidationFailedException {
return new NormalizedNodeAggregator(rootIdentifier, nodes, schemaContext, logicalDatastoreType).aggregate();
}
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import javanet.staxutils.IndentingXMLStreamWriter;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javanet.staxutils.IndentingXMLStreamWriter;
/**
* Utility class to output NormalizedNodes as XML.
XMLStreamWriter xmlWriter = xmlFactory.createXMLStreamWriter(outStream);
IndentingXMLStreamWriter indenting = new IndentingXMLStreamWriter(xmlWriter);
- try(NormalizedNodeStreamWriter streamWriter = XMLStreamNormalizedNodeStreamWriter.createSchemaless(
+ try (NormalizedNodeStreamWriter streamWriter = XMLStreamNormalizedNodeStreamWriter.createSchemaless(
indenting)) {
NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(streamWriter);
nodeWriter.write(node);
}
public static void toFile(File file, NormalizedNode<?, ?> node) {
- try(FileOutputStream outStream = new FileOutputStream(file)) {
+ try (FileOutputStream outStream = new FileOutputStream(file)) {
toStream(outStream, node);
- } catch(IOException | XMLStreamException e) {
+ } catch (IOException | XMLStreamException e) {
LOG.error("Error writing NormalizedNode to file {}", file, e);
}
}
/**
* The PruningDataTreeModification first removes all entries from the data which do not belong in the schemaContext
- * before delegating it to the actual DataTreeModification
+ * before delegating it to the actual DataTreeModification.
*/
public class PruningDataTreeModification extends ForwardingObject implements DataTreeModification {
public void delete(YangInstanceIdentifier yangInstanceIdentifier) {
try {
delegate.delete(yangInstanceIdentifier);
- } catch(SchemaValidationFailedException e){
+ } catch (SchemaValidationFailedException e) {
LOG.warn("Node at path : {} does not exist ignoring delete", yangInstanceIdentifier);
}
}
@Override
public void merge(YangInstanceIdentifier yangInstanceIdentifier, NormalizedNode<?, ?> normalizedNode) {
try {
- if(YangInstanceIdentifier.EMPTY.equals(yangInstanceIdentifier)){
+ if (YangInstanceIdentifier.EMPTY.equals(yangInstanceIdentifier)) {
pruneAndMergeNode(yangInstanceIdentifier, normalizedNode);
} else {
delegate.merge(yangInstanceIdentifier, normalizedNode);
}
- } catch (SchemaValidationFailedException e){
+ } catch (SchemaValidationFailedException e) {
LOG.warn("Node at path {} was pruned during merge due to validation error: {}",
yangInstanceIdentifier, e.getMessage());
private void pruneAndMergeNode(YangInstanceIdentifier yangInstanceIdentifier, NormalizedNode<?, ?> normalizedNode) {
NormalizedNode<?,?> pruned = pruneNormalizedNode(yangInstanceIdentifier, normalizedNode);
- if(pruned != null) {
+ if (pruned != null) {
delegate.merge(yangInstanceIdentifier, pruned);
}
}
@Override
public void write(YangInstanceIdentifier yangInstanceIdentifier, NormalizedNode<?, ?> normalizedNode) {
try {
- if(YangInstanceIdentifier.EMPTY.equals(yangInstanceIdentifier)){
+ if (YangInstanceIdentifier.EMPTY.equals(yangInstanceIdentifier)) {
pruneAndWriteNode(yangInstanceIdentifier, normalizedNode);
} else {
delegate.write(yangInstanceIdentifier, normalizedNode);
}
- } catch (SchemaValidationFailedException e){
+ } catch (SchemaValidationFailedException e) {
LOG.warn("Node at path : {} was pruned during write due to validation error: {}",
yangInstanceIdentifier, e.getMessage());
private void pruneAndWriteNode(YangInstanceIdentifier yangInstanceIdentifier, NormalizedNode<?, ?> normalizedNode) {
NormalizedNode<?,?> pruned = pruneNormalizedNode(yangInstanceIdentifier, normalizedNode);
- if(pruned != null) {
+ if (pruned != null) {
delegate.write(yangInstanceIdentifier, pruned);
}
}
public void write(PathArgument child, NormalizedNode<?, ?> data) {
YangInstanceIdentifier path = current().node(child);
NormalizedNode<?, ?> prunedNode = pruningModification.pruneNormalizedNode(path, data);
- if(prunedNode != null) {
+ if (prunedNode != null) {
toModification.write(path, prunedNode);
}
}
public void merge(PathArgument child, NormalizedNode<?, ?> data) {
YangInstanceIdentifier path = current().node(child);
NormalizedNode<?, ?> prunedNode = pruningModification.pruneNormalizedNode(path, data);
- if(prunedNode != null) {
+ if (prunedNode != null) {
toModification.merge(path, prunedNode);
}
}
public void delete(PathArgument child) {
try {
toModification.delete(current().node(child));
- } catch(SchemaValidationFailedException e) {
+ } catch (SchemaValidationFailedException e) {
// Ignoring since we would've already logged this in the call to the original modification.
}
}
private static NormalizedNodeDataOutput streamWriter(DataOutput out) throws IOException {
NormalizedNodeDataOutput streamWriter = REUSABLE_WRITER_TL.get();
- if(streamWriter == null) {
+ if (streamWriter == null) {
streamWriter = NormalizedNodeInputOutput.newDataOutput(out);
}
private static NormalizedNodeDataInput streamReader(DataInput in) throws IOException {
NormalizedNodeDataInput streamReader = REUSABLE_READER_TL.get();
- if(streamReader == null) {
+ if (streamReader == null) {
streamReader = new NormalizedNodeInputStreamReader(in);
}
}
}
- public static void serializeNormalizedNode(NormalizedNode<?, ?> node, DataOutput out) {
- try {
- out.writeBoolean(node != null);
- if(node != null) {
- NormalizedNodeDataOutput streamWriter = streamWriter(out);
- streamWriter.writeNormalizedNode(node);
- }
- } catch (IOException e) {
- throw new IllegalArgumentException(String.format("Error serializing NormalizedNode %s",
- node), e);
+ private static NormalizedNode<?, ?> tryDeserializeNormalizedNode(DataInput in) throws IOException {
+ boolean present = in.readBoolean();
+ if (present) {
+ NormalizedNodeDataInput streamReader = streamReader(in);
+ return streamReader.readNormalizedNode();
}
+
+ return null;
}
public static NormalizedNode<?, ?> deserializeNormalizedNode(DataInput in) {
}
}
- private static NormalizedNode<?, ?> tryDeserializeNormalizedNode(DataInput in) throws IOException {
- boolean present = in.readBoolean();
- if(present) {
- NormalizedNodeDataInput streamReader = streamReader(in);
- return streamReader.readNormalizedNode();
- }
-
- return null;
- }
-
public static NormalizedNode<?, ?> deserializeNormalizedNode(byte [] bytes) {
try {
return tryDeserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes)));
- } catch(InvalidNormalizedNodeStreamException e) {
+ } catch (InvalidNormalizedNodeStreamException e) {
// Probably from legacy protobuf serialization - try that.
try {
NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(bytes);
}
}
+ public static void serializeNormalizedNode(NormalizedNode<?, ?> node, DataOutput out) {
+ try {
+ out.writeBoolean(node != null);
+ if (node != null) {
+ NormalizedNodeDataOutput streamWriter = streamWriter(out);
+ streamWriter.writeNormalizedNode(node);
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Error serializing NormalizedNode %s",
+ node), e);
+ }
+ }
+
public static byte [] serializeNormalizedNode(NormalizedNode<?, ?> node) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
serializeNormalizedNode(node, new DataOutputStream(bos));
private volatile long pollOnCount = 1;
- public TransactionRateLimiter(ActorContext actorContext){
+ public TransactionRateLimiter(ActorContext actorContext) {
this.actorContext = actorContext;
this.commitTimeoutInSeconds = actorContext.getDatastoreContext().getShardTransactionCommitTimeoutInSeconds();
this.dataStoreName = actorContext.getDataStoreName();
- this.txRateLimiter = RateLimiter.create(actorContext.getDatastoreContext().getTransactionCreationInitialRateLimit());
+ this.txRateLimiter = RateLimiter.create(actorContext.getDatastoreContext()
+ .getTransactionCreationInitialRateLimit());
}
- public void acquire(){
+ public void acquire() {
adjustRateLimit();
txRateLimiter.acquire();
}
private void adjustRateLimit() {
final long count = acquireCount.incrementAndGet();
- if(count >= pollOnCount) {
+ if (count >= pollOnCount) {
final Timer commitTimer = actorContext.getOperationTimer(ActorContext.COMMIT);
double newRateLimit = calculateNewRateLimit(commitTimer, commitTimeoutInSeconds);
if (newRateLimit >= 1.0) {
txRateLimiter.setRate(newRateLimit);
- pollOnCount = count + ((long) newRateLimit/2);
+ pollOnCount = count + (long) newRateLimit / 2;
}
}
}
- public double getTxCreationLimit(){
+ public double getTxCreationLimit() {
return txRateLimiter.getRate();
}
- private double getRateLimitFromOtherDataStores(){
+ private double getRateLimitFromOtherDataStores() {
// Since we have no rate data for unused Tx's data store, adjust to the rate from another
// data store that does have rate data.
- for(String name: DatastoreContext.getGlobalDatastoreNames()) {
- if(name.equals(this.dataStoreName)) {
+ for (String name: DatastoreContext.getGlobalDatastoreNames()) {
+ if (name.equals(this.dataStoreName)) {
continue;
}
double newRateLimit = calculateNewRateLimit(actorContext.getOperationTimer(name, ActorContext.COMMIT),
this.commitTimeoutInSeconds);
- if(newRateLimit > 0.0) {
+ if (newRateLimit > 0.0) {
LOG.debug("On unused Tx - data Store {} commit rateLimit adjusted to {}",
this.dataStoreName, newRateLimit);
}
private static double calculateNewRateLimit(Timer commitTimer, long commitTimeoutInSeconds) {
- if(commitTimer == null) {
+ if (commitTimer == null) {
// This can happen in unit tests.
return 0;
}
// Find the time that it takes for transactions to get executed in every 10th percentile
// Compute the rate limit for that percentile and sum it up
- for(int i=1;i<=10;i++){
+ for (int i = 1; i <= 10; i++) {
// Get the amount of time transactions take in the i*10th percentile
double percentileTimeInNanos = timerSnapshot.getValue(i * 0.1D);
- if(percentileTimeInNanos > 0) {
+ if (percentileTimeInNanos > 0) {
// Figure out the rate limit for the i*10th percentile in nanos
- double percentileRateLimit = (commitTimeoutInNanos / percentileTimeInNanos);
+ double percentileRateLimit = commitTimeoutInNanos / percentileTimeInNanos;
// Add the percentileRateLimit to the total rate limit
newRateLimit += percentileRateLimit;
}
// Compute the rate limit per second
- return newRateLimit/(commitTimeoutInSeconds*10);
+ return newRateLimit / (commitTimeoutInSeconds * 10);
}
@VisibleForTesting
}
@VisibleForTesting
- void setPollOnCount(long value){
+ void setPollOnCount(long value) {
pollOnCount = value;
}
@VisibleForTesting
- void setAcquireCount(long value){
+ void setAcquireCount(long value) {
acquireCount.set(value);
}
-
}
import akka.actor.ActorSystem;
import akka.actor.Props;
-import akka.japi.Effect;
import akka.osgi.BundleDelegatingClassLoader;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
actorSystem.actorOf(Props.create(TerminationMonitor.class), TerminationMonitor.ADDRESS);
- actorSystem.actorOf(QuarantinedMonitorActor.props(new Effect() {
-
- @Override
- public void apply() throws Exception {
- // restart the entire karaf container
- LOG.warn("Restarting karaf container");
- System.setProperty("karaf.restart", "true");
- bundleContext.getBundle(0).stop();
- }
+ actorSystem.actorOf(QuarantinedMonitorActor.props(() -> {
+ // restart the entire karaf container
+ LOG.warn("Restarting karaf container");
+ System.setProperty("karaf.restart", "true");
+ bundleContext.getBundle(0).stop();
}), QuarantinedMonitorActor.ADDRESS);
}
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void close() {
LOG.info("Shutting down ActorSystem");
import com.google.common.collect.ForwardingObject;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.cluster.ActorSystemProviderListener;
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.ModuleIdentifier;
import org.opendaylight.controller.config.api.osgi.WaitingServiceTracker;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.osgi.framework.BundleContext;
public class ActorSystemProviderModule extends AbstractActorSystemProviderModule {
private BundleContext bundleContext;
- public ActorSystemProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+ public ActorSystemProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
- public ActorSystemProviderModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, ActorSystemProviderModule oldModule, java.lang.AutoCloseable oldInstance) {
+ public ActorSystemProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver,
+ ActorSystemProviderModule oldModule, AutoCloseable oldInstance) {
super(identifier, dependencyResolver, oldModule, oldInstance);
}
public class ActorSystemProviderModuleFactory extends AbstractActorSystemProviderModuleFactory {
@Override
- public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) {
- ActorSystemProviderModule module = (ActorSystemProviderModule)super.createModule(instanceName,dependencyResolver,bundleContext);
+ public Module createModule(String instanceName, DependencyResolver dependencyResolver,
+ BundleContext bundleContext) {
+ ActorSystemProviderModule module = (ActorSystemProviderModule)super.createModule(instanceName,
+ dependencyResolver,bundleContext);
module.setBundleContext(bundleContext);
return module;
}
@Override
public Module createModule(String instanceName, DependencyResolver dependencyResolver,
DynamicMBeanWithInstance old, BundleContext bundleContext) throws Exception {
- ActorSystemProviderModule module = (ActorSystemProviderModule)super.createModule(instanceName, dependencyResolver,
+ ActorSystemProviderModule module = (ActorSystemProviderModule)super.createModule(instanceName,
+ dependencyResolver,
old, bundleContext);
module.setBundleContext(bundleContext);
return module;
package org.opendaylight.controller.config.yang.config.concurrent_data_broker;
import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.ModuleIdentifier;
import org.opendaylight.controller.config.api.osgi.WaitingServiceTracker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.spi.ForwardingDOMDataBroker;
public class DomConcurrentDataBrokerModule extends AbstractDomConcurrentDataBrokerModule {
private BundleContext bundleContext;
- public DomConcurrentDataBrokerModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final DependencyResolver dependencyResolver) {
+ public DomConcurrentDataBrokerModule(final ModuleIdentifier identifier,
+ final DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
- public DomConcurrentDataBrokerModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final DependencyResolver dependencyResolver, final DomConcurrentDataBrokerModule oldModule, final AutoCloseable oldInstance) {
+ public DomConcurrentDataBrokerModule(final ModuleIdentifier identifier,
+ final DependencyResolver dependencyResolver, final DomConcurrentDataBrokerModule oldModule,
+ final AutoCloseable oldInstance) {
super(identifier, dependencyResolver, oldModule, oldInstance);
}
public class DomConcurrentDataBrokerModuleFactory extends AbstractDomConcurrentDataBrokerModuleFactory {
@Override
- public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) {
+ public Module createModule(String instanceName, DependencyResolver dependencyResolver,
+ BundleContext bundleContext) {
DomConcurrentDataBrokerModule module = (DomConcurrentDataBrokerModule)super.createModule(instanceName,
dependencyResolver, bundleContext);
module.setBundleContext(bundleContext);
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.ModuleIdentifier;
import org.opendaylight.controller.config.api.osgi.WaitingServiceTracker;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.osgi.framework.BundleContext;
-public class DistributedConfigDataStoreProviderModule extends
- org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule {
+public class DistributedConfigDataStoreProviderModule extends AbstractDistributedConfigDataStoreProviderModule {
private BundleContext bundleContext;
public DistributedConfigDataStoreProviderModule(
super(identifier, dependencyResolver);
}
- public DistributedConfigDataStoreProviderModule(
- org.opendaylight.controller.config.api.ModuleIdentifier identifier,
- org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
- org.opendaylight.controller.config.yang.config.distributed_datastore_provider.DistributedConfigDataStoreProviderModule oldModule,
- java.lang.AutoCloseable oldInstance) {
+ public DistributedConfigDataStoreProviderModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver,
+ DistributedConfigDataStoreProviderModule oldModule, AutoCloseable oldInstance) {
super(identifier, dependencyResolver, oldModule, oldInstance);
}
private static DatastoreContext newDatastoreContext(ConfigProperties inProps) {
ConfigProperties props = inProps;
- if(props == null) {
+ if (props == null) {
props = new ConfigProperties();
}
return DatastoreContext.newBuilder()
.logicalStoreType(LogicalDatastoreType.CONFIGURATION)
.maxShardDataChangeExecutorPoolSize(props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue())
- .maxShardDataChangeExecutorQueueSize(props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue())
- .maxShardDataChangeListenerQueueSize(props.getMaxShardDataChangeListenerQueueSize().getValue().intValue())
+ .maxShardDataChangeExecutorQueueSize(props.getMaxShardDataChangeExecutorQueueSize()
+ .getValue().intValue())
+ .maxShardDataChangeListenerQueueSize(props.getMaxShardDataChangeListenerQueueSize()
+ .getValue().intValue())
.maxShardDataStoreExecutorQueueSize(props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue())
.shardTransactionIdleTimeoutInMinutes(props.getShardTransactionIdleTimeoutInMinutes().getValue())
.operationTimeoutInSeconds(props.getOperationTimeoutInSeconds().getValue())
- .shardJournalRecoveryLogBatchSize(props.getShardJournalRecoveryLogBatchSize().
- getValue().intValue())
+ .shardJournalRecoveryLogBatchSize(props.getShardJournalRecoveryLogBatchSize()
+ .getValue().intValue())
.shardSnapshotBatchCount(props.getShardSnapshotBatchCount().getValue().intValue())
- .shardSnapshotDataThresholdPercentage(props.getShardSnapshotDataThresholdPercentage().getValue().intValue())
+ .shardSnapshotDataThresholdPercentage(props.getShardSnapshotDataThresholdPercentage()
+ .getValue().intValue())
.shardHeartbeatIntervalInMillis(props.getShardHeartbeatIntervalInMillis().getValue())
.shardInitializationTimeoutInSeconds(props.getShardInitializationTimeoutInSeconds().getValue())
.shardLeaderElectionTimeoutInSeconds(props.getShardLeaderElectionTimeoutInSeconds().getValue())
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
-/*
-* Generated file
-*
-* Generated from: yang module name: distributed-datastore-provider yang module local name: distributed-config-datastore-provider
-* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
-* Generated at: Tue Jun 24 17:14:50 PDT 2014
-*
-* Do not modify this file unless it is present under src/main directory
-*/
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
import org.opendaylight.controller.config.api.DependencyResolver;
import org.opendaylight.controller.config.spi.Module;
import org.osgi.framework.BundleContext;
-public class DistributedConfigDataStoreProviderModuleFactory extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModuleFactory {
+public class DistributedConfigDataStoreProviderModuleFactory
+ extends AbstractDistributedConfigDataStoreProviderModuleFactory {
@Override
- public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) {
- DistributedConfigDataStoreProviderModule module = (DistributedConfigDataStoreProviderModule)super.createModule(instanceName,dependencyResolver,bundleContext);
+ public Module createModule(String instanceName, DependencyResolver dependencyResolver,
+ BundleContext bundleContext) {
+ DistributedConfigDataStoreProviderModule module = (DistributedConfigDataStoreProviderModule)super.createModule(
+ instanceName,dependencyResolver,bundleContext);
module.setBundleContext(bundleContext);
return module;
}
@Override
public Module createModule(String instanceName, DependencyResolver dependencyResolver,
DynamicMBeanWithInstance old, BundleContext bundleContext) throws Exception {
- DistributedConfigDataStoreProviderModule module = (DistributedConfigDataStoreProviderModule)super.createModule(instanceName, dependencyResolver,
- old, bundleContext);
+ DistributedConfigDataStoreProviderModule module = (DistributedConfigDataStoreProviderModule)super.createModule(
+ instanceName, dependencyResolver, old, bundleContext);
module.setBundleContext(bundleContext);
return module;
}
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
+import org.opendaylight.controller.config.api.DependencyResolver;
+import org.opendaylight.controller.config.api.ModuleIdentifier;
import org.opendaylight.controller.config.api.osgi.WaitingServiceTracker;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.osgi.framework.BundleContext;
-public class DistributedOperationalDataStoreProviderModule extends
- org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule {
+public class DistributedOperationalDataStoreProviderModule
+ extends AbstractDistributedOperationalDataStoreProviderModule {
private BundleContext bundleContext;
- public DistributedOperationalDataStoreProviderModule(
- org.opendaylight.controller.config.api.ModuleIdentifier identifier,
- org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+ public DistributedOperationalDataStoreProviderModule(ModuleIdentifier identifier,
+ DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
- public DistributedOperationalDataStoreProviderModule(
- org.opendaylight.controller.config.api.ModuleIdentifier identifier,
- org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
- org.opendaylight.controller.config.yang.config.distributed_datastore_provider.DistributedOperationalDataStoreProviderModule oldModule,
- java.lang.AutoCloseable oldInstance) {
+ public DistributedOperationalDataStoreProviderModule(ModuleIdentifier identifier,
+ DependencyResolver dependencyResolver,DistributedOperationalDataStoreProviderModule oldModule,
+ AutoCloseable oldInstance) {
super(identifier, dependencyResolver, oldModule, oldInstance);
}
private static DatastoreContext newDatastoreContext(OperationalProperties inProps) {
OperationalProperties props = inProps;
- if(props == null) {
+ if (props == null) {
props = new OperationalProperties();
}
return DatastoreContext.newBuilder()
.logicalStoreType(LogicalDatastoreType.OPERATIONAL)
.maxShardDataChangeExecutorPoolSize(props.getMaxShardDataChangeExecutorPoolSize().getValue().intValue())
- .maxShardDataChangeExecutorQueueSize(props.getMaxShardDataChangeExecutorQueueSize().getValue().intValue())
- .maxShardDataChangeListenerQueueSize(props.getMaxShardDataChangeListenerQueueSize().getValue().intValue())
+ .maxShardDataChangeExecutorQueueSize(props.getMaxShardDataChangeExecutorQueueSize()
+ .getValue().intValue())
+ .maxShardDataChangeListenerQueueSize(props.getMaxShardDataChangeListenerQueueSize()
+ .getValue().intValue())
.maxShardDataStoreExecutorQueueSize(props.getMaxShardDataStoreExecutorQueueSize().getValue().intValue())
.shardTransactionIdleTimeoutInMinutes(props.getShardTransactionIdleTimeoutInMinutes().getValue())
.operationTimeoutInSeconds(props.getOperationTimeoutInSeconds().getValue())
- .shardJournalRecoveryLogBatchSize(props.getShardJournalRecoveryLogBatchSize().
- getValue().intValue())
+ .shardJournalRecoveryLogBatchSize(props.getShardJournalRecoveryLogBatchSize()
+ .getValue().intValue())
.shardSnapshotBatchCount(props.getShardSnapshotBatchCount().getValue().intValue())
- .shardSnapshotDataThresholdPercentage(props.getShardSnapshotDataThresholdPercentage().getValue().intValue())
+ .shardSnapshotDataThresholdPercentage(props.getShardSnapshotDataThresholdPercentage()
+ .getValue().intValue())
.shardHeartbeatIntervalInMillis(props.getShardHeartbeatIntervalInMillis().getValue())
.shardInitializationTimeoutInSeconds(props.getShardInitializationTimeoutInSeconds().getValue())
.shardLeaderElectionTimeoutInSeconds(props.getShardLeaderElectionTimeoutInSeconds().getValue())
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
-/*
-* Generated file
-*
-* Generated from: yang module name: distributed-datastore-provider yang module local name: distributed-operational-datastore-provider
-* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
-* Generated at: Tue Jun 24 17:14:50 PDT 2014
-*
-* Do not modify this file unless it is present under src/main directory
-*/
package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
import org.opendaylight.controller.config.api.DependencyResolver;
import org.opendaylight.controller.config.spi.Module;
import org.osgi.framework.BundleContext;
-public class DistributedOperationalDataStoreProviderModuleFactory extends org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModuleFactory {
+public class DistributedOperationalDataStoreProviderModuleFactory
+ extends AbstractDistributedOperationalDataStoreProviderModuleFactory {
@Override
- public Module createModule(String instanceName, DependencyResolver dependencyResolver, BundleContext bundleContext) {
- DistributedOperationalDataStoreProviderModule module = (DistributedOperationalDataStoreProviderModule)super.createModule(instanceName,dependencyResolver,bundleContext);
+ public Module createModule(String instanceName, DependencyResolver dependencyResolver,
+ BundleContext bundleContext) {
+ DistributedOperationalDataStoreProviderModule module = (DistributedOperationalDataStoreProviderModule)
+ super.createModule(instanceName,dependencyResolver,bundleContext);
module.setBundleContext(bundleContext);
return module;
}
@Override
public Module createModule(String instanceName, DependencyResolver dependencyResolver,
DynamicMBeanWithInstance old, BundleContext bundleContext) throws Exception {
- DistributedOperationalDataStoreProviderModule module = (DistributedOperationalDataStoreProviderModule)super.createModule(instanceName, dependencyResolver,
+ DistributedOperationalDataStoreProviderModule module = (DistributedOperationalDataStoreProviderModule)
+ super.createModule(instanceName, dependencyResolver,
old, bundleContext);
module.setBundleContext(bundleContext);
return module;
}
@Override
- public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
- YangInstanceIdentifier path, L listener, DataChangeScope scope) {
+ public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L>
+ registerChangeListener(YangInstanceIdentifier path, L listener, DataChangeScope scope) {
return delegate().registerChangeListener(path, listener, scope);
}