Merge "BUG 2718 : Create a diagnostic utility to track append entries replies"
authorTom Pantelis <tpanteli@brocade.com>
Mon, 23 Feb 2015 23:11:02 +0000 (23:11 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 23 Feb 2015 23:11:02 +0000 (23:11 +0000)
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageTrackerTest.java [new file with mode: 0644]

index 21d74a6..9551c80 100644 (file)
@@ -60,10 +60,12 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@@ -125,6 +127,8 @@ public class Shard extends RaftActor {
 
     private final Optional<ActorRef> roleChangeNotifier;
 
+    private final MessageTracker appendEntriesReplyTracker;
+
     /**
      * Coordinates persistence recovery on startup.
      */
@@ -168,6 +172,9 @@ public class Shard extends RaftActor {
 
         // create a notifier actor for each cluster member
         roleChangeNotifier = createRoleChangeNotifier(name.toString());
+
+        appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
+                getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
     }
 
     private static Map<String, String> mapPeerAddresses(
@@ -224,35 +231,50 @@ public class Shard extends RaftActor {
             onRecoveryComplete();
         } else {
             super.onReceiveRecover(message);
+            if(LOG.isTraceEnabled()) {
+                appendEntriesReplyTracker.begin();
+            }
         }
     }
 
     @Override
     public void onReceiveCommand(final Object message) throws Exception {
-        if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
-            handleCreateTransaction(message);
-        } else if(message instanceof ForwardedReadyTransaction) {
-            handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
-        } else if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
-            handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
-        } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
-            handleCommitTransaction(CommitTransaction.fromSerializable(message));
-        } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
-            handleAbortTransaction(AbortTransaction.fromSerializable(message));
-        } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
-            closeTransactionChain(CloseTransactionChain.fromSerializable(message));
-        } else if (message instanceof RegisterChangeListener) {
-            registerChangeListener((RegisterChangeListener) message);
-        } else if (message instanceof UpdateSchemaContext) {
-            updateSchemaContext((UpdateSchemaContext) message);
-        } else if (message instanceof PeerAddressResolved) {
-            PeerAddressResolved resolved = (PeerAddressResolved) message;
-            setPeerAddress(resolved.getPeerId().toString(),
-                resolved.getPeerAddress());
-        } else if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
-            handleTransactionCommitTimeoutCheck();
-        } else {
-            super.onReceiveCommand(message);
+
+        MessageTracker.Context context = appendEntriesReplyTracker.received(message);
+
+        if(context.error().isPresent()){
+            LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
+                    context.error());
+        }
+
+        try {
+            if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+                handleCreateTransaction(message);
+            } else if (message instanceof ForwardedReadyTransaction) {
+                handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
+            } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+                handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
+            } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+                handleCommitTransaction(CommitTransaction.fromSerializable(message));
+            } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+                handleAbortTransaction(AbortTransaction.fromSerializable(message));
+            } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
+                closeTransactionChain(CloseTransactionChain.fromSerializable(message));
+            } else if (message instanceof RegisterChangeListener) {
+                registerChangeListener((RegisterChangeListener) message);
+            } else if (message instanceof UpdateSchemaContext) {
+                updateSchemaContext((UpdateSchemaContext) message);
+            } else if (message instanceof PeerAddressResolved) {
+                PeerAddressResolved resolved = (PeerAddressResolved) message;
+                setPeerAddress(resolved.getPeerId().toString(),
+                        resolved.getPeerAddress());
+            } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+                handleTransactionCommitTimeoutCheck();
+            } else {
+                super.onReceiveCommand(message);
+            }
+        } finally {
+            context.done();
         }
     }
 
