}
public void onDeviceConnected(final NodeId nodeId) {
+ LOG.debug("FRM service registered for: {}", nodeId.getValue());
final DeviceMastership mastership = new DeviceMastership(nodeId, clusterSingletonService);
deviceMasterships.put(nodeId, mastership);
- LOG.debug("FRM service registered for: {}", nodeId.getValue());
}
public void onDeviceDisconnected(final NodeId nodeId) {
ConcurrentHashMap<NodeId, DeviceMastership> getDeviceMasterships() {
return deviceMasterships;
}
-
}
public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<T>> modifications) {
for (DataTreeModification<T> modification : modifications) {
final NodeId nodeId = PathUtil.digNodeId(modification.getRootPath().getRootIdentifier());
-
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("DataTreeModification of {} in {} datastore", nodeId.getValue(), dsType());
+ }
try {
final Optional<ListenableFuture<Boolean>> optFuture = processNodeModification(modification);
if (optFuture.isPresent()) {
final ListenableFuture<Boolean> future = optFuture.get();
final Boolean ret = future.get(15000, TimeUnit.MILLISECONDS);
- LOG.debug("syncup return in {} listener for: {} [{}] thread:{}", dsType(), nodeId.getValue(), ret, threadName());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("syncup return [{}] for {} from {} listener", ret, nodeId.getValue(), dsType());
+ }
}
} catch (InterruptedException e) {
- LOG.warn("permit for forwarding rules sync not acquired: {}", nodeId.getValue());
+ LOG.warn("Permit for forwarding rules sync not acquired: {}", nodeId.getValue());
} catch (Exception e) {
- LOG.error("error processing inventory node modification: {}", nodeId.getValue(), e);
+ LOG.error("Error processing inventory node modification: {}, {}", nodeId.getValue(), e);
}
}
}
protected abstract Optional<ListenableFuture<Boolean>> processNodeModification(
- DataTreeModification<T> modification) throws InterruptedException;
+ final DataTreeModification<T> modification) throws InterruptedException;
protected abstract LogicalDatastoreType dsType();
- private static String threadName() {
- final Thread currentThread = Thread.currentThread();
- return currentThread.getName();
- }
}
}
@Override
- public void onDataTreeChanged(Collection<DataTreeModification<FlowCapableNode>> modifications) {
- LOG.trace("Config changes: {}", modifications.size());
+ public void onDataTreeChanged(final Collection<DataTreeModification<FlowCapableNode>> modifications) {
super.onDataTreeChanged(modifications);
}
* @throws InterruptedException from syncup
*/
protected Optional<ListenableFuture<Boolean>> processNodeModification(
- DataTreeModification<FlowCapableNode> modification) throws InterruptedException {
+ final DataTreeModification<FlowCapableNode> modification) throws InterruptedException {
final InstanceIdentifier<FlowCapableNode> nodePath = modification.getRootPath().getRootIdentifier();
final NodeId nodeId = PathUtil.digNodeId(nodePath);
final Optional<FlowCapableNode> operationalNode = operationalDao.loadByNodeId(nodeId);
if (!operationalNode.isPresent()) {
- LOG.info("Skip syncup, {} operational is not present", nodeId.getValue());
+ LOG.debug("Skip syncup, {} operational is not present", nodeId.getValue());
return Optional.absent();
}
private ListenableFuture<Boolean> onNodeAdded(final InstanceIdentifier<FlowCapableNode> nodePath,
final FlowCapableNode dataAfter,
final FlowCapableNode operationalNode) throws InterruptedException {
- NodeId nodeId = PathUtil.digNodeId(nodePath);
- LOG.trace("onNodeAdded {}", nodeId.getValue());
- LOG.debug("Reconciliation {}: {}", dsType(), nodeId.getValue());
+ LOG.debug("Reconciliation {}: {}", dsType(), PathUtil.digNodeId(nodePath).getValue());
final SyncupEntry syncupEntry = new SyncupEntry(dataAfter, dsType(), operationalNode, LogicalDatastoreType.OPERATIONAL);
return reactor.syncup(nodePath, syncupEntry);
}
private ListenableFuture<Boolean> onNodeUpdated(final InstanceIdentifier<FlowCapableNode> nodePath,
final FlowCapableNode dataBefore,
final FlowCapableNode dataAfter) throws InterruptedException {
- NodeId nodeId = PathUtil.digNodeId(nodePath);
- LOG.trace("onNodeUpdated {}", nodeId.getValue());
final SyncupEntry syncupEntry = new SyncupEntry(dataAfter, dsType(), dataBefore, dsType());
return reactor.syncup(nodePath, syncupEntry);
}
*/
private ListenableFuture<Boolean> onNodeDeleted(final InstanceIdentifier<FlowCapableNode> nodePath,
final FlowCapableNode dataBefore) throws InterruptedException {
- NodeId nodeId = PathUtil.digNodeId(nodePath);
- LOG.trace("onNodeDeleted {}", nodeId.getValue());
final SyncupEntry syncupEntry = new SyncupEntry(null, dsType(), dataBefore, dsType());
return reactor.syncup(nodePath, syncupEntry);
}
public LogicalDatastoreType dsType() {
return LogicalDatastoreType.CONFIGURATION;
}
-
}
}
@Override
- public void onDataTreeChanged(Collection<DataTreeModification<Node>> modifications) {
- LOG.trace("Operational changes: {}", modifications.size());
+ public void onDataTreeChanged(final Collection<DataTreeModification<Node>> modifications) {
super.onDataTreeChanged(modifications);
}
* @throws InterruptedException from syncup
*/
protected Optional<ListenableFuture<Boolean>> processNodeModification(
- DataTreeModification<Node> modification) throws InterruptedException {
+ final DataTreeModification<Node> modification) throws InterruptedException {
final NodeId nodeId = ModificationUtil.nodeId(modification);
updateCache(modification);
* Unregister for device mastership.
* @param modification Datastore modification
*/
- private void updateCache(DataTreeModification<Node> modification) {
+ private void updateCache(final DataTreeModification<Node> modification) {
NodeId nodeId = ModificationUtil.nodeId(modification);
if (isDelete(modification) || isDeleteLogical(modification)) {
operationalSnapshot.updateCache(nodeId, Optional.absent());
operationalSnapshot.updateCache(nodeId, Optional.fromNullable(ModificationUtil.flowCapableNodeAfter(modification)));
}
- private Optional<ListenableFuture<Boolean>> skipModification(DataTreeModification<Node> modification) {
- LOG.trace("Skipping operational modification: {}, before {}, after {}",
- ModificationUtil.nodeIdValue(modification),
- modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
- modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
+ private Optional<ListenableFuture<Boolean>> skipModification(final DataTreeModification<Node> modification) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Skipping operational modification: {}, before {}, after {}",
+ ModificationUtil.nodeIdValue(modification),
+ modification.getRootNode().getDataBefore() == null ? "null" : "nonnull",
+ modification.getRootNode().getDataAfter() == null ? "null" : "nonnull");
+ }
return Optional.absent();
}
/**
* ModificationType.DELETE.
*/
- private boolean isDelete(DataTreeModification<Node> modification) {
- if (ModificationType.DELETE == modification.getRootNode().getModificationType()) {
- LOG.trace("Delete {} (physical)", ModificationUtil.nodeIdValue(modification));
- return true;
- }
-
- return false;
+ private boolean isDelete(final DataTreeModification<Node> modification) {
+ return ModificationType.DELETE == modification.getRootNode().getModificationType();
}
/**
* All connectors disappeared from operational store (logical delete).
*/
- private boolean isDeleteLogical(DataTreeModification<Node> modification) {
+ private boolean isDeleteLogical(final DataTreeModification<Node> modification) {
final DataObjectModification<Node> rootNode = modification.getRootNode();
- if (!safeConnectorsEmpty(rootNode.getDataBefore()) && safeConnectorsEmpty(rootNode.getDataAfter())) {
- LOG.trace("Delete {} (logical)", ModificationUtil.nodeIdValue(modification));
- return true;
- }
+ return !safeConnectorsEmpty(rootNode.getDataBefore()) && safeConnectorsEmpty(rootNode.getDataAfter());
- return false;
}
- private boolean isAdd(DataTreeModification<Node> modification) {
+ private boolean isAdd(final DataTreeModification<Node> modification) {
final DataObjectModification<Node> rootNode = modification.getRootNode();
- final Node dataAfter = rootNode.getDataAfter();
- final Node dataBefore = rootNode.getDataBefore();
-
- final boolean nodeAppearedInOperational = dataBefore == null && dataAfter != null;
- if (nodeAppearedInOperational) {
- LOG.trace("Add {} (physical)", ModificationUtil.nodeIdValue(modification));
- }
- return nodeAppearedInOperational;
+ return rootNode.getDataBefore() == null && rootNode.getDataAfter() != null;
}
/**
* All connectors appeared in operational store (logical add).
*/
- private boolean isAddLogical(DataTreeModification<Node> modification) {
+ private boolean isAddLogical(final DataTreeModification<Node> modification) {
final DataObjectModification<Node> rootNode = modification.getRootNode();
- if (safeConnectorsEmpty(rootNode.getDataBefore()) && !safeConnectorsEmpty(rootNode.getDataAfter())) {
- LOG.trace("Add {} (logical)", ModificationUtil.nodeIdValue(modification));
- return true;
- }
-
- return false;
+ return safeConnectorsEmpty(rootNode.getDataBefore()) && !safeConnectorsEmpty(rootNode.getDataAfter());
}
/**
* @return optional syncup future
* @throws InterruptedException from syncup
*/
- private Optional<ListenableFuture<Boolean>> reconciliation(DataTreeModification<Node> modification) throws InterruptedException {
+ private Optional<ListenableFuture<Boolean>> reconciliation(final DataTreeModification<Node> modification)
+ throws InterruptedException {
final NodeId nodeId = ModificationUtil.nodeId(modification);
final Optional<FlowCapableNode> nodeConfiguration = configDao.loadByNodeId(nodeId);
}
}
- private boolean isConsistentForReconcile(DataTreeModification<Node> modification) {
+ private boolean isConsistentForReconcile(final DataTreeModification<Node> modification) {
final NodeId nodeId = PathUtil.digNodeId(modification.getRootPath().getRootIdentifier());
final FlowCapableStatisticsGatheringStatus gatheringStatus = modification.getRootNode().getDataAfter()
.getAugmentation(FlowCapableStatisticsGatheringStatus.class);
return true;
}
} catch (ParseException e) {
- LOG.error("Timestamp parsing error {}", e);
+ LOG.warn("Timestamp parsing error {}", e);
}
LOG.debug("Fresh operational not present: {}", nodeId.getValue());
return false;
}
- private static boolean safeConnectorsEmpty(Node node) {
+ private static boolean safeConnectorsEmpty(final Node node) {
if (node == null) {
return true;
}
public LogicalDatastoreType dsType() {
return LogicalDatastoreType.OPERATIONAL;
}
-
}
public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
final SyncupEntry syncupEntry) throws InterruptedException {
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
- LOG.trace("Syncup cluster decorator: {}", nodeId.getValue());
-
if (!deviceMastershipManager.isDeviceMastered(nodeId)) {
LOG.debug("Skip syncup since not master for: {}", nodeId.getValue());
return Futures.immediateFuture(Boolean.TRUE);
public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
final SyncupEntry syncupEntry) throws InterruptedException {
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
- LOG.trace("syncup future decorator: {}", nodeId.getValue());
-
return executorService.submit(() -> {
final String oldThreadName = updateThreadName(nodeId);
-
try {
- final Boolean ret = doSyncupInFuture(flowcapableNodePath, syncupEntry)
- .get(10000, TimeUnit.MILLISECONDS);
- LOG.trace("syncup return in future decorator: {} [{}]", nodeId.getValue(), ret);
- return true;
+ final Boolean ret = doSyncupInFuture(flowcapableNodePath, syncupEntry).get(10000, TimeUnit.MILLISECONDS);
+ return ret;
} catch (TimeoutException e) {
- LOG.error("doSyncupInFuture timeout occured {}", nodeId.getValue(), e);
+ LOG.warn("doSyncupInFuture timeout occured {}", nodeId.getValue(), e);
return false;
} finally {
updateThreadName(oldThreadName);
protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
final SyncupEntry syncupEntry) throws InterruptedException {
- final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
- LOG.trace("doSyncupInFuture future decorator: {}", nodeId.getValue());
-
return delegate.syncup(flowcapableNodePath, syncupEntry);
}
- private String updateThreadName(NodeId nodeId) {
+ private String updateThreadName(final NodeId nodeId) {
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
- try {
- if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {
- currentThread.setName(oldName + "@" + nodeId.getValue());
- } else {
- LOG.warn("try to update foreign thread name {} {}", nodeId, oldName);
- }
- } catch (Exception e) {
- LOG.error("failed updating threadName {}", nodeId, e);
+ if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {
+ currentThread.setName(oldName + "@" + nodeId.getValue());
+ } else {
+ LOG.warn("Try to update foreign thread name {} {}", nodeId, oldName);
}
return oldName;
}
- private void updateThreadName(String name) {
+ private void updateThreadName(final String name) {
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
- try {
- if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {
- currentThread.setName(name);
- } else {
- LOG.warn("try to update foreign thread name {} {}", oldName, name);
- }
- } catch (Exception e) {
- LOG.error("failed updating threadName {}", name, e);
+ if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {
+ currentThread.setName(name);
+ } else {
+ LOG.warn("Try to update foreign thread name {} {}", oldName, name);
}
}
}
import java.util.concurrent.Semaphore;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
-import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
final SyncupEntry syncupEntry) throws InterruptedException {
- final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
- LOG.trace("syncup zip decorator: {}", nodeId.getValue());
-
try {
compressionGuard.acquire();
-
final boolean newFutureNecessary = updateCompressionState(flowcapableNodePath, syncupEntry);
if (newFutureNecessary) {
super.syncup(flowcapableNodePath, syncupEntry);
protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
final SyncupEntry syncupEntry) throws InterruptedException {
- final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
- LOG.trace("doSyncupInFuture zip decorator: {}", nodeId.getValue());
-
final SyncupEntry lastCompressionState = removeLastCompressionState(flowcapableNodePath);
if (lastCompressionState == null) {
return Futures.immediateFuture(true);
return previousEntry == null;
}
- private void updateOptimizedConfigDelta(InstanceIdentifier<FlowCapableNode> flowcapableNodePath, SyncupEntry actual,
- SyncupEntry previous) {
- compressionQueue.put(flowcapableNodePath, new SyncupEntry(actual.getAfter(), actual.getDsTypeAfter(),
- previous.getBefore(), previous.getDsTypeBefore()));
+ private void updateOptimizedConfigDelta(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+ final SyncupEntry actual,
+ final SyncupEntry previous) {
+ final SyncupEntry updatedEntry = new SyncupEntry(actual.getAfter(), actual.getDsTypeAfter(),
+ previous.getBefore(), previous.getDsTypeBefore());
+ compressionQueue.put(flowcapableNodePath, updatedEntry);
}
- private SyncupEntry removeLastCompressionState(
- final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
+ private SyncupEntry removeLastCompressionState(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath) {
try {
try {
compressionGuard.acquire();
} catch (InterruptedException e) {
return null;
}
-
return compressionQueue.remove(flowcapableNodePath);
} finally {
compressionGuard.release();
}
}
-
}
\ No newline at end of file
public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
final SyncupEntry syncupEntry) throws InterruptedException {
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
- LOG.trace("syncup guard decorator: {}", nodeId.getValue());
-
final long stampBeforeGuard = System.nanoTime();
final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);
if (guard == null) {
try {
if (LOG.isDebugEnabled()) {
- LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),
+ LOG.debug("syncup start {} waiting:{} guard:{} thread:{}",
+ nodeId.getValue(),
formatNanos(stampAfterGuard - stampBeforeGuard),
- guard, threadName());
+ guard,
+ threadName());
}
- final ListenableFuture<Boolean> endResult =
- delegate.syncup(flowcapableNodePath, syncupEntry);
-
+ final ListenableFuture<Boolean> endResult = delegate.syncup(flowcapableNodePath, syncupEntry);
Futures.addCallback(endResult, createSyncupCallback(guard, stampBeforeGuard, stampAfterGuard, nodeId));
return endResult;
} catch (InterruptedException e) {
@Override
public void onFailure(final Throwable t) {
final long stampFinished = System.nanoTime();
- LOG.error("syncup failed {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),
+ LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),
formatNanos(stampFinished - stampBeforeGuard), formatNanos(stampFinished - stampAfterGuard),
formatNanos(stampAfterGuard - stampBeforeGuard), guard.availablePermits(), threadName());
releaseGuardForNodeId(guard);
}};
}
- private static String formatNanos(long nanos) {
+ private static String formatNanos(final long nanos) {
return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";
}
try {
guard.acquire();
} catch (InterruptedException e) {
- LOG.error("syncup summon {} failed {}", nodeId.getValue(), e);
+ LOG.warn("syncup summon {} failed {}", nodeId.getValue(), e);
return null;
}
- LOG.trace("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());
+ }
return guard;
}
private static void releaseGuardForNodeId(final Semaphore guard) {
if (guard != null) {
guard.release();
- LOG.trace("syncup release guard:{} thread:{}", guard, threadName());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("syncup release guard:{} thread:{}", guard, threadName());
+ }
}
}
final Thread currentThread = Thread.currentThread();
return currentThread.getName();
}
-
}
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SynchronizationDiffInput;
import org.opendaylight.openflowplugin.applications.frsync.util.CrudCounts;
import org.opendaylight.openflowplugin.applications.frsync.util.FlowCapableNodeLookups;
-import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
FlowCapableNode configTree = syncupEntry.getAfter();
FlowCapableNode operationalTree = syncupEntry.getBefore();
-
- LOG.trace("syncup reactor {} cfg:{} oper:{}",
- nodeId.getValue(),
- configTree == null ? "is null" : "non null",
- operationalTree == null ? "is null" : "non null");
final SyncCrudCounters counters = new SyncCrudCounters();
/**
* - recommended order follows:
* reconciliation strategy - phase 1: - add/update missing objects in following order:
* - table features - groups (reordered) - meters - flows
- *
* reconciliation strategy - phase 2: - remove redundant objects in following order:
* - flows - meters - groups (reordered)
**/
final ListenableFuture<RpcResult<Void>> resultVehicle = syncPlanPushStrategy.executeSyncStrategy(
bootstrapResultFuture, input, counters);
- // log final result
- Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "final result"));
-
return Futures.transform(resultVehicle, new Function<RpcResult<Void>, Boolean>() {
@Override
public Boolean apply(RpcResult<Void> input) {
final CrudCounts flowCrudCounts = counters.getFlowCrudCounts();
final CrudCounts meterCrudCounts = counters.getMeterCrudCounts();
final CrudCounts groupCrudCounts = counters.getGroupCrudCounts();
- LOG.debug("syncup outcome[{}] (added/updated/removed): flow={}/{}/{}, group={}/{}/{}, meter={}/{}/{}, took={} ms",
+ LOG.debug("syncup outcome[{}] (added/updated/removed): flow={}/{}/{}, group={}/{}/{}, " +
+ "meter={}/{}/{}, took={} ms, errors={}",
nodeId.getValue(),
flowCrudCounts.getAdded(), flowCrudCounts.getUpdated(), flowCrudCounts.getRemoved(),
groupCrudCounts.getAdded(), groupCrudCounts.getUpdated(), groupCrudCounts.getRemoved(),
meterCrudCounts.getAdded(), meterCrudCounts.getUpdated(), meterCrudCounts.getRemoved(),
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - counters.getStartNano()));
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - counters.getStartNano()),
+ Arrays.toString(input.getErrors().toArray()));
}
- LOG.trace("syncup errors: {}", input.getErrors());
return input.isSuccessful();
}});
}
return ReconcileUtil.resolveAndDivideGroupDiffs(nodeId, groupConfiguredMap, pendingGroups, false);
}
-
}
final SyncupEntry syncupEntry) throws InterruptedException {
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
- LOG.trace("syncup retry decorator: {}", nodeId.getValue());
-
if (syncupEntry.isOptimizedConfigDelta() && reconciliationRegistry.isRegistered(nodeId)) {
LOG.debug("Config change ignored because {} is in reconcile.", nodeId.getValue());
return Futures.immediateFuture(Boolean.FALSE);
return Futures.transform(syncupResult, new Function<Boolean, Boolean>() {
@Override
public Boolean apply(Boolean result) {
- LOG.trace("syncup return in retry decorator: {} [{}]", nodeId.getValue(), result);
if (result) {
reconciliationRegistry.unregisterIfRegistered(nodeId);
return true;
}
public void onDeviceConnected(final NodeId nodeId) {
+ LOG.debug("FRS service registered for: {}", nodeId.getValue());
final DeviceMastership mastership = new DeviceMastership(nodeId, reconciliationRegistry, clusterSingletonService);
deviceMasterships.put(nodeId, mastership);
- LOG.debug("FRS service registered for: {}", nodeId.getValue());
}
public void onDeviceDisconnected(final NodeId nodeId) {
ConcurrentHashMap<NodeId, DeviceMastership> getDeviceMasterships() {
return deviceMasterships;
}
-
}
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
-import org.opendaylight.openflowplugin.applications.frsync.util.FxChainUtil;
import org.opendaylight.openflowplugin.applications.frsync.util.ItemSyncBox;
import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
import org.opendaylight.openflowplugin.applications.frsync.util.ReconcileUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.input.batch.batch.choice.flat.batch.update.meter._case.FlatBatchUpdateMeterBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.process.flat.batch.output.BatchFailure;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.batch.group.input.update.grouping.OriginalBatchedGroupBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.groups.service.rev160315.batch.group.input.update.grouping.UpdatedBatchedGroupBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.OriginalBatchedMeterBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meters.service.rev160316.batch.meter.input.update.grouping.UpdatedBatchedMeterBuilder;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public ListenableFuture<RpcResult<Void>> executeSyncStrategy(ListenableFuture<RpcResult<Void>> resultVehicle,
final SynchronizationDiffInput diffInput,
final SyncCrudCounters counters) {
- final InstanceIdentifier<FlowCapableNode> nodeIdent = diffInput.getNodeIdent();
- final NodeId nodeId = PathUtil.digNodeId(nodeIdent);
-
// prepare default (full) counts
counters.getGroupCrudCounts().setAdded(ReconcileUtil.countTotalPushed(diffInput.getGroupsToAddOrUpdate()));
counters.getGroupCrudCounts().setUpdated(ReconcileUtil.countTotalUpdated(diffInput.getGroupsToAddOrUpdate()));
final Future<RpcResult<ProcessFlatBatchOutput>> rpcResultFuture = flatBatchService.processFlatBatch(flatBatchInput);
- final int failureIndexLimit = batchOrder;
-
if (LOG.isDebugEnabled()) {
Futures.addCallback(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
- createCounterCallback(batchBag, failureIndexLimit, counters));
+ createCounterCallback(batchBag, batchOrder, counters));
}
return Futures.transform(JdkFutureAdapters.listenInPoolThread(rpcResultFuture),
ReconcileUtil.<ProcessFlatBatchOutput>createRpcResultToVoidFunction("flat-batch"));
}
});
-
- Futures.addCallback(resultVehicle, FxChainUtil.logResultCallback(nodeId, "flat-batch"));
return resultVehicle;
}