Source code for sssom.cliques

"""Utilities for identifying and working with cliques/SCCs in mappings graphs."""

import hashlib
import statistics
import uuid
from collections import defaultdict
from typing import TYPE_CHECKING, DefaultDict, Dict, List, Optional, Set

import pandas as pd
from sssom_schema import Mapping, MappingSet

from sssom.constants import (
    DEFAULT_LICENSE,
    OWL_DIFFERENT_FROM,
    OWL_EQUIVALENT_CLASS,
    RDFS_SUBCLASS_OF,
    SKOS_BROAD_MATCH,
    SKOS_CLOSE_MATCH,
    SKOS_EXACT_MATCH,
    SKOS_NARROW_MATCH,
    SSSOM_SUPERCLASS_OF,
    SSSOM_URI_PREFIX,
)

from .parsers import to_mapping_set_document
from .sssom_document import MappingSetDocument
from .util import MappingSetDataFrame

if TYPE_CHECKING:
    import networkx


[docs]def to_digraph(msdf: MappingSetDataFrame) -> "networkx.DiGraph": """Convert to a graph where the nodes are entities' CURIEs and edges are their mappings.""" import networkx as nx doc = to_mapping_set_document(msdf) g = nx.DiGraph() if doc.mapping_set.mappings is not None: for mapping in doc.mapping_set.mappings: if not isinstance(mapping, Mapping): raise TypeError s = mapping.subject_id o = mapping.object_id p = mapping.predicate_id # TODO: this is copypastad from export_ptable pi = None if p == OWL_EQUIVALENT_CLASS: pi = 2 elif p == SKOS_EXACT_MATCH: pi = 2 elif p == SKOS_CLOSE_MATCH: # TODO: consider distributing pi = 2 elif p == RDFS_SUBCLASS_OF: pi = 0 elif p == SKOS_BROAD_MATCH: pi = 0 elif p == SSSOM_SUPERCLASS_OF: pi = 1 elif p == SKOS_NARROW_MATCH: pi = 1 elif p == OWL_DIFFERENT_FROM: pi = 3 if pi == 0: g.add_edge(o, s) elif pi == 1: g.add_edge(s, o) elif pi == 2: g.add_edge(s, o) g.add_edge(o, s) return g
[docs]def split_into_cliques(msdf: MappingSetDataFrame) -> List[MappingSetDocument]: """Split a MappingSetDataFrames documents corresponding to a strongly connected components of the associated graph. :param msdf: MappingSetDataFrame object :raises TypeError: If Mappings is not of type List :raises TypeError: If each mapping is not of type Mapping :raises TypeError: If Mappings is not of type List :return: List of MappingSetDocument objects """ import networkx as nx doc = to_mapping_set_document(msdf) graph = to_digraph(msdf) components_it = nx.algorithms.components.strongly_connected_components(graph) components = sorted(components_it, key=len, reverse=True) curie_to_component = {} for i, component in enumerate(components): for curie in component: curie_to_component[curie] = i documents = [ MappingSetDocument( converter=doc.converter, mapping_set=MappingSet( mapping_set_id=f"{SSSOM_URI_PREFIX}mappings/{uuid.uuid4()}", license=doc.mapping_set.license or DEFAULT_LICENSE, ), ) for _ in components ] if not isinstance(doc.mapping_set.mappings, list): raise TypeError for mapping in doc.mapping_set.mappings: if not isinstance(mapping, Mapping): raise TypeError subject_document = documents[curie_to_component[mapping.subject_id]] if not isinstance(subject_document.mapping_set.mappings, list): raise TypeError subject_document.mapping_set.mappings.append(mapping) return documents
[docs]def group_values(d: Dict[str, str]) -> Dict[str, List[str]]: """Group all keys in the dictionary that share the same value.""" rv: DefaultDict[str, List[str]] = defaultdict(list) for k, v in d.items(): rv[v].append(k) return dict(rv)
[docs]def get_src(src: Optional[str], curie: str): """Get prefix of subject/object in the MappingSetDataFrame. :param src: Source :param curie: CURIE :return: Source """ if src is None: return curie.split(":")[0] else: return src
[docs]def summarize_cliques(doc: MappingSetDataFrame): """Summarize stats on a clique doc.""" cliquedocs = split_into_cliques(doc) items = [] for cdoc in cliquedocs: mappings = cdoc.mapping_set.mappings if mappings is None: continue members: Set[str] = set() members_names: Set[str] = set() confs: List[float] = [] id2src: Dict[str, str] = {} for mapping in mappings: if not isinstance(mapping, Mapping): raise TypeError sub = str(mapping.subject_id) obj = str(mapping.object_id) id2src[sub] = get_src(mapping.subject_source, sub) id2src[obj] = get_src(mapping.object_source, obj) members.add(sub) members.add(obj) members_names.add(str(mapping.subject_label)) members_names.add(str(mapping.object_label)) if mapping.confidence is not None: confs.append(mapping.confidence) src2ids = group_values(id2src) mstr = "|".join(members) md5 = hashlib.md5(mstr.encode("utf-8")).hexdigest() # noqa:S303 item = { "id": md5, "num_mappings": len(mappings), "num_members": len(members), "members": mstr, "members_labels": "|".join(members_names), "max_confidence": max(confs), "min_confidence": min(confs), "avg_confidence": statistics.mean(confs), "sources": "|".join(src2ids.keys()), "num_sources": len(src2ids.keys()), } for s, ids in src2ids.items(): item[s] = "|".join(ids) conflated = False total_conflated = 0 all_conflated = True src_counts = [] for s, ids in src2ids.items(): n = len(ids) item[f"{s}_count"] = n item[f"{s}_conflated"] = n > 1 if n > 1: conflated = True total_conflated += 1 else: all_conflated = False src_counts.append(n) item["is_conflated"] = conflated item["is_all_conflated"] = all_conflated item["total_conflated"] = total_conflated item["proportion_conflated"] = total_conflated / len(src2ids.items()) item["conflation_score"] = (min(src_counts) - 1) * len(src2ids.items()) + ( statistics.harmonic_mean(src_counts) - 1 ) item["members_count"] = sum(src_counts) item["min_count_by_source"] = min(src_counts) item["max_count_by_source"] = max(src_counts) item["avg_count_by_source"] = statistics.mean(src_counts) item["harmonic_mean_count_by_source"] = statistics.harmonic_mean(src_counts) # item['geometric_mean_conflated'] = statistics.geometric_mean(conflateds) py3.8 items.append(item) df = pd.DataFrame(items) return df