private static final String ALIEN_SYSTEM_FLOW_ID = "#UF$TABLE*";
+ private static final Integer REMOVE_AFTER_MISSING_COLLECTION = 1;
+
private final AtomicInteger unaccountedFlowsCounter = new AtomicInteger(0);
public StatListenCommitFlow (final StatisticsManager manager, final DataBroker db,
Optional<FlowCapableNode> fNode = Optional.absent();
try {
fNode = tx.read(LogicalDatastoreType.OPERATIONAL, fNodeIdent).checkedGet();
- }
- catch (final ReadFailedException e) {
+ } catch (final ReadFailedException e) {
LOG.debug("Read Operational/DS for FlowCapableNode fail! {}", fNodeIdent, e);
return;
}
}
statsFlowCommitAll(flowStats, nodeIdent, tx);
+ /* cleaning all not cached hash collisions */
+ final Map<InstanceIdentifier<Flow>, Integer> listAliens = mapNodesForDelete.get(nodeIdent);
+ if (listAliens != null) {
+ for (final Entry<InstanceIdentifier<Flow>, Integer> nodeForDelete : listAliens.entrySet()) {
+ final Integer lifeIndex = nodeForDelete.getValue();
+ if (nodeForDelete.getValue() > 0) {
+ nodeForDelete.setValue(Integer.valueOf(lifeIndex.intValue() - 1));
+ } else {
+ final InstanceIdentifier<Flow> flowNodeIdent = nodeForDelete.getKey();
+ mapNodesForDelete.get(nodeIdent).remove(flowNodeIdent);
+ tx.delete(LogicalDatastoreType.OPERATIONAL, flowNodeIdent);
+ }
+ }
+ }
/* Notification for continue collecting statistics */
notifyToCollectNextStatistics(nodeIdent);
}
/**
* build pseudoUnique hashCode for flow in table
* for future easy identification
+ *
+ * FIXME: we expect same version for YANG models for all clusters and that has to be fix
+ * FIXME: CREATE BETTER KEY - for flow (MATCH is the problem)
*/
- static String buildHashCode(final FlowAndStatisticsMapList deviceFlow) {
- final FlowBuilder builder = new FlowBuilder();
- builder.setMatch(deviceFlow.getMatch());
- builder.setCookie(deviceFlow.getCookie());
- builder.setPriority(deviceFlow.getPriority());
- final Flow flowForHashCode = builder.build();
- return String.valueOf(flowForHashCode.hashCode());
+ static String buildFlowIdOperKey(final FlowAndStatisticsMapList deviceFlow) {
+ return new StringBuffer().append(deviceFlow.getMatch())
+ .append(deviceFlow.getPriority()).append(deviceFlow.getCookie().getValue()).toString();
}
private class NodeUpdateState {
}
private class TableFlowUpdateState {
+
private boolean tableEnsured = false;
final KeyedInstanceIdentifier<Table, TableKey> tableRef;
final TableKey tableKey;
for (final FlowHashIdMap flowHashId : flowHashMap) {
try {
flowIdByHash.put(flowHashId.getKey(), flowHashId.getFlowId());
- } catch (Exception e) {
+ } catch (final Exception e) {
LOG.warn("flow hashing hit a duplicate for {} -> {}", flowHashId.getKey(), flowHashId.getFlowId());
}
}
}
private void initConfigFlows(final ReadWriteTransaction trans) {
- Optional<Table> table = readLatestConfiguration(tableRef);
- try {
- table = trans.read(LogicalDatastoreType.CONFIGURATION, tableRef).checkedGet();
- } catch (final ReadFailedException e) {
- table = Optional.absent();
- }
+ final Optional<Table> table = readLatestConfiguration(tableRef);
List<Flow> localList = null;
if(table.isPresent()) {
localList = table.get().getFlow();
void reportFlow(final FlowAndStatisticsMapList flowStat, final ReadWriteTransaction trans) {
ensureTable(trans);
- final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildHashCode(flowStat));
+ final FlowHashIdMapKey hashingKey = new FlowHashIdMapKey(buildFlowIdOperKey(flowStat));
FlowKey flowKey = getFlowKeyAndRemoveHash(hashingKey);
if (flowKey == null) {
flowKey = searchInConfiguration(flowStat, trans);
trans.put(LogicalDatastoreType.OPERATIONAL, flowIdent, flowBuilder.build());
/* check life for Alien flows */
if (flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
- removeData(flowIdent, Integer.valueOf(5));
+ removeData(flowIdent, REMOVE_AFTER_MISSING_COLLECTION);
}
}
/* Build and deploy new FlowHashId map */
private void updateHashCache(final ReadWriteTransaction trans, final FlowKey flowKey, final FlowHashIdMapKey hashingKey) {
- // TODO Auto-generated method stub
final FlowHashIdMapBuilder flHashIdMap = new FlowHashIdMapBuilder();
flHashIdMap.setFlowId(flowKey.getId());
flHashIdMap.setKey(hashingKey);
}
void removeUnreportedFlows(final ReadWriteTransaction tx) {
+ final InstanceIdentifier<Node> nodeIdent = tableRef.firstIdentifierOf(Node.class);
+ final Map<InstanceIdentifier<Flow>, Integer> nodeDeleteMap = mapNodesForDelete.get(nodeIdent);
final Map<FlowHashIdMapKey, FlowId> listForRemove = getRemovalList();
- final Optional<Table> configTable = readLatestConfiguration(tableRef);
- List<Flow> configFlows = Collections.emptyList();
- if (configTable.isPresent() && configTable.get().getFlow() != null) {
- configFlows = new ArrayList<>(configTable.get().getFlow());
- }
for (final Entry<FlowHashIdMapKey, FlowId> entryForRemove : listForRemove.entrySet()) {
final FlowKey flowKey = new FlowKey(entryForRemove.getValue());
final InstanceIdentifier<Flow> flowRef = tableRef.child(Flow.class, flowKey);
- final InstanceIdentifier<FlowStatisticsData> flowStatIdent = flowRef.augmentation(FlowStatisticsData.class);
- if (flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
- final InstanceIdentifier<Node> nodeIdent = tableRef.firstIdentifierOf(Node.class);
- final Integer lifeIndex = mapNodesForDelete.get(nodeIdent).remove(flowRef);
+ if (nodeDeleteMap != null && flowKey.getId().getValue().startsWith(ALIEN_SYSTEM_FLOW_ID)) {
+ final Integer lifeIndex = nodeDeleteMap.get(flowRef);
if (lifeIndex > 0) {
- mapNodesForDelete.get(nodeIdent).put(flowRef, Integer.valueOf(lifeIndex.intValue() - 1));
- break;
- }
- } else {
- if (configFlows.remove(flowRef)) {
- /* Node is still presented in Config/DataStore - probably lost some multipart msg */
break;
+ } else {
+ nodeDeleteMap.remove(flowRef);
}
}
- final Optional<FlowStatisticsData> flowStatNodeCheck;
- try {
- flowStatNodeCheck = tx.read(LogicalDatastoreType.OPERATIONAL, flowStatIdent).checkedGet();
- }
- catch (final ReadFailedException e) {
- LOG.debug("Read FlowStatistics {} in Operational/DS fail! Statisticscan not beupdated.", flowStatIdent, e);
- break;
- }
- if (flowStatNodeCheck.isPresent()) {
- /* Node isn't new and it has not been removed yet */
- final InstanceIdentifier<FlowHashIdMap> flHashIdent = tableRef.augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, entryForRemove.getKey());
- tx.delete(LogicalDatastoreType.OPERATIONAL, flowRef);
- tx.delete(LogicalDatastoreType.OPERATIONAL, flHashIdent);
- }
+ final InstanceIdentifier<FlowHashIdMap> flHashIdent =
+ tableRef.augmentation(FlowHashIdMapping.class).child(FlowHashIdMap.class, entryForRemove.getKey());
+ tx.delete(LogicalDatastoreType.OPERATIONAL, flowRef);
+ tx.delete(LogicalDatastoreType.OPERATIONAL, flHashIdent);
}
}
}
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.statistics.manager.StatListeningCommiter;
import org.opendaylight.controller.md.statistics.manager.StatNodeRegistration;
import org.opendaylight.controller.md.statistics.manager.StatNotifyCommiter;
private final static Logger LOG = LoggerFactory.getLogger(StatisticsManagerImpl.class);
- private static final int QUEUE_DEPTH = 500;
+ private static final int QUEUE_DEPTH = 1000;
private static final int MAX_BATCH = 1;
private final BlockingQueue<StatDataStoreOperation> dataStoreOperQueue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
LOG.trace("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
- try {
tx.submit().checkedGet();
- } catch (final TransactionCommitFailedException e) {
- LOG.warn("Stat DataStoreOperation unexpected State!", e);
- txChain.close();
- txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
- cleanDataStoreOperQueue();
- }
- }
- catch (final IllegalStateException e) {
- LOG.warn("Stat DataStoreOperation unexpected State!", e);
- }
- catch (final InterruptedException e) {
+ } catch (final InterruptedException e) {
LOG.warn("Stat Manager DS Operation thread interupted!", e);
finishing = true;
- }
- catch (final Exception e) {
- LOG.warn("Stat DataStore Operation executor fail!", e);
+ } catch (final Exception e) {
+ LOG.warn("Unhandled exception during processing statistics. Restarting transaction chain.", e);
+ txChain.close();
+ txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
+ cleanDataStoreOperQueue();
}
}
// Drain all events, making sure any blocked threads are unblocked
cleanDataStoreOperQueue();
}
- private void cleanDataStoreOperQueue() {
+ private synchronized void cleanDataStoreOperQueue() {
// Drain all events, making sure any blocked threads are unblocked
while (! dataStoreOperQueue.isEmpty()) {
dataStoreOperQueue.poll();
public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
final Throwable cause) {
LOG.warn("Failed to export Flow Capable Statistics, Transaction {} failed.",transaction.getIdentifier(),cause);
- txChain.close();
- txChain = dataBroker.createTransactionChain(StatisticsManagerImpl.this);
- cleanDataStoreOperQueue();
}
@Override
@Override
public void disconnectedNodeUnregistration(final InstanceIdentifier<Node> nodeIdent) {
+ flowListeningCommiter.cleanForDisconnect(nodeIdent);
for (final StatPermCollector collector : statCollectors) {
if (collector.disconnectedNodeUnregistration(nodeIdent)) {
if ( ! collector.hasActiveNodes()) {