followthemoney.schema
1from typing import ( 2 TYPE_CHECKING, 3 Any, 4 Dict, 5 List, 6 Optional, 7 Set, 8 TypedDict, 9 Union, 10 cast, 11) 12from banal import ensure_list, ensure_dict, as_bool 13from functools import lru_cache 14 15from followthemoney.property import Property, PropertySpec, PropertyToDict, ReverseSpec 16from followthemoney.types import registry 17from followthemoney.exc import InvalidData, InvalidModel 18from followthemoney.rdf import URIRef, NS 19from followthemoney.util import gettext 20 21if TYPE_CHECKING: 22 from followthemoney.model import Model 23 24 25class EdgeSpec(TypedDict, total=False): 26 source: str 27 target: str 28 caption: List[str] 29 label: str 30 directed: bool 31 32 33class TemporalExtentSpec(TypedDict, total=False): 34 start: List[str] 35 end: List[str] 36 37 38class SchemaSpec(TypedDict, total=False): 39 label: str 40 plural: str 41 schemata: List[str] 42 extends: List[str] 43 properties: Dict[str, PropertySpec] 44 featured: List[str] 45 required: List[str] 46 caption: List[str] 47 edge: EdgeSpec 48 temporalExtent: TemporalExtentSpec 49 description: Optional[str] 50 rdf: Optional[str] 51 abstract: bool 52 hidden: bool 53 generated: bool 54 matchable: bool 55 deprecated: Optional[bool] 56 57 58class SchemaToDict(TypedDict, total=False): 59 label: str 60 plural: str 61 schemata: List[str] 62 extends: List[str] 63 properties: Dict[str, PropertyToDict] 64 featured: List[str] 65 required: List[str] 66 caption: List[str] 67 edge: EdgeSpec 68 temporalExtent: TemporalExtentSpec 69 description: Optional[str] 70 abstract: bool 71 hidden: bool 72 generated: bool 73 matchable: bool 74 deprecated: bool 75 76 77class Schema: 78 """A type definition for a class of entities that have certain properties. 79 80 Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple 81 parent schemata from which it inherits all of their properties. A schema can also 82 have descendant child schemata, which, in turn, add further properties. Schemata 83 are usually accessed via the model, which holds all available definitions. 84 """ 85 86 __slots__ = ( 87 "model", 88 "name", 89 "_label", 90 "_plural", 91 "_description", 92 "_hash", 93 "uri", 94 "abstract", 95 "hidden", 96 "generated", 97 "matchable", 98 "featured", 99 "required", 100 "deprecated", 101 "caption", 102 "edge", 103 "_edge_label", 104 "edge_directed", 105 "edge_source", 106 "edge_target", 107 "edge_caption", 108 "_temporal_start", 109 "_temporal_end", 110 "_extends", 111 "extends", 112 "schemata", 113 "names", 114 "descendants", 115 "properties", 116 "_matchable_schemata", 117 ) 118 119 def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None: 120 #: Machine-readable name of the schema, used for identification. 121 self.name = name 122 self.model = model 123 self._label = data.get("label", name) 124 self._plural = data.get("plural", self.label) 125 self._description = data.get("description") 126 self._hash = hash("<Schema(%r)>" % name) 127 128 #: RDF identifier for this schema when it is transformed to a triple term. 129 self.uri = URIRef(cast(str, data.get("rdf", NS[name]))) 130 131 #: Do not store or emit entities of this type, it is used only for 132 #: inheritance. 133 self.abstract = as_bool(data.get("abstract"), False) 134 135 #: This schema is deprecated and should not be used. 136 self.deprecated = as_bool(data.get("deprecated", False)) 137 138 #: Hide this schema in listings. 139 self.hidden = as_bool(data.get("hidden"), False) 140 self.hidden = self.hidden and not self.abstract 141 142 #: Entities with this type are generated by the system - for example, via 143 #: `ingest-file`. The user should not be offered an option to create them 144 #: in the interface. 145 self.generated = as_bool(data.get("generated"), False) 146 147 #: Try to perform fuzzy matching. Fuzzy similarity search does not 148 #: make sense for entities which have a lot of similar names, such 149 #: as land plots, assets etc. 150 self.matchable = as_bool(data.get("matchable"), True) 151 152 #: Mark a set of properties as important, i.e. they should be shown 153 #: first, or in an abridged view of the entity. In Aleph, these properties 154 #: are included in tabular entity listings. 155 self.featured = ensure_list(data.get("featured", [])) 156 157 #: Mark a set of properties as required. This is applied only when 158 #: an entity is created by the user - bulk created entities will 159 #: slip through even if it is technically invalid. 160 self.required = ensure_list(data.get("required", [])) 161 162 #: Mark a set of properties to be used for the entity's caption. 163 #: They will be checked in order and the first existent value will 164 #: be used. 165 self.caption = ensure_list(data.get("caption", [])) 166 167 # A transform of the entity into an edge for its representation in 168 # the context of a property graph representation like Neo4J/Gephi. 169 edge = data.get("edge", {}) 170 self.edge_source = edge.get("source") 171 self.edge_target = edge.get("target") 172 173 #: Flag to indicate if this schema should be represented by an edge (rather than 174 #: a node) when the data is converted into a property graph. 175 self.edge: bool = self.edge_source is not None and self.edge_target is not None 176 self.edge_caption = ensure_list(edge.get("caption", [])) 177 self._edge_label = edge.get("label", self._label) 178 179 #: Flag to indicate if the edge should be presented as directed to the user, 180 #: e.g. by showing an error at the target end of the edge. 181 self.edge_directed = as_bool(edge.get("directed", True)) 182 183 #: Specify which properties should be used to represent this schema in a 184 #: timeline. 185 temporal_extent = data.get("temporalExtent", {}) 186 self._temporal_start = ensure_list(temporal_extent.get("start", [])) 187 self._temporal_end = ensure_list(temporal_extent.get("end", [])) 188 189 #: Direct parent schemata of this schema. 190 self._extends = ensure_list(data.get("extends", [])) 191 self.extends: Set["Schema"] = set() 192 193 #: All parents of this schema (including indirect parents and the schema 194 #: itself). 195 self.schemata = set([self]) 196 197 #: All names of :attr:`~schemata`. 198 self.names = set([self.name]) 199 200 #: Inverse of :attr:`~schemata`, all derived child types of this schema 201 #: and their children. 202 self.descendants: Set["Schema"] = set() 203 self._matchable_schemata: Optional[Set["Schema"]] = None 204 205 #: The full list of properties defined for the entity, including those 206 #: inherited from parent schemata. 207 self.properties: Dict[str, Property] = {} 208 for name, prop in data.get("properties", {}).items(): 209 self.properties[name] = Property(self, name, prop) 210 211 def generate(self, model: "Model") -> None: 212 """While loading the schema, this function will validate and 213 load the hierarchy, properties, and flags of the definition.""" 214 temporal_start: Optional[List[str]] = None 215 temporal_end: Optional[List[str]] = None 216 for extends in self._extends: 217 parent = model.get(extends) 218 if parent is None: 219 raise InvalidData("Invalid extends: %r" % extends) 220 parent.generate(model) 221 222 for name, prop in parent.properties.items(): 223 if name not in self.properties: 224 self.properties[name] = prop 225 226 self.extends.add(parent) 227 for ancestor in parent.schemata: 228 self.schemata.add(ancestor) 229 self.names.add(ancestor.name) 230 ancestor.descendants.add(self) 231 232 if len(self._temporal_start) == 0 and parent.temporal_start: 233 if ( 234 temporal_start is not None 235 and temporal_start != parent.temporal_start 236 ): 237 raise InvalidModel( 238 "Conflicting temporal start properties: %s" % self.name 239 ) 240 temporal_start = parent.temporal_start 241 242 if len(self._temporal_end) == 0 and parent.temporal_end: 243 if temporal_end is not None and temporal_end != parent.temporal_end: 244 raise InvalidModel( 245 "Conflicting temporal start properties: %s" % self.name 246 ) 247 temporal_end = parent.temporal_end 248 249 for prop in list(self.properties.values()): 250 prop.generate(model) 251 252 for featured in self.featured: 253 if self.get(featured) is None: 254 raise InvalidModel("Missing featured property: %s" % featured) 255 256 for caption in self.caption: 257 prop_ = self.get(caption) 258 if prop_ is None: 259 raise InvalidModel("Missing caption property: %s" % caption) 260 if prop_.type == registry.entity: 261 raise InvalidModel("Caption cannot be entity: %s" % caption) 262 263 for required in self.required: 264 if self.get(required) is None: 265 raise InvalidModel("Missing required property: %s" % required) 266 267 if self.edge: 268 if self.source_prop is None: 269 msg = "Missing edge source: %s" % self.edge_source 270 raise InvalidModel(msg) 271 272 if self.target_prop is None: 273 msg = "Missing edge target: %s" % self.edge_target 274 raise InvalidModel(msg) 275 276 def _add_reverse( 277 self, model: "Model", data: ReverseSpec, other: Property 278 ) -> Property: 279 name = data.get("name") 280 if name is None: 281 raise InvalidModel("Unnamed reverse: %s" % other) 282 283 prop = self.get(name) 284 if prop is None: 285 spec: PropertySpec = { 286 "label": data.get("label"), 287 "type": registry.entity.name, 288 "reverse": {"name": other.name}, 289 "range": other.schema.name, 290 "hidden": data.get("hidden", other.hidden), 291 } 292 prop = Property(self, name, spec) 293 prop.stub = True 294 prop.generate(model) 295 self.properties[name] = prop 296 return prop 297 298 @property 299 def label(self) -> str: 300 """User-facing name of the schema.""" 301 return gettext(self._label) 302 303 @property 304 def plural(self) -> str: 305 """Name of the schema to be used in plural constructions.""" 306 return gettext(self._plural) 307 308 @property 309 def description(self) -> Optional[str]: 310 """A longer description of the semantics of the schema.""" 311 return gettext(self._description) 312 313 @property 314 def edge_label(self) -> Optional[str]: 315 """Description label for edges derived from entities of this schema.""" 316 return gettext(self._edge_label) 317 318 @property 319 def source_prop(self) -> Optional[Property]: 320 """The entity property to be used as an edge source.""" 321 return self.get(self.edge_source) 322 323 @property 324 def target_prop(self) -> Optional[Property]: 325 """The entity property to be used as an edge target.""" 326 return self.get(self.edge_target) 327 328 @property 329 def temporal_start(self) -> List[str]: 330 """The entity properties to be used as the start when representing the entity 331 in a timeline.""" 332 if not len(self._temporal_start): 333 for parent in self.extends: 334 if len(parent.temporal_start): 335 return parent.temporal_start 336 return self._temporal_start 337 338 @property 339 def temporal_end(self) -> List[str]: 340 """The entity properties to be used as the end when representing the entity 341 in a timeline.""" 342 if not len(self._temporal_end): 343 for parent in self.extends: 344 if len(parent.temporal_end): 345 return parent.temporal_end 346 return self._temporal_end 347 348 @property 349 def temporal_start_props(self) -> List[Property]: 350 """The entity properties to be used as the start when representing the entity 351 in a timeline.""" 352 props = [self.get(prop_name) for prop_name in self.temporal_start] 353 return [prop for prop in props if prop is not None] 354 355 @property 356 def temporal_end_props(self) -> List[Property]: 357 """The entity properties to be used as the end when representing the entity 358 in a timeline.""" 359 props = [self.get(prop_name) for prop_name in self.temporal_end] 360 return [prop for prop in props if prop is not None] 361 362 @property 363 def sorted_properties(self) -> List[Property]: 364 """All properties of the schema in the order in which they should be shown 365 to the user (alphabetically, with captions and featured properties first).""" 366 return sorted( 367 self.properties.values(), 368 key=lambda p: ( 369 p.name not in self.caption, 370 p.name not in self.featured, 371 p.label, 372 ), 373 ) 374 375 @property 376 def matchable_schemata(self) -> Set["Schema"]: 377 """Return the set of schemata to which it makes sense to compare with this 378 schema. For example, it makes sense to compare a legal entity with a company, 379 but it does not make sense to compare a car and a person.""" 380 if self._matchable_schemata is None: 381 self._matchable_schemata = set() 382 if self.matchable: 383 # This is used by the cross-referencer to determine what 384 # other schemata should be considered for matches. For 385 # example, a Company may be compared to a Legal Entity, 386 # but it makes no sense to compare it to an Aircraft. 387 candidates = set(self.schemata) 388 candidates.update(self.descendants) 389 for schema in candidates: 390 if schema.matchable: 391 self._matchable_schemata.add(schema) 392 return self._matchable_schemata 393 394 def can_match(self, other: "Schema") -> bool: 395 """Check if an schema can match with another schema.""" 396 return other in self.matchable_schemata 397 398 @lru_cache(maxsize=None) 399 def is_a(self, other: Union[str, "Schema"]) -> bool: 400 """Check if the schema or one of its parents is the same as the given 401 candidate ``other``.""" 402 if not isinstance(other, str): 403 other = other.name 404 return other in self.names 405 406 def get(self, name: Optional[str]) -> Optional[Property]: 407 """Retrieve a property defined for this schema by its name.""" 408 if name is None: 409 return None 410 return self.properties.get(name) 411 412 def validate(self, data: Any) -> Optional[str]: 413 """Validate a dictionary against the given schema. 414 This will also drop keys which are not valid as properties. 415 """ 416 errors = {} 417 properties = cast(Dict[str, Any], ensure_dict(data.get("properties"))) 418 for name, prop in self.properties.items(): 419 values = ensure_list(properties.get(name, [])) 420 error = prop.validate(values) 421 if error is None and not len(values): 422 if prop.name in self.required: 423 error = gettext("Required") 424 if error is not None: 425 errors[name] = error 426 if len(errors): 427 msg = gettext("Entity validation failed") 428 raise InvalidData(msg, errors={"properties": errors}) 429 return None 430 431 def to_dict(self) -> SchemaToDict: 432 """Return schema metadata, including all properties, in a serializable form.""" 433 data: SchemaToDict = { 434 "label": self.label, 435 "plural": self.plural, 436 "schemata": list(sorted(self.names)), 437 "extends": list(sorted([e.name for e in self.extends])), 438 } 439 if self.edge_source and self.edge_target and self.edge_label: 440 data["edge"] = { 441 "source": self.edge_source, 442 "target": self.edge_target, 443 "caption": self.edge_caption, 444 "label": self.edge_label, 445 "directed": self.edge_directed, 446 } 447 if len(self.temporal_start) or len(self.temporal_end): 448 data["temporalExtent"] = { 449 "start": self.temporal_start, 450 "end": self.temporal_end, 451 } 452 if len(self.featured): 453 data["featured"] = self.featured 454 if len(self.required): 455 data["required"] = self.required 456 if len(self.caption): 457 data["caption"] = self.caption 458 if self.description: 459 data["description"] = self.description 460 if self.abstract: 461 data["abstract"] = True 462 if self.hidden: 463 data["hidden"] = True 464 if self.generated: 465 data["generated"] = True 466 if self.matchable: 467 data["matchable"] = True 468 if self.deprecated: 469 data["deprecated"] = True 470 properties: Dict[str, PropertyToDict] = {} 471 for name, prop in self.properties.items(): 472 if prop.schema == self: 473 properties[name] = prop.to_dict() 474 data["properties"] = properties 475 return data 476 477 def __eq__(self, other: Any) -> bool: 478 """Compare two schemata (via hash).""" 479 try: 480 return self._hash == hash(other) 481 except AttributeError: 482 return False 483 484 def __lt__(self, other: Any) -> bool: 485 return self.name.__lt__(other.name) 486 487 def __hash__(self) -> int: 488 try: 489 return self._hash 490 except AttributeError: 491 return super().__hash__() 492 493 def __repr__(self) -> str: 494 return "<Schema(%r)>" % self.name
39class SchemaSpec(TypedDict, total=False): 40 label: str 41 plural: str 42 schemata: List[str] 43 extends: List[str] 44 properties: Dict[str, PropertySpec] 45 featured: List[str] 46 required: List[str] 47 caption: List[str] 48 edge: EdgeSpec 49 temporalExtent: TemporalExtentSpec 50 description: Optional[str] 51 rdf: Optional[str] 52 abstract: bool 53 hidden: bool 54 generated: bool 55 matchable: bool 56 deprecated: Optional[bool]
59class SchemaToDict(TypedDict, total=False): 60 label: str 61 plural: str 62 schemata: List[str] 63 extends: List[str] 64 properties: Dict[str, PropertyToDict] 65 featured: List[str] 66 required: List[str] 67 caption: List[str] 68 edge: EdgeSpec 69 temporalExtent: TemporalExtentSpec 70 description: Optional[str] 71 abstract: bool 72 hidden: bool 73 generated: bool 74 matchable: bool 75 deprecated: bool
78class Schema: 79 """A type definition for a class of entities that have certain properties. 80 81 Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple 82 parent schemata from which it inherits all of their properties. A schema can also 83 have descendant child schemata, which, in turn, add further properties. Schemata 84 are usually accessed via the model, which holds all available definitions. 85 """ 86 87 __slots__ = ( 88 "model", 89 "name", 90 "_label", 91 "_plural", 92 "_description", 93 "_hash", 94 "uri", 95 "abstract", 96 "hidden", 97 "generated", 98 "matchable", 99 "featured", 100 "required", 101 "deprecated", 102 "caption", 103 "edge", 104 "_edge_label", 105 "edge_directed", 106 "edge_source", 107 "edge_target", 108 "edge_caption", 109 "_temporal_start", 110 "_temporal_end", 111 "_extends", 112 "extends", 113 "schemata", 114 "names", 115 "descendants", 116 "properties", 117 "_matchable_schemata", 118 ) 119 120 def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None: 121 #: Machine-readable name of the schema, used for identification. 122 self.name = name 123 self.model = model 124 self._label = data.get("label", name) 125 self._plural = data.get("plural", self.label) 126 self._description = data.get("description") 127 self._hash = hash("<Schema(%r)>" % name) 128 129 #: RDF identifier for this schema when it is transformed to a triple term. 130 self.uri = URIRef(cast(str, data.get("rdf", NS[name]))) 131 132 #: Do not store or emit entities of this type, it is used only for 133 #: inheritance. 134 self.abstract = as_bool(data.get("abstract"), False) 135 136 #: This schema is deprecated and should not be used. 137 self.deprecated = as_bool(data.get("deprecated", False)) 138 139 #: Hide this schema in listings. 140 self.hidden = as_bool(data.get("hidden"), False) 141 self.hidden = self.hidden and not self.abstract 142 143 #: Entities with this type are generated by the system - for example, via 144 #: `ingest-file`. The user should not be offered an option to create them 145 #: in the interface. 146 self.generated = as_bool(data.get("generated"), False) 147 148 #: Try to perform fuzzy matching. Fuzzy similarity search does not 149 #: make sense for entities which have a lot of similar names, such 150 #: as land plots, assets etc. 151 self.matchable = as_bool(data.get("matchable"), True) 152 153 #: Mark a set of properties as important, i.e. they should be shown 154 #: first, or in an abridged view of the entity. In Aleph, these properties 155 #: are included in tabular entity listings. 156 self.featured = ensure_list(data.get("featured", [])) 157 158 #: Mark a set of properties as required. This is applied only when 159 #: an entity is created by the user - bulk created entities will 160 #: slip through even if it is technically invalid. 161 self.required = ensure_list(data.get("required", [])) 162 163 #: Mark a set of properties to be used for the entity's caption. 164 #: They will be checked in order and the first existent value will 165 #: be used. 166 self.caption = ensure_list(data.get("caption", [])) 167 168 # A transform of the entity into an edge for its representation in 169 # the context of a property graph representation like Neo4J/Gephi. 170 edge = data.get("edge", {}) 171 self.edge_source = edge.get("source") 172 self.edge_target = edge.get("target") 173 174 #: Flag to indicate if this schema should be represented by an edge (rather than 175 #: a node) when the data is converted into a property graph. 176 self.edge: bool = self.edge_source is not None and self.edge_target is not None 177 self.edge_caption = ensure_list(edge.get("caption", [])) 178 self._edge_label = edge.get("label", self._label) 179 180 #: Flag to indicate if the edge should be presented as directed to the user, 181 #: e.g. by showing an error at the target end of the edge. 182 self.edge_directed = as_bool(edge.get("directed", True)) 183 184 #: Specify which properties should be used to represent this schema in a 185 #: timeline. 186 temporal_extent = data.get("temporalExtent", {}) 187 self._temporal_start = ensure_list(temporal_extent.get("start", [])) 188 self._temporal_end = ensure_list(temporal_extent.get("end", [])) 189 190 #: Direct parent schemata of this schema. 191 self._extends = ensure_list(data.get("extends", [])) 192 self.extends: Set["Schema"] = set() 193 194 #: All parents of this schema (including indirect parents and the schema 195 #: itself). 196 self.schemata = set([self]) 197 198 #: All names of :attr:`~schemata`. 199 self.names = set([self.name]) 200 201 #: Inverse of :attr:`~schemata`, all derived child types of this schema 202 #: and their children. 203 self.descendants: Set["Schema"] = set() 204 self._matchable_schemata: Optional[Set["Schema"]] = None 205 206 #: The full list of properties defined for the entity, including those 207 #: inherited from parent schemata. 208 self.properties: Dict[str, Property] = {} 209 for name, prop in data.get("properties", {}).items(): 210 self.properties[name] = Property(self, name, prop) 211 212 def generate(self, model: "Model") -> None: 213 """While loading the schema, this function will validate and 214 load the hierarchy, properties, and flags of the definition.""" 215 temporal_start: Optional[List[str]] = None 216 temporal_end: Optional[List[str]] = None 217 for extends in self._extends: 218 parent = model.get(extends) 219 if parent is None: 220 raise InvalidData("Invalid extends: %r" % extends) 221 parent.generate(model) 222 223 for name, prop in parent.properties.items(): 224 if name not in self.properties: 225 self.properties[name] = prop 226 227 self.extends.add(parent) 228 for ancestor in parent.schemata: 229 self.schemata.add(ancestor) 230 self.names.add(ancestor.name) 231 ancestor.descendants.add(self) 232 233 if len(self._temporal_start) == 0 and parent.temporal_start: 234 if ( 235 temporal_start is not None 236 and temporal_start != parent.temporal_start 237 ): 238 raise InvalidModel( 239 "Conflicting temporal start properties: %s" % self.name 240 ) 241 temporal_start = parent.temporal_start 242 243 if len(self._temporal_end) == 0 and parent.temporal_end: 244 if temporal_end is not None and temporal_end != parent.temporal_end: 245 raise InvalidModel( 246 "Conflicting temporal start properties: %s" % self.name 247 ) 248 temporal_end = parent.temporal_end 249 250 for prop in list(self.properties.values()): 251 prop.generate(model) 252 253 for featured in self.featured: 254 if self.get(featured) is None: 255 raise InvalidModel("Missing featured property: %s" % featured) 256 257 for caption in self.caption: 258 prop_ = self.get(caption) 259 if prop_ is None: 260 raise InvalidModel("Missing caption property: %s" % caption) 261 if prop_.type == registry.entity: 262 raise InvalidModel("Caption cannot be entity: %s" % caption) 263 264 for required in self.required: 265 if self.get(required) is None: 266 raise InvalidModel("Missing required property: %s" % required) 267 268 if self.edge: 269 if self.source_prop is None: 270 msg = "Missing edge source: %s" % self.edge_source 271 raise InvalidModel(msg) 272 273 if self.target_prop is None: 274 msg = "Missing edge target: %s" % self.edge_target 275 raise InvalidModel(msg) 276 277 def _add_reverse( 278 self, model: "Model", data: ReverseSpec, other: Property 279 ) -> Property: 280 name = data.get("name") 281 if name is None: 282 raise InvalidModel("Unnamed reverse: %s" % other) 283 284 prop = self.get(name) 285 if prop is None: 286 spec: PropertySpec = { 287 "label": data.get("label"), 288 "type": registry.entity.name, 289 "reverse": {"name": other.name}, 290 "range": other.schema.name, 291 "hidden": data.get("hidden", other.hidden), 292 } 293 prop = Property(self, name, spec) 294 prop.stub = True 295 prop.generate(model) 296 self.properties[name] = prop 297 return prop 298 299 @property 300 def label(self) -> str: 301 """User-facing name of the schema.""" 302 return gettext(self._label) 303 304 @property 305 def plural(self) -> str: 306 """Name of the schema to be used in plural constructions.""" 307 return gettext(self._plural) 308 309 @property 310 def description(self) -> Optional[str]: 311 """A longer description of the semantics of the schema.""" 312 return gettext(self._description) 313 314 @property 315 def edge_label(self) -> Optional[str]: 316 """Description label for edges derived from entities of this schema.""" 317 return gettext(self._edge_label) 318 319 @property 320 def source_prop(self) -> Optional[Property]: 321 """The entity property to be used as an edge source.""" 322 return self.get(self.edge_source) 323 324 @property 325 def target_prop(self) -> Optional[Property]: 326 """The entity property to be used as an edge target.""" 327 return self.get(self.edge_target) 328 329 @property 330 def temporal_start(self) -> List[str]: 331 """The entity properties to be used as the start when representing the entity 332 in a timeline.""" 333 if not len(self._temporal_start): 334 for parent in self.extends: 335 if len(parent.temporal_start): 336 return parent.temporal_start 337 return self._temporal_start 338 339 @property 340 def temporal_end(self) -> List[str]: 341 """The entity properties to be used as the end when representing the entity 342 in a timeline.""" 343 if not len(self._temporal_end): 344 for parent in self.extends: 345 if len(parent.temporal_end): 346 return parent.temporal_end 347 return self._temporal_end 348 349 @property 350 def temporal_start_props(self) -> List[Property]: 351 """The entity properties to be used as the start when representing the entity 352 in a timeline.""" 353 props = [self.get(prop_name) for prop_name in self.temporal_start] 354 return [prop for prop in props if prop is not None] 355 356 @property 357 def temporal_end_props(self) -> List[Property]: 358 """The entity properties to be used as the end when representing the entity 359 in a timeline.""" 360 props = [self.get(prop_name) for prop_name in self.temporal_end] 361 return [prop for prop in props if prop is not None] 362 363 @property 364 def sorted_properties(self) -> List[Property]: 365 """All properties of the schema in the order in which they should be shown 366 to the user (alphabetically, with captions and featured properties first).""" 367 return sorted( 368 self.properties.values(), 369 key=lambda p: ( 370 p.name not in self.caption, 371 p.name not in self.featured, 372 p.label, 373 ), 374 ) 375 376 @property 377 def matchable_schemata(self) -> Set["Schema"]: 378 """Return the set of schemata to which it makes sense to compare with this 379 schema. For example, it makes sense to compare a legal entity with a company, 380 but it does not make sense to compare a car and a person.""" 381 if self._matchable_schemata is None: 382 self._matchable_schemata = set() 383 if self.matchable: 384 # This is used by the cross-referencer to determine what 385 # other schemata should be considered for matches. For 386 # example, a Company may be compared to a Legal Entity, 387 # but it makes no sense to compare it to an Aircraft. 388 candidates = set(self.schemata) 389 candidates.update(self.descendants) 390 for schema in candidates: 391 if schema.matchable: 392 self._matchable_schemata.add(schema) 393 return self._matchable_schemata 394 395 def can_match(self, other: "Schema") -> bool: 396 """Check if an schema can match with another schema.""" 397 return other in self.matchable_schemata 398 399 @lru_cache(maxsize=None) 400 def is_a(self, other: Union[str, "Schema"]) -> bool: 401 """Check if the schema or one of its parents is the same as the given 402 candidate ``other``.""" 403 if not isinstance(other, str): 404 other = other.name 405 return other in self.names 406 407 def get(self, name: Optional[str]) -> Optional[Property]: 408 """Retrieve a property defined for this schema by its name.""" 409 if name is None: 410 return None 411 return self.properties.get(name) 412 413 def validate(self, data: Any) -> Optional[str]: 414 """Validate a dictionary against the given schema. 415 This will also drop keys which are not valid as properties. 416 """ 417 errors = {} 418 properties = cast(Dict[str, Any], ensure_dict(data.get("properties"))) 419 for name, prop in self.properties.items(): 420 values = ensure_list(properties.get(name, [])) 421 error = prop.validate(values) 422 if error is None and not len(values): 423 if prop.name in self.required: 424 error = gettext("Required") 425 if error is not None: 426 errors[name] = error 427 if len(errors): 428 msg = gettext("Entity validation failed") 429 raise InvalidData(msg, errors={"properties": errors}) 430 return None 431 432 def to_dict(self) -> SchemaToDict: 433 """Return schema metadata, including all properties, in a serializable form.""" 434 data: SchemaToDict = { 435 "label": self.label, 436 "plural": self.plural, 437 "schemata": list(sorted(self.names)), 438 "extends": list(sorted([e.name for e in self.extends])), 439 } 440 if self.edge_source and self.edge_target and self.edge_label: 441 data["edge"] = { 442 "source": self.edge_source, 443 "target": self.edge_target, 444 "caption": self.edge_caption, 445 "label": self.edge_label, 446 "directed": self.edge_directed, 447 } 448 if len(self.temporal_start) or len(self.temporal_end): 449 data["temporalExtent"] = { 450 "start": self.temporal_start, 451 "end": self.temporal_end, 452 } 453 if len(self.featured): 454 data["featured"] = self.featured 455 if len(self.required): 456 data["required"] = self.required 457 if len(self.caption): 458 data["caption"] = self.caption 459 if self.description: 460 data["description"] = self.description 461 if self.abstract: 462 data["abstract"] = True 463 if self.hidden: 464 data["hidden"] = True 465 if self.generated: 466 data["generated"] = True 467 if self.matchable: 468 data["matchable"] = True 469 if self.deprecated: 470 data["deprecated"] = True 471 properties: Dict[str, PropertyToDict] = {} 472 for name, prop in self.properties.items(): 473 if prop.schema == self: 474 properties[name] = prop.to_dict() 475 data["properties"] = properties 476 return data 477 478 def __eq__(self, other: Any) -> bool: 479 """Compare two schemata (via hash).""" 480 try: 481 return self._hash == hash(other) 482 except AttributeError: 483 return False 484 485 def __lt__(self, other: Any) -> bool: 486 return self.name.__lt__(other.name) 487 488 def __hash__(self) -> int: 489 try: 490 return self._hash 491 except AttributeError: 492 return super().__hash__() 493 494 def __repr__(self) -> str: 495 return "<Schema(%r)>" % self.name
A type definition for a class of entities that have certain properties.
Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple parent schemata from which it inherits all of their properties. A schema can also have descendant child schemata, which, in turn, add further properties. Schemata are usually accessed via the model, which holds all available definitions.
120 def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None: 121 #: Machine-readable name of the schema, used for identification. 122 self.name = name 123 self.model = model 124 self._label = data.get("label", name) 125 self._plural = data.get("plural", self.label) 126 self._description = data.get("description") 127 self._hash = hash("<Schema(%r)>" % name) 128 129 #: RDF identifier for this schema when it is transformed to a triple term. 130 self.uri = URIRef(cast(str, data.get("rdf", NS[name]))) 131 132 #: Do not store or emit entities of this type, it is used only for 133 #: inheritance. 134 self.abstract = as_bool(data.get("abstract"), False) 135 136 #: This schema is deprecated and should not be used. 137 self.deprecated = as_bool(data.get("deprecated", False)) 138 139 #: Hide this schema in listings. 140 self.hidden = as_bool(data.get("hidden"), False) 141 self.hidden = self.hidden and not self.abstract 142 143 #: Entities with this type are generated by the system - for example, via 144 #: `ingest-file`. The user should not be offered an option to create them 145 #: in the interface. 146 self.generated = as_bool(data.get("generated"), False) 147 148 #: Try to perform fuzzy matching. Fuzzy similarity search does not 149 #: make sense for entities which have a lot of similar names, such 150 #: as land plots, assets etc. 151 self.matchable = as_bool(data.get("matchable"), True) 152 153 #: Mark a set of properties as important, i.e. they should be shown 154 #: first, or in an abridged view of the entity. In Aleph, these properties 155 #: are included in tabular entity listings. 156 self.featured = ensure_list(data.get("featured", [])) 157 158 #: Mark a set of properties as required. This is applied only when 159 #: an entity is created by the user - bulk created entities will 160 #: slip through even if it is technically invalid. 161 self.required = ensure_list(data.get("required", [])) 162 163 #: Mark a set of properties to be used for the entity's caption. 164 #: They will be checked in order and the first existent value will 165 #: be used. 166 self.caption = ensure_list(data.get("caption", [])) 167 168 # A transform of the entity into an edge for its representation in 169 # the context of a property graph representation like Neo4J/Gephi. 170 edge = data.get("edge", {}) 171 self.edge_source = edge.get("source") 172 self.edge_target = edge.get("target") 173 174 #: Flag to indicate if this schema should be represented by an edge (rather than 175 #: a node) when the data is converted into a property graph. 176 self.edge: bool = self.edge_source is not None and self.edge_target is not None 177 self.edge_caption = ensure_list(edge.get("caption", [])) 178 self._edge_label = edge.get("label", self._label) 179 180 #: Flag to indicate if the edge should be presented as directed to the user, 181 #: e.g. by showing an error at the target end of the edge. 182 self.edge_directed = as_bool(edge.get("directed", True)) 183 184 #: Specify which properties should be used to represent this schema in a 185 #: timeline. 186 temporal_extent = data.get("temporalExtent", {}) 187 self._temporal_start = ensure_list(temporal_extent.get("start", [])) 188 self._temporal_end = ensure_list(temporal_extent.get("end", [])) 189 190 #: Direct parent schemata of this schema. 191 self._extends = ensure_list(data.get("extends", [])) 192 self.extends: Set["Schema"] = set() 193 194 #: All parents of this schema (including indirect parents and the schema 195 #: itself). 196 self.schemata = set([self]) 197 198 #: All names of :attr:`~schemata`. 199 self.names = set([self.name]) 200 201 #: Inverse of :attr:`~schemata`, all derived child types of this schema 202 #: and their children. 203 self.descendants: Set["Schema"] = set() 204 self._matchable_schemata: Optional[Set["Schema"]] = None 205 206 #: The full list of properties defined for the entity, including those 207 #: inherited from parent schemata. 208 self.properties: Dict[str, Property] = {} 209 for name, prop in data.get("properties", {}).items(): 210 self.properties[name] = Property(self, name, prop)
212 def generate(self, model: "Model") -> None: 213 """While loading the schema, this function will validate and 214 load the hierarchy, properties, and flags of the definition.""" 215 temporal_start: Optional[List[str]] = None 216 temporal_end: Optional[List[str]] = None 217 for extends in self._extends: 218 parent = model.get(extends) 219 if parent is None: 220 raise InvalidData("Invalid extends: %r" % extends) 221 parent.generate(model) 222 223 for name, prop in parent.properties.items(): 224 if name not in self.properties: 225 self.properties[name] = prop 226 227 self.extends.add(parent) 228 for ancestor in parent.schemata: 229 self.schemata.add(ancestor) 230 self.names.add(ancestor.name) 231 ancestor.descendants.add(self) 232 233 if len(self._temporal_start) == 0 and parent.temporal_start: 234 if ( 235 temporal_start is not None 236 and temporal_start != parent.temporal_start 237 ): 238 raise InvalidModel( 239 "Conflicting temporal start properties: %s" % self.name 240 ) 241 temporal_start = parent.temporal_start 242 243 if len(self._temporal_end) == 0 and parent.temporal_end: 244 if temporal_end is not None and temporal_end != parent.temporal_end: 245 raise InvalidModel( 246 "Conflicting temporal start properties: %s" % self.name 247 ) 248 temporal_end = parent.temporal_end 249 250 for prop in list(self.properties.values()): 251 prop.generate(model) 252 253 for featured in self.featured: 254 if self.get(featured) is None: 255 raise InvalidModel("Missing featured property: %s" % featured) 256 257 for caption in self.caption: 258 prop_ = self.get(caption) 259 if prop_ is None: 260 raise InvalidModel("Missing caption property: %s" % caption) 261 if prop_.type == registry.entity: 262 raise InvalidModel("Caption cannot be entity: %s" % caption) 263 264 for required in self.required: 265 if self.get(required) is None: 266 raise InvalidModel("Missing required property: %s" % required) 267 268 if self.edge: 269 if self.source_prop is None: 270 msg = "Missing edge source: %s" % self.edge_source 271 raise InvalidModel(msg) 272 273 if self.target_prop is None: 274 msg = "Missing edge target: %s" % self.edge_target 275 raise InvalidModel(msg)
While loading the schema, this function will validate and load the hierarchy, properties, and flags of the definition.
299 @property 300 def label(self) -> str: 301 """User-facing name of the schema.""" 302 return gettext(self._label)
User-facing name of the schema.
304 @property 305 def plural(self) -> str: 306 """Name of the schema to be used in plural constructions.""" 307 return gettext(self._plural)
Name of the schema to be used in plural constructions.
309 @property 310 def description(self) -> Optional[str]: 311 """A longer description of the semantics of the schema.""" 312 return gettext(self._description)
A longer description of the semantics of the schema.
314 @property 315 def edge_label(self) -> Optional[str]: 316 """Description label for edges derived from entities of this schema.""" 317 return gettext(self._edge_label)
Description label for edges derived from entities of this schema.
319 @property 320 def source_prop(self) -> Optional[Property]: 321 """The entity property to be used as an edge source.""" 322 return self.get(self.edge_source)
The entity property to be used as an edge source.
324 @property 325 def target_prop(self) -> Optional[Property]: 326 """The entity property to be used as an edge target.""" 327 return self.get(self.edge_target)
The entity property to be used as an edge target.
329 @property 330 def temporal_start(self) -> List[str]: 331 """The entity properties to be used as the start when representing the entity 332 in a timeline.""" 333 if not len(self._temporal_start): 334 for parent in self.extends: 335 if len(parent.temporal_start): 336 return parent.temporal_start 337 return self._temporal_start
The entity properties to be used as the start when representing the entity in a timeline.
339 @property 340 def temporal_end(self) -> List[str]: 341 """The entity properties to be used as the end when representing the entity 342 in a timeline.""" 343 if not len(self._temporal_end): 344 for parent in self.extends: 345 if len(parent.temporal_end): 346 return parent.temporal_end 347 return self._temporal_end
The entity properties to be used as the end when representing the entity in a timeline.
349 @property 350 def temporal_start_props(self) -> List[Property]: 351 """The entity properties to be used as the start when representing the entity 352 in a timeline.""" 353 props = [self.get(prop_name) for prop_name in self.temporal_start] 354 return [prop for prop in props if prop is not None]
The entity properties to be used as the start when representing the entity in a timeline.
356 @property 357 def temporal_end_props(self) -> List[Property]: 358 """The entity properties to be used as the end when representing the entity 359 in a timeline.""" 360 props = [self.get(prop_name) for prop_name in self.temporal_end] 361 return [prop for prop in props if prop is not None]
The entity properties to be used as the end when representing the entity in a timeline.
363 @property 364 def sorted_properties(self) -> List[Property]: 365 """All properties of the schema in the order in which they should be shown 366 to the user (alphabetically, with captions and featured properties first).""" 367 return sorted( 368 self.properties.values(), 369 key=lambda p: ( 370 p.name not in self.caption, 371 p.name not in self.featured, 372 p.label, 373 ), 374 )
All properties of the schema in the order in which they should be shown to the user (alphabetically, with captions and featured properties first).
376 @property 377 def matchable_schemata(self) -> Set["Schema"]: 378 """Return the set of schemata to which it makes sense to compare with this 379 schema. For example, it makes sense to compare a legal entity with a company, 380 but it does not make sense to compare a car and a person.""" 381 if self._matchable_schemata is None: 382 self._matchable_schemata = set() 383 if self.matchable: 384 # This is used by the cross-referencer to determine what 385 # other schemata should be considered for matches. For 386 # example, a Company may be compared to a Legal Entity, 387 # but it makes no sense to compare it to an Aircraft. 388 candidates = set(self.schemata) 389 candidates.update(self.descendants) 390 for schema in candidates: 391 if schema.matchable: 392 self._matchable_schemata.add(schema) 393 return self._matchable_schemata
Return the set of schemata to which it makes sense to compare with this schema. For example, it makes sense to compare a legal entity with a company, but it does not make sense to compare a car and a person.
395 def can_match(self, other: "Schema") -> bool: 396 """Check if an schema can match with another schema.""" 397 return other in self.matchable_schemata
Check if an schema can match with another schema.
399 @lru_cache(maxsize=None) 400 def is_a(self, other: Union[str, "Schema"]) -> bool: 401 """Check if the schema or one of its parents is the same as the given 402 candidate ``other``.""" 403 if not isinstance(other, str): 404 other = other.name 405 return other in self.names
Check if the schema or one of its parents is the same as the given
candidate other
.
407 def get(self, name: Optional[str]) -> Optional[Property]: 408 """Retrieve a property defined for this schema by its name.""" 409 if name is None: 410 return None 411 return self.properties.get(name)
Retrieve a property defined for this schema by its name.
413 def validate(self, data: Any) -> Optional[str]: 414 """Validate a dictionary against the given schema. 415 This will also drop keys which are not valid as properties. 416 """ 417 errors = {} 418 properties = cast(Dict[str, Any], ensure_dict(data.get("properties"))) 419 for name, prop in self.properties.items(): 420 values = ensure_list(properties.get(name, [])) 421 error = prop.validate(values) 422 if error is None and not len(values): 423 if prop.name in self.required: 424 error = gettext("Required") 425 if error is not None: 426 errors[name] = error 427 if len(errors): 428 msg = gettext("Entity validation failed") 429 raise InvalidData(msg, errors={"properties": errors}) 430 return None
Validate a dictionary against the given schema. This will also drop keys which are not valid as properties.
432 def to_dict(self) -> SchemaToDict: 433 """Return schema metadata, including all properties, in a serializable form.""" 434 data: SchemaToDict = { 435 "label": self.label, 436 "plural": self.plural, 437 "schemata": list(sorted(self.names)), 438 "extends": list(sorted([e.name for e in self.extends])), 439 } 440 if self.edge_source and self.edge_target and self.edge_label: 441 data["edge"] = { 442 "source": self.edge_source, 443 "target": self.edge_target, 444 "caption": self.edge_caption, 445 "label": self.edge_label, 446 "directed": self.edge_directed, 447 } 448 if len(self.temporal_start) or len(self.temporal_end): 449 data["temporalExtent"] = { 450 "start": self.temporal_start, 451 "end": self.temporal_end, 452 } 453 if len(self.featured): 454 data["featured"] = self.featured 455 if len(self.required): 456 data["required"] = self.required 457 if len(self.caption): 458 data["caption"] = self.caption 459 if self.description: 460 data["description"] = self.description 461 if self.abstract: 462 data["abstract"] = True 463 if self.hidden: 464 data["hidden"] = True 465 if self.generated: 466 data["generated"] = True 467 if self.matchable: 468 data["matchable"] = True 469 if self.deprecated: 470 data["deprecated"] = True 471 properties: Dict[str, PropertyToDict] = {} 472 for name, prop in self.properties.items(): 473 if prop.schema == self: 474 properties[name] = prop.to_dict() 475 data["properties"] = properties 476 return data
Return schema metadata, including all properties, in a serializable form.