package org.opendaylight.openflowplugin.applications.frsync;
-import java.util.EventListener;
import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
import org.opendaylight.yangtools.yang.binding.DataObject;
/**
* Unifying listener for data and event changes on node.
*/
-public interface NodeListener<T extends DataObject> extends EventListener, ClusteredDataTreeChangeListener<T> {
+public interface NodeListener<T extends DataObject> extends ClusteredDataTreeChangeListener<T> {
}
final Optional<ListenableFuture<Boolean>> optFuture = processNodeModification(modification);
if (optFuture.isPresent()) {
final ListenableFuture<Boolean> future = optFuture.get();
- final Boolean ret = future.get(15000, TimeUnit.MILLISECONDS);
+ future.get(15000, TimeUnit.MILLISECONDS);
if (LOG.isTraceEnabled()) {
- LOG.trace("Syncup return [{}] for {} from {} listener", ret, nodeId.getValue(), dsType());
+ LOG.trace("Syncup for {} return from {} listener", nodeId.getValue(), dsType());
}
}
} catch (InterruptedException e) {
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareProvider {
private static final Logger LOG = LoggerFactory.getLogger(ForwardingRulesSyncProvider.class);
+ private static final String FRS_EXECUTOR_PREFIX = "FRS-executor-";
private final DataBroker dataService;
private final ClusterSingletonServiceProvider clusterSingletonService;
final DataBroker dataBroker,
final RpcConsumerRegistry rpcRegistry,
final ClusterSingletonServiceProvider clusterSingletonService) {
- Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null!");
+ Preconditions.checkNotNull(rpcRegistry, "RpcConsumerRegistry can not be null!");
this.dataService = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
this.clusterSingletonService = Preconditions.checkNotNull(clusterSingletonService,
"ClusterSingletonServiceProvider can not be null!");
nodeOperationalDataTreePath = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, NODE_WC_PATH);
final ExecutorService executorService= Executors.newCachedThreadPool(new ThreadFactoryBuilder()
- .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX + "%d")
+ .setNameFormat(FRS_EXECUTOR_PREFIX + "%d")
.setDaemon(false)
.setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
.build());
}
public void close() throws InterruptedException {
- if (dataTreeConfigChangeListener != null) {
+ if (Objects.nonNull(dataTreeConfigChangeListener)) {
dataTreeConfigChangeListener.close();
dataTreeConfigChangeListener = null;
}
- if (dataTreeOperationalChangeListener != null) {
+ if (Objects.nonNull(dataTreeOperationalChangeListener)) {
dataTreeOperationalChangeListener.close();
dataTreeOperationalChangeListener = null;
}
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collection;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
}
@Override
- public void onDataTreeChanged(final Collection<DataTreeModification<FlowCapableNode>> modifications) {
+ public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<FlowCapableNode>> modifications) {
super.onDataTreeChanged(modifications);
}
}
/**
- * Add only what is missing on device. If node was added to config DS and it is already present
- * in operational DS (connected) diff between current new configuration and actual configuration
- * (seen in operational) should be calculated and sent to device.
+ * If node was added to config DS and it is already present in operational DS (connected) diff between current
+ * new configuration and actual configuration (seen in operational) should be calculated and sent to device.
*/
private ListenableFuture<Boolean> onNodeAdded(final InstanceIdentifier<FlowCapableNode> nodePath,
final FlowCapableNode dataAfter,
import java.util.Collection;
import java.util.Date;
import java.util.List;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
}
@Override
- public void onDataTreeChanged(final Collection<DataTreeModification<Node>> modifications) {
+ public void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<Node>> modifications) {
super.onDataTreeChanged(modifications);
}
/**
- * Update cache, register for device masterhip when device connected and start reconciliation if device
+ * Update cache, register for device mastership when device connected and start reconciliation if device
* is registered and actual modification is consistent.Skip the event otherwise.
* @throws InterruptedException from syncup
*/
public class SyncReactorFutureDecorator implements SyncReactor {
private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureDecorator.class);
- public static final String FRM_RPC_CLIENT_PREFIX = "FRS-executor-";
private final SyncReactor delegate;
private final ListeningExecutorService executorService;
- public SyncReactorFutureDecorator(SyncReactor delegate, ListeningExecutorService executorService) {
+ public SyncReactorFutureDecorator(final SyncReactor delegate, final ListeningExecutorService executorService) {
this.delegate = delegate;
this.executorService = executorService;
}
final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
return executorService.submit(() -> {
try {
- final Boolean futureResult = doSyncupInFuture(flowcapableNodePath, syncupEntry)
- .get(10000, TimeUnit.MILLISECONDS);
- return futureResult;
+ return doSyncupInFuture(flowcapableNodePath, syncupEntry).get(10000, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
LOG.warn("Syncup future timeout occured {}", nodeId.getValue(), e);
return Boolean.FALSE;
import org.opendaylight.openflowplugin.applications.frsync.util.SyncupEntry;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Enriches {@link SyncReactorFutureDecorator} with state compression.
private final Map<InstanceIdentifier<FlowCapableNode>, SyncupEntry> compressionQueue = new HashMap<>();
private final Semaphore compressionGuard = new Semaphore(1, true);
- public SyncReactorFutureZipDecorator(SyncReactor delegate, ListeningExecutorService executorService) {
+ public SyncReactorFutureZipDecorator(final SyncReactor delegate, final ListeningExecutorService executorService) {
super(delegate, executorService);
}
private final SyncReactor delegate;
private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;
- public SyncReactorGuardDecorator(SyncReactor delegate,
- SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {
+ public SyncReactorGuardDecorator(final SyncReactor delegate,
+ final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {
this.delegate = delegate;
this.semaphoreKeeper = semaphoreKeeper;
}
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SynchronizationDiffInput;
private static final Logger LOG = LoggerFactory.getLogger(SyncReactorImpl.class);
private final SyncPlanPushStrategy syncPlanPushStrategy;
- public SyncReactorImpl(SyncPlanPushStrategy syncPlanPushStrategy) {
+ public SyncReactorImpl(final SyncPlanPushStrategy syncPlanPushStrategy) {
this.syncPlanPushStrategy = Preconditions.checkNotNull(syncPlanPushStrategy, "execution strategy is mandatory");
}
public void setUp() {
final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(false)
- .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)
+ .setNameFormat("frsync-test-%d")
.setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
.build());
syncThreadPool = MoreExecutors.listeningDecorator(executorService);