followthemoney.schema
1from typing import TYPE_CHECKING, Any, cast 2from typing import Dict, List, Optional, Set, TypedDict, Union 3from banal import ensure_list, ensure_dict, as_bool 4from functools import lru_cache 5 6from followthemoney.property import Property, PropertySpec, PropertyToDict, ReverseSpec 7from followthemoney.types import registry 8from followthemoney.exc import InvalidData, InvalidModel 9from followthemoney.util import gettext 10 11if TYPE_CHECKING: 12 from followthemoney.model import Model 13 14 15class EdgeSpec(TypedDict, total=False): 16 source: str 17 target: str 18 caption: List[str] 19 label: str 20 directed: bool 21 22 23class TemporalExtentSpec(TypedDict, total=False): 24 start: List[str] 25 end: List[str] 26 27 28class SchemaSpec(TypedDict, total=False): 29 label: str 30 plural: str 31 schemata: List[str] 32 extends: List[str] 33 properties: Dict[str, PropertySpec] 34 featured: List[str] 35 required: List[str] 36 caption: List[str] 37 edge: EdgeSpec 38 temporalExtent: TemporalExtentSpec 39 description: Optional[str] 40 abstract: bool 41 hidden: bool 42 generated: bool 43 matchable: bool 44 deprecated: Optional[bool] 45 46 47class SchemaToDict(TypedDict, total=False): 48 label: str 49 plural: str 50 schemata: List[str] 51 extends: List[str] 52 properties: Dict[str, PropertyToDict] 53 featured: List[str] 54 required: List[str] 55 caption: List[str] 56 edge: EdgeSpec 57 temporalExtent: TemporalExtentSpec 58 description: Optional[str] 59 abstract: bool 60 hidden: bool 61 generated: bool 62 matchable: bool 63 deprecated: bool 64 65 66class Schema: 67 """A type definition for a class of entities that have certain properties. 68 69 Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple 70 parent schemata from which it inherits all of their properties. A schema can also 71 have descendant child schemata, which, in turn, add further properties. Schemata 72 are usually accessed via the model, which holds all available definitions. 73 """ 74 75 __slots__ = ( 76 "model", 77 "name", 78 "_label", 79 "_plural", 80 "_description", 81 "_hash", 82 "abstract", 83 "hidden", 84 "generated", 85 "matchable", 86 "featured", 87 "required", 88 "deprecated", 89 "caption", 90 "edge", 91 "_edge_label", 92 "edge_directed", 93 "edge_source", 94 "edge_target", 95 "edge_caption", 96 "_temporal_start", 97 "_temporal_end", 98 "_extends", 99 "extends", 100 "schemata", 101 "names", 102 "descendants", 103 "properties", 104 "_matchable_schemata", 105 ) 106 107 def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None: 108 #: Machine-readable name of the schema, used for identification. 109 self.name = name 110 self.model = model 111 self._label = data.get("label", name) 112 self._plural = data.get("plural", self.label) 113 self._description = data.get("description") 114 self._hash = hash("<Schema(%r)>" % name) 115 116 #: Do not store or emit entities of this type, it is used only for 117 #: inheritance. 118 self.abstract = as_bool(data.get("abstract"), False) 119 120 #: This schema is deprecated and should not be used. 121 self.deprecated = as_bool(data.get("deprecated", False)) 122 123 #: Hide this schema in listings. 124 self.hidden = as_bool(data.get("hidden"), False) 125 self.hidden = self.hidden and not self.abstract 126 127 #: Entities with this type are generated by the system - for example, via 128 #: `ingest-file`. The user should not be offered an option to create them 129 #: in the interface. 130 self.generated = as_bool(data.get("generated"), False) 131 132 #: Try to perform fuzzy matching. Fuzzy similarity search does not 133 #: make sense for entities which have a lot of similar names, such 134 #: as land plots, assets etc. 135 self.matchable = as_bool(data.get("matchable"), True) 136 137 #: Mark a set of properties as important, i.e. they should be shown 138 #: first, or in an abridged view of the entity. In Aleph, these properties 139 #: are included in tabular entity listings. 140 self.featured = ensure_list(data.get("featured", [])) 141 142 #: Mark a set of properties as required. This is applied only when 143 #: an entity is created by the user - bulk created entities will 144 #: slip through even if it is technically invalid. 145 self.required = ensure_list(data.get("required", [])) 146 147 #: Mark a set of properties to be used for the entity's caption. 148 #: They will be checked in order and the first existent value will 149 #: be used. 150 self.caption = ensure_list(data.get("caption", [])) 151 152 # A transform of the entity into an edge for its representation in 153 # the context of a property graph representation like Neo4J/Gephi. 154 edge = data.get("edge", {}) 155 self.edge_source = edge.get("source") 156 self.edge_target = edge.get("target") 157 158 #: Flag to indicate if this schema should be represented by an edge (rather than 159 #: a node) when the data is converted into a property graph. 160 self.edge: bool = self.edge_source is not None and self.edge_target is not None 161 self.edge_caption = ensure_list(edge.get("caption", [])) 162 self._edge_label = edge.get("label", self._label) 163 164 #: Flag to indicate if the edge should be presented as directed to the user, 165 #: e.g. by showing an error at the target end of the edge. 166 self.edge_directed = as_bool(edge.get("directed", True)) 167 168 #: Specify which properties should be used to represent this schema in a 169 #: timeline. 170 temporal_extent = data.get("temporalExtent", {}) 171 self._temporal_start = ensure_list(temporal_extent.get("start", [])) 172 self._temporal_end = ensure_list(temporal_extent.get("end", [])) 173 174 #: Direct parent schemata of this schema. 175 self._extends = ensure_list(data.get("extends", [])) 176 self.extends: Set["Schema"] = set() 177 178 #: All parents of this schema (including indirect parents and the schema 179 #: itself). 180 self.schemata = set([self]) 181 182 #: All names of :attr:`~schemata`. 183 self.names = set([self.name]) 184 185 #: Inverse of :attr:`~schemata`, all derived child types of this schema 186 #: and their children. 187 self.descendants: Set["Schema"] = set() 188 self._matchable_schemata: Optional[Set["Schema"]] = None 189 190 #: The full list of properties defined for the entity, including those 191 #: inherited from parent schemata. 192 self.properties: Dict[str, Property] = {} 193 for name, prop in data.get("properties", {}).items(): 194 self.properties[name] = Property(self, name, prop) 195 196 def generate(self, model: "Model") -> None: 197 """While loading the schema, this function will validate and 198 load the hierarchy, properties, and flags of the definition.""" 199 temporal_start: Optional[List[str]] = None 200 temporal_end: Optional[List[str]] = None 201 for extends in self._extends: 202 parent = model.get(extends) 203 if parent is None: 204 raise InvalidData("Invalid extends: %r" % extends) 205 parent.generate(model) 206 207 for name, prop in parent.properties.items(): 208 if name not in self.properties: 209 self.properties[name] = prop 210 211 self.extends.add(parent) 212 for ancestor in parent.schemata: 213 self.schemata.add(ancestor) 214 self.names.add(ancestor.name) 215 ancestor.descendants.add(self) 216 217 if len(self._temporal_start) == 0 and parent.temporal_start: 218 if ( 219 temporal_start is not None 220 and temporal_start != parent.temporal_start 221 ): 222 raise InvalidModel( 223 "Conflicting temporal start properties: %s" % self.name 224 ) 225 temporal_start = parent.temporal_start 226 227 if len(self._temporal_end) == 0 and parent.temporal_end: 228 if temporal_end is not None and temporal_end != parent.temporal_end: 229 raise InvalidModel( 230 "Conflicting temporal start properties: %s" % self.name 231 ) 232 temporal_end = parent.temporal_end 233 234 for prop in list(self.properties.values()): 235 prop.generate(model) 236 237 for featured in self.featured: 238 if self.get(featured) is None: 239 raise InvalidModel("Missing featured property: %s" % featured) 240 241 for caption in self.caption: 242 prop_ = self.get(caption) 243 if prop_ is None: 244 raise InvalidModel("Missing caption property: %s" % caption) 245 if prop_.type == registry.entity: 246 raise InvalidModel("Caption cannot be entity: %s" % caption) 247 248 for required in self.required: 249 if self.get(required) is None: 250 raise InvalidModel("Missing required property: %s" % required) 251 252 if self.edge: 253 if self.source_prop is None: 254 msg = "Missing edge source: %s" % self.edge_source 255 raise InvalidModel(msg) 256 257 if self.target_prop is None: 258 msg = "Missing edge target: %s" % self.edge_target 259 raise InvalidModel(msg) 260 261 def _add_reverse( 262 self, model: "Model", data: ReverseSpec, other: Property 263 ) -> Property: 264 name = data.get("name") 265 if name is None: 266 raise InvalidModel("Unnamed reverse: %s" % other) 267 268 prop = self.get(name) 269 if prop is None: 270 spec: PropertySpec = { 271 "label": data.get("label"), 272 "type": registry.entity.name, 273 "reverse": {"name": other.name}, 274 "range": other.schema.name, 275 "hidden": data.get("hidden", other.hidden), 276 } 277 prop = Property(self, name, spec) 278 prop.stub = True 279 prop.generate(model) 280 self.properties[name] = prop 281 return prop 282 283 @property 284 def label(self) -> str: 285 """User-facing name of the schema.""" 286 return gettext(self._label) 287 288 @property 289 def plural(self) -> str: 290 """Name of the schema to be used in plural constructions.""" 291 return gettext(self._plural) 292 293 @property 294 def description(self) -> Optional[str]: 295 """A longer description of the semantics of the schema.""" 296 return gettext(self._description) 297 298 @property 299 def edge_label(self) -> Optional[str]: 300 """Description label for edges derived from entities of this schema.""" 301 return gettext(self._edge_label) 302 303 @property 304 def source_prop(self) -> Optional[Property]: 305 """The entity property to be used as an edge source when the schema is 306 considered as a relationship.""" 307 if self.edge_source is None: 308 return None 309 return self.get(self.edge_source) 310 311 @property 312 def target_prop(self) -> Optional[Property]: 313 """The entity property to be used as an edge target when the schema is transformed 314 into a relationship.""" 315 if self.edge_target is None: 316 return None 317 return self.get(self.edge_target) 318 319 @property 320 def temporal_start(self) -> List[str]: 321 """The entity properties to be used as the start when representing the entity 322 in a timeline.""" 323 if not len(self._temporal_start): 324 for parent in self.extends: 325 if len(parent.temporal_start): 326 return parent.temporal_start 327 return self._temporal_start 328 329 @property 330 def temporal_end(self) -> List[str]: 331 """The entity properties to be used as the end when representing the entity 332 in a timeline.""" 333 if not len(self._temporal_end): 334 for parent in self.extends: 335 if len(parent.temporal_end): 336 return parent.temporal_end 337 return self._temporal_end 338 339 @property 340 def temporal_start_props(self) -> List[Property]: 341 """The entity properties to be used as the start when representing the entity 342 in a timeline.""" 343 props = [self.get(prop_name) for prop_name in self.temporal_start] 344 return [prop for prop in props if prop is not None] 345 346 @property 347 def temporal_end_props(self) -> List[Property]: 348 """The entity properties to be used as the end when representing the entity 349 in a timeline.""" 350 props = [self.get(prop_name) for prop_name in self.temporal_end] 351 return [prop for prop in props if prop is not None] 352 353 @property 354 def sorted_properties(self) -> List[Property]: 355 """All properties of the schema in the order in which they should be shown 356 to the user (alphabetically, with captions and featured properties first).""" 357 return sorted( 358 self.properties.values(), 359 key=lambda p: ( 360 p.name not in self.caption, 361 p.name not in self.featured, 362 p.label, 363 ), 364 ) 365 366 @property 367 def matchable_schemata(self) -> Set["Schema"]: 368 """Return the set of schemata to which it makes sense to compare with this 369 schema. For example, it makes sense to compare a legal entity with a company, 370 but it does not make sense to compare a car and a person.""" 371 if self._matchable_schemata is None: 372 self._matchable_schemata = set() 373 if self.matchable: 374 # This is used by the cross-referencer to determine what 375 # other schemata should be considered for matches. For 376 # example, a Company may be compared to a Legal Entity, 377 # but it makes no sense to compare it to an Aircraft. 378 candidates = set(self.schemata) 379 candidates.update(self.descendants) 380 for schema in candidates: 381 if schema.matchable: 382 self._matchable_schemata.add(schema) 383 return self._matchable_schemata 384 385 @lru_cache(maxsize=None) 386 def can_match(self, other: "Schema") -> bool: 387 """Check if an schema can match with another schema.""" 388 return other in self.matchable_schemata 389 390 @lru_cache(maxsize=None) 391 def is_a(self, other: Union[str, "Schema"]) -> bool: 392 """Check if the schema or one of its parents is the same as the given 393 candidate ``other``.""" 394 if not isinstance(other, str): 395 other = other.name 396 return other in self.names 397 398 def get(self, name: str) -> Optional[Property]: 399 """Retrieve a property defined for this schema by its name.""" 400 if name is None: 401 return None 402 return self.properties.get(name) 403 404 def validate(self, data: Dict[str, Any]) -> Optional[str]: 405 """Validate a dictionary against the given schema. 406 This will also drop keys which are not valid as properties. 407 """ 408 errors = {} 409 properties = cast(Dict[str, Any], ensure_dict(data.get("properties"))) 410 for name, prop in self.properties.items(): 411 values = ensure_list(properties.get(name, [])) 412 error = prop.validate(values) 413 if error is None and not len(values): 414 if prop.name in self.required: 415 error = gettext("Required") 416 if error is not None: 417 errors[name] = error 418 if len(errors): 419 msg = gettext("Entity validation failed") 420 raise InvalidData(msg, errors={"properties": errors}) 421 return None 422 423 def to_dict(self) -> SchemaToDict: 424 """Return schema metadata, including all properties, in a serializable form.""" 425 data: SchemaToDict = { 426 "label": self.label, 427 "plural": self.plural, 428 "schemata": list(sorted(self.names)), 429 "extends": list(sorted([e.name for e in self.extends])), 430 } 431 if self.edge_source and self.edge_target and self.edge_label: 432 data["edge"] = { 433 "source": self.edge_source, 434 "target": self.edge_target, 435 "caption": self.edge_caption, 436 "label": self.edge_label, 437 "directed": self.edge_directed, 438 } 439 if len(self.temporal_start) or len(self.temporal_end): 440 data["temporalExtent"] = { 441 "start": self.temporal_start, 442 "end": self.temporal_end, 443 } 444 if len(self.featured): 445 data["featured"] = self.featured 446 if len(self.required): 447 data["required"] = self.required 448 if len(self.caption): 449 data["caption"] = self.caption 450 if self.description: 451 data["description"] = self.description 452 if self.abstract: 453 data["abstract"] = True 454 if self.hidden: 455 data["hidden"] = True 456 if self.generated: 457 data["generated"] = True 458 if self.matchable: 459 data["matchable"] = True 460 if self.deprecated: 461 data["deprecated"] = True 462 properties: Dict[str, PropertyToDict] = {} 463 for name, prop in self.properties.items(): 464 if prop.schema == self: 465 properties[name] = prop.to_dict() 466 data["properties"] = properties 467 return data 468 469 def __eq__(self, other: Any) -> bool: 470 """Compare two schemata (via hash).""" 471 try: 472 return self._hash == other._hash # type: ignore 473 except AttributeError: 474 return False 475 476 def __lt__(self, other: Any) -> bool: 477 return self.name.__lt__(other.name) 478 479 def __hash__(self) -> int: 480 return self._hash 481 482 def __repr__(self) -> str: 483 return "<Schema(%r)>" % self.name
29class SchemaSpec(TypedDict, total=False): 30 label: str 31 plural: str 32 schemata: List[str] 33 extends: List[str] 34 properties: Dict[str, PropertySpec] 35 featured: List[str] 36 required: List[str] 37 caption: List[str] 38 edge: EdgeSpec 39 temporalExtent: TemporalExtentSpec 40 description: Optional[str] 41 abstract: bool 42 hidden: bool 43 generated: bool 44 matchable: bool 45 deprecated: Optional[bool]
48class SchemaToDict(TypedDict, total=False): 49 label: str 50 plural: str 51 schemata: List[str] 52 extends: List[str] 53 properties: Dict[str, PropertyToDict] 54 featured: List[str] 55 required: List[str] 56 caption: List[str] 57 edge: EdgeSpec 58 temporalExtent: TemporalExtentSpec 59 description: Optional[str] 60 abstract: bool 61 hidden: bool 62 generated: bool 63 matchable: bool 64 deprecated: bool
67class Schema: 68 """A type definition for a class of entities that have certain properties. 69 70 Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple 71 parent schemata from which it inherits all of their properties. A schema can also 72 have descendant child schemata, which, in turn, add further properties. Schemata 73 are usually accessed via the model, which holds all available definitions. 74 """ 75 76 __slots__ = ( 77 "model", 78 "name", 79 "_label", 80 "_plural", 81 "_description", 82 "_hash", 83 "abstract", 84 "hidden", 85 "generated", 86 "matchable", 87 "featured", 88 "required", 89 "deprecated", 90 "caption", 91 "edge", 92 "_edge_label", 93 "edge_directed", 94 "edge_source", 95 "edge_target", 96 "edge_caption", 97 "_temporal_start", 98 "_temporal_end", 99 "_extends", 100 "extends", 101 "schemata", 102 "names", 103 "descendants", 104 "properties", 105 "_matchable_schemata", 106 ) 107 108 def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None: 109 #: Machine-readable name of the schema, used for identification. 110 self.name = name 111 self.model = model 112 self._label = data.get("label", name) 113 self._plural = data.get("plural", self.label) 114 self._description = data.get("description") 115 self._hash = hash("<Schema(%r)>" % name) 116 117 #: Do not store or emit entities of this type, it is used only for 118 #: inheritance. 119 self.abstract = as_bool(data.get("abstract"), False) 120 121 #: This schema is deprecated and should not be used. 122 self.deprecated = as_bool(data.get("deprecated", False)) 123 124 #: Hide this schema in listings. 125 self.hidden = as_bool(data.get("hidden"), False) 126 self.hidden = self.hidden and not self.abstract 127 128 #: Entities with this type are generated by the system - for example, via 129 #: `ingest-file`. The user should not be offered an option to create them 130 #: in the interface. 131 self.generated = as_bool(data.get("generated"), False) 132 133 #: Try to perform fuzzy matching. Fuzzy similarity search does not 134 #: make sense for entities which have a lot of similar names, such 135 #: as land plots, assets etc. 136 self.matchable = as_bool(data.get("matchable"), True) 137 138 #: Mark a set of properties as important, i.e. they should be shown 139 #: first, or in an abridged view of the entity. In Aleph, these properties 140 #: are included in tabular entity listings. 141 self.featured = ensure_list(data.get("featured", [])) 142 143 #: Mark a set of properties as required. This is applied only when 144 #: an entity is created by the user - bulk created entities will 145 #: slip through even if it is technically invalid. 146 self.required = ensure_list(data.get("required", [])) 147 148 #: Mark a set of properties to be used for the entity's caption. 149 #: They will be checked in order and the first existent value will 150 #: be used. 151 self.caption = ensure_list(data.get("caption", [])) 152 153 # A transform of the entity into an edge for its representation in 154 # the context of a property graph representation like Neo4J/Gephi. 155 edge = data.get("edge", {}) 156 self.edge_source = edge.get("source") 157 self.edge_target = edge.get("target") 158 159 #: Flag to indicate if this schema should be represented by an edge (rather than 160 #: a node) when the data is converted into a property graph. 161 self.edge: bool = self.edge_source is not None and self.edge_target is not None 162 self.edge_caption = ensure_list(edge.get("caption", [])) 163 self._edge_label = edge.get("label", self._label) 164 165 #: Flag to indicate if the edge should be presented as directed to the user, 166 #: e.g. by showing an error at the target end of the edge. 167 self.edge_directed = as_bool(edge.get("directed", True)) 168 169 #: Specify which properties should be used to represent this schema in a 170 #: timeline. 171 temporal_extent = data.get("temporalExtent", {}) 172 self._temporal_start = ensure_list(temporal_extent.get("start", [])) 173 self._temporal_end = ensure_list(temporal_extent.get("end", [])) 174 175 #: Direct parent schemata of this schema. 176 self._extends = ensure_list(data.get("extends", [])) 177 self.extends: Set["Schema"] = set() 178 179 #: All parents of this schema (including indirect parents and the schema 180 #: itself). 181 self.schemata = set([self]) 182 183 #: All names of :attr:`~schemata`. 184 self.names = set([self.name]) 185 186 #: Inverse of :attr:`~schemata`, all derived child types of this schema 187 #: and their children. 188 self.descendants: Set["Schema"] = set() 189 self._matchable_schemata: Optional[Set["Schema"]] = None 190 191 #: The full list of properties defined for the entity, including those 192 #: inherited from parent schemata. 193 self.properties: Dict[str, Property] = {} 194 for name, prop in data.get("properties", {}).items(): 195 self.properties[name] = Property(self, name, prop) 196 197 def generate(self, model: "Model") -> None: 198 """While loading the schema, this function will validate and 199 load the hierarchy, properties, and flags of the definition.""" 200 temporal_start: Optional[List[str]] = None 201 temporal_end: Optional[List[str]] = None 202 for extends in self._extends: 203 parent = model.get(extends) 204 if parent is None: 205 raise InvalidData("Invalid extends: %r" % extends) 206 parent.generate(model) 207 208 for name, prop in parent.properties.items(): 209 if name not in self.properties: 210 self.properties[name] = prop 211 212 self.extends.add(parent) 213 for ancestor in parent.schemata: 214 self.schemata.add(ancestor) 215 self.names.add(ancestor.name) 216 ancestor.descendants.add(self) 217 218 if len(self._temporal_start) == 0 and parent.temporal_start: 219 if ( 220 temporal_start is not None 221 and temporal_start != parent.temporal_start 222 ): 223 raise InvalidModel( 224 "Conflicting temporal start properties: %s" % self.name 225 ) 226 temporal_start = parent.temporal_start 227 228 if len(self._temporal_end) == 0 and parent.temporal_end: 229 if temporal_end is not None and temporal_end != parent.temporal_end: 230 raise InvalidModel( 231 "Conflicting temporal start properties: %s" % self.name 232 ) 233 temporal_end = parent.temporal_end 234 235 for prop in list(self.properties.values()): 236 prop.generate(model) 237 238 for featured in self.featured: 239 if self.get(featured) is None: 240 raise InvalidModel("Missing featured property: %s" % featured) 241 242 for caption in self.caption: 243 prop_ = self.get(caption) 244 if prop_ is None: 245 raise InvalidModel("Missing caption property: %s" % caption) 246 if prop_.type == registry.entity: 247 raise InvalidModel("Caption cannot be entity: %s" % caption) 248 249 for required in self.required: 250 if self.get(required) is None: 251 raise InvalidModel("Missing required property: %s" % required) 252 253 if self.edge: 254 if self.source_prop is None: 255 msg = "Missing edge source: %s" % self.edge_source 256 raise InvalidModel(msg) 257 258 if self.target_prop is None: 259 msg = "Missing edge target: %s" % self.edge_target 260 raise InvalidModel(msg) 261 262 def _add_reverse( 263 self, model: "Model", data: ReverseSpec, other: Property 264 ) -> Property: 265 name = data.get("name") 266 if name is None: 267 raise InvalidModel("Unnamed reverse: %s" % other) 268 269 prop = self.get(name) 270 if prop is None: 271 spec: PropertySpec = { 272 "label": data.get("label"), 273 "type": registry.entity.name, 274 "reverse": {"name": other.name}, 275 "range": other.schema.name, 276 "hidden": data.get("hidden", other.hidden), 277 } 278 prop = Property(self, name, spec) 279 prop.stub = True 280 prop.generate(model) 281 self.properties[name] = prop 282 return prop 283 284 @property 285 def label(self) -> str: 286 """User-facing name of the schema.""" 287 return gettext(self._label) 288 289 @property 290 def plural(self) -> str: 291 """Name of the schema to be used in plural constructions.""" 292 return gettext(self._plural) 293 294 @property 295 def description(self) -> Optional[str]: 296 """A longer description of the semantics of the schema.""" 297 return gettext(self._description) 298 299 @property 300 def edge_label(self) -> Optional[str]: 301 """Description label for edges derived from entities of this schema.""" 302 return gettext(self._edge_label) 303 304 @property 305 def source_prop(self) -> Optional[Property]: 306 """The entity property to be used as an edge source when the schema is 307 considered as a relationship.""" 308 if self.edge_source is None: 309 return None 310 return self.get(self.edge_source) 311 312 @property 313 def target_prop(self) -> Optional[Property]: 314 """The entity property to be used as an edge target when the schema is transformed 315 into a relationship.""" 316 if self.edge_target is None: 317 return None 318 return self.get(self.edge_target) 319 320 @property 321 def temporal_start(self) -> List[str]: 322 """The entity properties to be used as the start when representing the entity 323 in a timeline.""" 324 if not len(self._temporal_start): 325 for parent in self.extends: 326 if len(parent.temporal_start): 327 return parent.temporal_start 328 return self._temporal_start 329 330 @property 331 def temporal_end(self) -> List[str]: 332 """The entity properties to be used as the end when representing the entity 333 in a timeline.""" 334 if not len(self._temporal_end): 335 for parent in self.extends: 336 if len(parent.temporal_end): 337 return parent.temporal_end 338 return self._temporal_end 339 340 @property 341 def temporal_start_props(self) -> List[Property]: 342 """The entity properties to be used as the start when representing the entity 343 in a timeline.""" 344 props = [self.get(prop_name) for prop_name in self.temporal_start] 345 return [prop for prop in props if prop is not None] 346 347 @property 348 def temporal_end_props(self) -> List[Property]: 349 """The entity properties to be used as the end when representing the entity 350 in a timeline.""" 351 props = [self.get(prop_name) for prop_name in self.temporal_end] 352 return [prop for prop in props if prop is not None] 353 354 @property 355 def sorted_properties(self) -> List[Property]: 356 """All properties of the schema in the order in which they should be shown 357 to the user (alphabetically, with captions and featured properties first).""" 358 return sorted( 359 self.properties.values(), 360 key=lambda p: ( 361 p.name not in self.caption, 362 p.name not in self.featured, 363 p.label, 364 ), 365 ) 366 367 @property 368 def matchable_schemata(self) -> Set["Schema"]: 369 """Return the set of schemata to which it makes sense to compare with this 370 schema. For example, it makes sense to compare a legal entity with a company, 371 but it does not make sense to compare a car and a person.""" 372 if self._matchable_schemata is None: 373 self._matchable_schemata = set() 374 if self.matchable: 375 # This is used by the cross-referencer to determine what 376 # other schemata should be considered for matches. For 377 # example, a Company may be compared to a Legal Entity, 378 # but it makes no sense to compare it to an Aircraft. 379 candidates = set(self.schemata) 380 candidates.update(self.descendants) 381 for schema in candidates: 382 if schema.matchable: 383 self._matchable_schemata.add(schema) 384 return self._matchable_schemata 385 386 @lru_cache(maxsize=None) 387 def can_match(self, other: "Schema") -> bool: 388 """Check if an schema can match with another schema.""" 389 return other in self.matchable_schemata 390 391 @lru_cache(maxsize=None) 392 def is_a(self, other: Union[str, "Schema"]) -> bool: 393 """Check if the schema or one of its parents is the same as the given 394 candidate ``other``.""" 395 if not isinstance(other, str): 396 other = other.name 397 return other in self.names 398 399 def get(self, name: str) -> Optional[Property]: 400 """Retrieve a property defined for this schema by its name.""" 401 if name is None: 402 return None 403 return self.properties.get(name) 404 405 def validate(self, data: Dict[str, Any]) -> Optional[str]: 406 """Validate a dictionary against the given schema. 407 This will also drop keys which are not valid as properties. 408 """ 409 errors = {} 410 properties = cast(Dict[str, Any], ensure_dict(data.get("properties"))) 411 for name, prop in self.properties.items(): 412 values = ensure_list(properties.get(name, [])) 413 error = prop.validate(values) 414 if error is None and not len(values): 415 if prop.name in self.required: 416 error = gettext("Required") 417 if error is not None: 418 errors[name] = error 419 if len(errors): 420 msg = gettext("Entity validation failed") 421 raise InvalidData(msg, errors={"properties": errors}) 422 return None 423 424 def to_dict(self) -> SchemaToDict: 425 """Return schema metadata, including all properties, in a serializable form.""" 426 data: SchemaToDict = { 427 "label": self.label, 428 "plural": self.plural, 429 "schemata": list(sorted(self.names)), 430 "extends": list(sorted([e.name for e in self.extends])), 431 } 432 if self.edge_source and self.edge_target and self.edge_label: 433 data["edge"] = { 434 "source": self.edge_source, 435 "target": self.edge_target, 436 "caption": self.edge_caption, 437 "label": self.edge_label, 438 "directed": self.edge_directed, 439 } 440 if len(self.temporal_start) or len(self.temporal_end): 441 data["temporalExtent"] = { 442 "start": self.temporal_start, 443 "end": self.temporal_end, 444 } 445 if len(self.featured): 446 data["featured"] = self.featured 447 if len(self.required): 448 data["required"] = self.required 449 if len(self.caption): 450 data["caption"] = self.caption 451 if self.description: 452 data["description"] = self.description 453 if self.abstract: 454 data["abstract"] = True 455 if self.hidden: 456 data["hidden"] = True 457 if self.generated: 458 data["generated"] = True 459 if self.matchable: 460 data["matchable"] = True 461 if self.deprecated: 462 data["deprecated"] = True 463 properties: Dict[str, PropertyToDict] = {} 464 for name, prop in self.properties.items(): 465 if prop.schema == self: 466 properties[name] = prop.to_dict() 467 data["properties"] = properties 468 return data 469 470 def __eq__(self, other: Any) -> bool: 471 """Compare two schemata (via hash).""" 472 try: 473 return self._hash == other._hash # type: ignore 474 except AttributeError: 475 return False 476 477 def __lt__(self, other: Any) -> bool: 478 return self.name.__lt__(other.name) 479 480 def __hash__(self) -> int: 481 return self._hash 482 483 def __repr__(self) -> str: 484 return "<Schema(%r)>" % self.name
A type definition for a class of entities that have certain properties.
Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple parent schemata from which it inherits all of their properties. A schema can also have descendant child schemata, which, in turn, add further properties. Schemata are usually accessed via the model, which holds all available definitions.
108 def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None: 109 #: Machine-readable name of the schema, used for identification. 110 self.name = name 111 self.model = model 112 self._label = data.get("label", name) 113 self._plural = data.get("plural", self.label) 114 self._description = data.get("description") 115 self._hash = hash("<Schema(%r)>" % name) 116 117 #: Do not store or emit entities of this type, it is used only for 118 #: inheritance. 119 self.abstract = as_bool(data.get("abstract"), False) 120 121 #: This schema is deprecated and should not be used. 122 self.deprecated = as_bool(data.get("deprecated", False)) 123 124 #: Hide this schema in listings. 125 self.hidden = as_bool(data.get("hidden"), False) 126 self.hidden = self.hidden and not self.abstract 127 128 #: Entities with this type are generated by the system - for example, via 129 #: `ingest-file`. The user should not be offered an option to create them 130 #: in the interface. 131 self.generated = as_bool(data.get("generated"), False) 132 133 #: Try to perform fuzzy matching. Fuzzy similarity search does not 134 #: make sense for entities which have a lot of similar names, such 135 #: as land plots, assets etc. 136 self.matchable = as_bool(data.get("matchable"), True) 137 138 #: Mark a set of properties as important, i.e. they should be shown 139 #: first, or in an abridged view of the entity. In Aleph, these properties 140 #: are included in tabular entity listings. 141 self.featured = ensure_list(data.get("featured", [])) 142 143 #: Mark a set of properties as required. This is applied only when 144 #: an entity is created by the user - bulk created entities will 145 #: slip through even if it is technically invalid. 146 self.required = ensure_list(data.get("required", [])) 147 148 #: Mark a set of properties to be used for the entity's caption. 149 #: They will be checked in order and the first existent value will 150 #: be used. 151 self.caption = ensure_list(data.get("caption", [])) 152 153 # A transform of the entity into an edge for its representation in 154 # the context of a property graph representation like Neo4J/Gephi. 155 edge = data.get("edge", {}) 156 self.edge_source = edge.get("source") 157 self.edge_target = edge.get("target") 158 159 #: Flag to indicate if this schema should be represented by an edge (rather than 160 #: a node) when the data is converted into a property graph. 161 self.edge: bool = self.edge_source is not None and self.edge_target is not None 162 self.edge_caption = ensure_list(edge.get("caption", [])) 163 self._edge_label = edge.get("label", self._label) 164 165 #: Flag to indicate if the edge should be presented as directed to the user, 166 #: e.g. by showing an error at the target end of the edge. 167 self.edge_directed = as_bool(edge.get("directed", True)) 168 169 #: Specify which properties should be used to represent this schema in a 170 #: timeline. 171 temporal_extent = data.get("temporalExtent", {}) 172 self._temporal_start = ensure_list(temporal_extent.get("start", [])) 173 self._temporal_end = ensure_list(temporal_extent.get("end", [])) 174 175 #: Direct parent schemata of this schema. 176 self._extends = ensure_list(data.get("extends", [])) 177 self.extends: Set["Schema"] = set() 178 179 #: All parents of this schema (including indirect parents and the schema 180 #: itself). 181 self.schemata = set([self]) 182 183 #: All names of :attr:`~schemata`. 184 self.names = set([self.name]) 185 186 #: Inverse of :attr:`~schemata`, all derived child types of this schema 187 #: and their children. 188 self.descendants: Set["Schema"] = set() 189 self._matchable_schemata: Optional[Set["Schema"]] = None 190 191 #: The full list of properties defined for the entity, including those 192 #: inherited from parent schemata. 193 self.properties: Dict[str, Property] = {} 194 for name, prop in data.get("properties", {}).items(): 195 self.properties[name] = Property(self, name, prop)
197 def generate(self, model: "Model") -> None: 198 """While loading the schema, this function will validate and 199 load the hierarchy, properties, and flags of the definition.""" 200 temporal_start: Optional[List[str]] = None 201 temporal_end: Optional[List[str]] = None 202 for extends in self._extends: 203 parent = model.get(extends) 204 if parent is None: 205 raise InvalidData("Invalid extends: %r" % extends) 206 parent.generate(model) 207 208 for name, prop in parent.properties.items(): 209 if name not in self.properties: 210 self.properties[name] = prop 211 212 self.extends.add(parent) 213 for ancestor in parent.schemata: 214 self.schemata.add(ancestor) 215 self.names.add(ancestor.name) 216 ancestor.descendants.add(self) 217 218 if len(self._temporal_start) == 0 and parent.temporal_start: 219 if ( 220 temporal_start is not None 221 and temporal_start != parent.temporal_start 222 ): 223 raise InvalidModel( 224 "Conflicting temporal start properties: %s" % self.name 225 ) 226 temporal_start = parent.temporal_start 227 228 if len(self._temporal_end) == 0 and parent.temporal_end: 229 if temporal_end is not None and temporal_end != parent.temporal_end: 230 raise InvalidModel( 231 "Conflicting temporal start properties: %s" % self.name 232 ) 233 temporal_end = parent.temporal_end 234 235 for prop in list(self.properties.values()): 236 prop.generate(model) 237 238 for featured in self.featured: 239 if self.get(featured) is None: 240 raise InvalidModel("Missing featured property: %s" % featured) 241 242 for caption in self.caption: 243 prop_ = self.get(caption) 244 if prop_ is None: 245 raise InvalidModel("Missing caption property: %s" % caption) 246 if prop_.type == registry.entity: 247 raise InvalidModel("Caption cannot be entity: %s" % caption) 248 249 for required in self.required: 250 if self.get(required) is None: 251 raise InvalidModel("Missing required property: %s" % required) 252 253 if self.edge: 254 if self.source_prop is None: 255 msg = "Missing edge source: %s" % self.edge_source 256 raise InvalidModel(msg) 257 258 if self.target_prop is None: 259 msg = "Missing edge target: %s" % self.edge_target 260 raise InvalidModel(msg)
While loading the schema, this function will validate and load the hierarchy, properties, and flags of the definition.
284 @property 285 def label(self) -> str: 286 """User-facing name of the schema.""" 287 return gettext(self._label)
User-facing name of the schema.
289 @property 290 def plural(self) -> str: 291 """Name of the schema to be used in plural constructions.""" 292 return gettext(self._plural)
Name of the schema to be used in plural constructions.
294 @property 295 def description(self) -> Optional[str]: 296 """A longer description of the semantics of the schema.""" 297 return gettext(self._description)
A longer description of the semantics of the schema.
299 @property 300 def edge_label(self) -> Optional[str]: 301 """Description label for edges derived from entities of this schema.""" 302 return gettext(self._edge_label)
Description label for edges derived from entities of this schema.
304 @property 305 def source_prop(self) -> Optional[Property]: 306 """The entity property to be used as an edge source when the schema is 307 considered as a relationship.""" 308 if self.edge_source is None: 309 return None 310 return self.get(self.edge_source)
The entity property to be used as an edge source when the schema is considered as a relationship.
312 @property 313 def target_prop(self) -> Optional[Property]: 314 """The entity property to be used as an edge target when the schema is transformed 315 into a relationship.""" 316 if self.edge_target is None: 317 return None 318 return self.get(self.edge_target)
The entity property to be used as an edge target when the schema is transformed into a relationship.
320 @property 321 def temporal_start(self) -> List[str]: 322 """The entity properties to be used as the start when representing the entity 323 in a timeline.""" 324 if not len(self._temporal_start): 325 for parent in self.extends: 326 if len(parent.temporal_start): 327 return parent.temporal_start 328 return self._temporal_start
The entity properties to be used as the start when representing the entity in a timeline.
330 @property 331 def temporal_end(self) -> List[str]: 332 """The entity properties to be used as the end when representing the entity 333 in a timeline.""" 334 if not len(self._temporal_end): 335 for parent in self.extends: 336 if len(parent.temporal_end): 337 return parent.temporal_end 338 return self._temporal_end
The entity properties to be used as the end when representing the entity in a timeline.
340 @property 341 def temporal_start_props(self) -> List[Property]: 342 """The entity properties to be used as the start when representing the entity 343 in a timeline.""" 344 props = [self.get(prop_name) for prop_name in self.temporal_start] 345 return [prop for prop in props if prop is not None]
The entity properties to be used as the start when representing the entity in a timeline.
347 @property 348 def temporal_end_props(self) -> List[Property]: 349 """The entity properties to be used as the end when representing the entity 350 in a timeline.""" 351 props = [self.get(prop_name) for prop_name in self.temporal_end] 352 return [prop for prop in props if prop is not None]
The entity properties to be used as the end when representing the entity in a timeline.
354 @property 355 def sorted_properties(self) -> List[Property]: 356 """All properties of the schema in the order in which they should be shown 357 to the user (alphabetically, with captions and featured properties first).""" 358 return sorted( 359 self.properties.values(), 360 key=lambda p: ( 361 p.name not in self.caption, 362 p.name not in self.featured, 363 p.label, 364 ), 365 )
All properties of the schema in the order in which they should be shown to the user (alphabetically, with captions and featured properties first).
367 @property 368 def matchable_schemata(self) -> Set["Schema"]: 369 """Return the set of schemata to which it makes sense to compare with this 370 schema. For example, it makes sense to compare a legal entity with a company, 371 but it does not make sense to compare a car and a person.""" 372 if self._matchable_schemata is None: 373 self._matchable_schemata = set() 374 if self.matchable: 375 # This is used by the cross-referencer to determine what 376 # other schemata should be considered for matches. For 377 # example, a Company may be compared to a Legal Entity, 378 # but it makes no sense to compare it to an Aircraft. 379 candidates = set(self.schemata) 380 candidates.update(self.descendants) 381 for schema in candidates: 382 if schema.matchable: 383 self._matchable_schemata.add(schema) 384 return self._matchable_schemata
Return the set of schemata to which it makes sense to compare with this schema. For example, it makes sense to compare a legal entity with a company, but it does not make sense to compare a car and a person.
386 @lru_cache(maxsize=None) 387 def can_match(self, other: "Schema") -> bool: 388 """Check if an schema can match with another schema.""" 389 return other in self.matchable_schemata
Check if an schema can match with another schema.
391 @lru_cache(maxsize=None) 392 def is_a(self, other: Union[str, "Schema"]) -> bool: 393 """Check if the schema or one of its parents is the same as the given 394 candidate ``other``.""" 395 if not isinstance(other, str): 396 other = other.name 397 return other in self.names
Check if the schema or one of its parents is the same as the given
candidate other
.
399 def get(self, name: str) -> Optional[Property]: 400 """Retrieve a property defined for this schema by its name.""" 401 if name is None: 402 return None 403 return self.properties.get(name)
Retrieve a property defined for this schema by its name.
405 def validate(self, data: Dict[str, Any]) -> Optional[str]: 406 """Validate a dictionary against the given schema. 407 This will also drop keys which are not valid as properties. 408 """ 409 errors = {} 410 properties = cast(Dict[str, Any], ensure_dict(data.get("properties"))) 411 for name, prop in self.properties.items(): 412 values = ensure_list(properties.get(name, [])) 413 error = prop.validate(values) 414 if error is None and not len(values): 415 if prop.name in self.required: 416 error = gettext("Required") 417 if error is not None: 418 errors[name] = error 419 if len(errors): 420 msg = gettext("Entity validation failed") 421 raise InvalidData(msg, errors={"properties": errors}) 422 return None
Validate a dictionary against the given schema. This will also drop keys which are not valid as properties.
424 def to_dict(self) -> SchemaToDict: 425 """Return schema metadata, including all properties, in a serializable form.""" 426 data: SchemaToDict = { 427 "label": self.label, 428 "plural": self.plural, 429 "schemata": list(sorted(self.names)), 430 "extends": list(sorted([e.name for e in self.extends])), 431 } 432 if self.edge_source and self.edge_target and self.edge_label: 433 data["edge"] = { 434 "source": self.edge_source, 435 "target": self.edge_target, 436 "caption": self.edge_caption, 437 "label": self.edge_label, 438 "directed": self.edge_directed, 439 } 440 if len(self.temporal_start) or len(self.temporal_end): 441 data["temporalExtent"] = { 442 "start": self.temporal_start, 443 "end": self.temporal_end, 444 } 445 if len(self.featured): 446 data["featured"] = self.featured 447 if len(self.required): 448 data["required"] = self.required 449 if len(self.caption): 450 data["caption"] = self.caption 451 if self.description: 452 data["description"] = self.description 453 if self.abstract: 454 data["abstract"] = True 455 if self.hidden: 456 data["hidden"] = True 457 if self.generated: 458 data["generated"] = True 459 if self.matchable: 460 data["matchable"] = True 461 if self.deprecated: 462 data["deprecated"] = True 463 properties: Dict[str, PropertyToDict] = {} 464 for name, prop in self.properties.items(): 465 if prop.schema == self: 466 properties[name] = prop.to_dict() 467 data["properties"] = properties 468 return data
Return schema metadata, including all properties, in a serializable form.