import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
}
- public static Props props(final String name, final Map<String, String> peerAddresses) {
+ public static Props props(final String name,
+ final Map<String, String> peerAddresses) {
return Props.create(new Creator<Shard>() {
@Override
}
- @Override public void onReceiveCommand(Object message){
- LOG.debug("Received message {} from {}", message.getClass().toString(), getSender());
+ @Override public void onReceiveCommand(Object message) {
+ LOG.debug("Received message {} from {}", message.getClass().toString(),
+ getSender());
- if (message.getClass().equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
- if(isLeader()) {
+ if (message.getClass()
+ .equals(CreateTransactionChain.SERIALIZABLE_CLASS)) {
+ if (isLeader()) {
createTransactionChain();
- } else if(getLeader() != null){
+ } else if (getLeader() != null) {
getLeader().forward(message, getContext());
}
} else if (message instanceof RegisterChangeListener) {
updateSchemaContext((UpdateSchemaContext) message);
} else if (message instanceof ForwardedCommitTransaction) {
handleForwardedCommit((ForwardedCommitTransaction) message);
- } else if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
- if(isLeader()) {
+ } else if (message.getClass()
+ .equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+ if (isLeader()) {
createTransaction(CreateTransaction.fromSerializable(message));
- } else if(getLeader() != null){
+ } else if (getLeader() != null) {
getLeader().forward(message, getContext());
}
- } else if (message instanceof PeerAddressResolved){
+ } else if (message instanceof PeerAddressResolved) {
PeerAddressResolved resolved = (PeerAddressResolved) message;
setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
} else {
- super.onReceiveCommand(message);
+ super.onReceiveCommand(message);
}
}
- private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,String transactionId){
- if(createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_ONLY.ordinal()){
- return getContext().actorOf(
- ShardTransaction.props( store.newReadOnlyTransaction(), getSelf(), schemaContext), transactionId);
-
- }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_WRITE.ordinal()){
- return getContext().actorOf(
- ShardTransaction.props( store.newReadWriteTransaction(), getSelf(), schemaContext), transactionId);
-
-
- }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.WRITE_ONLY.ordinal()){
- return getContext().actorOf(
- ShardTransaction.props( store.newWriteOnlyTransaction(), getSelf(), schemaContext), transactionId);
- }else{
- throw new IllegalArgumentException ("CreateTransaction message has unidentified transaction type="+createTransaction.getTransactionType()) ;
- }
- }
+ private ActorRef createTypedTransactionActor(
+ CreateTransaction createTransaction, String transactionId) {
+ if (createTransaction.getTransactionType()
+ == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+ shardMBean.incrementReadOnlyTransactionCount();
+ return getContext().actorOf(
+ ShardTransaction
+ .props(store.newReadOnlyTransaction(), getSelf(),
+ schemaContext), transactionId);
+
+ } else if (createTransaction.getTransactionType()
+ == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
+ shardMBean.incrementReadWriteTransactionCount();
+ return getContext().actorOf(
+ ShardTransaction
+ .props(store.newReadWriteTransaction(), getSelf(),
+ schemaContext), transactionId);
+
+
+ } else if (createTransaction.getTransactionType()
+ == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+ shardMBean.incrementWriteOnlyTransactionCount();
+ return getContext().actorOf(
+ ShardTransaction
+ .props(store.newWriteOnlyTransaction(), getSelf(),
+ schemaContext), transactionId);
+ } else {
+ throw new IllegalArgumentException(
+ "CreateTransaction message has unidentified transaction type="
+ + createTransaction.getTransactionType());
+ }
+ }
private void createTransaction(CreateTransaction createTransaction) {
String transactionId = "shard-" + createTransaction.getTransactionId();
- LOG.info("Creating transaction : {} " , transactionId);
- ActorRef transactionActor = createTypedTransactionActor(createTransaction,transactionId);
+ LOG.info("Creating transaction : {} ", transactionId);
+ ActorRef transactionActor =
+ createTypedTransactionActor(createTransaction, transactionId);
getSender()
- .tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor), createTransaction.getTransactionId()).toSerializable(),
- getSelf());
+ .tell(new CreateTransactionReply(
+ Serialization.serializedActorPath(transactionActor),
+ createTransaction.getTransactionId()).toSerializable(),
+ getSelf()
+ );
}
private void commit(final ActorRef sender, Object serialized) {
- Modification modification = MutableCompositeModification.fromSerializable(serialized, schemaContext);
+ Modification modification =
+ MutableCompositeModification.fromSerializable(
+ serialized, schemaContext);
DOMStoreThreePhaseCommitCohort cohort =
modificationToCohort.remove(serialized);
if (cohort == null) {
try {
future.get();
- if(sender != null) {
+ if (sender != null) {
sender
.tell(new CommitTransactionReply().toSerializable(),
self);
}
private void handleForwardedCommit(ForwardedCommitTransaction message) {
- Object serializedModification = message.getModification().toSerializable();
+ Object serializedModification =
+ message.getModification().toSerializable();
modificationToCohort
- .put(serializedModification , message.getCohort());
+ .put(serializedModification, message.getCohort());
- if(persistent) {
- this.persistData(getSender(), "identifier", new CompositeModificationPayload(serializedModification));
+ if (persistent) {
+ this.persistData(getSender(), "identifier",
+ new CompositeModificationPayload(serializedModification));
} else {
this.commit(getSender(), serializedModification);
}
private void registerChangeListener(
RegisterChangeListener registerChangeListener) {
- LOG.debug("registerDataChangeListener for " + registerChangeListener.getPath());
+ LOG.debug("registerDataChangeListener for " + registerChangeListener
+ .getPath());
ActorSelection dataChangeListenerPath = getContext()
// Notify the listener if notifications should be enabled or not
// If this shard is the leader then it will enable notifications else
// it will not
- dataChangeListenerPath.tell(new EnableNotification(isLeader()), getSelf());
+ dataChangeListenerPath
+ .tell(new EnableNotification(isLeader()), getSelf());
// Now store a reference to the data change listener so it can be notified
// at a later point if notifications should be enabled or disabled
dataChangeListeners.add(dataChangeListenerPath);
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
- listener = new DataChangeListenerProxy(schemaContext,dataChangeListenerPath);
+ listener =
+ new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
registration =
getContext().actorOf(
DataChangeListenerRegistration.props(registration));
- LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = " + listenerRegistration.path().toString());
+ LOG.debug(
+ "registerDataChangeListener sending reply, listenerRegistrationPath = "
+ + listenerRegistration.path().toString());
getSender()
.tell(new RegisterChangeListenerReply(listenerRegistration.path()),
ShardTransactionChain.props(chain, schemaContext));
getSender()
.tell(new CreateTransactionChainReply(transactionChain.path())
- .toSerializable(),
- getSelf());
+ .toSerializable(),
+ getSelf()
+ );
}
@Override protected void applyState(ActorRef clientActor, String identifier,
Object data) {
- if(data instanceof CompositeModificationPayload){
+ if (data instanceof CompositeModificationPayload) {
Object modification =
((CompositeModificationPayload) data).getModification();
- if(modification != null){
+ if (modification != null) {
commit(clientActor, modification);
} else {
LOG.error("modification is null - this is very unexpected");
LOG.error("Unknown state received {}", data);
}
+ ReplicatedLogEntry lastLogEntry = getLastLogEntry();
+
+ if(lastLogEntry != null){
+ shardMBean.setLastLogIndex(lastLogEntry.getIndex());
+ shardMBean.setLastLogTerm(lastLogEntry.getTerm());
+ }
+
+ shardMBean.setCommitIndex(getCommitIndex());
+ shardMBean.setLastApplied(getLastApplied());
+
}
@Override protected Object createSnapshot() {
}
@Override protected void onStateChanged() {
- for(ActorSelection dataChangeListener : dataChangeListeners){
- dataChangeListener.tell(new EnableNotification(isLeader()), getSelf());
+ for (ActorSelection dataChangeListener : dataChangeListeners) {
+ dataChangeListener
+ .tell(new EnableNotification(isLeader()), getSelf());
}
- if(getLeaderId() != null){
+ if (getLeaderId() != null) {
shardMBean.setLeader(getLeaderId());
}
shardMBean.setRaftState(getRaftState().name());
+ shardMBean.setCurrentTerm(getCurrentTerm());
}
@Override public String persistenceId() {