X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=f3f8b8b193c78a5511c02ec6ede526f632c3f172;hp=e6ddd8fa198497cbf1d86f221ecb2ef82b2df5c8;hb=680f28578ae2d6c650422220a35bb69605bfb7c4;hpb=93bec87a3187c32ed3a2a684523b66db3b46645a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index e6ddd8fa19..f3f8b8b193 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -53,6 +53,7 @@ import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; @@ -192,6 +193,7 @@ public class Shard extends RaftActor { .tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), self()); + createSnapshotTransaction = null; // Send a PoisonPill instead of sending close transaction because we do not really need // a response getSender().tell(PoisonPill.getInstance(), self()); @@ -210,6 +212,9 @@ public class Shard extends RaftActor { createTransaction(CreateTransaction.fromSerializable(message)); } else if (getLeader() != null) { getLeader().forward(message, getContext()); + } else { + getSender().tell(new akka.actor.Status.Failure(new IllegalStateException( + "Could not find leader so transaction cannot be created")), getSelf()); } } else if (message instanceof PeerAddressResolved) { PeerAddressResolved resolved = (PeerAddressResolved) message; @@ -326,6 +331,9 @@ public class Shard extends RaftActor { modification); DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); + + LOG.debug("Created new transaction {}", transaction.getIdentifier().toString()); + modification.apply(transaction); try { syncCommitTransaction(transaction); @@ -339,6 +347,12 @@ public class Shard extends RaftActor { return; } + + if(sender == null){ + LOG.error("Commit failed. Sender cannot be null"); + return; + } + final ListenableFuture future = cohort.commit(); final ActorRef self = getSelf(); @@ -455,7 +469,7 @@ public class Shard extends RaftActor { } } else { - LOG.error("Unknown state received {}", data); + LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", data, data.getClass().getClassLoader(), CompositeModificationPayload.class.getClassLoader()); } // Update stats @@ -490,6 +504,8 @@ public class Shard extends RaftActor { // Since this will be done only on Recovery or when this actor is a Follower // we can safely commit everything in here. We not need to worry about event notifications // as they would have already been disabled on the follower + + LOG.info("Applying snapshot"); try { DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot); @@ -504,6 +520,8 @@ public class Shard extends RaftActor { syncCommitTransaction(transaction); } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) { LOG.error(e, "An exception occurred when applying snapshot"); + } finally { + LOG.info("Done applying snapshot"); } } @@ -513,9 +531,6 @@ public class Shard extends RaftActor { .tell(new EnableNotification(isLeader()), getSelf()); } - if (getLeaderId() != null) { - shardMBean.setLeader(getLeaderId()); - } shardMBean.setRaftState(getRaftState().name()); shardMBean.setCurrentTerm(getCurrentTerm()); @@ -531,6 +546,10 @@ public class Shard extends RaftActor { } } + @Override protected void onLeaderChanged(String oldLeader, String newLeader) { + shardMBean.setLeader(newLeader); + } + @Override public String persistenceId() { return this.name.toString(); }