Source code for sssom.sparql_util

"""Utilities for querying mappings with SPARQL."""

import logging
from dataclasses import dataclass
from textwrap import dedent
from typing import Dict, List, Optional

import pandas as pd
from curies import Converter
from rdflib import URIRef
from rdflib.namespace import RDFS, SKOS
from SPARQLWrapper import JSON, SPARQLWrapper

from .util import MappingSetDataFrame, safe_compress

__all__ = [
    "EndpointConfig",
    "query_mappings",
]


[docs] @dataclass class EndpointConfig: """A container for a SPARQL endpoint's configuration.""" url: str graph: URIRef converter: Converter predmap: Dict[str, str] predicates: Optional[List[str]] limit: Optional[int] include_object_labels: bool = False
[docs] def query_mappings(config: EndpointConfig) -> MappingSetDataFrame: """Query a SPARQL endpoint to obtain a set of mappings.""" if config.graph is None: g = "?g" elif isinstance(config.graph, str): g = URIRef(config.graph).n3() else: g = config.graph.n3() if config.predicates is None: predicates = [SKOS.exactMatch, SKOS.closeMatch] else: predicates = [ URIRef(config.converter.expand_strict(predicate)) for predicate in config.predicates ] predstr = " ".join(URIRef(predicate).n3() for predicate in predicates) if config.limit is not None: limitstr = f"LIMIT {config.limit}" else: limitstr = "" cols = [ "subject_id", "subject_label", "predicate_id", "object_id", "mapping_provider", ] if config.include_object_labels: cols.insert(-1, "object_label") colstr = " ".join(f"?{c}" for c in cols) olq = "OPTIONAL { ?object_id rdfs:label ?object_label }" if config.include_object_labels else "" sparql = dedent( f"""\ PREFIX rdfs: {RDFS.uri.n3()} SELECT {colstr} WHERE {{ GRAPH {g} {{ VALUES ?predicate_id {{ {predstr} }} . ?subject_id ?predicate_id ?object_id . ?subject_id rdfs:label ?subject_label }} . {olq} BIND({g} as ?mapping_provider) }} {limitstr} """ ) logging.info(sparql) sparql_wrapper = SPARQLWrapper(config.url, returnFormat=JSON) sparql_wrapper.setQuery(sparql) results = sparql_wrapper.query().convert() df = pd.DataFrame( [ {key: safe_compress(v["value"], config.converter) for key, v in result.items()} for result in results["results"]["bindings"] ] ) return MappingSetDataFrame.with_converter(df=df, converter=config.converter)