/*
- *
* Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
public class ShardWriteTransaction extends ShardTransaction {
private final MutableCompositeModification compositeModification = new MutableCompositeModification();
+ private int totalBatchedModificationsReceived;
+ private Exception lastBatchedModificationsException;
private final DOMStoreWriteTransaction transaction;
public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
modification.apply(transaction);
}
+ totalBatchedModificationsReceived++;
if(batched.isReady()) {
+ if(lastBatchedModificationsException != null) {
+ throw lastBatchedModificationsException;
+ }
+
+ if(totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
+ throw new IllegalStateException(String.format(
+ "The total number of batched messages received %d does not match the number sent %d",
+ totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
+ }
+
readyTransaction(transaction, false);
} else {
getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
}
} catch (Exception e) {
+ lastBatchedModificationsException = e;
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+
+ if(batched.isReady()) {
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
+ }
}
}
/*
* Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
private final OperationCompleter operationCompleter;
private BatchedModifications batchedModifications;
+ private int totalBatchedModificationsSent;
protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
}
batchedModifications.setReady(ready);
+ batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
sent = executeOperationAsync(batchedModifications);
if(ready) {
private static final long serialVersionUID = 1L;
private boolean ready;
+ private int totalMessagesSent;
private String transactionID;
private String transactionChainID;
this.ready = ready;
}
+ public int getTotalMessagesSent() {
+ return totalMessagesSent;
+ }
+
+ public void setTotalMessagesSent(int totalMessagesSent) {
+ this.totalMessagesSent = totalMessagesSent;
+ }
+
public String getTransactionID() {
return transactionID;
}
transactionID = in.readUTF();
transactionChainID = in.readUTF();
ready = in.readBoolean();
+ totalMessagesSent = in.readInt();
}
@Override
out.writeUTF(transactionID);
out.writeUTF(transactionChainID);
out.writeBoolean(ready);
+ out.writeInt(totalMessagesSent);
}
@Override
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("BatchedModifications [transactionID=").append(transactionID).append(", ready=").append(ready)
- .append(", modifications size=").append(getModifications().size()).append("]");
+ builder.append("BatchedModifications [transactionID=").append(transactionID).append(", transactionChainID=")
+ .append(transactionChainID).append(", ready=").append(ready).append(", totalMessagesSent=")
+ .append(totalMessagesSent).append(", modifications size=").append(getModifications().size())
+ .append("]");
return builder.toString();
}
}
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doThrow;
import akka.actor.ActorRef;
import akka.actor.Props;
+import akka.actor.Status.Failure;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
- batched.setReady(true);
batched.addModification(new WriteModification(writePath, writeData));
transaction.tell(batched, getRef());
+ BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
+ assertEquals("getNumBatched", 1, reply.getNumBatched());
+
+ batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ batched.setReady(true);
+ batched.setTotalMessagesSent(2);
+ transaction.tell(batched, getRef());
expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
}};
}
+ @Test(expected=TestException.class)
+ public void testOnReceiveBatchedModificationsFailure() throws Throwable {
+ new JavaTestKit(getSystem()) {{
+
+ DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
+ final ActorRef transaction = newTransactionActor(mockWriteTx,
+ "testOnReceiveBatchedModificationsFailure");
+
+ JavaTestKit watcher = new JavaTestKit(getSystem());
+ watcher.watch(transaction);
+
+ YangInstanceIdentifier path = TestModel.TEST_PATH;
+ ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doThrow(new TestException()).when(mockWriteTx).write(path, node);
+
+ BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ batched.addModification(new WriteModification(path, node));
+
+ transaction.tell(batched, getRef());
+ expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+ batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ batched.setReady(true);
+ batched.setTotalMessagesSent(2);
+
+ transaction.tell(batched, getRef());
+ Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+ watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+
+ if(failure != null) {
+ throw failure.cause();
+ }
+ }};
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
+ new JavaTestKit(getSystem()) {{
+
+ final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+ "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
+
+ JavaTestKit watcher = new JavaTestKit(getSystem());
+ watcher.watch(transaction);
+
+ BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+ batched.setReady(true);
+ batched.setTotalMessagesSent(2);
+
+ transaction.tell(batched, getRef());
+
+ Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+ watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+
+ if(failure != null) {
+ throw failure.cause();
+ }
+ }};
+ }
+
@Test
public void testOnReceivePreLithiumReadyTransaction() throws Exception {
new JavaTestKit(getSystem()) {{
expectMsgClass(duration("3 seconds"), Terminated.class);
}};
}
+
+ public static class TestException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+ }
}
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
- verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
- isA(BatchedModifications.class));
+ List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+ assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+ verifyBatchedModifications(batchedModifications.get(0), true,
+ new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+
+ assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
}
@Test
verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3),
new DeleteModification(deletePath2));
+
+ assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
}
@Test
batched.addModification(new MergeModification(mergePath, mergeData));
batched.addModification(new DeleteModification(deletePath));
batched.setReady(true);
+ batched.setTotalMessagesSent(5);
BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
(Serializable) batched.toSerializable());
assertEquals("getTransactionID", "tx1", clone.getTransactionID());
assertEquals("getTransactionChainID", "txChain", clone.getTransactionChainID());
assertEquals("isReady", true, clone.isReady());
+ assertEquals("getTotalMessagesSent", 5, clone.getTotalMessagesSent());
assertEquals("getModifications size", 3, clone.getModifications().size());