Merge "BUG-2424 Bump mina sshd-core version to 0.14"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 99bc9de6a23f0a7c637ff1f6c168048f257ef857..8e00a1389ca6a057bef39292ffcc0b04958be794 100644 (file)
@@ -23,7 +23,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -140,16 +139,14 @@ public class Shard extends RaftActor {
      * Coordinates persistence recovery on startup.
      */
     private ShardRecoveryCoordinator recoveryCoordinator;
-    private List<Object> currentLogRecoveryBatch;
 
     private final DOMTransactionFactory transactionFactory;
 
     private final String txnDispatcherPath;
 
-    protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+    protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
-        super(name.toString(), mapPeerAddresses(peerAddresses),
-                Optional.of(datastoreContext.getShardRaftConfig()));
+        super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
 
         this.name = name.toString();
         this.datastoreContext = datastoreContext;
@@ -191,6 +188,8 @@ public class Shard extends RaftActor {
 
         appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+
+        recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG);
     }
 
     private void setTransactionCommitTimeout() {
@@ -198,20 +197,8 @@ public class Shard extends RaftActor {
                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
     }
 
-    private static Map<String, String> mapPeerAddresses(
-        final Map<ShardIdentifier, String> peerAddresses) {
-        Map<String, String> map = new HashMap<>();
-
-        for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
-            .entrySet()) {
-            map.put(entry.getKey().toString(), entry.getValue());
-        }
-
-        return map;
-    }
-
     public static Props props(final ShardIdentifier name,
-        final Map<ShardIdentifier, String> peerAddresses,
+        final Map<String, String> peerAddresses,
         final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
         Preconditions.checkNotNull(name, "name should not be null");
         Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
@@ -690,7 +677,7 @@ public class Shard extends RaftActor {
         LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
                 persistenceId(), listenerRegistration.path());
 
-        getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
+        getSender().tell(new RegisterChangeListenerReply(listenerRegistration), getSelf());
     }
 
     private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
@@ -726,81 +713,27 @@ public class Shard extends RaftActor {
     @Override
     protected
     void startLogRecoveryBatch(final int maxBatchSize) {
-        currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: starting log recovery batch with max size {}", persistenceId(), maxBatchSize);
-        }
+        recoveryCoordinator.startLogRecoveryBatch(maxBatchSize);
     }
 
     @Override
     protected void appendRecoveredLogEntry(final Payload data) {
-        if(data instanceof ModificationPayload) {
-            try {
-                currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
-            } catch (ClassNotFoundException | IOException e) {
-                LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
-            }
-        } else if (data instanceof CompositeModificationPayload) {
-            currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
-        } else if (data instanceof CompositeModificationByteStringPayload) {
-            currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
-        } else {
-            LOG.error("{}: Unknown state received {} during recovery", persistenceId(), data);
-        }
+        recoveryCoordinator.appendRecoveredLogPayload(data);
     }
 
     @Override
     protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
-        if(recoveryCoordinator == null) {
-            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
-                    LOG, name.toString());
-        }
-
-        recoveryCoordinator.submit(snapshotBytes, store.newWriteOnlyTransaction());
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: submitted recovery sbapshot", persistenceId());
-        }
+        recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes);
     }
 
     @Override
     protected void applyCurrentLogRecoveryBatch() {
-        if(recoveryCoordinator == null) {
-            recoveryCoordinator = new ShardRecoveryCoordinator(persistenceId(), schemaContext,
-                    LOG, name.toString());
-        }
-
-        recoveryCoordinator.submit(currentLogRecoveryBatch, store.newWriteOnlyTransaction());
-
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: submitted log recovery batch with size {}", persistenceId(),
-                    currentLogRecoveryBatch.size());
-        }
+        recoveryCoordinator.applyCurrentLogRecoveryBatch();
     }
 
     @Override
     protected void onRecoveryComplete() {
-        if(recoveryCoordinator != null) {
-            Collection<DOMStoreWriteTransaction> txList = recoveryCoordinator.getTransactions();
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: recovery complete - committing {} Tx's", persistenceId(), txList.size());
-            }
-
-            for(DOMStoreWriteTransaction tx: txList) {
-                try {
-                    syncCommitTransaction(tx);
-                    shardMBean.incrementCommittedTransactionCount();
-                } catch (InterruptedException | ExecutionException e) {
-                    shardMBean.incrementFailedTransactionsCount();
-                    LOG.error("{}: Failed to commit", persistenceId(), e);
-                }
-            }
-        }
-
         recoveryCoordinator = null;
-        currentLogRecoveryBatch = null;
 
         //notify shard manager
         getContext().parent().tell(new ActorInitialized(), getSelf());
@@ -950,11 +883,11 @@ public class Shard extends RaftActor {
         private static final long serialVersionUID = 1L;
 
         final ShardIdentifier name;
-        final Map<ShardIdentifier, String> peerAddresses;
+        final Map<String, String> peerAddresses;
         final DatastoreContext datastoreContext;
         final SchemaContext schemaContext;
 
-        ShardCreator(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+        ShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
                 final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
             this.name = name;
             this.peerAddresses = peerAddresses;