Skip to content

Connections

Connections provides the search-and-connect algorithms that underlie Network expansion methods. It is initialised with an interaction database DataFrame and pre-processes lookup tables for fast neighbour queries.

You rarely need to instantiate Connections directly — it is used internally by Network. The documentation here is aimed at developers who want to extend NeKo with custom connection strategies.

Import

from neko._methods.enrichment_methods import Connections

Quick example

import pandas as pd
from neko._methods.enrichment_methods import Connections

db = pd.read_csv("my_interactions.csv")   # source, target, effect, ...
conn = Connections(db)

# Check if a direct path exists between two proteins
paths = conn.find_paths("EGFR", "AKT1", maxlen=3)

Class reference

Connections

Connections(database: DataFrame)

Class that stores many utility functions to enrich an object Network. Each utility functions should take as input the nodes dataframe, which is used as base for each algorithm, and a database from the inputs modules, which will be used to extend the initial network.

Source code in neko/_methods/enrichment_methods.py
def __init__(self, database: pd.DataFrame):
    self.resources = database.copy()
    self.target_neighbours_map = self._preprocess_target_neighbours()
    self.source_neighbours_map = self._preprocess_source_neighbours()
    # Precompute edge sign cache for both consensus True/False
    self.signed_edges = {}
    self.signed_edges_consensus = {}
    for _, row in self.resources.iterrows():
        key = (row['source'], row['target'])
        self.signed_edges[key] = check_sign(row, consensus=False) != "undefined"
        self.signed_edges_consensus[key] = check_sign(row, consensus=True) != "undefined"

Functions

find_target_neighbours
find_target_neighbours(node: str) -> List[str]

Optimized helper function that finds the neighbors of the target node.

Source code in neko/_methods/enrichment_methods.py
def find_target_neighbours(self, node: str) -> List[str]:
    """
    Optimized helper function that finds the neighbors of the target node.
    """
    return list(self.target_neighbours_map.get(node, []))
find_source_neighbours
find_source_neighbours(node: str) -> List[str]

Optimized helper function that finds the neighbors of the target node.

Source code in neko/_methods/enrichment_methods.py
def find_source_neighbours(self, node: str) -> List[str]:
    """
    Optimized helper function that finds the neighbors of the target node.
    """
    return list(self.source_neighbours_map.get(node, []))
find_all_neighbours
find_all_neighbours(node: str) -> List[str]

Optimized helper function that finds all neighbors (both source and target) of the target node.

Source code in neko/_methods/enrichment_methods.py
def find_all_neighbours(self, node: str) -> List[str]:
    """
    Optimized helper function that finds all neighbors (both source and target) of the target node.
    """
    target_neighs = self.find_target_neighbours(node)
    source_neighs = self.find_source_neighbours(node)
    return list(set(target_neighs + source_neighs))
is_signed_edge
is_signed_edge(source, target, consensus=False)

Returns True if the edge from source to target is signed (not undefined), False otherwise. Uses precomputed cache for speed.

Source code in neko/_methods/enrichment_methods.py
def is_signed_edge(self, source, target, consensus=False):
    """
    Returns True if the edge from source to target is signed (not undefined), False otherwise.
    Uses precomputed cache for speed.
    """
    key = (source, target)
    if consensus:
        result = self.signed_edges_consensus.get(key, False)
    else:
        result = self.signed_edges.get(key, False)
    return result
bfs
bfs(start: str, end: str, maxlen: Optional[int], only_signed: bool = False, consensus: bool = False, force: bool = False) -> List[List[str]]

Returns the shortest path between two nodes (as a list of nodes) using BFS, but stops searching if the path length exceeds maxlen edges (if provided). If only_signed is True, only considers signed edges (not undefined). If force is False and maxlen is None, uses a default upper bound of 10.

Source code in neko/_methods/enrichment_methods.py
def bfs(self, start: str, end: str, maxlen: Optional[int], only_signed: bool = False, consensus: bool = False, force: bool = False) -> List[List[str]]:
    """
    Returns the shortest path between two nodes (as a list of nodes) using BFS,
    but stops searching if the path length exceeds `maxlen` edges (if provided).
    If only_signed is True, only considers signed edges (not undefined).
    If force is False and maxlen is None, uses a default upper bound of 10.
    """
    if start == end:
        return [[start]]  # trivial path

    visited = set()
    # Set a default upper bound if maxlen is None and not force
    effective_maxlen = maxlen
    if maxlen is None and not force:
        effective_maxlen = 10
    queue = deque([(start, [start], 0)])  # (node, path_so_far, depth)

    while queue:
        node, path, depth = queue.popleft()
        if node == end:
            return [path]
        if effective_maxlen is not None and depth >= effective_maxlen:
            continue
        if node not in visited:
            visited.add(node)
            for neighbor in self.find_target_neighbours(node):
                if neighbor not in visited:
                    if not only_signed or self.is_signed_edge(node, neighbor, consensus):
                        queue.append((neighbor, path + [neighbor], depth + 1))
    return []
