import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
private static final TransactionIdentifier TX_ID = new TransactionIdentifier(new LocalHistoryIdentifier(
ClientIdentifier.create(FrontendIdentifier.create(MemberName.forName("test"), FrontendType.forName("test")), 0),
0), 0);
- private static final DeleteModification DELETE = new DeleteModification(DataStoreVersions.CURRENT_VERSION);
private OperationLimiter limiter;
private RemoteTransactionContext txContext;
- private ActorContext actorContext;
+ private ActorUtils actorUtils;
private TestKit kit;
@Before
public void before() {
kit = new TestKit(getSystem());
- actorContext = Mockito.spy(new ActorContext(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
+ actorUtils = Mockito.spy(new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
mock(Configuration.class)));
limiter = new OperationLimiter(TX_ID, 4, 0);
- txContext = new RemoteTransactionContext(TX_ID, actorContext.actorSelection(kit.getRef().path()), actorContext,
+ txContext = new RemoteTransactionContext(TX_ID, actorUtils.actorSelection(kit.getRef().path()), actorUtils,
DataStoreVersions.CURRENT_VERSION, limiter);
txContext.operationHandOffComplete();
}
*/
@Test
public void testLimiterOnFailure() throws TimeoutException, InterruptedException {
- txContext.executeModification(DELETE, null);
- txContext.executeModification(DELETE, null);
+ txContext.executeDelete(null, null);
+ txContext.executeDelete(null, null);
assertEquals(2, limiter.availablePermits());
- Future<Object> future = txContext.sendBatchedModifications();
+ final Future<Object> sendFuture = txContext.sendBatchedModifications();
assertEquals(2, limiter.availablePermits());
BatchedModifications msg = kit.expectMsgClass(BatchedModifications.class);
assertEquals(2, msg.getModifications().size());
assertEquals(1, msg.getTotalMessagesSent());
sendReply(new Failure(new NullPointerException()));
- assertFuture(future, new OnComplete<Object>() {
+ assertFuture(sendFuture, new OnComplete<>() {
@Override
public void onComplete(final Throwable failure, final Object success) {
assertTrue(failure instanceof NullPointerException);
assertEquals(4, limiter.availablePermits());
// The transaction has failed, no throttling should occur
- txContext.executeModification(DELETE, null);
+ txContext.executeDelete(null, null);
assertEquals(4, limiter.availablePermits());
// Executing a read should result in immediate failure
}
});
- future = txContext.directCommit(null);
+ final Future<Object> commitFuture = txContext.directCommit(null);
msg = kit.expectMsgClass(BatchedModifications.class);
// Modification should have been thrown away by the dropped transmit induced by executeRead()
assertTrue(msg.isReady());
assertEquals(2, msg.getTotalMessagesSent());
sendReply(new Failure(new IllegalStateException()));
- assertFuture(future, new OnComplete<Object>() {
+ assertFuture(commitFuture, new OnComplete<>() {
@Override
public void onComplete(final Throwable failure, final Object success) {
assertTrue(failure instanceof IllegalStateException);
}
});
- kit.expectNoMsg();
+ kit.expectNoMessage();
}
/**
*/
@Test
public void testLimiterOnOverflowFailure() throws TimeoutException, InterruptedException {
- txContext.executeModification(DELETE, null);
- txContext.executeModification(DELETE, null);
- txContext.executeModification(DELETE, null);
- txContext.executeModification(DELETE, null);
+ txContext.executeDelete(null, null);
+ txContext.executeDelete(null, null);
+ txContext.executeDelete(null, null);
+ txContext.executeDelete(null, null);
assertEquals(0, limiter.availablePermits());
- txContext.executeModification(DELETE, null);
+ txContext.executeDelete(null, null);
// Last acquire should have failed ...
assertEquals(0, limiter.availablePermits());
- Future<Object> future = txContext.sendBatchedModifications();
+ final Future<Object> future = txContext.sendBatchedModifications();
assertEquals(0, limiter.availablePermits());
BatchedModifications msg = kit.expectMsgClass(BatchedModifications.class);
assertEquals(1, msg.getTotalMessagesSent());
sendReply(new Failure(new NullPointerException()));
- assertFuture(future, new OnComplete<Object>() {
+ assertFuture(future, new OnComplete<>() {
@Override
public void onComplete(final Throwable failure, final Object success) {
assertTrue(failure instanceof NullPointerException);
}
});
- kit.expectNoMsg();
+ kit.expectNoMessage();
}
private void sendReply(final Object message) {