class StatementEntity(EntityProxy):
"""An entity object that can link to a set of datasets that it is sourced from."""
__slots__ = (
"schema",
"id",
"_caption",
"extra_referents",
"dataset",
"last_change",
"_statements",
)
def __init__(self, dataset: Dataset, data: Dict[str, Any], cleaned: bool = True):
data = dict(data or {})
schema = Model.instance().get(data.pop("schema", None))
if schema is None:
raise InvalidData(gettext("No schema for entity."))
self.schema = schema
self._caption: Optional[str] = None
"""A pre-computed label for this entity."""
self.extra_referents: Set[str] = set(data.pop("referents", []))
"""The IDs of all entities which are included in this canonical entity."""
self.last_change: Optional[str] = data.get("last_change", None)
"""The last time this entity was changed."""
self.dataset = dataset
"""The default dataset for new statements."""
self.id: Optional[str] = data.pop("id", None)
self._statements: Dict[str, Set[Statement]] = {}
properties = data.pop("properties", None)
if isinstance(properties, Mapping):
for key, value in properties.items():
self.add(key, value, cleaned=cleaned, quiet=True)
for stmt_data in data.pop("statements", []):
stmt = Statement.from_dict(stmt_data)
if self.id is not None:
stmt.canonical_id = self.id
self.add_statement(stmt)
@property
def _properties(self) -> Dict[str, List[str]]: # type: ignore
return {p: [s.value for s in v] for p, v in self._statements.items()}
def _iter_stmt(self) -> Generator[Statement, None, None]:
for stmts in self._statements.values():
for stmt in stmts:
if stmt.entity_id is None and self.id is not None:
stmt.entity_id = self.id
stmt.id = stmt.generate_key()
if stmt.id is None:
stmt.id = stmt.generate_key()
yield stmt
@property
def statements(self) -> Generator[Statement, None, None]:
"""Return all statements for this entity, with extra ID statement."""
ids: List[str] = []
last_seen: Set[str] = set()
first_seen: Set[str] = set()
for stmt in self._iter_stmt():
yield stmt
if stmt.id is not None:
ids.append(stmt.id)
if stmt.last_seen is not None:
last_seen.add(stmt.last_seen)
if stmt.first_seen is not None:
first_seen.add(stmt.first_seen)
if self.id is not None:
digest = sha1(self.schema.name.encode("utf-8"))
for id in sorted(ids):
digest.update(id.encode("utf-8"))
checksum = digest.hexdigest()
# This is to make the last_change value stable across
# serialisation:
first = self.last_change or min(first_seen, default=None)
yield Statement(
canonical_id=self.id,
entity_id=self.id,
prop=BASE_ID,
schema=self.schema.name,
value=checksum,
dataset=self.dataset.name,
first_seen=first,
last_seen=max(last_seen, default=None),
)
@property
def first_seen(self) -> Optional[str]:
seen = (s.first_seen for s in self._iter_stmt() if s.first_seen is not None)
return min(seen, default=None)
@property
def last_seen(self) -> Optional[str]:
seen = (s.last_seen for s in self._iter_stmt() if s.last_seen is not None)
return max(seen, default=None)
@property
def datasets(self) -> Set[str]:
datasets: Set[str] = set()
for stmt in self._iter_stmt():
datasets.add(stmt.dataset)
return datasets
@property
def referents(self) -> Set[str]:
referents: Set[str] = set(self.extra_referents)
for stmt in self._iter_stmt():
if stmt.entity_id is not None and stmt.entity_id != self.id:
referents.add(stmt.entity_id)
return referents
@property
def key_prefix(self) -> Optional[str]:
return self.dataset.name
@key_prefix.setter
def key_prefix(self, dataset: Optional[str]) -> None:
raise NotImplementedError()
def add_statement(self, stmt: Statement) -> None:
schema = self.schema
if schema.name != stmt.schema and not schema.is_a(stmt.schema):
try:
self.schema = schema.model.common_schema(schema, stmt.schema)
except InvalidData as exc:
raise InvalidData(f"{self.id}: {exc}") from exc
if stmt.prop == BASE_ID:
if stmt.first_seen is not None:
# The last_change attribute describes the latest checksum change
# of any emitted component of the entity, which is stored in the BASE
# field.
if self.last_change is None:
self.last_change = stmt.first_seen
else:
self.last_change = max(self.last_change, stmt.first_seen)
else:
self._caption = None
if stmt.prop not in self._statements:
self._statements[stmt.prop] = set()
self._statements[stmt.prop].add(stmt)
def get(self, prop: P, quiet: bool = False) -> List[str]:
prop_name = self._prop_name(prop, quiet=quiet)
if prop_name is None or prop_name not in self._statements:
return []
return list({s.value for s in self._statements[prop_name]})
def get_statements(self, prop: P, quiet: bool = False) -> List[Statement]:
prop_name = self._prop_name(prop, quiet=quiet)
if prop_name is None or prop_name not in self._statements:
return []
return list(self._statements[prop_name])
def set(
self,
prop: P,
values: Values,
cleaned: bool = False,
quiet: bool = False,
fuzzy: bool = False,
format: Optional[str] = None,
lang: Optional[str] = None,
original_value: Optional[str] = None,
origin: Optional[str] = None,
) -> None:
prop_name = self._prop_name(prop, quiet=quiet)
if prop_name is None:
return
self._statements.pop(prop_name, None)
return self.add(
prop,
values,
cleaned=cleaned,
quiet=quiet,
fuzzy=fuzzy,
format=format,
lang=lang,
original_value=original_value,
origin=origin,
)
def add(
self,
prop: P,
values: Values,
cleaned: bool = False,
quiet: bool = False,
fuzzy: bool = False,
format: Optional[str] = None,
lang: Optional[str] = None,
original_value: Optional[str] = None,
origin: Optional[str] = None,
) -> None:
prop_name = self._prop_name(prop, quiet=quiet)
if prop_name is None:
return None
prop = self.schema.properties[prop_name]
for value in string_list(values, sanitize=not cleaned):
self.unsafe_add(
prop,
value,
cleaned=cleaned,
fuzzy=fuzzy,
format=format,
quiet=quiet,
lang=lang,
original_value=original_value,
origin=origin,
)
return None
def unsafe_add(
self,
prop: Property,
value: Optional[str],
cleaned: bool = False,
fuzzy: bool = False,
format: Optional[str] = None,
quiet: bool = False,
schema: Optional[str] = None,
dataset: Optional[str] = None,
seen: Optional[str] = None,
lang: Optional[str] = None,
original_value: Optional[str] = None,
origin: Optional[str] = None,
) -> Optional[str]:
"""Add a statement to the entity, possibly the value."""
if value is None or len(value) == 0:
return None
# Don't allow setting the reverse properties:
if prop.stub:
if quiet:
return None
msg = gettext("Stub property (%s): %s")
raise InvalidData(msg % (self.schema, prop))
if lang is not None:
lang = registry.language.clean_text(lang)
clean: Optional[str] = value
if not cleaned:
clean = prop.type.clean_text(value, proxy=self, fuzzy=fuzzy, format=format)
if clean is None:
return None
if original_value is None and clean != value:
original_value = value
if self.id is None:
raise InvalidData("Cannot add statement to entity without ID!")
stmt = Statement(
entity_id=self.id,
prop=prop.name,
schema=schema or self.schema.name,
value=clean,
dataset=dataset or self.dataset.name,
lang=lang,
original_value=original_value,
first_seen=seen,
origin=origin,
)
self.add_statement(stmt)
return clean
def pop(self, prop: P, quiet: bool = True) -> List[str]:
prop_name = self._prop_name(prop, quiet=quiet)
if prop_name is None or prop_name not in self._statements:
return []
if prop_name in self.schema.caption:
self._caption = None
return list({s.value for s in self._statements.pop(prop_name, [])})
def remove(self, prop: P, value: str, quiet: bool = True) -> None:
prop_name = self._prop_name(prop, quiet=quiet)
if prop_name is not None and prop_name in self._statements:
stmts = {s for s in self._statements[prop_name] if s.value != value}
self._statements[prop_name] = stmts
if prop_name in self.schema.caption:
self._caption = None
def itervalues(self) -> Generator[Tuple[Property, str], None, None]:
for name, statements in self._statements.items():
prop = self.schema.properties[name]
for value in set((s.value for s in statements)):
yield (prop, value)
def get_type_values(
self, type_: PropertyType, matchable: bool = False
) -> List[str]:
combined: Set[str] = set()
for stmt in self.get_type_statements(type_, matchable=matchable):
combined.add(stmt.value)
return list(combined)
def get_type_statements(
self, type_: PropertyType, matchable: bool = False
) -> List[Statement]:
combined = []
for prop_name, statements in self._statements.items():
prop = self.schema.properties[prop_name]
if matchable and not prop.matchable:
continue
if prop.type == type_:
for statement in statements:
combined.append(statement)
return combined
@property
def properties(self) -> Dict[str, List[str]]:
return {p: list({s.value for s in vs}) for p, vs in self._statements.items()}
@property
def caption(self) -> str:
"""The user-facing label to be used for this entity. This checks a list
of properties defined by the schema (caption) and returns the first
available value. If no caption is available, return the schema label.
This implementation prefers statements where the language property is that
of the preferred system language."""
if self._caption is None:
for prop_ in self.schema.caption:
stmts = self._statements.get(prop_)
if stmts is None:
continue
prop = self.schema.properties[prop_]
if prop.type == registry.name and len(stmts) > 1:
values = [LangStr(s.value, lang=s.lang) for s in stmts]
name = pick_lang_name(values)
if name is not None:
self._caption = name
break
else:
for stmt in sorted(stmts):
self._caption = stmt.value
break
if self._caption is None:
self._caption = self.schema.label
return self._caption
def iterprops(self) -> List[Property]:
return [self.schema.properties[p] for p in self._statements.keys()]
def clone(self: SE) -> SE:
data = {"schema": self.schema.name, "id": self.id}
cloned = type(self)(self.dataset, data)
for stmt in self._iter_stmt():
cloned.add_statement(stmt)
return cloned
def merge(self: SE, other: EntityProxy) -> SE:
try:
self.schema = self.schema.model.common_schema(self.schema, other.schema)
except InvalidData as e:
msg = "Cannot merge entities with id %s: %s"
raise InvalidData(msg % (self.id, e))
if not isinstance(other, StatementEntity):
for prop, value in other.itervalues():
self.unsafe_add(prop, value, cleaned=True, quiet=True)
return self
for stmt in other._iter_stmt():
if self.id is not None:
stmt.canonical_id = self.id
self.add_statement(stmt)
self.extra_referents.update(other.extra_referents)
return self
def to_context_dict(self) -> Dict[str, Any]:
"""Return a dictionary representation of the entity for context."""
data: Dict[str, Any] = {
"id": self.id,
"caption": self.caption,
"schema": self.schema.name,
}
referents: Set[Optional[str]] = set(self.extra_referents)
datasets = set(self.datasets)
origins: Set[str] = set()
first_seen = None
last_seen = None
for stmts in self._statements.values():
for stmt in stmts:
if stmt.first_seen is not None:
if first_seen is None or stmt.first_seen < first_seen:
first_seen = stmt.first_seen
if stmt.last_seen is not None:
if last_seen is None or stmt.last_seen > last_seen:
last_seen = stmt.last_seen
if stmt.entity_id is not None and stmt.entity_id != self.id:
referents.add(stmt.entity_id)
datasets.add(stmt.dataset)
if stmt.origin is not None:
origins.add(stmt.origin)
data["referents"] = list(referents)
data["datasets"] = list(datasets)
if origins:
data["origin"] = list(origins)
if first_seen is not None:
data["first_seen"] = first_seen
if last_seen is not None:
data["last_seen"] = last_seen
if self.last_change is not None:
data["last_change"] = self.last_change
return data
def to_dict(self) -> Dict[str, Any]:
data = self.to_context_dict()
data["properties"] = self.properties
return data
def to_statement_dict(self) -> Dict[str, Any]:
"""Return a dictionary representation of the entity's statements."""
data = self.to_context_dict()
data["statements"] = [stmt.to_dict() for stmt in self.statements]
return data
def __len__(self) -> int:
return len(list(self._iter_stmt())) + 1
@classmethod
def from_dict(
cls: Type[SE],
data: Dict[str, Any],
cleaned: bool = True,
default_dataset: Optional[Dataset] = None,
) -> SE:
# Exists only for backwards compatibility.
dataset = default_dataset or DefaultDataset
return cls(dataset, data, cleaned=cleaned)
@classmethod
def from_data(
cls: Type[SE],
dataset: Dataset,
data: Dict[str, Any],
cleaned: bool = True,
) -> SE:
return cls(dataset, data, cleaned=cleaned)
@classmethod
def from_statements(
cls: Type[SE],
dataset: Dataset,
statements: Iterable[Statement],
) -> SE:
obj: Optional[SE] = None
for stmt in statements:
if obj is None:
data = {"schema": stmt.schema, "id": stmt.canonical_id}
obj = cls(dataset, data)
obj.add_statement(stmt)
if obj is None:
raise ValueError("No statements given!")
return obj