find_paths
find_paths(start: Union[str, DataFrame, List[str]], end: Union[str, DataFrame, List[str], None] = None, maxlen: int = 2, minlen: int = 1, loops: bool = False, only_signed: bool = False, consensus: bool = False) -> List[List[str]]

Find all paths or motifs in a network, with optional sign/consensus filtering. Uses an iterative DFS with an explicit stack for better performance and memory efficiency. Args: start: Node(s) to start from (str, list of str, or DataFrame with 'name_of_node'). end: Node(s) to end at (str, list of str, DataFrame, or None for motif search). maxlen: Maximum path length (number of edges). minlen: Minimum path length (number of edges). loops: Allow cycles/loops if True. only_signed: If True, only consider signed edges (not undefined). consensus: If True, use consensus sign filtering. Returns: List of paths (each path is a list of node names).

Source code in neko/_methods/enrichment_methods.py
def find_paths(self,
               start: Union[str, pd.DataFrame, List[str]],
               end: Union[str, pd.DataFrame, List[str], None] = None,
               maxlen: int = 2,
               minlen: int = 1,
               loops: bool = False,
               only_signed: bool = False,
               consensus: bool = False) -> List[List[str]]:
    """
    Find all paths or motifs in a network, with optional sign/consensus filtering.
    Uses an iterative DFS with an explicit stack for better performance and memory efficiency.
    Args:
        start: Node(s) to start from (str, list of str, or DataFrame with 'name_of_node').
        end: Node(s) to end at (str, list of str, DataFrame, or None for motif search).
        maxlen: Maximum path length (number of edges).
        minlen: Minimum path length (number of edges).
        loops: Allow cycles/loops if True.
        only_signed: If True, only consider signed edges (not undefined).
        consensus: If True, use consensus sign filtering.
    Returns:
        List of paths (each path is a list of node names).
    """

    def convert_to_string_list(start):
        if isinstance(start, str):
            return [start]
        elif isinstance(start, pd.DataFrame):
            return start['name_of_node'].tolist()
        elif isinstance(start, list) and all(isinstance(item, str) for item in start):
            return start
        else:
            raise ValueError("Invalid type for 'start' variable")

    def path_generator(start_nodes, end_nodes, maxlen, minlen, loops, only_signed, consensus):
        for s in start_nodes:
            for e in end_nodes:
                stack = [(s, [s])]
                while stack:
                    current, path = stack.pop()
                    # Prune if path too long
                    if len(path) > maxlen + 1:
                        continue
                    # Check for valid path
                    if len(path) >= minlen + 1 and (
                        (e is not None and current == e) or
                        (e is None and not loops and len(path) == maxlen + 1) or
                        (loops and path[0] == path[-1] and len(path) > 1)
                    ):
                        yield path
                    # Continue DFS
                    if len(path) <= maxlen:
                        next_steps = self.find_target_neighbours(current)
                        if only_signed:
                            next_steps = [n for n in next_steps if self.is_signed_edge(current, n, consensus)]
                        if not loops:
                            next_steps = list(set(next_steps) - set(path))
                        for neighbor in next_steps:
                            stack.append((neighbor, path + [neighbor]))

    start_nodes = convert_to_string_list(start)
    end_nodes = convert_to_string_list(end) if end else [None]
    minlen = max(1, minlen)
    # Collect all paths in a list for backward compatibility
    return list(path_generator(start_nodes, end_nodes, maxlen, minlen, loops, only_signed, consensus))
find_upstream_cascades
find_upstream_cascades(target_genes: List[str], max_depth: int = 1, selected_rank: int = 1) -> List[Tuple[str, str]]

Find cascades of interactions in the network. Parameters: - target_genes: List of target genes to start the cascade. - max_depth: Maximum depth of the cascade. - selected_rank: Number of top regulators to select for each iteration. Returns: - interactions: List of interactions in the cascade.

Source code in neko/_methods/enrichment_methods.py
def find_upstream_cascades(self,
                           target_genes: List[str],
                           max_depth: int = 1,
                           selected_rank: int = 1) -> List[Tuple[str, str]]:
    """
    Find cascades of interactions in the network.
    Parameters:
    - target_genes: List of target genes to start the cascade.
    - max_depth: Maximum depth of the cascade.
    - selected_rank: Number of top regulators to select for each iteration.
    Returns:
    - interactions: List of interactions in the cascade.
    """

    def collect_for_depth(current_targets, current_depth):
        if current_depth > max_depth:
            return []

        mcs_regulators = find_minimal_covering_regulators(self.resources, current_targets, selected_rank)

        interactions = [(reg, target) for reg in mcs_regulators for target in self.target_neighbours_map.get(reg, []) if target in current_targets]  # this is not working

        if current_depth < max_depth:
            next_targets = list(mcs_regulators)
            interactions += collect_for_depth(next_targets, current_depth + 1)

        return interactions

    return collect_for_depth(target_genes, 1) # it returns nothing