import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
private final Optional<ActorRef> roleChangeNotifier;
+ private final MessageTracker appendEntriesReplyTracker;
+
/**
* Coordinates persistence recovery on startup.
*/
// create a notifier actor for each cluster member
roleChangeNotifier = createRoleChangeNotifier(name.toString());
+
+ appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
+ getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
}
private static Map<String, String> mapPeerAddresses(
onRecoveryComplete();
} else {
super.onReceiveRecover(message);
+ if(LOG.isTraceEnabled()) {
+ appendEntriesReplyTracker.begin();
+ }
}
}
@Override
public void onReceiveCommand(final Object message) throws Exception {
- if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
- handleCreateTransaction(message);
- } else if(message instanceof ForwardedReadyTransaction) {
- handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
- } else if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
- handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
- } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
- handleCommitTransaction(CommitTransaction.fromSerializable(message));
- } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
- handleAbortTransaction(AbortTransaction.fromSerializable(message));
- } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
- closeTransactionChain(CloseTransactionChain.fromSerializable(message));
- } else if (message instanceof RegisterChangeListener) {
- registerChangeListener((RegisterChangeListener) message);
- } else if (message instanceof UpdateSchemaContext) {
- updateSchemaContext((UpdateSchemaContext) message);
- } else if (message instanceof PeerAddressResolved) {
- PeerAddressResolved resolved = (PeerAddressResolved) message;
- setPeerAddress(resolved.getPeerId().toString(),
- resolved.getPeerAddress());
- } else if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
- handleTransactionCommitTimeoutCheck();
- } else {
- super.onReceiveCommand(message);
+
+ MessageTracker.Context context = appendEntriesReplyTracker.received(message);
+
+ if(context.error().isPresent()){
+ LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
+ context.error());
+ }
+
+ try {
+ if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+ handleCreateTransaction(message);
+ } else if (message instanceof ForwardedReadyTransaction) {
+ handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
+ } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+ handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
+ } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+ handleCommitTransaction(CommitTransaction.fromSerializable(message));
+ } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+ handleAbortTransaction(AbortTransaction.fromSerializable(message));
+ } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
+ closeTransactionChain(CloseTransactionChain.fromSerializable(message));
+ } else if (message instanceof RegisterChangeListener) {
+ registerChangeListener((RegisterChangeListener) message);
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext((UpdateSchemaContext) message);
+ } else if (message instanceof PeerAddressResolved) {
+ PeerAddressResolved resolved = (PeerAddressResolved) message;
+ setPeerAddress(resolved.getPeerId().toString(),
+ resolved.getPeerAddress());
+ } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+ handleTransactionCommitTimeoutCheck();
+ } else {
+ super.onReceiveCommand(message);
+ }
+ } finally {
+ context.done();
}
}
return getContext().actorOf(
ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
- schemaContext,datastoreContext, shardMBean,
+ schemaContext, datastoreContext, shardMBean,
transactionId.getRemoteTransactionId(), clientVersion),
transactionId.toString());
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * MessageTracker is a diagnostic utility class to be used for figuring out why a certain message which was
+ * expected to arrive in a given time interval does not arrive. It attempts to keep track of all the messages that
+ * received between the arrival of two instances of the same message and the amount of time it took to process each
+ * of those messages.
+ * <br/>
+ * Usage of the API is as follows,
+ * <pre>
+ *
+ * // Track the Foo class, Here we expect to see a message of type Foo come in every 10 millis
+ * MessageTracker tracker = new MessageTracker(Foo.class, 10);
+ *
+ * // Begin the tracking process. If this is not called then calling received and done on the resultant Context
+ * // will do nothing
+ * tracker.begin();
+ *
+ * .....
+ *
+ * MessageTracker.Context context = tracker.received(message);
+ *
+ * if(context.error().isPresent()){
+ * LOG.error("{}", context.error().get());
+ * }
+ *
+ * // Some custom processing
+ * process(message);
+ *
+ * context.done();
+ *
+ * </pre>
+ */
+public class MessageTracker {
+
+ private static final Context NO_OP_CONTEXT = new NoOpContext();
+
+ private final Class expectedMessageClass;
+
+ private final long expectedArrivalInterval;
+
+ private final List<MessageProcessingTime> messagesSinceLastExpectedMessage = new LinkedList<>();
+
+ private Stopwatch expectedMessageWatch;
+
+ private boolean enabled = false;
+
+ private Object lastExpectedMessage;
+
+ private Object currentMessage;
+
+ private final CurrentMessageContext currentMessageContext = new CurrentMessageContext();
+
+ /**
+ *
+ * @param expectedMessageClass The class of the message to track
+ * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
+ * message
+ */
+ public MessageTracker(Class expectedMessageClass, long expectedArrivalIntervalInMillis){
+ this.expectedMessageClass = expectedMessageClass;
+ this.expectedArrivalInterval = expectedArrivalIntervalInMillis;
+ }
+
+ public void begin(){
+ if(enabled) {
+ return;
+ }
+ enabled = true;
+ expectedMessageWatch = Stopwatch.createStarted();
+ }
+
+ public Context received(Object message){
+ if(!enabled) {
+ return NO_OP_CONTEXT;
+ }
+ this.currentMessage = message;
+ if(expectedMessageClass.isInstance(message)){
+ long actualElapsedTime = expectedMessageWatch.elapsed(TimeUnit.MILLISECONDS);
+ if(actualElapsedTime > expectedArrivalInterval){
+ return new ErrorContext(message, Optional.of(new FailedExpectation(lastExpectedMessage, message,
+ ImmutableList.copyOf(messagesSinceLastExpectedMessage), expectedArrivalInterval,
+ actualElapsedTime)));
+ }
+ this.lastExpectedMessage = message;
+ this.messagesSinceLastExpectedMessage.clear();
+ }
+
+ currentMessageContext.reset();
+ return currentMessageContext;
+ }
+
+ private void processed(Object message, long messageElapseTimeInNanos){
+ if(!enabled) {
+ return;
+ }
+ if(!expectedMessageClass.isInstance(message)){
+ this.messagesSinceLastExpectedMessage.add(new MessageProcessingTime(message.getClass(), messageElapseTimeInNanos));
+ }
+ }
+
+ public List<MessageProcessingTime> getMessagesSinceLastExpectedMessage(){
+ return ImmutableList.copyOf(this.messagesSinceLastExpectedMessage);
+ }
+
+ public static class MessageProcessingTime {
+ private final Class messageClass;
+ private final long elapsedTimeInNanos;
+
+ MessageProcessingTime(Class messageClass, long elapsedTimeInNanos){
+ this.messageClass = messageClass;
+ this.elapsedTimeInNanos = elapsedTimeInNanos;
+ }
+
+ @Override
+ public String toString() {
+ return "MessageProcessingTime{" +
+ "messageClass=" + messageClass.getSimpleName() +
+ ", elapsedTimeInMillis=" + TimeUnit.NANOSECONDS.toMillis(elapsedTimeInNanos) +
+ '}';
+ }
+
+ public Class getMessageClass() {
+ return messageClass;
+ }
+
+ public long getElapsedTimeInNanos() {
+ return elapsedTimeInNanos;
+ }
+ }
+
+ public interface Error {
+ Object getLastExpectedMessage();
+ Object getCurrentExpectedMessage();
+ List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage();
+ }
+
+ private class FailedExpectation implements Error {
+
+ private final Object lastExpectedMessage;
+ private final Object currentExpectedMessage;
+ private final List<MessageProcessingTime> messagesSinceLastExpectedMessage;
+ private final long expectedTimeInMillis;
+ private final long actualTimeInMillis;
+
+ public FailedExpectation(Object lastExpectedMessage, Object message, List<MessageProcessingTime> messagesSinceLastExpectedMessage, long expectedTimeInMillis, long actualTimeInMillis) {
+ this.lastExpectedMessage = lastExpectedMessage;
+ this.currentExpectedMessage = message;
+ this.messagesSinceLastExpectedMessage = messagesSinceLastExpectedMessage;
+ this.expectedTimeInMillis = expectedTimeInMillis;
+ this.actualTimeInMillis = actualTimeInMillis;
+ }
+
+ public Object getLastExpectedMessage() {
+ return lastExpectedMessage;
+ }
+
+ public Object getCurrentExpectedMessage() {
+ return currentExpectedMessage;
+ }
+
+ public List<MessageProcessingTime> getMessageProcessingTimesSinceLastExpectedMessage() {
+ return messagesSinceLastExpectedMessage;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("\n> Last Expected Message = " + lastExpectedMessage);
+ builder.append("\n> Current Expected Message = " + currentExpectedMessage);
+ builder.append("\n> Expected time in between messages = " + expectedTimeInMillis);
+ builder.append("\n> Actual time in between messages = " + actualTimeInMillis);
+ for (MessageProcessingTime time : messagesSinceLastExpectedMessage) {
+ builder.append("\n\t> ").append(time.toString());
+ }
+ return builder.toString();
+ }
+
+ }
+
+ public interface Context {
+ Context done();
+ Optional<? extends Error> error();
+ }
+
+ private static class NoOpContext implements Context {
+
+ @Override
+ public Context done() {
+ return this;
+ }
+
+ @Override
+ public Optional<Error> error() {
+ return Optional.absent();
+ }
+ }
+
+ private class CurrentMessageContext implements Context {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ boolean done = true;
+
+ public void reset(){
+ Preconditions.checkState(done);
+ done = false;
+ stopwatch.reset().start();
+ }
+
+ @Override
+ public Context done() {
+ processed(currentMessage, stopwatch.elapsed(TimeUnit.NANOSECONDS));
+ done = true;
+ return this;
+ }
+
+ @Override
+ public Optional<? extends Error> error() {
+ return Optional.absent();
+ }
+ }
+
+ private class ErrorContext implements Context {
+ Object message;
+ private final Optional<? extends Error> error;
+ Stopwatch stopwatch;
+
+ ErrorContext(Object message, Optional<? extends Error> error){
+ this.message = message;
+ this.error = error;
+ this.stopwatch = Stopwatch.createStarted();
+ }
+
+ @Override
+ public Context done(){
+ processed(message, this.stopwatch.elapsed(TimeUnit.NANOSECONDS));
+ this.stopwatch.stop();
+ return this;
+ }
+
+ @Override
+ public Optional<? extends Error> error() {
+ return error;
+ }
+ }
+}
--- /dev/null
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageTrackerTest {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ private class Foo {}
+
+ @Test
+ public void testNoTracking(){
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+
+ MessageTracker.Context context1 = messageTracker.received(new Foo());
+ context1.done();
+
+ Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+ MessageTracker.Context context2 = messageTracker.received(new Foo());
+ context2.done();
+
+ }
+
+ @Test
+ public void testFailedExpectationOnTracking(){
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+ messageTracker.begin();
+
+ MessageTracker.Context context1 = messageTracker.received(new Foo());
+ context1.done();
+
+ Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+ MessageTracker.Context context2 = messageTracker.received(new Foo());
+ Assert.assertEquals(true, context2.error().isPresent());
+ Assert.assertEquals(0, context2.error().get().getMessageProcessingTimesSinceLastExpectedMessage().size());
+
+ }
+
+ @Test
+ public void testFailedExpectationOnTrackingWithMessagesInBetween(){
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+ messageTracker.begin();
+
+ MessageTracker.Context context1 = messageTracker.received(new Foo());
+ context1.done();
+
+ messageTracker.received("A").done();
+ messageTracker.received(Long.valueOf(10)).done();
+ MessageTracker.Context c = messageTracker.received(Integer.valueOf(100));
+
+ Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+ c.done();
+
+ MessageTracker.Context context2 = messageTracker.received(new Foo());
+
+ Assert.assertEquals(true, context2.error().isPresent());
+
+ MessageTracker.Error error = context2.error().get();
+
+ List<MessageTracker.MessageProcessingTime> messageProcessingTimes =
+ error.getMessageProcessingTimesSinceLastExpectedMessage();
+
+ Assert.assertEquals(3, messageProcessingTimes.size());
+
+ Assert.assertEquals(String.class, messageProcessingTimes.get(0).getMessageClass());
+ Assert.assertEquals(Long.class, messageProcessingTimes.get(1).getMessageClass());
+ Assert.assertEquals(Integer.class, messageProcessingTimes.get(2).getMessageClass());
+ Assert.assertTrue(messageProcessingTimes.get(2).getElapsedTimeInNanos() > TimeUnit.MILLISECONDS.toNanos(10));
+ Assert.assertEquals(Foo.class, error.getLastExpectedMessage().getClass());
+ Assert.assertEquals(Foo.class, error.getCurrentExpectedMessage().getClass());
+
+ LOG.error("An error occurred : {}" , error);
+
+ }
+
+
+ @Test
+ public void testMetExpectationOnTracking(){
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+ messageTracker.begin();
+
+ MessageTracker.Context context1 = messageTracker.received(new Foo());
+ context1.done();
+
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
+
+ MessageTracker.Context context2 = messageTracker.received(new Foo());
+ Assert.assertEquals(false, context2.error().isPresent());
+
+ }
+
+ @Test
+ public void testIllegalStateExceptionWhenDoneIsNotCalledWhileTracking(){
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+ messageTracker.begin();
+
+ messageTracker.received(new Foo());
+
+ try {
+ messageTracker.received(new Foo());
+ fail("Expected an IllegalStateException");
+ } catch (IllegalStateException e){
+
+ }
+ }
+
+ @Test
+ public void testNoIllegalStateExceptionWhenDoneIsNotCalledWhileNotTracking(){
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+
+ messageTracker.received(new Foo());
+ messageTracker.received(new Foo());
+ }
+
+ @Test
+ public void testDelayInFirstExpectedMessageArrival(){
+
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+ messageTracker.begin();
+
+ Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+ MessageTracker.Context context = messageTracker.received(new Foo());
+
+ Assert.assertEquals(true, context.error().isPresent());
+
+ MessageTracker.Error error = context.error().get();
+
+ Assert.assertEquals(null, error.getLastExpectedMessage());
+ Assert.assertEquals(Foo.class, error.getCurrentExpectedMessage().getClass());
+
+ String errorString = error.toString();
+ Assert.assertTrue(errorString.contains("Last Expected Message = null"));
+
+ LOG.error("An error occurred : {}", error);
+ }
+
+ @Test
+ public void testCallingBeginDoesNotResetWatch(){
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+ messageTracker.begin();
+
+ Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+
+ messageTracker.begin();
+
+ MessageTracker.Context context = messageTracker.received(new Foo());
+
+ Assert.assertEquals(true, context.error().isPresent());
+
+ }
+
+ @Test
+ public void testMessagesSinceLastExpectedMessage(){
+
+ MessageTracker messageTracker = new MessageTracker(Foo.class, 10);
+ messageTracker.begin();
+
+ MessageTracker.Context context1 = messageTracker.received(Integer.valueOf(45)).done();
+
+ Assert.assertEquals(false, context1.error().isPresent());
+
+ MessageTracker.Context context2 = messageTracker.received(Long.valueOf(45)).done();
+
+ Assert.assertEquals(false, context2.error().isPresent());
+
+ List<MessageTracker.MessageProcessingTime> processingTimeList =
+ messageTracker.getMessagesSinceLastExpectedMessage();
+
+ Assert.assertEquals(2, processingTimeList.size());
+
+ assertEquals(Integer.class, processingTimeList.get(0).getMessageClass());
+ assertEquals(Long.class, processingTimeList.get(1).getMessageClass());
+
+ }
+
+}
\ No newline at end of file