@@ -495,7 +517,7 @@ public class Shard extends RaftActor {
 
             return getContext().actorOf(
                 ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
-                        schemaContext,datastoreContext, shardMBean,
+                        schemaContext, datastoreContext, shardMBean,
                         transactionId.getRemoteTransactionId(), clientVersion),
                         transactionId.toString());
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java
new file mode 100644 (file)
index 0000000..2757d2f
--- /dev/null
@@ -0,0 +1,261 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
+ * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
+ * received between the arrival of two instances of the same message and the amount of time it took to process each
+ * of those messages.
+ * <br/>
+ * Usage of the API is as follows,
+ * <pre>
+ *
+ *      // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
+ *     MessageTracker tracker = new MessageTracker(Foo.class, 10);
+ *
+ *     // Begin the tracking process. If this is not called then calling received and done on the resultant Context
+ *     // will do nothing
+ *     tracker.begin();
+ *
+ *     .....
+ *
+ *     MessageTracker.Context context = tracker.received(message);
+ *
+ *     if(context.error().isPresent()){
+ *         LOG.error("{}", context.error().get());
+ *     }
+ *
+ *     // Some custom processing
+ *     process(message);
+ *
+ *     context.done();
+ *
+ * </pre>
+ */
+public class MessageTracker {
+
+    private static final Context NO_OP_CONTEXT = new NoOpContext();
+
+    private final Class expectedMessageClass;
+
+    private final long expectedArrivalInterval;
+
+    private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
+
+    private Stopwatch expectedMessageWatch;
+
+    private boolean enabled = false;
+
+    private Object lastExpectedMessage;
+
+    private Object currentMessage;
+
+    private final CurrentMessageContext currentMessageContext = new CurrentMessageContext();
+
+    /**
+     *
+     * @param expectedMessageClass The class of the message to track
+     * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
+     *                                        message
+     */
+    public MessageTracker(Class expectedMessageClass, long expectedArrivalIntervalInMillis){
+        this.expectedMessageClass = expectedMessageClass;
+        this.expectedArrivalInterval = expectedArrivalIntervalInMillis;
+    }
+
+    public void begin(){
+        if(enabled) {
+            return;
+        }
+        enabled = true;
+        expectedMessageWatch = Stopwatch.createStarted();
+    }
+
+    public Context received(Object message){
+        if(!enabled) {
+            return NO_OP_CONTEXT;
+        }
+        this.currentMessage = message;
+        if(expectedMessageClass.isInstance(message)){
+            long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS);
+            if(actualElapsedTime > expectedArrivalInterval){
+                return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message,
+                        ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval,
+                        actualElapsedTime)));
+            }
+            this.lastExpectedMessage = message;
+            this.messagesSinceLastExpectedMessage.clear();
+        }
+
+        currentMessageContext.reset();
+        return currentMessageContext;
+    }
+
+    private void processed(Object message, long messageElapseTimeInNanos){
+        if(!enabled) {
+            return;
+        }
+        if(!expectedMessageClass.isInstance(message)){
+            this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos));
+        }
+    }
+
+    public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage(){
+        return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage);
+    }
+
+    public static class MessageProcessingTime {
+        private final Class messageClass;
+        private final long elapsedTimeInNanos;
+
+        MessageProcessingTime(Class messageClass, long elapsedTimeInNanos){
+            this.messageClass = messageClass;
+            this.elapsedTimeInNanos = elapsedTimeInNanos;
+        }
+
+        @Override
+        public String toString() {
+            return "MessageProcessingTime{" +
+                    "messageClass=" + messageClass.getSimpleName() +
+                    ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) +
+                    '}';
+        }
+
+        public Class getMessageClass() {
+            return messageClass;
+        }
+
+        public long getElapsedTimeInNanos() {
+            return elapsedTimeInNanos;
+        }
+    }
+
+    public interface Error {
+        Object getLastExpectedMessage();
+        Object getCurrentExpectedMessage();
+        List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
+    }
+
+    private class FailedExpectation implements Error {
+
+        private final Object lastExpectedMessage;
+        private final Object currentExpectedMessage;
+        private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
+        private final long expectedTimeInMillis;
+        private final long actualTimeInMillis;
+
+        public FailedExpectation(Object lastExpectedMessage, Object message, List<MessageProcessingTime> messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) {
+            this.lastExpectedMessage = lastExpectedMessage;
+            this.currentExpectedMessage = message;
+            this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage;
+            this.expectedTimeInMillis = expectedTimeInMillis;
+            this.actualTimeInMillis = actualTimeInMillis;
+        }
+
+        public Object getLastExpectedMessage() {
+            return lastExpectedMessage;
+        }
+
+        public Object getCurrentExpectedMessage() {
+            return currentExpectedMessage;
+        }
+
+        public List<MessageProcessingTime>  getMessageProcessingTimesSinceLastExpectedMessage() {
+            return messagesSinceLastExpectedMessage;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("\n> Last Expected Message = " + lastExpectedMessage);
+            builder.append("\n> Current Expected Message = " + currentExpectedMessage);
+            builder.append("\n> Expected time in between messages = " + expectedTimeInMillis);
+            builder.append("\n> Actual time in between messages = " + actualTimeInMillis);
+            for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
+                builder.append("\n\t> ").append(time.toString());
+            }
+            return builder.toString();
+        }
+
+    }
+
+    public interface Context {
+        Context done();
+        Optional<? extends Error> error();
+    }
+
+    private static class NoOpContext implements Context {
+
+        @Override
+        public Context done() {
+            return this;
+        }
+
+        @Override
+        public Optional<Error> error() {
+            return Optional.absent();
+        }
+    }
+
+    private class CurrentMessageContext implements Context {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        boolean done = true;
+
+        public void reset(){
+            Preconditions.checkState(done);
+            done = false;
+            stopwatch.reset().start();
+        }
+
+        @Override
+        public Context done() {
+            processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS));
+            done = true;
+            return this;
+        }
+
+        @Override
+        public Optional<? extends Error> error() {
+            return Optional.absent();
+        }
+    }
+
+    private class ErrorContext implements Context {
+        Object message;
+        private final Optional<? extends Error> error;
+        Stopwatch stopwatch;
+
+        ErrorContext(Object message, Optional<? extends Error> error){
+            this.message = message;
+            this.error = error;
+            this.stopwatch = Stopwatch.createStarted();
+        }
+
+        @Override
+        public Context done(){
+            processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS));
+            this.stopwatch.stop();
+            return this;
+        }
+
+        @Override
+        public Optional<? extends Error> error() {
+            return error;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageTrackerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageTrackerTest.java
new file mode 100644 (file)
index 0000000..a125b49
--- /dev/null
@@ -0,0 +1,188 @@
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageTrackerTest {
+
+    private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+    private class Foo {}
+
+    @Test
+    public void testNoTracking(){
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+
+        MessageTracker.Context context1 = messageTracker.received(new Foo());
+        context1.done();
+
+        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+        MessageTracker.Context context2 = messageTracker.received(new Foo());
+        context2.done();
+
+    }
+
+    @Test
+    public void testFailedExpectationOnTracking(){
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+        messageTracker.begin();
+
+        MessageTracker.Context context1 = messageTracker.received(new Foo());
+        context1.done();
+
+        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+        MessageTracker.Context context2 = messageTracker.received(new Foo());
+        Assert.assertEquals(true, context2.error().isPresent());
+        Assert.assertEquals(0, context2.error().get().getMessageProcessingTimesSinceLastExpectedMessage().size());
+
+    }
+
+    @Test
+    public void testFailedExpectationOnTrackingWithMessagesInBetween(){
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+        messageTracker.begin();
+
+        MessageTracker.Context context1 = messageTracker.received(new Foo());
+        context1.done();
+
+        messageTracker.received("A").done();
+        messageTracker.received(Long.valueOf(10)).done();
+        MessageTracker.Context c = messageTracker.received(Integer.valueOf(100));
+
+        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+        c.done();
+
+        MessageTracker.Context context2 = messageTracker.received(new Foo());
+
+        Assert.assertEquals(true, context2.error().isPresent());
+
+        MessageTracker.Error error = context2.error().get();
+
+        List<MessageTracker.MessageProcessingTime> messageProcessingTimes =
+                error.getMessageProcessingTimesSinceLastExpectedMessage();
+
+        Assert.assertEquals(3, messageProcessingTimes.size());
+
+        Assert.assertEquals(String.class, messageProcessingTimes.get(0).getMessageClass());
+        Assert.assertEquals(Long.class, messageProcessingTimes.get(1).getMessageClass());
+        Assert.assertEquals(Integer.class, messageProcessingTimes.get(2).getMessageClass());
+        Assert.assertTrue(messageProcessingTimes.get(2).getElapsedTimeInNanos() > TimeUnit.MILLISECONDS.toNanos(10));
+        Assert.assertEquals(Foo.class, error.getLastExpectedMessage().getClass());
+        Assert.assertEquals(Foo.class, error.getCurrentExpectedMessage().getClass());
+
+        LOG.error("An error occurred : {}" , error);
+
+    }
+
+
+    @Test
+    public void testMetExpectationOnTracking(){
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+        messageTracker.begin();
+
+        MessageTracker.Context context1 = messageTracker.received(new Foo());
+        context1.done();
+
+        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
+
+        MessageTracker.Context context2 = messageTracker.received(new Foo());
+        Assert.assertEquals(false, context2.error().isPresent());
+
+    }
+
+    @Test
+    public void testIllegalStateExceptionWhenDoneIsNotCalledWhileTracking(){
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+        messageTracker.begin();
+
+        messageTracker.received(new Foo());
+
+        try {
+            messageTracker.received(new Foo());
+            fail("Expected an IllegalStateException");
+        } catch (IllegalStateException e){
+
+        }
+    }
+
+    @Test
+    public void testNoIllegalStateExceptionWhenDoneIsNotCalledWhileNotTracking(){
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+
+        messageTracker.received(new Foo());
+        messageTracker.received(new Foo());
+    }
+
+    @Test
+    public void testDelayInFirstExpectedMessageArrival(){
+
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+        messageTracker.begin();
+
+        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+        MessageTracker.Context context = messageTracker.received(new Foo());
+
+        Assert.assertEquals(true, context.error().isPresent());
+
+        MessageTracker.Error error = context.error().get();
+
+        Assert.assertEquals(null, error.getLastExpectedMessage());
+        Assert.assertEquals(Foo.class, error.getCurrentExpectedMessage().getClass());
+
+        String errorString = error.toString();
+        Assert.assertTrue(errorString.contains("Last Expected Message = null"));
+
+        LOG.error("An error occurred : {}", error);
+    }
+
+    @Test
+    public void testCallingBeginDoesNotResetWatch(){
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+        messageTracker.begin();
+
+        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+        messageTracker.begin();
+
+        MessageTracker.Context context = messageTracker.received(new Foo());
+
+        Assert.assertEquals(true, context.error().isPresent());
+
+    }
+
+    @Test
+    public void testMessagesSinceLastExpectedMessage(){
+
+        MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+        messageTracker.begin();
+
+        MessageTracker.Context context1 = messageTracker.received(Integer.valueOf(45)).done();
+
+        Assert.assertEquals(false, context1.error().isPresent());
+
+        MessageTracker.Context context2 = messageTracker.received(Long.valueOf(45)).done();
+
+        Assert.assertEquals(false, context2.error().isPresent());
+
+        List<MessageTracker.MessageProcessingTime> processingTimeList =
+                messageTracker.getMessagesSinceLastExpectedMessage();
+
+        Assert.assertEquals(2, processingTimeList.size());
+
+        assertEquals(Integer.class, processingTimeList.get(0).getMessageClass());
+        assertEquals(Long.class, processingTimeList.get(1).getMessageClass());
+
+    }
+
+}
\ No newline at end of file

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.