import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ClientActor extends UntypedActor {
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
private final ActorRef target;
try {
bs = fromObject(state);
} catch (Exception e) {
- LOG.error(e, "Exception in creating snapshot");
+ LOG.error("Exception in creating snapshot", e);
}
getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
}
try {
state.putAll((HashMap) toObject(snapshot));
} catch (Exception e) {
- LOG.error(e, "Exception in applying snapshot");
+ LOG.error("Exception in applying snapshot", e);
}
if(LOG.isDebugEnabled()) {
LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.japi.Procedure;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* RaftActor encapsulates a state machine that needs to be kept synchronized
* </ul>
*/
public abstract class RaftActor extends AbstractUntypedPersistentActor {
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
/**
* The current state determines the current behavior of a RaftActor
} else if (message instanceof SaveSnapshotFailure) {
SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
- LOG.error(saveSnapshotFailure.cause(), "{}: SaveSnapshotFailure received for snapshot Cause:",
- persistenceId());
+ LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
+ persistenceId(), saveSnapshotFailure.cause());
context.getReplicatedLog().snapshotRollback();
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
-import akka.event.LoggingAdapter;
-
import java.util.Map;
+import org.slf4j.Logger;
/**
* The RaftActorContext contains that portion of the RaftActors state that
*
* @return
*/
- LoggingAdapter getLogger();
+ Logger getLogger();
/**
* Get a mapping of peerId's to their addresses
package org.opendaylight.controller.cluster.raft;
+import static com.google.common.base.Preconditions.checkState;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActorContext;
-import akka.event.LoggingAdapter;
import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkState;
+import org.slf4j.Logger;
public class RaftActorContextImpl implements RaftActorContext {
private final Map<String, String> peerAddresses;
- private final LoggingAdapter LOG;
+ private final Logger LOG;
private final ConfigParams configParams;
ElectionTerm termInformation, long commitIndex,
long lastApplied, ReplicatedLog replicatedLog,
Map<String, String> peerAddresses, ConfigParams configParams,
- LoggingAdapter logger) {
+ Logger logger) {
this.actor = actor;
this.context = context;
this.id = id;
return context.system();
}
- @Override public LoggingAdapter getLogger() {
+ @Override public Logger getLogger() {
return this.LOG;
}
followerToSnapshot.getTotalChunks());
}
} catch (IOException e) {
- LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());
+ LOG.error("{}: InstallSnapshot failed for Leader.", context.getId(), e);
}
}
import akka.actor.ActorRef;
import akka.actor.Cancellable;
-import akka.event.LoggingAdapter;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;
/**
/**
*
*/
- protected final LoggingAdapter LOG;
+ protected final Logger LOG;
/**
*
} else {
//if one index is not present in the log, no point in looping
// around as the rest wont be present either
- LOG.warning(
+ LOG.warn(
"{}: Missing index {} from log. Cannot apply state. Ignoring {} to {}",
context.getId(), i, i, index);
break;
try {
close();
} catch (Exception e) {
- LOG.error(e, "{}: Failed to close behavior : {}", context.getId(), this.state());
+ LOG.error("{}: Failed to close behavior : {}", context.getId(), this.state(), e);
}
return behavior;
snapshotTracker = null;
} catch (Exception e){
- LOG.error(e, "{}: Exception in InstallSnapshot of follower", context.getId());
+ LOG.error("{}: Exception in InstallSnapshot of follower", context.getId(), e);
//send reply with success as false. The chunk will be sent again on failure
sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
installSnapshot.getChunkIndex(), false), actor());
package org.opendaylight.controller.cluster.raft.behaviors;
-import akka.event.LoggingAdapter;
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
+import org.slf4j.Logger;
/**
* SnapshotTracker does house keeping for a snapshot that is being installed in chunks on the Follower
*/
public class SnapshotTracker {
- private final LoggingAdapter LOG;
+ private final Logger LOG;
private final int totalChunks;
private ByteString collectedChunks = ByteString.EMPTY;
private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1;
private boolean sealed = false;
private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
- SnapshotTracker(LoggingAdapter LOG, int totalChunks){
+ SnapshotTracker(Logger LOG, int totalChunks){
this.LOG = LOG;
this.totalChunks = totalChunks;
}
}
public static class InvalidChunkException extends Exception {
+ private static final long serialVersionUID = 1L;
+
InvalidChunkException(String message){
super(message);
}
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
import java.io.Serializable;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MockRaftActorContext implements RaftActorContext {
return this.system;
}
- @Override public LoggingAdapter getLogger() {
- return Logging.getLogger(system, this);
+ @Override public Logger getLogger() {
+ return LoggerFactory.getLogger(getClass());
}
@Override public Map<String, String> getPeerAddresses() {
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import akka.event.LoggingAdapter;
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
import java.io.ByteArrayOutputStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SnapshotTrackerTest {
+ Logger logger = LoggerFactory.getLogger(getClass());
+
Map<String, String> data;
ByteString byteString;
ByteString chunk1;
@Test
public void testAddChunk() throws SnapshotTracker.InvalidChunkException {
- SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+ SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
tracker1.addChunk(3, chunk3, Optional.<Integer>absent());
// Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker
- SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+ SnapshotTracker tracker2 = new SnapshotTracker(logger, 2);
tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
}
// The first chunk's index must at least be FIRST_CHUNK_INDEX
- SnapshotTracker tracker3 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+ SnapshotTracker tracker3 = new SnapshotTracker(logger, 2);
try {
tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
}
// Out of sequence chunk indexes won't work
- SnapshotTracker tracker4 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+ SnapshotTracker tracker4 = new SnapshotTracker(logger, 2);
tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
// No exceptions will be thrown when invalid chunk is added with the right sequence
// If the lastChunkHashCode is missing
- SnapshotTracker tracker5 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+ SnapshotTracker tracker5 = new SnapshotTracker(logger, 2);
tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
// Look I can add the same chunk again
// An exception will be thrown when an invalid chunk is addedd with the right sequence
// when the lastChunkHashCode is present
- SnapshotTracker tracker6 = new SnapshotTracker(mock(LoggingAdapter.class), 2);
+ SnapshotTracker tracker6 = new SnapshotTracker(logger, 2);
tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1));
public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException {
// Trying to get a snapshot before all chunks have been received will throw an exception
- SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+ SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
try {
}
- SnapshotTracker tracker2 = new SnapshotTracker(mock(LoggingAdapter.class), 3);
+ SnapshotTracker tracker2 = new SnapshotTracker(logger, 3);
tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
@Test
public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException {
- SnapshotTracker tracker1 = new SnapshotTracker(mock(LoggingAdapter.class), 5);
+ SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
ByteString chunks = chunk1.concat(chunk2);
package org.opendaylight.controller.cluster.common.actor;
import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class AbstractUntypedActor extends UntypedActor {
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
public AbstractUntypedActor() {
if(LOG.isDebugEnabled()) {
package org.opendaylight.controller.cluster.common.actor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.japi.Procedure;
import akka.persistence.SnapshotSelectionCriteria;
import akka.persistence.UntypedPersistentActor;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class AbstractUntypedPersistentActor extends UntypedPersistentActor {
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
public AbstractUntypedPersistentActor() {
if(LOG.isDebugEnabled()) {
try {
procedure.apply(o);
} catch (Exception e) {
- LOG.error(e, "An unexpected error occurred");
+ LOG.error("An unexpected error occurred", e);
}
}
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.japi.Creator;
import akka.persistence.RecoveryFailure;
import akka.serialization.Serialization;
// The state of this Shard
private final InMemoryDOMDataStore store;
- private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
-
/// The name of this shard
private final ShardIdentifier name;
}
if (message instanceof RecoveryFailure){
- LOG.error(((RecoveryFailure) message).cause(), "{}: Recovery failed because of this cause",
- persistenceId());
+ LOG.error("{}: Recovery failed because of this cause",
+ persistenceId(), ((RecoveryFailure) message).cause());
// Even though recovery failed, we still need to finish our recovery, eg send the
// ActorInitialized message and start the txCommitTimeoutCheckSchedule.
if(cohortEntry != null) {
long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
if(elapsed > transactionCommitTimeout) {
- LOG.warning("{}: Current transaction {} has timed out after {} ms - aborting",
+ LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
doAbortTransaction(cohortEntry.getTransactionID(), null);
new ModificationPayload(cohortEntry.getModification()));
}
} catch (Exception e) {
- LOG.error(e, "{} An exception occurred while preCommitting transaction {}",
- persistenceId(), cohortEntry.getTransactionID());
+ LOG.error("{} An exception occurred while preCommitting transaction {}",
+ persistenceId(), cohortEntry.getTransactionID(), e);
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
} catch (Exception e) {
sender.tell(new akka.actor.Status.Failure(e), getSelf());
- LOG.error(e, "{}, An exception occurred while committing transaction {}", persistenceId(), transactionID);
+ LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
+ transactionID, e);
shardMBean.incrementFailedTransactionsCount();
} finally {
commitCoordinator.currentTransactionComplete(transactionID, true);
@Override
public void onFailure(final Throwable t) {
- LOG.error(t, "{}: An exception happened during abort", persistenceId());
+ LOG.error("{}: An exception happened during abort", persistenceId(), t);
if(sender != null) {
sender.tell(new akka.actor.Status.Failure(t), self);
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
- LOG.error(e, "{}: Failed to commit", persistenceId());
+ LOG.error("{}: Failed to commit", persistenceId(), e);
}
}
try {
currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
- LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
+ LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
}
} else if (data instanceof CompositeModificationPayload) {
currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
shardMBean.incrementCommittedTransactionCount();
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
- LOG.error(e, "{}: Failed to commit", persistenceId());
+ LOG.error("{}: Failed to commit", persistenceId(), e);
}
}
}
try {
applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
- LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
+ LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
}
}
else if (data instanceof CompositeModificationPayload) {
transaction.write(DATASTORE_ROOT, node);
syncCommitTransaction(transaction);
} catch (InterruptedException | ExecutionException e) {
- LOG.error(e, "{}: An exception occurred when applying snapshot", persistenceId());
+ LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e);
} finally {
LOG.info("{}: Done applying snapshot", persistenceId());
}
import akka.actor.ActorRef;
import akka.actor.Status;
-import akka.event.LoggingAdapter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.LinkedList;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.slf4j.Logger;
/**
* Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
private final int queueCapacity;
- private final LoggingAdapter log;
+ private final Logger log;
private final String name;
- public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, LoggingAdapter log,
+ public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log,
String name) {
cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.cluster.ClusterEvent;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.japi.Creator;
import akka.japi.Function;
import akka.japi.Procedure;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
/**
*/
public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ private final Logger LOG = LoggerFactory.getLogger(getClass());
// Stores a mapping between a member name and the address of the member
// Member names look like "member-1", "member-2" etc and are as specified
knownModules = ImmutableSet.copyOf(msg.getModules());
} else if (message instanceof RecoveryFailure) {
RecoveryFailure failure = (RecoveryFailure) message;
- LOG.error(failure.cause(), "Recovery failed");
+ LOG.error("Recovery failed", failure.cause());
} else if (message instanceof RecoveryCompleted) {
LOG.info("Recovery complete : {}", persistenceId());
new Function<Throwable, SupervisorStrategy.Directive>() {
@Override
public SupervisorStrategy.Directive apply(Throwable t) {
- StringBuilder sb = new StringBuilder();
- for(StackTraceElement element : t.getStackTrace()) {
- sb.append("\n\tat ")
- .append(element.toString());
- }
- LOG.warning("Supervisor Strategy of resume applied {}",sb.toString());
+ LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
return SupervisorStrategy.resume();
}
}
*/
package org.opendaylight.controller.cluster.datastore;
-import akka.event.LoggingAdapter;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
/**
* Coordinates persistence recovery of journal log entries and snapshots for a shard. Each snapshot
private final SchemaContext schemaContext;
private final String shardName;
private final ExecutorService executor;
- private final LoggingAdapter log;
+ private final Logger log;
private final String name;
- ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, LoggingAdapter log,
+ ShardRecoveryCoordinator(String shardName, SchemaContext schemaContext, Logger log,
String name) {
this.schemaContext = schemaContext;
this.shardName = shardName;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import org.opendaylight.controller.cluster.datastore.messages.Monitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TerminationMonitor extends UntypedActor{
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ private static final Logger LOG = LoggerFactory.getLogger(TerminationMonitor.class);
public TerminationMonitor(){
- LOG.info("Created TerminationMonitor");
+ LOG.debug("Created TerminationMonitor");
}
@Override public void onReceive(Object message) throws Exception {
*/
package org.opendaylight.controller.cluster.datastore.compat;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.japi.Creator;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.japi.Creator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An actor to maintain backwards compatibility for the base Helium version where the 3-phase commit
*/
public class BackwardsCompatibleThreePhaseCommitCohort extends AbstractUntypedActor {
- private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
+ private static final Logger LOG = LoggerFactory.getLogger(BackwardsCompatibleThreePhaseCommitCohort.class);
private final String transactionId;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import org.opendaylight.controller.cluster.common.actor.Monitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TerminationMonitor extends UntypedActor{
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ private static final Logger LOG = LoggerFactory.getLogger(TerminationMonitor.class);
public TerminationMonitor(){
- LOG.info("Created TerminationMonitor");
+ LOG.debug("Created TerminationMonitor");
}
@Override public void onReceive(Object message) throws Exception {
package org.opendaylight.controller.remote.rpc.registry;
import akka.actor.ActorRef;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.japi.Option;
import akka.japi.Pair;
import com.google.common.base.Preconditions;
*/
public class RpcRegistry extends BucketStore<RoutingTable> {
- final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
public RpcRegistry() {
getLocalBucket().setData(new RoutingTable());
}
import akka.actor.Address;
import akka.actor.Props;
import akka.cluster.ClusterActorRefProvider;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
import org.opendaylight.controller.utils.ConditionalProbe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A store that syncs its data across nodes in the cluster.
private static final Long NO_VERSION = -1L;
- final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+ protected final Logger log = LoggerFactory.getLogger(getClass());
/**
* Bucket owned by the node
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.dispatch.Mapper;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
/**
* Gossiper that syncs bucket store across nodes in the cluster.
public class Gossiper extends AbstractUntypedActorWithMetering {
- final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+ private final Logger log = LoggerFactory.getLogger(getClass());
private Cluster cluster;
@Override
public void postStop(){
- if (cluster != null)
+ if (cluster != null) {
cluster.unsubscribe(getSelf());
- if (gossipTask != null)
+ }
+ if (gossipTask != null) {
gossipTask.cancel();
+ }
}
@Override
protected void handleReceive(Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
- if (message instanceof GossipTick)
+ if (message instanceof GossipTick) {
receiveGossipTick();
-
- //Message from remote gossiper with its bucket versions
- else if (message instanceof GossipStatus)
+ } else if (message instanceof GossipStatus) {
+ // Message from remote gossiper with its bucket versions
receiveGossipStatus((GossipStatus) message);
-
- //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
- //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
- //message with its local versions
- else if (message instanceof GossipEnvelope)
+ } else if (message instanceof GossipEnvelope) {
+ // Message from remote gossiper with buckets. This is usually in response to GossipStatus
+ // message. The contained buckets are newer as determined by the remote gossiper by
+ // comparing the GossipStatus message with its local versions.
receiveGossip((GossipEnvelope) message);
-
- else if (message instanceof ClusterEvent.MemberUp) {
+ } else if (message instanceof ClusterEvent.MemberUp) {
receiveMemberUp(((ClusterEvent.MemberUp) message).member());
} else if (message instanceof ClusterEvent.MemberRemoved) {
} else if ( message instanceof ClusterEvent.UnreachableMember){
receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
- } else
+ } else {
unhandled(message);
+ }
}
/**
*/
void receiveMemberUp(Member member) {
- if (selfAddress.equals(member.address()))
+ if (selfAddress.equals(member.address())) {
return; //ignore up notification for self
+ }
- if (!clusterMembers.contains(member.address()))
+ if (!clusterMembers.contains(member.address())) {
clusterMembers.add(member.address());
+ }
if(log.isDebugEnabled()) {
log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
}
* 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
*/
void receiveGossipTick(){
- if (clusterMembers.size() == 0) return; //no members to send gossip status to
+ if (clusterMembers.size() == 0) {
+ return; //no members to send gossip status to
+ }
Address remoteMemberToGossipTo;
- if (clusterMembers.size() == 1)
+ if (clusterMembers.size() == 1) {
remoteMemberToGossipTo = clusterMembers.get(0);
- else {
+ } else {
Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
remoteMemberToGossipTo = clusterMembers.get(randomIndex);
}
*/
void receiveGossipStatus(GossipStatus status){
//Don't accept messages from non-members
- if (!clusterMembers.contains(status.from()))
+ if (!clusterMembers.contains(status.from())) {
return;
+ }
final ActorRef sender = getSender();
Future<Object> futureReply =
for (Address address : remoteVersions.keySet()){
- if (localVersions.get(address) == null || remoteVersions.get(address) == null)
+ if (localVersions.get(address) == null || remoteVersions.get(address) == null) {
continue; //this condition is taken care of by above diffs
- if (localVersions.get(address) < remoteVersions.get(address))
+ }
+ if (localVersions.get(address) < remoteVersions.get(address)) {
localIsOlder.add(address);
- else if (localVersions.get(address) > remoteVersions.get(address))
+ } else if (localVersions.get(address) > remoteVersions.get(address)) {
localIsNewer.add(address);
+ }
}
- if (!localIsOlder.isEmpty())
+ if (!localIsOlder.isEmpty()) {
sendGossipStatusTo(sender, localVersions );
+ }
- if (!localIsNewer.isEmpty())
+ if (!localIsNewer.isEmpty()) {
sendGossipTo(sender, localIsNewer);//send newer buckets to remote
+ }
}
return null;