-/**\r
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-\r
-package org.opendaylight.openflowplugin.applications.frsync.dao;\r
-\r
-import com.google.common.base.Optional;\r
-import javax.annotation.Nonnull;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
-\r
-/**\r
- * Implementation of data access object for {@link FlowCapableNode}.\r
- * Contains pair of snapshot and odl DAOs.\r
- */\r
-public class FlowCapableNodeCachedDao implements FlowCapableNodeDao {\r
-\r
- private final FlowCapableNodeDao snapshotDao;\r
- private final FlowCapableNodeDao odlDao;\r
-\r
- public FlowCapableNodeCachedDao(FlowCapableNodeDao snapshotDao, FlowCapableNodeDao odlDao) {\r
- this.snapshotDao = snapshotDao;\r
- this.odlDao = odlDao;\r
- }\r
-\r
- public Optional<FlowCapableNode> loadByNodeId(@Nonnull NodeId nodeId) {\r
- final Optional<FlowCapableNode> node = snapshotDao.loadByNodeId(nodeId);\r
-\r
- if (node.isPresent()) {\r
- return node;\r
- }\r
-\r
- return odlDao.loadByNodeId(nodeId);\r
- }\r
-\r
-}\r
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.dao;
+
+import com.google.common.base.Optional;
+import javax.annotation.Nonnull;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+
+/**
+ * Implementation of data access object for {@link FlowCapableNode}.
+ * Contains pair of snapshot and odl DAOs.
+ */
+public class FlowCapableNodeCachedDao implements FlowCapableNodeDao {
+
+ private final FlowCapableNodeDao snapshotDao;
+ private final FlowCapableNodeDao odlDao;
+
+ public FlowCapableNodeCachedDao(FlowCapableNodeDao snapshotDao, FlowCapableNodeDao odlDao) {
+ this.snapshotDao = snapshotDao;
+ this.odlDao = odlDao;
+ }
+
+ public Optional<FlowCapableNode> loadByNodeId(@Nonnull NodeId nodeId) {
+ final Optional<FlowCapableNode> node = snapshotDao.loadByNodeId(nodeId);
+
+ if (node.isPresent()) {
+ return node;
+ }
+
+ return odlDao.loadByNodeId(nodeId);
+ }
+
+}
-/**\r
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-\r
-package org.opendaylight.openflowplugin.applications.frsync.dao;\r
-\r
-import com.google.common.base.Optional;\r
-import javax.annotation.Nonnull;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
-\r
-/**\r
- * Data access object for {@link FlowCapableNode}.\r
- */\r
-public interface FlowCapableNodeDao {\r
- Optional<FlowCapableNode> loadByNodeId(@Nonnull NodeId nodeId);\r
-}\r
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.dao;
+
+import com.google.common.base.Optional;
+import javax.annotation.Nonnull;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+
+/**
+ * Data access object for {@link FlowCapableNode}.
+ */
+public interface FlowCapableNodeDao {
+ Optional<FlowCapableNode> loadByNodeId(@Nonnull NodeId nodeId);
+}
-/**\r
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-\r
-package org.opendaylight.openflowplugin.applications.frsync.dao;\r
-\r
-import com.google.common.base.Optional;\r
-import java.util.concurrent.TimeUnit;\r
-import java.util.concurrent.TimeoutException;\r
-import javax.annotation.Nonnull;\r
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;\r
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;\r
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;\r
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;\r
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-/**\r
- * Implementation of data access object for ODL {@link FlowCapableNode}.\r
- */\r
-public class FlowCapableNodeOdlDao implements FlowCapableNodeDao {\r
-\r
- private static final Logger LOG = LoggerFactory.getLogger(FlowCapableNodeOdlDao.class);\r
-\r
- private static final InstanceIdentifier<Nodes> NODES_IID = InstanceIdentifier.create(Nodes.class);\r
- private final DataBroker dataBroker;\r
- private final LogicalDatastoreType logicalDatastoreType;\r
-\r
- public FlowCapableNodeOdlDao(DataBroker dataBroker, LogicalDatastoreType logicalDatastoreType) {\r
- this.dataBroker = dataBroker;\r
- this.logicalDatastoreType = logicalDatastoreType;\r
- }\r
-\r
- public Optional<FlowCapableNode> loadByNodeId(@Nonnull NodeId nodeId) {\r
- try (final ReadOnlyTransaction roTx = dataBroker.newReadOnlyTransaction()) {\r
- final InstanceIdentifier<FlowCapableNode> path =\r
- NODES_IID.child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);\r
- return roTx.read(logicalDatastoreType, path).checkedGet(5000, TimeUnit.MILLISECONDS);\r
- } catch (ReadFailedException | TimeoutException e) {\r
- LOG.error("error reading {}", nodeId, e);\r
- }\r
-\r
- return Optional.absent();\r
- }\r
-\r
-}\r
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.dao;
+
+import com.google.common.base.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of data access object for ODL {@link FlowCapableNode}.
+ */
+public class FlowCapableNodeOdlDao implements FlowCapableNodeDao {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlowCapableNodeOdlDao.class);
+
+ private static final InstanceIdentifier<Nodes> NODES_IID = InstanceIdentifier.create(Nodes.class);
+ private final DataBroker dataBroker;
+ private final LogicalDatastoreType logicalDatastoreType;
+
+ public FlowCapableNodeOdlDao(DataBroker dataBroker, LogicalDatastoreType logicalDatastoreType) {
+ this.dataBroker = dataBroker;
+ this.logicalDatastoreType = logicalDatastoreType;
+ }
+
+ public Optional<FlowCapableNode> loadByNodeId(@Nonnull NodeId nodeId) {
+ try (final ReadOnlyTransaction roTx = dataBroker.newReadOnlyTransaction()) {
+ final InstanceIdentifier<FlowCapableNode> path =
+ NODES_IID.child(Node.class, new NodeKey(nodeId)).augmentation(FlowCapableNode.class);
+ return roTx.read(logicalDatastoreType, path).checkedGet(5000, TimeUnit.MILLISECONDS);
+ } catch (ReadFailedException | TimeoutException e) {
+ LOG.error("error reading {}", nodeId, e);
+ }
+
+ return Optional.absent();
+ }
+
+}
-/**\r
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-\r
-package org.opendaylight.openflowplugin.applications.frsync.dao;\r
-\r
-import com.google.common.base.Optional;\r
-import java.util.concurrent.ConcurrentHashMap;\r
-import javax.annotation.Nonnull;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
-\r
-/**\r
- * Adding cache to data access object of {@link FlowCapableNode}.\r
- */\r
-public class FlowCapableNodeSnapshotDao implements FlowCapableNodeDao {\r
-\r
- private final ConcurrentHashMap<String, FlowCapableNode> cache = new ConcurrentHashMap<>();\r
-\r
- public void updateCache(@Nonnull NodeId nodeId, Optional<FlowCapableNode> dataAfter) {\r
- if (dataAfter.isPresent()) {\r
- cache.put(nodeId.getValue(), dataAfter.get());\r
- } else {\r
- cache.remove(nodeId.getValue());\r
- }\r
- }\r
-\r
- public Optional<FlowCapableNode> loadByNodeId(@Nonnull NodeId nodeId) {\r
- final FlowCapableNode node = cache.get(nodeId.getValue());\r
- return Optional.fromNullable(node);\r
- }\r
-\r
-}\r
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.dao;
+
+import com.google.common.base.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nonnull;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+
+/**
+ * Adding cache to data access object of {@link FlowCapableNode}.
+ */
+public class FlowCapableNodeSnapshotDao implements FlowCapableNodeDao {
+
+ private final ConcurrentHashMap<String, FlowCapableNode> cache = new ConcurrentHashMap<>();
+
+ public void updateCache(@Nonnull NodeId nodeId, Optional<FlowCapableNode> dataAfter) {
+ if (dataAfter.isPresent()) {
+ cache.put(nodeId.getValue(), dataAfter.get());
+ } else {
+ cache.remove(nodeId.getValue());
+ }
+ }
+
+ public Optional<FlowCapableNode> loadByNodeId(@Nonnull NodeId nodeId) {
+ final FlowCapableNode node = cache.get(nodeId.getValue());
+ return Optional.fromNullable(node);
+ }
+
+}
-/**\r
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-\r
-package org.opendaylight.openflowplugin.applications.frsync.impl;\r
-\r
-import com.google.common.annotations.VisibleForTesting;\r
-import com.google.common.util.concurrent.ListeningExecutorService;\r
-import com.google.common.util.concurrent.MoreExecutors;\r
-import java.util.concurrent.ExecutorService;\r
-import java.util.concurrent.Executors;\r
-import java.util.concurrent.ThreadFactory;\r
-\r
-/**\r
- * Static Factory for creating ExecutorServices (because there is no dependency injection but\r
- * static getInstance).\r
- */\r
-public final class FrmExecutors {\r
- public static PceExecutorsFactory instance() {\r
- return DEFAULT_EXECUTORS;\r
- }\r
-\r
- public interface PceExecutorsFactory {\r
-\r
- ListeningExecutorService newFixedThreadPool(int nThreads, ThreadFactory factory);\r
- }\r
-\r
- /**\r
- * This will be rewritten in JUnits using SynchronousExecutorService.\r
- */\r
- @VisibleForTesting // should not be private and final\r
- static PceExecutorsFactory DEFAULT_EXECUTORS = new PceExecutorsFactory() {\r
-\r
- public ListeningExecutorService newFixedThreadPool(int nThreads, ThreadFactory factory) {\r
- final ExecutorService executorService = Executors.newFixedThreadPool(nThreads, factory);\r
- return MoreExecutors.listeningDecorator(executorService);\r
- }\r
- };\r
-}\r
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Static Factory for creating ExecutorServices (because there is no dependency injection but
+ * static getInstance).
+ */
+public final class FrmExecutors {
+ public static PceExecutorsFactory instance() {
+ return DEFAULT_EXECUTORS;
+ }
+
+ public interface PceExecutorsFactory {
+
+ ListeningExecutorService newFixedThreadPool(int nThreads, ThreadFactory factory);
+ }
+
+ /**
+ * This will be rewritten in JUnits using SynchronousExecutorService.
+ */
+ @VisibleForTesting // should not be private and final
+ static PceExecutorsFactory DEFAULT_EXECUTORS = new PceExecutorsFactory() {
+
+ public ListeningExecutorService newFixedThreadPool(int nThreads, ThreadFactory factory) {
+ final ExecutorService executorService = Executors.newFixedThreadPool(nThreads, factory);
+ return MoreExecutors.listeningDecorator(executorService);
+ }
+ };
+}
-/**\r
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-\r
-package org.opendaylight.openflowplugin.applications.frsync.impl;\r
-\r
-import com.google.common.util.concurrent.ListenableFuture;\r
-import com.google.common.util.concurrent.ListeningExecutorService;\r
-import java.util.concurrent.Callable;\r
-import java.util.concurrent.TimeUnit;\r
-import java.util.concurrent.TimeoutException;\r
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;\r
-import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
-import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-/**\r
- * Decorator for running delegate syncup in Future.\r
- */\r
-public class SyncReactorFutureDecorator implements SyncReactor {\r
-\r
- private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureDecorator.class);\r
- public static final String FRM_RPC_CLIENT_PREFIX = "FRM-RPC-client-";\r
- private final SyncReactor delegate;\r
- private final ListeningExecutorService executorService;\r
-\r
- public SyncReactorFutureDecorator(SyncReactor delegate, ListeningExecutorService executorService) {\r
- this.delegate = delegate;\r
- this.executorService = executorService;\r
- }\r
-\r
- public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
- final FlowCapableNode configTree, final FlowCapableNode operationalTree,\r
- final LogicalDatastoreType dsType) throws InterruptedException {\r
- final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
- LOG.trace("syncup future {}", nodeId.getValue());\r
-\r
- final ListenableFuture<Boolean> syncup = executorService.submit(new Callable<Boolean>() {\r
- public Boolean call() throws Exception {\r
- final String oldThreadName = updateThreadName(nodeId);\r
-\r
- try {\r
- final Boolean ret = doSyncupInFuture(flowcapableNodePath, configTree, operationalTree, dsType)\r
- .get(10000, TimeUnit.MILLISECONDS);\r
- LOG.trace("ret {} {}", nodeId.getValue(), ret);\r
- return true;\r
- } catch (TimeoutException e) {\r
- LOG.error("doSyncupInFuture timeout occured {}", nodeId.getValue(), e);\r
- return false;\r
- } finally {\r
- updateThreadName(oldThreadName);\r
- }\r
- }\r
- });\r
-\r
- return syncup;\r
- }\r
-\r
- protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
- final FlowCapableNode configTree, final FlowCapableNode operationalTree,\r
- final LogicalDatastoreType dsType) throws InterruptedException {\r
- final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
- LOG.trace("doSyncupInFuture future {}", nodeId.getValue());\r
-\r
- return delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);\r
- }\r
-\r
- private String updateThreadName(NodeId nodeId) {\r
- final Thread currentThread = Thread.currentThread();\r
- final String oldName = currentThread.getName();\r
- try {\r
- if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {\r
- currentThread.setName(oldName + "@" + nodeId.getValue());\r
- } else {\r
- LOG.warn("try to update foreign thread name {} {}", nodeId, oldName);\r
- }\r
- } catch (Exception e) {\r
- LOG.error("failed updating threadName {}", nodeId, e);\r
- }\r
- return oldName;\r
- }\r
-\r
- private String updateThreadName(String name) {\r
- final Thread currentThread = Thread.currentThread();\r
- final String oldName = currentThread.getName();\r
- try {\r
- if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {\r
- currentThread.setName(name);\r
- } else {\r
- LOG.warn("try to update foreign thread name {} {}", oldName, name);\r
- }\r
- } catch (Exception e) {\r
- LOG.error("failed updating threadName {}", name, e);\r
- }\r
- return oldName;\r
- }\r
-}\r
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decorator for running delegate syncup in Future.
+ */
+public class SyncReactorFutureDecorator implements SyncReactor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SyncReactorFutureDecorator.class);
+ public static final String FRM_RPC_CLIENT_PREFIX = "FRM-RPC-client-";
+ private final SyncReactor delegate;
+ private final ListeningExecutorService executorService;
+
+ public SyncReactorFutureDecorator(SyncReactor delegate, ListeningExecutorService executorService) {
+ this.delegate = delegate;
+ this.executorService = executorService;
+ }
+
+ public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+ final LogicalDatastoreType dsType) throws InterruptedException {
+ final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+ LOG.trace("syncup future {}", nodeId.getValue());
+
+ final ListenableFuture<Boolean> syncup = executorService.submit(new Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ final String oldThreadName = updateThreadName(nodeId);
+
+ try {
+ final Boolean ret = doSyncupInFuture(flowcapableNodePath, configTree, operationalTree, dsType)
+ .get(10000, TimeUnit.MILLISECONDS);
+ LOG.trace("ret {} {}", nodeId.getValue(), ret);
+ return true;
+ } catch (TimeoutException e) {
+ LOG.error("doSyncupInFuture timeout occured {}", nodeId.getValue(), e);
+ return false;
+ } finally {
+ updateThreadName(oldThreadName);
+ }
+ }
+ });
+
+ return syncup;
+ }
+
+ protected ListenableFuture<Boolean> doSyncupInFuture(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+ final LogicalDatastoreType dsType) throws InterruptedException {
+ final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+ LOG.trace("doSyncupInFuture future {}", nodeId.getValue());
+
+ return delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);
+ }
+
+ private String updateThreadName(NodeId nodeId) {
+ final Thread currentThread = Thread.currentThread();
+ final String oldName = currentThread.getName();
+ try {
+ if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {
+ currentThread.setName(oldName + "@" + nodeId.getValue());
+ } else {
+ LOG.warn("try to update foreign thread name {} {}", nodeId, oldName);
+ }
+ } catch (Exception e) {
+ LOG.error("failed updating threadName {}", nodeId, e);
+ }
+ return oldName;
+ }
+
+ private String updateThreadName(String name) {
+ final Thread currentThread = Thread.currentThread();
+ final String oldName = currentThread.getName();
+ try {
+ if (oldName.startsWith(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX)) {
+ currentThread.setName(name);
+ } else {
+ LOG.warn("try to update foreign thread name {} {}", oldName, name);
+ }
+ } catch (Exception e) {
+ LOG.error("failed updating threadName {}", name, e);
+ }
+ return oldName;
+ }
+}
-/**\r
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-\r
-package org.opendaylight.openflowplugin.applications.frsync.impl;\r
-\r
-import com.google.common.base.Preconditions;\r
-import com.google.common.util.concurrent.FutureCallback;\r
-import com.google.common.util.concurrent.Futures;\r
-import com.google.common.util.concurrent.ListenableFuture;\r
-import java.util.concurrent.Semaphore;\r
-import java.util.concurrent.TimeUnit;\r
-import javax.annotation.Nullable;\r
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;\r
-import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;\r
-import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;\r
-import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;\r
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-/**\r
- * Decorator for NodeId level syncup locking.\r
- */\r
-public class SyncReactorGuardDecorator implements SyncReactor {\r
-\r
- private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);\r
-\r
- private final SyncReactor delegate;\r
- private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;\r
-\r
- public SyncReactorGuardDecorator(SyncReactor delegate,\r
- SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {\r
- this.delegate = delegate;\r
- this.semaphoreKeeper = semaphoreKeeper;\r
- }\r
-\r
- public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,\r
- final FlowCapableNode configTree, final FlowCapableNode operationalTree,\r
- final LogicalDatastoreType dsType) throws InterruptedException {\r
- final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
- LOG.trace("syncup guard {}", nodeId.getValue());\r
-\r
- final long stampBeforeGuard = System.nanoTime();\r
- final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);//TODO handle InteruptedException\r
-\r
- try {\r
- final long stampAfterGuard = System.nanoTime();\r
- if (LOG.isDebugEnabled()) {\r
- LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),\r
- formatNanos(stampAfterGuard - stampBeforeGuard),\r
- guard, threadName());\r
- }\r
-\r
- final ListenableFuture<Boolean> endResult =\r
- delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);//TODO handle InteruptedException\r
-\r
- Futures.addCallback(endResult, new FutureCallback<Boolean>() {\r
- @Override\r
- public void onSuccess(@Nullable final Boolean result) {\r
- if (LOG.isDebugEnabled()) {\r
- final long stampFinished = System.nanoTime();\r
- LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),\r
- formatNanos(stampFinished - stampBeforeGuard),\r
- formatNanos(stampFinished - stampAfterGuard),\r
- formatNanos(stampAfterGuard - stampBeforeGuard),\r
- guard.availablePermits(), threadName());\r
- }\r
-\r
- releaseGuardForNodeId(guard);\r
- }\r
-\r
- @Override\r
- public void onFailure(final Throwable t) {\r
- if (LOG.isDebugEnabled()) {\r
- final long stampFinished = System.nanoTime();\r
- LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),\r
- formatNanos(stampFinished - stampBeforeGuard),\r
- formatNanos(stampFinished - stampAfterGuard),\r
- formatNanos(stampAfterGuard - stampBeforeGuard),\r
- guard.availablePermits(), threadName());\r
- }\r
-\r
- releaseGuardForNodeId(guard);\r
- }\r
- });\r
- return endResult;\r
- } catch (InterruptedException e) {\r
- releaseGuardForNodeId(guard);\r
- throw e;\r
- }\r
- }\r
-\r
- private String formatNanos(long nanos) {\r
- return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";\r
- }\r
-\r
- /**\r
- * Get guard and lock for node.\r
- * @param flowcapableNodePath II of node for which guard should be acquired\r
- * @return semaphore guard\r
- */\r
- private Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath)\r
- throws InterruptedException {\r
- final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),\r
- "no guard for " + flowcapableNodePath);\r
-\r
- if (LOG.isDebugEnabled()) {\r
- final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);\r
- try {\r
- LOG.debug("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());\r
- } catch (Exception e) {\r
- LOG.error("error logging guard after summon before aquiring {}", nodeId);\r
- }\r
- }\r
-\r
- guard.acquire();\r
- return guard;\r
- }\r
-\r
- /**\r
- * Unlock and release guard.\r
- * @param guard semaphore guard which should be unlocked\r
- */\r
- private void releaseGuardForNodeId(final Semaphore guard) {\r
- if (guard == null) {\r
- return;\r
- }\r
- guard.release();\r
- }\r
-\r
- private static String threadName() {\r
- final Thread currentThread = Thread.currentThread();\r
- return currentThread.getName();\r
- }\r
-\r
-}\r
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowplugin.applications.frsync.SemaphoreKeeper;
+import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
+import org.opendaylight.openflowplugin.applications.frsync.util.PathUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decorator for NodeId level syncup locking.
+ */
+public class SyncReactorGuardDecorator implements SyncReactor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SyncReactorGuardDecorator.class);
+
+ private final SyncReactor delegate;
+ private final SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper;
+
+ public SyncReactorGuardDecorator(SyncReactor delegate,
+ SemaphoreKeeper<InstanceIdentifier<FlowCapableNode>> semaphoreKeeper) {
+ this.delegate = delegate;
+ this.semaphoreKeeper = semaphoreKeeper;
+ }
+
+ public ListenableFuture<Boolean> syncup(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath,
+ final FlowCapableNode configTree, final FlowCapableNode operationalTree,
+ final LogicalDatastoreType dsType) throws InterruptedException {
+ final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+ LOG.trace("syncup guard {}", nodeId.getValue());
+
+ final long stampBeforeGuard = System.nanoTime();
+ final Semaphore guard = summonGuardAndAcquire(flowcapableNodePath);//TODO handle InteruptedException
+
+ try {
+ final long stampAfterGuard = System.nanoTime();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("syncup start {} waiting:{} guard:{} thread:{}", nodeId.getValue(),
+ formatNanos(stampAfterGuard - stampBeforeGuard),
+ guard, threadName());
+ }
+
+ final ListenableFuture<Boolean> endResult =
+ delegate.syncup(flowcapableNodePath, configTree, operationalTree, dsType);//TODO handle InteruptedException
+
+ Futures.addCallback(endResult, new FutureCallback<Boolean>() {
+ @Override
+ public void onSuccess(@Nullable final Boolean result) {
+ if (LOG.isDebugEnabled()) {
+ final long stampFinished = System.nanoTime();
+ LOG.debug("syncup finished {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),
+ formatNanos(stampFinished - stampBeforeGuard),
+ formatNanos(stampFinished - stampAfterGuard),
+ formatNanos(stampAfterGuard - stampBeforeGuard),
+ guard.availablePermits(), threadName());
+ }
+
+ releaseGuardForNodeId(guard);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ if (LOG.isDebugEnabled()) {
+ final long stampFinished = System.nanoTime();
+ LOG.warn("syncup failed {} took:{} rpc:{} wait:{} guard:{} permits thread:{}", nodeId.getValue(),
+ formatNanos(stampFinished - stampBeforeGuard),
+ formatNanos(stampFinished - stampAfterGuard),
+ formatNanos(stampAfterGuard - stampBeforeGuard),
+ guard.availablePermits(), threadName());
+ }
+
+ releaseGuardForNodeId(guard);
+ }
+ });
+ return endResult;
+ } catch(InterruptedException e) {
+ releaseGuardForNodeId(guard);
+ throw e;
+ }
+ }
+
+ private String formatNanos(long nanos) {
+ return "'" + TimeUnit.NANOSECONDS.toMillis(nanos) + " ms'";
+ }
+
+ /**
+ * Get guard and lock for node.
+ * @param flowcapableNodePath II of node for which guard should be acquired
+ * @return semaphore guard
+ */
+ private Semaphore summonGuardAndAcquire(final InstanceIdentifier<FlowCapableNode> flowcapableNodePath)
+ throws InterruptedException {
+ final Semaphore guard = Preconditions.checkNotNull(semaphoreKeeper.summonGuard(flowcapableNodePath),
+ "no guard for " + flowcapableNodePath);
+
+ if (LOG.isDebugEnabled()) {
+ final NodeId nodeId = PathUtil.digNodeId(flowcapableNodePath);
+ try {
+ LOG.debug("syncup summon {} guard:{} thread:{}", nodeId.getValue(), guard, threadName());
+ } catch (Exception e) {
+ LOG.error("error logging guard after summon before aquiring {}", nodeId);
+ }
+ }
+
+ guard.acquire();
+ return guard;
+ }
+
+ /**
+ * Unlock and release guard.
+ * @param guard semaphore guard which should be unlocked
+ */
+ private void releaseGuardForNodeId(final Semaphore guard) {
+ if (guard == null) {
+ return;
+ }
+ guard.release();
+ }
+
+ private static String threadName() {
+ final Thread currentThread = Thread.currentThread();
+ return currentThread.getName();
+ }
+
+}
-/**\r
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-\r
-package org.opendaylight.openflowplugin.applications.frsync.markandsweep;\r
-\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;\r
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;\r
-\r
-/**\r
- * Identifier of {@link Flow} on device. Switch does not know about flow-id but,\r
- * it uses combination of these unique fields: table-id, priority, match.\r
- */\r
-public class SwitchFlowId {\r
-\r
- private final Short tableId;\r
-\r
- private final Integer priority;\r
-\r
- private final Match match;\r
-\r
- public SwitchFlowId(Flow flow) {\r
- this.tableId = flow.getTableId();\r
- this.priority = flow.getPriority();\r
- this.match = flow.getMatch();\r
- }\r
-\r
- @Override\r
- public int hashCode() {\r
- final int prime = 31;\r
- int result = 1;\r
- result = prime * result + ((match == null) ? 0 : match.hashCode());\r
- result = prime * result + ((priority == null) ? 0 : priority.hashCode());\r
- result = prime * result + ((tableId == null) ? 0 : tableId.hashCode());\r
- return result;\r
- }\r
-\r
- @Override\r
- public boolean equals(Object obj) {\r
- if (this == obj)\r
- return true;\r
- if (obj == null)\r
- return false;\r
- if (getClass() != obj.getClass())\r
- return false;\r
- SwitchFlowId other = (SwitchFlowId) obj;\r
- if (match == null) {\r
- if (other.match != null)\r
- return false;\r
- } else if (!match.equals(other.match))\r
- return false;\r
- if (priority == null) {\r
- if (other.priority != null)\r
- return false;\r
- } else if (!priority.equals(other.priority))\r
- return false;\r
- if (tableId == null) {\r
- if (other.tableId != null)\r
- return false;\r
- } else if (!tableId.equals(other.tableId))\r
- return false;\r
- return true;\r
- }\r
-}\r
+/**
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.applications.frsync.markandsweep;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.flow.Match;
+
+/**
+ * Identifier of {@link Flow} on device. Switch does not know about flow-id but,
+ * it uses combination of these unique fields: table-id, priority, match.
+ */
+public class SwitchFlowId {
+
+ private final Short tableId;
+
+ private final Integer priority;
+
+ private final Match match;
+
+ public SwitchFlowId(Flow flow) {
+ this.tableId = flow.getTableId();
+ this.priority = flow.getPriority();
+ this.match = flow.getMatch();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((match == null) ? 0 : match.hashCode());
+ result = prime * result + ((priority == null) ? 0 : priority.hashCode());
+ result = prime * result + ((tableId == null) ? 0 : tableId.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ SwitchFlowId other = (SwitchFlowId) obj;
+ if (match == null) {
+ if (other.match != null)
+ return false;
+ } else if (!match.equals(other.match))
+ return false;
+ if (priority == null) {
+ if (other.priority != null)
+ return false;
+ } else if (!priority.equals(other.priority))
+ return false;
+ if (tableId == null) {
+ if (other.tableId != null)
+ return false;
+ } else if (!tableId.equals(other.tableId))
+ return false;
+ return true;
+ }
+}