From: Moiz Raja Date: Sat, 21 Feb 2015 01:59:40 +0000 (-0800) Subject: BUG 2718 : Create a diagnostic utility to track append entries replies X-Git-Tag: release/lithium~496^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=18126649bba1cda7b589a4f23b96211562f9e008;ds=sidebyside BUG 2718 : Create a diagnostic utility to track append entries replies This patch introduces a utility to keep track of all messages that were processed between the time a certain message is received. It is used for tracking why AppendEntriesReply do not come in a timely manner. This is important to know because if a leader will switch to IsolatedLeader if these messages do not arrive within the ElectionTimeout interval causing disruption in service. Change-Id: I13bab798aad8a225d22d2ebd3775cbf0bf1d1592 Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 21d74a6e1a..9551c800ca 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -60,10 +60,12 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; +import org.opendaylight.controller.cluster.datastore.utils.MessageTracker; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; @@ -125,6 +127,8 @@ public class Shard extends RaftActor { private final Optional roleChangeNotifier; + private final MessageTracker appendEntriesReplyTracker; + /** * Coordinates persistence recovery on startup. */ @@ -168,6 +172,9 @@ public class Shard extends RaftActor { // create a notifier actor for each cluster member roleChangeNotifier = createRoleChangeNotifier(name.toString()); + + appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class, + getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); } private static Map mapPeerAddresses( @@ -224,35 +231,50 @@ public class Shard extends RaftActor { onRecoveryComplete(); } else { super.onReceiveRecover(message); + if(LOG.isTraceEnabled()) { + appendEntriesReplyTracker.begin(); + } } } @Override public void onReceiveCommand(final Object message) throws Exception { - if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) { - handleCreateTransaction(message); - } else if(message instanceof ForwardedReadyTransaction) { - handleForwardedReadyTransaction((ForwardedReadyTransaction)message); - } else if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) { - handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message)); - } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) { - handleCommitTransaction(CommitTransaction.fromSerializable(message)); - } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) { - handleAbortTransaction(AbortTransaction.fromSerializable(message)); - } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){ - closeTransactionChain(CloseTransactionChain.fromSerializable(message)); - } else if (message instanceof RegisterChangeListener) { - registerChangeListener((RegisterChangeListener) message); - } else if (message instanceof UpdateSchemaContext) { - updateSchemaContext((UpdateSchemaContext) message); - } else if (message instanceof PeerAddressResolved) { - PeerAddressResolved resolved = (PeerAddressResolved) message; - setPeerAddress(resolved.getPeerId().toString(), - resolved.getPeerAddress()); - } else if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) { - handleTransactionCommitTimeoutCheck(); - } else { - super.onReceiveCommand(message); + + MessageTracker.Context context = appendEntriesReplyTracker.received(message); + + if(context.error().isPresent()){ + LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(), + context.error()); + } + + try { + if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) { + handleCreateTransaction(message); + } else if (message instanceof ForwardedReadyTransaction) { + handleForwardedReadyTransaction((ForwardedReadyTransaction) message); + } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) { + handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message)); + } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) { + handleCommitTransaction(CommitTransaction.fromSerializable(message)); + } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) { + handleAbortTransaction(AbortTransaction.fromSerializable(message)); + } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) { + closeTransactionChain(CloseTransactionChain.fromSerializable(message)); + } else if (message instanceof RegisterChangeListener) { + registerChangeListener((RegisterChangeListener) message); + } else if (message instanceof UpdateSchemaContext) { + updateSchemaContext((UpdateSchemaContext) message); + } else if (message instanceof PeerAddressResolved) { + PeerAddressResolved resolved = (PeerAddressResolved) message; + setPeerAddress(resolved.getPeerId().toString(), + resolved.getPeerAddress()); + } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) { + handleTransactionCommitTimeoutCheck(); + } else { + super.onReceiveCommand(message); + } + } finally { + context.done(); } } @@ -495,7 +517,7 @@ public class Shard extends RaftActor { return getContext().actorOf( ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(), - schemaContext,datastoreContext, shardMBean, + schemaContext, datastoreContext, shardMBean, transactionId.getRemoteTransactionId(), clientVersion), transactionId.toString()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java new file mode 100644 index 0000000000..2757d2f5f6 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java @@ -0,0 +1,261 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was + * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that + * received between the arrival of two instances of the same message and the amount of time it took to process each + * of those messages. + *
+ * Usage of the API is as follows, + *
+ *
+ *      // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
+ *     MessageTracker tracker = new MessageTracker(Foo.class, 10);
+ *
+ *     // Begin the tracking process. If this is not called then calling received and done on the resultant Context
+ *     // will do nothing
+ *     tracker.begin();
+ *
+ *     .....
+ *
+ *     MessageTracker.Context context = tracker.received(message);
+ *
+ *     if(context.error().isPresent()){
+ *         LOG.error("{}", context.error().get());
+ *     }
+ *
+ *     // Some custom processing
+ *     process(message);
+ *
+ *     context.done();
+ *
+ * 
+ */ +public class MessageTracker { + + private static final Context NO_OP_CONTEXT = new NoOpContext(); + + private final Class expectedMessageClass; + + private final long expectedArrivalInterval; + + private final List messagesSinceLastExpectedMessage = new LinkedList<>(); + + private Stopwatch expectedMessageWatch; + + private boolean enabled = false; + + private Object lastExpectedMessage; + + private Object currentMessage; + + private final CurrentMessageContext currentMessageContext = new CurrentMessageContext(); + + /** + * + * @param expectedMessageClass The class of the message to track + * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected + * message + */ + public MessageTracker(Class expectedMessageClass, long expectedArrivalIntervalInMillis){ + this.expectedMessageClass = expectedMessageClass; + this.expectedArrivalInterval = expectedArrivalIntervalInMillis; + } + + public void begin(){ + if(enabled) { + return; + } + enabled = true; + expectedMessageWatch = Stopwatch.createStarted(); + } + + public Context received(Object message){ + if(!enabled) { + return NO_OP_CONTEXT; + } + this.currentMessage = message; + if(expectedMessageClass.isInstance(message)){ + long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS); + if(actualElapsedTime > expectedArrivalInterval){ + return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message, + ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval, + actualElapsedTime))); + } + this.lastExpectedMessage = message; + this.messagesSinceLastExpectedMessage.clear(); + } + + currentMessageContext.reset(); + return currentMessageContext; + } + + private void processed(Object message, long messageElapseTimeInNanos){ + if(!enabled) { + return; + } + if(!expectedMessageClass.isInstance(message)){ + this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos)); + } + } + + public List getMessagesSinceLastExpectedMessage(){ + return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage); + } + + public static class MessageProcessingTime { + private final Class messageClass; + private final long elapsedTimeInNanos; + + MessageProcessingTime(Class messageClass, long elapsedTimeInNanos){ + this.messageClass = messageClass; + this.elapsedTimeInNanos = elapsedTimeInNanos; + } + + @Override + public String toString() { + return "MessageProcessingTime{" + + "messageClass=" + messageClass.getSimpleName() + + ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) + + '}'; + } + + public Class getMessageClass() { + return messageClass; + } + + public long getElapsedTimeInNanos() { + return elapsedTimeInNanos; + } + } + + public interface Error { + Object getLastExpectedMessage(); + Object getCurrentExpectedMessage(); + List getMessageProcessingTimesSinceLastExpectedMessage(); + } + + private class FailedExpectation implements Error { + + private final Object lastExpectedMessage; + private final Object currentExpectedMessage; + private final List messagesSinceLastExpectedMessage; + private final long expectedTimeInMillis; + private final long actualTimeInMillis; + + public FailedExpectation(Object lastExpectedMessage, Object message, List messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) { + this.lastExpectedMessage = lastExpectedMessage; + this.currentExpectedMessage = message; + this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage; + this.expectedTimeInMillis = expectedTimeInMillis; + this.actualTimeInMillis = actualTimeInMillis; + } + + public Object getLastExpectedMessage() { + return lastExpectedMessage; + } + + public Object getCurrentExpectedMessage() { + return currentExpectedMessage; + } + + public List getMessageProcessingTimesSinceLastExpectedMessage() { + return messagesSinceLastExpectedMessage; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("\n> Last Expected Message = " + lastExpectedMessage); + builder.append("\n> Current Expected Message = " + currentExpectedMessage); + builder.append("\n> Expected time in between messages = " + expectedTimeInMillis); + builder.append("\n> Actual time in between messages = " + actualTimeInMillis); + for (MessageProcessingTime time : messagesSinceLastExpectedMessage) { + builder.append("\n\t> ").append(time.toString()); + } + return builder.toString(); + } + + } + + public interface Context { + Context done(); + Optional error(); + } + + private static class NoOpContext implements Context { + + @Override + public Context done() { + return this; + } + + @Override + public Optional error() { + return Optional.absent(); + } + } + + private class CurrentMessageContext implements Context { + Stopwatch stopwatch = Stopwatch.createStarted(); + boolean done = true; + + public void reset(){ + Preconditions.checkState(done); + done = false; + stopwatch.reset().start(); + } + + @Override + public Context done() { + processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS)); + done = true; + return this; + } + + @Override + public Optional error() { + return Optional.absent(); + } + } + + private class ErrorContext implements Context { + Object message; + private final Optional error; + Stopwatch stopwatch; + + ErrorContext(Object message, Optional error){ + this.message = message; + this.error = error; + this.stopwatch = Stopwatch.createStarted(); + } + + @Override + public Context done(){ + processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS)); + this.stopwatch.stop(); + return this; + } + + @Override + public Optional error() { + return error; + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageTrackerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageTrackerTest.java new file mode 100644 index 0000000000..a125b49a5a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageTrackerTest.java @@ -0,0 +1,188 @@ +package org.opendaylight.controller.cluster.datastore.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageTrackerTest { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + private class Foo {} + + @Test + public void testNoTracking(){ + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + + MessageTracker.Context context1 = messageTracker.received(new Foo()); + context1.done(); + + Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS); + + MessageTracker.Context context2 = messageTracker.received(new Foo()); + context2.done(); + + } + + @Test + public void testFailedExpectationOnTracking(){ + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + messageTracker.begin(); + + MessageTracker.Context context1 = messageTracker.received(new Foo()); + context1.done(); + + Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS); + + MessageTracker.Context context2 = messageTracker.received(new Foo()); + Assert.assertEquals(true, context2.error().isPresent()); + Assert.assertEquals(0, context2.error().get().getMessageProcessingTimesSinceLastExpectedMessage().size()); + + } + + @Test + public void testFailedExpectationOnTrackingWithMessagesInBetween(){ + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + messageTracker.begin(); + + MessageTracker.Context context1 = messageTracker.received(new Foo()); + context1.done(); + + messageTracker.received("A").done(); + messageTracker.received(Long.valueOf(10)).done(); + MessageTracker.Context c = messageTracker.received(Integer.valueOf(100)); + + Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS); + + c.done(); + + MessageTracker.Context context2 = messageTracker.received(new Foo()); + + Assert.assertEquals(true, context2.error().isPresent()); + + MessageTracker.Error error = context2.error().get(); + + List messageProcessingTimes = + error.getMessageProcessingTimesSinceLastExpectedMessage(); + + Assert.assertEquals(3, messageProcessingTimes.size()); + + Assert.assertEquals(String.class, messageProcessingTimes.get(0).getMessageClass()); + Assert.assertEquals(Long.class, messageProcessingTimes.get(1).getMessageClass()); + Assert.assertEquals(Integer.class, messageProcessingTimes.get(2).getMessageClass()); + Assert.assertTrue(messageProcessingTimes.get(2).getElapsedTimeInNanos() > TimeUnit.MILLISECONDS.toNanos(10)); + Assert.assertEquals(Foo.class, error.getLastExpectedMessage().getClass()); + Assert.assertEquals(Foo.class, error.getCurrentExpectedMessage().getClass()); + + LOG.error("An error occurred : {}" , error); + + } + + + @Test + public void testMetExpectationOnTracking(){ + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + messageTracker.begin(); + + MessageTracker.Context context1 = messageTracker.received(new Foo()); + context1.done(); + + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); + + MessageTracker.Context context2 = messageTracker.received(new Foo()); + Assert.assertEquals(false, context2.error().isPresent()); + + } + + @Test + public void testIllegalStateExceptionWhenDoneIsNotCalledWhileTracking(){ + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + messageTracker.begin(); + + messageTracker.received(new Foo()); + + try { + messageTracker.received(new Foo()); + fail("Expected an IllegalStateException"); + } catch (IllegalStateException e){ + + } + } + + @Test + public void testNoIllegalStateExceptionWhenDoneIsNotCalledWhileNotTracking(){ + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + + messageTracker.received(new Foo()); + messageTracker.received(new Foo()); + } + + @Test + public void testDelayInFirstExpectedMessageArrival(){ + + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + messageTracker.begin(); + + Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS); + + MessageTracker.Context context = messageTracker.received(new Foo()); + + Assert.assertEquals(true, context.error().isPresent()); + + MessageTracker.Error error = context.error().get(); + + Assert.assertEquals(null, error.getLastExpectedMessage()); + Assert.assertEquals(Foo.class, error.getCurrentExpectedMessage().getClass()); + + String errorString = error.toString(); + Assert.assertTrue(errorString.contains("Last Expected Message = null")); + + LOG.error("An error occurred : {}", error); + } + + @Test + public void testCallingBeginDoesNotResetWatch(){ + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + messageTracker.begin(); + + Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS); + + messageTracker.begin(); + + MessageTracker.Context context = messageTracker.received(new Foo()); + + Assert.assertEquals(true, context.error().isPresent()); + + } + + @Test + public void testMessagesSinceLastExpectedMessage(){ + + MessageTracker messageTracker = new MessageTracker(Foo.class, 10); + messageTracker.begin(); + + MessageTracker.Context context1 = messageTracker.received(Integer.valueOf(45)).done(); + + Assert.assertEquals(false, context1.error().isPresent()); + + MessageTracker.Context context2 = messageTracker.received(Long.valueOf(45)).done(); + + Assert.assertEquals(false, context2.error().isPresent()); + + List processingTimeList = + messageTracker.getMessagesSinceLastExpectedMessage(); + + Assert.assertEquals(2, processingTimeList.size()); + + assertEquals(Integer.class, processingTimeList.get(0).getMessageClass()); + assertEquals(Long.class, processingTimeList.get(1).getMessageClass()); + + } + +} \ No newline at end of file