GaetanoParente's picture
rimossi import inutili e blindato utilizzo utente
9cbbfac
Raw
History Blame Contribute Delete
5.16 kB
import os
import re
from pathlib import Path
from rdflib import Graph, Literal, RDF, Namespace
from rdflib.namespace import SKOS
from pyshacl import validate
class SemanticValidator:
def __init__(self, ontology_dir="./ontology", shapes_file="./ontology/shapes/auto_constraints.ttl"):
self.shapes_file = shapes_file
# Mappatura namespace
self.namespaces = {
"arco": Namespace("https://w3id.org/arco/ontology/arco/"),
"core": Namespace("https://w3id.org/arco/ontology/core/"),
"a-loc": Namespace("https://w3id.org/arco/ontology/location/"),
"a-cd": Namespace("https://w3id.org/arco/ontology/context-description/"),
"cis": Namespace("http://dati.beniculturali.it/cis/"),
"crm": Namespace("http://www.cidoc-crm.org/cidoc-crm/"),
"ex": Namespace("http://activadigital.it/ontology/")
}
print("🛡️ Inizializzazione Semantic Validator (OWL RL)...")
# Caricamento massivo dell'Ontologia in memoria per il Reasoner
self.ont_graph = Graph()
arco_path = Path(ontology_dir) / "arco"
if arco_path.exists():
for owl_file in arco_path.glob("*.owl"):
self.ont_graph.parse(str(owl_file), format="xml")
cidoc_path = Path(ontology_dir) / "cidoc-crm" / "cidoc-crm.owl"
if cidoc_path.exists():
self.ont_graph.parse(str(cidoc_path), format="xml")
print(f"✅ Ontologia completa caricata nel reasoner ({len(self.ont_graph)} triple).")
if os.path.exists(self.shapes_file):
self.shacl_graph = Graph()
self.shacl_graph.parse(self.shapes_file, format="turtle")
print("🛡️ SHACL Auto-Constraints caricati.")
else:
print("⚠️ File SHACL non trovato. Validazione disabilitata (pericoloso in prod!).")
self.shacl_graph = None
def _get_uri(self, text_val):
if ":" in text_val and not text_val.startswith("http"):
prefix, name = text_val.split(":", 1)
if prefix in self.namespaces:
return self.namespaces[prefix][name]
clean_name = text_val.replace(" ", "_")
clean_name = re.sub(r'[^a-zA-Z0-9_]', '', clean_name)
if not clean_name:
clean_name = "UnknownEntity"
return self.namespaces["ex"][clean_name]
def _json_to_rdf(self, entities, triples):
g = Graph()
for prefix, ns in self.namespaces.items():
g.bind(prefix, ns)
g.bind("skos", SKOS)
if entities:
for ent in entities:
label = ent["label"] if isinstance(ent, dict) else str(ent)
ent_uri = self._get_uri(label)
g.add((ent_uri, SKOS.prefLabel, Literal(label, lang="it")))
if triples:
for t in triples:
subj_uri = self._get_uri(t.subject)
g.add((subj_uri, SKOS.prefLabel, Literal(t.subject, lang="it")))
if t.predicate.lower() in ["rdf:type", "a", "type", "rdf_type"]:
obj_uri = self._get_uri(t.object)
g.add((subj_uri, RDF.type, obj_uri))
else:
pred_uri = self._get_uri(t.predicate)
obj_uri = self._get_uri(t.object)
g.add((subj_uri, pred_uri, obj_uri))
g.add((obj_uri, SKOS.prefLabel, Literal(t.object, lang="it")))
return g
def filter_valid_triples(self, entities, triples):
"""
Esegue la validazione bloccante (OWL RL).
Ritorna le triple valide da salvare su Neo4j e quelle invalide da buttare su Mongo.
"""
if not self.shacl_graph or not triples:
return triples, [], "No Validation"
# 1. Testiamo l'intero batch in un colpo solo per massima velocità
batch_graph = self._json_to_rdf(entities, triples)
conforms, report_graph, report_text = validate(
batch_graph,
shacl_graph=self.shacl_graph,
ont_graph=self.ont_graph,
inference='owlrl'
)
if conforms:
return triples, [], "All valid"
print("⚠️ Rilevate violazioni SHACL nel blocco. Isolamento colpevoli...")
# 2. Se fallisce, isoliamo chirurgicamente le triple non conformi
valid_triples = []
invalid_triples = []
for t in triples:
single_graph = self._json_to_rdf(entities, [t])
t_conforms, _, t_report = validate(
single_graph,
shacl_graph=self.shacl_graph,
ont_graph=self.ont_graph,
inference='owlrl'
)
if t_conforms:
valid_triples.append(t)
else:
invalid_triples.append({
"triple": t.model_dump() if hasattr(t, 'model_dump') else t,
"violation_report": t_report
})
return valid_triples, invalid_triples, report_text