// original ElectionTimeout message to proceed with the election.
firstElectionTimeout = false;
final ActorRef self = getSelf();
- new Thread() {
- @Override
- public void run() {
- Uninterruptibles.awaitUninterruptibly(
- onChangeListenerRegistered, 5, TimeUnit.SECONDS);
- self.tell(message, self);
- }
- }.start();
+ new Thread(() -> {
+ Uninterruptibles.awaitUninterruptibly(
+ onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+ self.tell(message, self);
+ }).start();
onFirstElectionTimeout.countDown();
} else {
if (message instanceof ElectionTimeout && firstElectionTimeout) {
firstElectionTimeout = false;
final ActorRef self = getSelf();
- new Thread() {
- @Override
- public void run() {
- Uninterruptibles.awaitUninterruptibly(
- onChangeListenerRegistered, 5, TimeUnit.SECONDS);
- self.tell(message, self);
- }
- }.start();
+ new Thread(() -> {
+ Uninterruptibles.awaitUninterruptibly(
+ onChangeListenerRegistered, 5, TimeUnit.SECONDS);
+ self.tell(message, self);
+ }).start();
onFirstElectionTimeout.countDown();
} else {
final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
- if (caughtEx.get() != null) {
- Throwables.propagateIfInstanceOf(caughtEx.get(), Exception.class);
- Throwables.propagate(caughtEx.get());
+ final Throwable t = caughtEx.get();
+ if (t != null) {
+ Throwables.propagateIfPossible(t, Exception.class);
+ throw new RuntimeException(t);
}
assertEquals("Commits complete", true, done);
verifyOuterListEntry(shard, 1);
- verifyLastApplied(shard, 2);
+ verifyLastApplied(shard, 5);
}
};
}
final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
if (failure != null) {
- Throwables.propagateIfInstanceOf(failure.cause(), Exception.class);
- Throwables.propagate(failure.cause());
+ Throwables.propagateIfPossible(failure.cause(), Exception.class);
+ throw new RuntimeException(failure.cause());
}
}
};
inOrder.verify(dataTree).prepare(any(DataTreeModification.class));
inOrder.verify(dataTree).commit(any(DataTreeCandidate.class));
+ // Purge request is scheduled as asynchronous, wait for two heartbeats to let it propagate into
+ // the journal
+ Thread.sleep(HEARTBEAT_MILLIS * 2);
+
shard.tell(Shard.GET_SHARD_MBEAN_MESSAGE, getRef());
final ShardStats shardStats = expectMsgClass(duration, ShardStats.class);
// Commit index should advance as we do not have an empty
// modification
- assertEquals(0, shardStats.getCommitIndex());
+ assertEquals(1, shardStats.getCommitIndex());
}
};
}
.shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(true).build();
final Props persistentProps = Shard.builder().id(shardID).datastoreContext(persistentContext)
- .schemaContext(SCHEMA_CONTEXT).props();
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
final DatastoreContext nonPersistentContext = DatastoreContext.newBuilder()
.shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).persistent(false).build();
final Props nonPersistentProps = Shard.builder().id(shardID).datastoreContext(nonPersistentContext)
- .schemaContext(SCHEMA_CONTEXT).props();
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props();
new ShardTestKit(getSystem()) {
{
waitUntilLeader(shard);
- final TestActorRef<MessageCollectorActor> listener =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+ final ActorRef listener = getSystem().actorOf(MessageCollectorActor.props());
shard.tell(new RegisterRoleChangeListener(), listener);
.datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
.peerAddresses(Collections.singletonMap(leaderShardID.toString(),
"akka://test/user/" + leaderShardID.toString()))
- .schemaContext(SCHEMA_CONTEXT).props()
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
final TestActorRef<Shard> leaderShard = actorFactory
.createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
.peerAddresses(Collections.singletonMap(followerShardID.toString(),
"akka://test/user/" + followerShardID.toString()))
- .schemaContext(SCHEMA_CONTEXT).props()
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
.datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build())
.peerAddresses(Collections.singletonMap(leaderShardID.toString(),
"akka://test/user/" + leaderShardID.toString()))
- .schemaContext(SCHEMA_CONTEXT).props()
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
final TestActorRef<Shard> leaderShard = actorFactory
.createTestActor(Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext())
.peerAddresses(Collections.singletonMap(followerShardID.toString(),
"akka://test/user/" + followerShardID.toString()))
- .schemaContext(SCHEMA_CONTEXT).props()
+ .schemaContextProvider(() -> SCHEMA_CONTEXT).props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
leaderShard.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
@Test
public void testServerRemoved() throws Exception {
- final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props());
+ final TestActorRef<MessageCollectorActor> parent = actorFactory.createTestActor(MessageCollectorActor.props()
+ .withDispatcher(Dispatchers.DefaultDispatcherId()));
final ActorRef shard = parent.underlyingActor().context().actorOf(
newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),