X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardWriteTransaction.java;h=1d5b1d8e1b99bf35f5fabe8ab3723892dd7af193;hp=d5dcfde803a16bfcc6ea285473167d9b78e8c5cb;hb=c31a6fcf9fb070d4419ca4c32d8b531fdcb5030d;hpb=3927509ec3ecfa32a51b725d2b7155d425f5b877 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index d5dcfde803..1d5b1d8e1b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -1,6 +1,6 @@ /* - * * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -32,7 +32,6 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** * @author: syedbahm @@ -41,12 +40,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; public class ShardWriteTransaction extends ShardTransaction { private final MutableCompositeModification compositeModification = new MutableCompositeModification(); + private int totalBatchedModificationsReceived; + private Exception lastBatchedModificationsException; private final DOMStoreWriteTransaction transaction; public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor, - SchemaContext schemaContext, ShardStats shardStats, String transactionID, - short clientTxVersion) { - super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion); + ShardStats shardStats, String transactionID, short clientTxVersion) { + super(shardActor, shardStats, transactionID, clientTxVersion); this.transaction = transaction; } @@ -88,9 +88,29 @@ public class ShardWriteTransaction extends ShardTransaction { modification.apply(transaction); } - getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf()); + totalBatchedModificationsReceived++; + if(batched.isReady()) { + if(lastBatchedModificationsException != null) { + throw lastBatchedModificationsException; + } + + if(totalBatchedModificationsReceived != batched.getTotalMessagesSent()) { + throw new IllegalStateException(String.format( + "The total number of batched messages received %d does not match the number sent %d", + totalBatchedModificationsReceived, batched.getTotalMessagesSent())); + } + + readyTransaction(transaction, false); + } else { + getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf()); + } } catch (Exception e) { + lastBatchedModificationsException = e; getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + + if(batched.isReady()) { + getSelf().tell(PoisonPill.getInstance(), getSelf()); + } } }