This series of patches uses lambdas instead of anonymous classes for
functional interfaces when possible. Lambdas are replaced with method
references when appropriate.
Change-Id: Ib397cbe0e0179f2f47ef10f13301b604dc6db88b
Signed-off-by: Stephen Kitt <skitt@redhat.com>
16 files changed:
import scala.concurrent.duration.Duration;
public abstract class AbstractDataStoreClientActor extends AbstractClientActor {
import scala.concurrent.duration.Duration;
public abstract class AbstractDataStoreClientActor extends AbstractClientActor {
- private static final Function1<ActorRef, ?> GET_CLIENT_FACTORY = ExplicitAsk.toScala(t -> new GetClientRequest(t));
+ private static final Function1<ActorRef, ?> GET_CLIENT_FACTORY = ExplicitAsk.toScala(GetClientRequest::new);
private final ActorContext actorContext;
private final ActorContext actorContext;
@Override
final void doAbort() {
@Override
final void doAbort() {
- sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> {
- LOG.debug("Transaction {} abort completed with {}", identifier, response);
- });
+ sendAbort(new AbortLocalTransactionRequest(identifier, localActor()),
+ response -> LOG.debug("Transaction {} abort completed with {}", identifier, response));
void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
- tree.closeTransactionChain(getIdentifier(), () -> {
- envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
- });
+ tree.closeTransactionChain(getIdentifier(),
+ () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now));
}
void purge(final long sequence, final RequestEnvelope envelope, final long now) {
LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
}
void purge(final long sequence, final RequestEnvelope envelope, final long now) {
LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
- tree.purgeTransactionChain(getIdentifier(), () -> {
- envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
- });
+ tree.purgeTransactionChain(getIdentifier(),
+ () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now));
}
private FrontendTransaction createTransaction(final TransactionRequest<?> request, final TransactionIdentifier id)
}
private FrontendTransaction createTransaction(final TransactionRequest<?> request, final TransactionIdentifier id)
final Iterable<Object> results;
try {
final Iterable<Object> results;
try {
- results = Await.result(Futures.sequence(Lists.transform(futures, e -> e.getValue()),
+ results = Await.result(Futures.sequence(Lists.transform(futures, Entry::getValue),
ExecutionContexts.global()), timeout.duration());
} catch (TimeoutException e) {
successfulFromPrevious = null;
ExecutionContexts.global()), timeout.duration());
} catch (TimeoutException e) {
successfulFromPrevious = null;
getActorContext().getActorSystem().scheduler().scheduleOnce(
FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS),
getActorContext().getActorSystem().scheduler().scheduleOnce(
FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS),
- () -> tryFindPrimaryShard(), getActorContext().getClientDispatcher());
+ this::tryFindPrimaryShard, getActorContext().getClientDispatcher());
try {
Collection<DOMEntityOwnershipListener> listeners = entityTypeListenerMap.get(entity.getType());
if (!listeners.isEmpty()) {
try {
Collection<DOMEntityOwnershipListener> listeners = entityTypeListenerMap.get(entity.getType());
if (!listeners.isEmpty()) {
- notifyListeners(entity, wasOwner, isOwner, hasOwner, listeners.stream().map(
- listener -> listenerActorMap.get(listener)).collect(Collectors.toList()));
+ notifyListeners(entity, wasOwner, isOwner, hasOwner,
+ listeners.stream().map(listenerActorMap::get).collect(Collectors.toList()));
}
} finally {
listenerLock.readLock().unlock();
}
} finally {
listenerLock.readLock().unlock();
final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()]));
final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()]));
- combinedFuture.thenRun(() -> {
- sender.tell(new Status.Success(null), noSender());
- }).exceptionally(throwable -> {
- sender.tell(new Status.Failure(throwable), self());
- return null;
- });
+ combinedFuture
+ .thenRun(() -> sender.tell(new Success(null), noSender()))
+ .exceptionally(throwable -> {
+ sender.tell(new Status.Failure(throwable), self());
+ return null;
+ });
}
private void onNotifyProducerCreated(final NotifyProducerCreated message) {
}
private void onNotifyProducerCreated(final NotifyProducerCreated message) {
Answer<ListenableFuture<Boolean>> asyncCanCommit = invocation -> {
final SettableFuture<Boolean> future = SettableFuture.create();
if (doAsync) {
Answer<ListenableFuture<Boolean>> asyncCanCommit = invocation -> {
final SettableFuture<Boolean> future = SettableFuture.create();
if (doAsync) {
- new Thread() {
- @Override
- public void run() {
- Uninterruptibles.awaitUninterruptibly(asyncCanCommitContinue,
- 10, TimeUnit.SECONDS);
- future.set(true);
- }
- }.start();
+ new Thread(() -> {
+ Uninterruptibles.awaitUninterruptibly(asyncCanCommitContinue,
+ 10, TimeUnit.SECONDS);
+ future.set(true);
+ }).start();
} else {
future.set(true);
}
} else {
future.set(true);
}
return null;
}).when(mock).validate(any(DataTreeModification.class));
return null;
}).when(mock).validate(any(DataTreeModification.class));
- doAnswer(invocation -> {
- return actual.prepare(invocation.getArgumentAt(0, DataTreeModification.class));
- }).when(mock).prepare(any(DataTreeModification.class));
+ doAnswer(invocation -> actual.prepare(invocation.getArgumentAt(0, DataTreeModification.class))).when(
+ mock).prepare(any(DataTreeModification.class));
doAnswer(invocation -> {
actual.commit(invocation.getArgumentAt(0, DataTreeCandidate.class));
doAnswer(invocation -> {
actual.commit(invocation.getArgumentAt(0, DataTreeCandidate.class));
return null;
}).when(mock).setSchemaContext(any(SchemaContext.class));
return null;
}).when(mock).setSchemaContext(any(SchemaContext.class));
- doAnswer(invocation -> {
- return actual.takeSnapshot();
- }).when(mock).takeSnapshot();
+ doAnswer(invocation -> actual.takeSnapshot()).when(mock).takeSnapshot();
- doAnswer(invocation -> {
- return actual.getRootPath();
- }).when(mock).getRootPath();
+ doAnswer(invocation -> actual.getRootPath()).when(mock).getRootPath();
final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
- new Thread() {
- @Override
- public void run() {
- proxy.init(path, scope);
- }
-
- }.start();
+ new Thread(() -> proxy.init(path, scope)).start();
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
- new Thread() {
- @Override
- public void run() {
- proxy.init(path, scope);
- }
-
- }.start();
+ new Thread(() -> proxy.init(path, scope)).start();
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
- new Thread() {
- @Override
- public void run() {
- proxy.init(path, scope);
- }
-
- }.start();
+ new Thread(() -> proxy.init(path, scope)).start();
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
final DataChangeScope scope = AsyncDataBroker.DataChangeScope.ONE;
- new Thread() {
- @Override
- public void run() {
- proxy.init(path, scope);
- }
-
- }.start();
+ new Thread(() -> proxy.init(path, scope)).start();
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
actorContext, mockListener, path);
final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
actorContext, mockListener, path);
- new Thread() {
- @Override
- public void run() {
- proxy.init("shard-1");
- }
-
- }.start();
+ new Thread(() -> proxy.init("shard-1")).start();
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
final DataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> proxy =
new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener, path);
final DataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> proxy =
new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener, path);
- new Thread() {
- @Override
- public void run() {
- proxy.init("shard-1");
- }
-
- }.start();
+ new Thread(() -> proxy.init("shard-1")).start();
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
actorContext, mockListener, path);
final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
actorContext, mockListener, path);
- new Thread() {
- @Override
- public void run() {
- proxy.init("shard-1");
- }
-
- }.start();
+ new Thread(() -> proxy.init("shard-1")).start();
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
actorContext, mockListener, path);
final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> proxy = new DataTreeChangeListenerProxy<>(
actorContext, mockListener, path);
- new Thread() {
- @Override
- public void run() {
- proxy.init("shard-1");
- }
-
- }.start();
+ new Thread(() -> proxy.init("shard-1")).start();
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
FiniteDuration timeout = duration("5 seconds");
FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
final AtomicReference<Exception> caughtEx = new AtomicReference<>();
final CountDownLatch txReady = new CountDownLatch(1);
final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
final AtomicReference<Exception> caughtEx = new AtomicReference<>();
final CountDownLatch txReady = new CountDownLatch(1);
- final Thread txThread = new Thread() {
- @Override
- public void run() {
- try {
- writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ final Thread txThread = new Thread(() -> {
+ try {
+ writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- writeTx.merge(TestModel.OUTER_LIST_PATH,
- ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+ writeTx.merge(TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
- writeTx.write(listEntryPath,
- ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
+ writeTx.write(listEntryPath,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
- writeTx.delete(listEntryPath);
+ writeTx.delete(listEntryPath);
- txCohort.set(writeTx.ready());
- } catch (Exception e) {
- caughtEx.set(e);
- } finally {
- txReady.countDown();
- }
+ txCohort.set(writeTx.ready());
+ } catch (Exception e) {
+ caughtEx.set(e);
+ } finally {
+ txReady.countDown();
txReadFuture = new AtomicReference<>();
final AtomicReference<Exception> caughtEx = new AtomicReference<>();
final CountDownLatch txReadsDone = new CountDownLatch(1);
txReadFuture = new AtomicReference<>();
final AtomicReference<Exception> caughtEx = new AtomicReference<>();
final CountDownLatch txReadsDone = new CountDownLatch(1);
- final Thread txThread = new Thread() {
- @Override
- public void run() {
- try {
- readWriteTx.write(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ final Thread txThread = new Thread(() -> {
+ try {
+ readWriteTx.write(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
+ txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
- txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
- } catch (Exception e) {
- caughtEx.set(e);
- } finally {
- txReadsDone.countDown();
- }
+ txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
+ } catch (Exception e) {
+ caughtEx.set(e);
+ } finally {
+ txReadsDone.countDown();
@SuppressWarnings("unchecked")
public static ShardDataTreeCohort failedCanCommit(final ShardDataTreeCohort mock) {
@SuppressWarnings("unchecked")
public static ShardDataTreeCohort failedCanCommit(final ShardDataTreeCohort mock) {
- doAnswer(invocation -> {
- return invokeFailure(invocation);
- }).when(mock).canCommit(any(FutureCallback.class));
+ doAnswer(ShardDataTreeMocking::invokeFailure).when(mock).canCommit(any(FutureCallback.class));
return mock;
}
@SuppressWarnings("unchecked")
public static ShardDataTreeCohort failedPreCommit(final ShardDataTreeCohort mock) {
return mock;
}
@SuppressWarnings("unchecked")
public static ShardDataTreeCohort failedPreCommit(final ShardDataTreeCohort mock) {
- doAnswer(invocation -> {
- return invokeFailure(invocation);
- }).when(mock).preCommit(any(FutureCallback.class));
+ doAnswer(ShardDataTreeMocking::invokeFailure).when(mock).preCommit(any(FutureCallback.class));
return mock;
}
@SuppressWarnings("unchecked")
public static ShardDataTreeCohort failedCommit(final ShardDataTreeCohort mock) {
return mock;
}
@SuppressWarnings("unchecked")
public static ShardDataTreeCohort failedCommit(final ShardDataTreeCohort mock) {
- doAnswer(invocation -> {
- return invokeFailure(invocation);
- }).when(mock).commit(any(FutureCallback.class));
+ doAnswer(ShardDataTreeMocking::invokeFailure).when(mock).commit(any(FutureCallback.class));
return mock;
}
@SuppressWarnings("unchecked")
public static ShardDataTreeCohort successfulCanCommit(final ShardDataTreeCohort mock) {
return mock;
}
@SuppressWarnings("unchecked")
public static ShardDataTreeCohort successfulCanCommit(final ShardDataTreeCohort mock) {
- doAnswer(invocation -> {
- return invokeSuccess(invocation, null);
- }).when(mock).canCommit(any(FutureCallback.class));
+ doAnswer(invocation -> invokeSuccess(invocation, null)).when(mock).canCommit(any(FutureCallback.class));
@SuppressWarnings("unchecked")
public static ShardDataTreeCohort successfulPreCommit(final ShardDataTreeCohort mock,
final DataTreeCandidate candidate) {
@SuppressWarnings("unchecked")
public static ShardDataTreeCohort successfulPreCommit(final ShardDataTreeCohort mock,
final DataTreeCandidate candidate) {
- doAnswer(invocation -> {
- return invokeSuccess(invocation, candidate);
- }).when(mock).preCommit(any(FutureCallback.class));
+ doAnswer(invocation -> invokeSuccess(invocation, candidate)).when(mock).preCommit(any(FutureCallback.class));
@SuppressWarnings("unchecked")
public static ShardDataTreeCohort successfulCommit(final ShardDataTreeCohort mock, final UnsignedLong index) {
@SuppressWarnings("unchecked")
public static ShardDataTreeCohort successfulCommit(final ShardDataTreeCohort mock, final UnsignedLong index) {
- doAnswer(invocation -> {
- return invokeSuccess(invocation, index);
- }).when(mock).commit(any(FutureCallback.class));
+ doAnswer(invocation -> invokeSuccess(invocation, index)).when(mock).commit(any(FutureCallback.class));
// original ElectionTimeout message to proceed with the election.
firstElectionTimeout = false;
final ActorRef self = getSelf();
// original ElectionTimeout message to proceed with the election.
firstElectionTimeout = false;
final ActorRef self = getSelf();
- new Thread() {
- @Override
- public void run() {
- Uninterruptibles.awaitUninterruptibly(
- onChangeListenerRegistered, 5, TimeUnit.SECONDS);
- self.tell(message, self);
- }
- }.start();
+ new Thread(() -> {
+ Uninterruptibles.awaitUninterruptibly(
+ onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+ self.tell(message, self);
+ }).start();
onFirstElectionTimeout.countDown();
} else {
onFirstElectionTimeout.countDown();
} else {
if (message instanceof ElectionTimeout && firstElectionTimeout) {
firstElectionTimeout = false;
final ActorRef self = getSelf();
if (message instanceof ElectionTimeout && firstElectionTimeout) {
firstElectionTimeout = false;
final ActorRef self = getSelf();
- new Thread() {
- @Override
- public void run() {
- Uninterruptibles.awaitUninterruptibly(
- onChangeListenerRegistered, 5, TimeUnit.SECONDS);
- self.tell(message, self);
- }
- }.start();
+ new Thread(() -> {
+ Uninterruptibles.awaitUninterruptibly(
+ onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+ self.tell(message, self);
+ }).start();
onFirstElectionTimeout.countDown();
} else {
onFirstElectionTimeout.countDown();
} else {
final AtomicReference<Exception> caughtEx = new AtomicReference<>();
final CountDownLatch write2Complete = new CountDownLatch(1);
final AtomicReference<Exception> caughtEx = new AtomicReference<>();
final CountDownLatch write2Complete = new CountDownLatch(1);
- new Thread() {
- @Override
- public void run() {
- try {
- writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
- } catch (Exception e) {
- caughtEx.set(e);
- } finally {
- write2Complete.countDown();
- }
+ new Thread(() -> {
+ try {
+ writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
+ } catch (Exception e) {
+ caughtEx.set(e);
+ } finally {
+ write2Complete.countDown();
assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
final AtomicReference<Exception> caughtEx = new AtomicReference<>();
final CountDownLatch write2Complete = new CountDownLatch(1);
final AtomicReference<Exception> caughtEx = new AtomicReference<>();
final CountDownLatch write2Complete = new CountDownLatch(1);
- new Thread() {
- @Override
- public void run() {
- try {
- writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
- } catch (Exception e) {
- caughtEx.set(e);
- } finally {
- write2Complete.countDown();
- }
+ new Thread(() -> {
+ try {
+ writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
+ } catch (Exception e) {
+ caughtEx.set(e);
+ } finally {
+ write2Complete.countDown();
assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
assertEquals("Tx 2 write should've completed", true, write2Complete.await(5, TimeUnit.SECONDS));
assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
assertEquals("getType", shardMrgIDSuffix, datastoreSnapshot.getType());
assertNull("Expected null ShardManagerSnapshot", datastoreSnapshot.getShardManagerSnapshot());
- Function<ShardSnapshot, String> shardNameTransformer = s -> s.getName();
+ Function<ShardSnapshot, String> shardNameTransformer = ShardSnapshot::getName;
assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));
assertEquals("Shard names", Sets.newHashSet("shard1", "shard2"), Sets.newHashSet(
Lists.transform(datastoreSnapshot.getShardSnapshots(), shardNameTransformer)));