followthemoney.schema
1from typing import ( 2 TYPE_CHECKING, 3 Any, 4 Dict, 5 List, 6 Optional, 7 Set, 8 TypedDict, 9 Union, 10 cast, 11) 12from banal import ensure_list, ensure_dict, as_bool 13from functools import lru_cache 14 15from followthemoney.property import Property, PropertySpec, PropertyToDict, ReverseSpec 16from followthemoney.types import registry 17from followthemoney.exc import InvalidData, InvalidModel 18from followthemoney.rdf import URIRef, NS 19from followthemoney.util import gettext 20 21if TYPE_CHECKING: 22 from followthemoney.model import Model 23 24 25class EdgeSpec(TypedDict, total=False): 26 source: str 27 target: str 28 caption: List[str] 29 label: str 30 directed: bool 31 32 33class TemporalExtentSpec(TypedDict, total=False): 34 start: List[str] 35 end: List[str] 36 37 38class SchemaSpec(TypedDict, total=False): 39 label: str 40 plural: str 41 schemata: List[str] 42 extends: List[str] 43 properties: Dict[str, PropertySpec] 44 featured: List[str] 45 required: List[str] 46 caption: List[str] 47 edge: EdgeSpec 48 temporalExtent: TemporalExtentSpec 49 description: Optional[str] 50 rdf: Optional[str] 51 abstract: bool 52 hidden: bool 53 generated: bool 54 matchable: bool 55 deprecated: Optional[bool] 56 57 58class SchemaToDict(TypedDict, total=False): 59 label: str 60 plural: str 61 schemata: List[str] 62 extends: List[str] 63 properties: Dict[str, PropertyToDict] 64 featured: List[str] 65 required: List[str] 66 caption: List[str] 67 edge: EdgeSpec 68 temporalExtent: TemporalExtentSpec 69 description: Optional[str] 70 abstract: bool 71 hidden: bool 72 generated: bool 73 matchable: bool 74 deprecated: bool 75 76 77class Schema: 78 """A type definition for a class of entities that have certain properties. 79 80 Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple 81 parent schemata from which it inherits all of their properties. A schema can also 82 have descendant child schemata, which, in turn, add further properties. Schemata 83 are usually accessed via the model, which holds all available definitions. 84 """ 85 86 __slots__ = ( 87 "model", 88 "name", 89 "_label", 90 "_plural", 91 "_description", 92 "_hash", 93 "uri", 94 "abstract", 95 "hidden", 96 "generated", 97 "matchable", 98 "featured", 99 "required", 100 "deprecated", 101 "caption", 102 "edge", 103 "_edge_label", 104 "edge_directed", 105 "edge_source", 106 "edge_target", 107 "edge_caption", 108 "temporal_start", 109 "temporal_end", 110 "_extends", 111 "extends", 112 "schemata", 113 "names", 114 "descendants", 115 "properties", 116 "_matchable_schemata", 117 ) 118 119 def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None: 120 #: Machine-readable name of the schema, used for identification. 121 self.name = name 122 self.model = model 123 self._label = data.get("label", name) 124 self._plural = data.get("plural", self.label) 125 self._description = data.get("description") 126 self._hash = hash("<Schema(%r)>" % name) 127 128 #: RDF identifier for this schema when it is transformed to a triple term. 129 self.uri = URIRef(cast(str, data.get("rdf", NS[name]))) 130 131 #: Do not store or emit entities of this type, it is used only for 132 #: inheritance. 133 self.abstract = as_bool(data.get("abstract"), False) 134 135 #: This schema is deprecated and should not be used. 136 self.deprecated = as_bool(data.get("deprecated", False)) 137 138 #: Hide this schema in listings. 139 self.hidden = as_bool(data.get("hidden"), False) 140 self.hidden = self.hidden and not self.abstract 141 142 #: Entities with this type are generated by the system - for example, via 143 #: `ingest-file`. The user should not be offered an option to create them 144 #: in the interface. 145 self.generated = as_bool(data.get("generated"), False) 146 147 #: Try to perform fuzzy matching. Fuzzy similarity search does not 148 #: make sense for entities which have a lot of similar names, such 149 #: as land plots, assets etc. 150 self.matchable = as_bool(data.get("matchable"), True) 151 152 #: Mark a set of properties as important, i.e. they should be shown 153 #: first, or in an abridged view of the entity. In Aleph, these properties 154 #: are included in tabular entity listings. 155 self.featured = ensure_list(data.get("featured", [])) 156 157 #: Mark a set of properties as required. This is applied only when 158 #: an entity is created by the user - bulk created entities will 159 #: slip through even if it is technically invalid. 160 self.required = ensure_list(data.get("required", [])) 161 162 #: Mark a set of properties to be used for the entity's caption. 163 #: They will be checked in order and the first existent value will 164 #: be used. 165 self.caption = ensure_list(data.get("caption", [])) 166 167 # A transform of the entity into an edge for its representation in 168 # the context of a property graph representation like Neo4J/Gephi. 169 edge = data.get("edge", {}) 170 self.edge_source = edge.get("source") 171 self.edge_target = edge.get("target") 172 173 #: Flag to indicate if this schema should be represented by an edge (rather than 174 #: a node) when the data is converted into a property graph. 175 self.edge: bool = self.edge_source is not None and self.edge_target is not None 176 self.edge_caption = ensure_list(edge.get("caption", [])) 177 self._edge_label = edge.get("label", self._label) 178 179 #: Flag to indicate if the edge should be presented as directed to the user, 180 #: e.g. by showing an error at the target end of the edge. 181 self.edge_directed = as_bool(edge.get("directed", True)) 182 183 #: Specify which properties should be used to represent this schema in a 184 #: timeline. 185 temporal_extent = data.get("temporalExtent", {}) 186 self.temporal_start = set(temporal_extent.get("start", [])) 187 self.temporal_end = set(temporal_extent.get("end", [])) 188 189 #: Direct parent schemata of this schema. 190 self._extends = ensure_list(data.get("extends", [])) 191 self.extends: Set["Schema"] = set() 192 193 #: All parents of this schema (including indirect parents and the schema 194 #: itself). 195 self.schemata = set([self]) 196 197 #: All names of :attr:`~schemata`. 198 self.names = set([self.name]) 199 200 #: Inverse of :attr:`~schemata`, all derived child types of this schema 201 #: and their children. 202 self.descendants: Set["Schema"] = set() 203 self._matchable_schemata: Optional[Set["Schema"]] = None 204 205 #: The full list of properties defined for the entity, including those 206 #: inherited from parent schemata. 207 self.properties: Dict[str, Property] = {} 208 for name, prop in data.get("properties", {}).items(): 209 self.properties[name] = Property(self, name, prop) 210 211 def generate(self, model: "Model") -> None: 212 """While loading the schema, this function will validate and 213 load the hierarchy, properties, and flags of the definition.""" 214 for extends in self._extends: 215 parent = model.get(extends) 216 if parent is None: 217 raise InvalidData("Invalid extends: %r" % extends) 218 parent.generate(model) 219 220 for name, prop in parent.properties.items(): 221 if name not in self.properties: 222 self.properties[name] = prop 223 224 self.extends.add(parent) 225 for ancestor in parent.schemata: 226 self.schemata.add(ancestor) 227 self.names.add(ancestor.name) 228 ancestor.descendants.add(self) 229 230 self.temporal_start |= parent.temporal_start 231 self.temporal_end |= parent.temporal_end 232 233 for prop in list(self.properties.values()): 234 prop.generate(model) 235 236 for featured in self.featured: 237 if self.get(featured) is None: 238 raise InvalidModel("Missing featured property: %s" % featured) 239 240 for caption in self.caption: 241 prop_ = self.get(caption) 242 if prop_ is None: 243 raise InvalidModel("Missing caption property: %s" % caption) 244 if prop_.type == registry.entity: 245 raise InvalidModel("Caption cannot be entity: %s" % caption) 246 247 for required in self.required: 248 if self.get(required) is None: 249 raise InvalidModel("Missing required property: %s" % required) 250 251 if self.edge: 252 if self.source_prop is None: 253 msg = "Missing edge source: %s" % self.edge_source 254 raise InvalidModel(msg) 255 256 if self.target_prop is None: 257 msg = "Missing edge target: %s" % self.edge_target 258 raise InvalidModel(msg) 259 260 def _add_reverse( 261 self, model: "Model", data: ReverseSpec, other: Property 262 ) -> Property: 263 name = data.get("name") 264 if name is None: 265 raise InvalidModel("Unnamed reverse: %s" % other) 266 267 prop = self.get(name) 268 if prop is None: 269 spec: PropertySpec = { 270 "label": data.get("label"), 271 "type": registry.entity.name, 272 "reverse": {"name": other.name}, 273 "range": other.schema.name, 274 "hidden": data.get("hidden", other.hidden), 275 } 276 prop = Property(self, name, spec) 277 prop.stub = True 278 prop.generate(model) 279 self.properties[name] = prop 280 return prop 281 282 @property 283 def label(self) -> str: 284 """User-facing name of the schema.""" 285 return gettext(self._label) 286 287 @property 288 def plural(self) -> str: 289 """Name of the schema to be used in plural constructions.""" 290 return gettext(self._plural) 291 292 @property 293 def description(self) -> Optional[str]: 294 """A longer description of the semantics of the schema.""" 295 return gettext(self._description) 296 297 @property 298 def edge_label(self) -> Optional[str]: 299 """Description label for edges derived from entities of this schema.""" 300 return gettext(self._edge_label) 301 302 @property 303 def source_prop(self) -> Optional[Property]: 304 """The entity property to be used as an edge source.""" 305 return self.get(self.edge_source) 306 307 @property 308 def target_prop(self) -> Optional[Property]: 309 """The entity property to be used as an edge target.""" 310 return self.get(self.edge_target) 311 312 @property 313 def temporal_start_props(self) -> Set[Property]: 314 """The entity properties to be used as the start when representing the entity 315 in a timeline.""" 316 props = [self.get(prop_name) for prop_name in self.temporal_start] 317 return set([prop for prop in props if prop is not None]) 318 319 @property 320 def temporal_end_props(self) -> Set[Property]: 321 """The entity properties to be used as the end when representing the entity 322 in a timeline.""" 323 props = [self.get(prop_name) for prop_name in self.temporal_end] 324 return set([prop for prop in props if prop is not None]) 325 326 @property 327 def sorted_properties(self) -> List[Property]: 328 """All properties of the schema in the order in which they should be shown 329 to the user (alphabetically, with captions and featured properties first).""" 330 return sorted( 331 self.properties.values(), 332 key=lambda p: ( 333 p.name not in self.caption, 334 p.name not in self.featured, 335 p.label, 336 ), 337 ) 338 339 @property 340 def matchable_schemata(self) -> Set["Schema"]: 341 """Return the set of schemata to which it makes sense to compare with this 342 schema. For example, it makes sense to compare a legal entity with a company, 343 but it does not make sense to compare a car and a person.""" 344 if self._matchable_schemata is None: 345 self._matchable_schemata = set() 346 if self.matchable: 347 # This is used by the cross-referencer to determine what 348 # other schemata should be considered for matches. For 349 # example, a Company may be compared to a Legal Entity, 350 # but it makes no sense to compare it to an Aircraft. 351 candidates = set(self.schemata) 352 candidates.update(self.descendants) 353 for schema in candidates: 354 if schema.matchable: 355 self._matchable_schemata.add(schema) 356 return self._matchable_schemata 357 358 def can_match(self, other: "Schema") -> bool: 359 """Check if an schema can match with another schema.""" 360 return other in self.matchable_schemata 361 362 @lru_cache(maxsize=None) 363 def is_a(self, other: Union[str, "Schema"]) -> bool: 364 """Check if the schema or one of its parents is the same as the given 365 candidate ``other``.""" 366 if not isinstance(other, str): 367 other = other.name 368 return other in self.names 369 370 def get(self, name: Optional[str]) -> Optional[Property]: 371 """Retrieve a property defined for this schema by its name.""" 372 if name is None: 373 return None 374 return self.properties.get(name) 375 376 def validate(self, data: Any) -> Optional[str]: 377 """Validate a dictionary against the given schema. 378 This will also drop keys which are not valid as properties. 379 """ 380 errors = {} 381 properties = cast(Dict[str, Any], ensure_dict(data.get("properties"))) 382 for name, prop in self.properties.items(): 383 values = ensure_list(properties.get(name, [])) 384 error = prop.validate(values) 385 if error is None and not len(values): 386 if prop.name in self.required: 387 error = gettext("Required") 388 if error is not None: 389 errors[name] = error 390 if len(errors): 391 msg = gettext("Entity validation failed") 392 raise InvalidData(msg, errors={"properties": errors}) 393 return None 394 395 def to_dict(self) -> SchemaToDict: 396 """Return schema metadata, including all properties, in a serializable form.""" 397 data: SchemaToDict = { 398 "label": self.label, 399 "plural": self.plural, 400 "schemata": list(sorted(self.names)), 401 "extends": list(sorted([e.name for e in self.extends])), 402 } 403 if self.edge_source and self.edge_target and self.edge_label: 404 data["edge"] = { 405 "source": self.edge_source, 406 "target": self.edge_target, 407 "caption": self.edge_caption, 408 "label": self.edge_label, 409 "directed": self.edge_directed, 410 } 411 start_props = [ 412 prop.name for prop in self.temporal_start_props if prop.schema == self 413 ] 414 end_props = [ 415 prop.name for prop in self.temporal_end_props if prop.schema == self 416 ] 417 if start_props or end_props: 418 data["temporalExtent"] = { 419 "start": sorted(start_props), 420 "end": sorted(end_props), 421 } 422 if len(self.featured): 423 data["featured"] = self.featured 424 if len(self.required): 425 data["required"] = self.required 426 if len(self.caption): 427 data["caption"] = self.caption 428 if self.description: 429 data["description"] = self.description 430 if self.abstract: 431 data["abstract"] = True 432 if self.hidden: 433 data["hidden"] = True 434 if self.generated: 435 data["generated"] = True 436 if self.matchable: 437 data["matchable"] = True 438 if self.deprecated: 439 data["deprecated"] = True 440 properties: Dict[str, PropertyToDict] = {} 441 for name, prop in self.properties.items(): 442 if prop.schema == self: 443 properties[name] = prop.to_dict() 444 data["properties"] = properties 445 return data 446 447 def __eq__(self, other: Any) -> bool: 448 """Compare two schemata (via hash).""" 449 try: 450 return self._hash == hash(other) 451 except AttributeError: 452 return False 453 454 def __lt__(self, other: Any) -> bool: 455 return self.name.__lt__(other.name) 456 457 def __hash__(self) -> int: 458 try: 459 return self._hash 460 except AttributeError: 461 return super().__hash__() 462 463 def __repr__(self) -> str: 464 return "<Schema(%r)>" % self.name
39class SchemaSpec(TypedDict, total=False): 40 label: str 41 plural: str 42 schemata: List[str] 43 extends: List[str] 44 properties: Dict[str, PropertySpec] 45 featured: List[str] 46 required: List[str] 47 caption: List[str] 48 edge: EdgeSpec 49 temporalExtent: TemporalExtentSpec 50 description: Optional[str] 51 rdf: Optional[str] 52 abstract: bool 53 hidden: bool 54 generated: bool 55 matchable: bool 56 deprecated: Optional[bool]
59class SchemaToDict(TypedDict, total=False): 60 label: str 61 plural: str 62 schemata: List[str] 63 extends: List[str] 64 properties: Dict[str, PropertyToDict] 65 featured: List[str] 66 required: List[str] 67 caption: List[str] 68 edge: EdgeSpec 69 temporalExtent: TemporalExtentSpec 70 description: Optional[str] 71 abstract: bool 72 hidden: bool 73 generated: bool 74 matchable: bool 75 deprecated: bool
78class Schema: 79 """A type definition for a class of entities that have certain properties. 80 81 Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple 82 parent schemata from which it inherits all of their properties. A schema can also 83 have descendant child schemata, which, in turn, add further properties. Schemata 84 are usually accessed via the model, which holds all available definitions. 85 """ 86 87 __slots__ = ( 88 "model", 89 "name", 90 "_label", 91 "_plural", 92 "_description", 93 "_hash", 94 "uri", 95 "abstract", 96 "hidden", 97 "generated", 98 "matchable", 99 "featured", 100 "required", 101 "deprecated", 102 "caption", 103 "edge", 104 "_edge_label", 105 "edge_directed", 106 "edge_source", 107 "edge_target", 108 "edge_caption", 109 "temporal_start", 110 "temporal_end", 111 "_extends", 112 "extends", 113 "schemata", 114 "names", 115 "descendants", 116 "properties", 117 "_matchable_schemata", 118 ) 119 120 def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None: 121 #: Machine-readable name of the schema, used for identification. 122 self.name = name 123 self.model = model 124 self._label = data.get("label", name) 125 self._plural = data.get("plural", self.label) 126 self._description = data.get("description") 127 self._hash = hash("<Schema(%r)>" % name) 128 129 #: RDF identifier for this schema when it is transformed to a triple term. 130 self.uri = URIRef(cast(str, data.get("rdf", NS[name]))) 131 132 #: Do not store or emit entities of this type, it is used only for 133 #: inheritance. 134 self.abstract = as_bool(data.get("abstract"), False) 135 136 #: This schema is deprecated and should not be used. 137 self.deprecated = as_bool(data.get("deprecated", False)) 138 139 #: Hide this schema in listings. 140 self.hidden = as_bool(data.get("hidden"), False) 141 self.hidden = self.hidden and not self.abstract 142 143 #: Entities with this type are generated by the system - for example, via 144 #: `ingest-file`. The user should not be offered an option to create them 145 #: in the interface. 146 self.generated = as_bool(data.get("generated"), False) 147 148 #: Try to perform fuzzy matching. Fuzzy similarity search does not 149 #: make sense for entities which have a lot of similar names, such 150 #: as land plots, assets etc. 151 self.matchable = as_bool(data.get("matchable"), True) 152 153 #: Mark a set of properties as important, i.e. they should be shown 154 #: first, or in an abridged view of the entity. In Aleph, these properties 155 #: are included in tabular entity listings. 156 self.featured = ensure_list(data.get("featured", [])) 157 158 #: Mark a set of properties as required. This is applied only when 159 #: an entity is created by the user - bulk created entities will 160 #: slip through even if it is technically invalid. 161 self.required = ensure_list(data.get("required", [])) 162 163 #: Mark a set of properties to be used for the entity's caption. 164 #: They will be checked in order and the first existent value will 165 #: be used. 166 self.caption = ensure_list(data.get("caption", [])) 167 168 # A transform of the entity into an edge for its representation in 169 # the context of a property graph representation like Neo4J/Gephi. 170 edge = data.get("edge", {}) 171 self.edge_source = edge.get("source") 172 self.edge_target = edge.get("target") 173 174 #: Flag to indicate if this schema should be represented by an edge (rather than 175 #: a node) when the data is converted into a property graph. 176 self.edge: bool = self.edge_source is not None and self.edge_target is not None 177 self.edge_caption = ensure_list(edge.get("caption", [])) 178 self._edge_label = edge.get("label", self._label) 179 180 #: Flag to indicate if the edge should be presented as directed to the user, 181 #: e.g. by showing an error at the target end of the edge. 182 self.edge_directed = as_bool(edge.get("directed", True)) 183 184 #: Specify which properties should be used to represent this schema in a 185 #: timeline. 186 temporal_extent = data.get("temporalExtent", {}) 187 self.temporal_start = set(temporal_extent.get("start", [])) 188 self.temporal_end = set(temporal_extent.get("end", [])) 189 190 #: Direct parent schemata of this schema. 191 self._extends = ensure_list(data.get("extends", [])) 192 self.extends: Set["Schema"] = set() 193 194 #: All parents of this schema (including indirect parents and the schema 195 #: itself). 196 self.schemata = set([self]) 197 198 #: All names of :attr:`~schemata`. 199 self.names = set([self.name]) 200 201 #: Inverse of :attr:`~schemata`, all derived child types of this schema 202 #: and their children. 203 self.descendants: Set["Schema"] = set() 204 self._matchable_schemata: Optional[Set["Schema"]] = None 205 206 #: The full list of properties defined for the entity, including those 207 #: inherited from parent schemata. 208 self.properties: Dict[str, Property] = {} 209 for name, prop in data.get("properties", {}).items(): 210 self.properties[name] = Property(self, name, prop) 211 212 def generate(self, model: "Model") -> None: 213 """While loading the schema, this function will validate and 214 load the hierarchy, properties, and flags of the definition.""" 215 for extends in self._extends: 216 parent = model.get(extends) 217 if parent is None: 218 raise InvalidData("Invalid extends: %r" % extends) 219 parent.generate(model) 220 221 for name, prop in parent.properties.items(): 222 if name not in self.properties: 223 self.properties[name] = prop 224 225 self.extends.add(parent) 226 for ancestor in parent.schemata: 227 self.schemata.add(ancestor) 228 self.names.add(ancestor.name) 229 ancestor.descendants.add(self) 230 231 self.temporal_start |= parent.temporal_start 232 self.temporal_end |= parent.temporal_end 233 234 for prop in list(self.properties.values()): 235 prop.generate(model) 236 237 for featured in self.featured: 238 if self.get(featured) is None: 239 raise InvalidModel("Missing featured property: %s" % featured) 240 241 for caption in self.caption: 242 prop_ = self.get(caption) 243 if prop_ is None: 244 raise InvalidModel("Missing caption property: %s" % caption) 245 if prop_.type == registry.entity: 246 raise InvalidModel("Caption cannot be entity: %s" % caption) 247 248 for required in self.required: 249 if self.get(required) is None: 250 raise InvalidModel("Missing required property: %s" % required) 251 252 if self.edge: 253 if self.source_prop is None: 254 msg = "Missing edge source: %s" % self.edge_source 255 raise InvalidModel(msg) 256 257 if self.target_prop is None: 258 msg = "Missing edge target: %s" % self.edge_target 259 raise InvalidModel(msg) 260 261 def _add_reverse( 262 self, model: "Model", data: ReverseSpec, other: Property 263 ) -> Property: 264 name = data.get("name") 265 if name is None: 266 raise InvalidModel("Unnamed reverse: %s" % other) 267 268 prop = self.get(name) 269 if prop is None: 270 spec: PropertySpec = { 271 "label": data.get("label"), 272 "type": registry.entity.name, 273 "reverse": {"name": other.name}, 274 "range": other.schema.name, 275 "hidden": data.get("hidden", other.hidden), 276 } 277 prop = Property(self, name, spec) 278 prop.stub = True 279 prop.generate(model) 280 self.properties[name] = prop 281 return prop 282 283 @property 284 def label(self) -> str: 285 """User-facing name of the schema.""" 286 return gettext(self._label) 287 288 @property 289 def plural(self) -> str: 290 """Name of the schema to be used in plural constructions.""" 291 return gettext(self._plural) 292 293 @property 294 def description(self) -> Optional[str]: 295 """A longer description of the semantics of the schema.""" 296 return gettext(self._description) 297 298 @property 299 def edge_label(self) -> Optional[str]: 300 """Description label for edges derived from entities of this schema.""" 301 return gettext(self._edge_label) 302 303 @property 304 def source_prop(self) -> Optional[Property]: 305 """The entity property to be used as an edge source.""" 306 return self.get(self.edge_source) 307 308 @property 309 def target_prop(self) -> Optional[Property]: 310 """The entity property to be used as an edge target.""" 311 return self.get(self.edge_target) 312 313 @property 314 def temporal_start_props(self) -> Set[Property]: 315 """The entity properties to be used as the start when representing the entity 316 in a timeline.""" 317 props = [self.get(prop_name) for prop_name in self.temporal_start] 318 return set([prop for prop in props if prop is not None]) 319 320 @property 321 def temporal_end_props(self) -> Set[Property]: 322 """The entity properties to be used as the end when representing the entity 323 in a timeline.""" 324 props = [self.get(prop_name) for prop_name in self.temporal_end] 325 return set([prop for prop in props if prop is not None]) 326 327 @property 328 def sorted_properties(self) -> List[Property]: 329 """All properties of the schema in the order in which they should be shown 330 to the user (alphabetically, with captions and featured properties first).""" 331 return sorted( 332 self.properties.values(), 333 key=lambda p: ( 334 p.name not in self.caption, 335 p.name not in self.featured, 336 p.label, 337 ), 338 ) 339 340 @property 341 def matchable_schemata(self) -> Set["Schema"]: 342 """Return the set of schemata to which it makes sense to compare with this 343 schema. For example, it makes sense to compare a legal entity with a company, 344 but it does not make sense to compare a car and a person.""" 345 if self._matchable_schemata is None: 346 self._matchable_schemata = set() 347 if self.matchable: 348 # This is used by the cross-referencer to determine what 349 # other schemata should be considered for matches. For 350 # example, a Company may be compared to a Legal Entity, 351 # but it makes no sense to compare it to an Aircraft. 352 candidates = set(self.schemata) 353 candidates.update(self.descendants) 354 for schema in candidates: 355 if schema.matchable: 356 self._matchable_schemata.add(schema) 357 return self._matchable_schemata 358 359 def can_match(self, other: "Schema") -> bool: 360 """Check if an schema can match with another schema.""" 361 return other in self.matchable_schemata 362 363 @lru_cache(maxsize=None) 364 def is_a(self, other: Union[str, "Schema"]) -> bool: 365 """Check if the schema or one of its parents is the same as the given 366 candidate ``other``.""" 367 if not isinstance(other, str): 368 other = other.name 369 return other in self.names 370 371 def get(self, name: Optional[str]) -> Optional[Property]: 372 """Retrieve a property defined for this schema by its name.""" 373 if name is None: 374 return None 375 return self.properties.get(name) 376 377 def validate(self, data: Any) -> Optional[str]: 378 """Validate a dictionary against the given schema. 379 This will also drop keys which are not valid as properties. 380 """ 381 errors = {} 382 properties = cast(Dict[str, Any], ensure_dict(data.get("properties"))) 383 for name, prop in self.properties.items(): 384 values = ensure_list(properties.get(name, [])) 385 error = prop.validate(values) 386 if error is None and not len(values): 387 if prop.name in self.required: 388 error = gettext("Required") 389 if error is not None: 390 errors[name] = error 391 if len(errors): 392 msg = gettext("Entity validation failed") 393 raise InvalidData(msg, errors={"properties": errors}) 394 return None 395 396 def to_dict(self) -> SchemaToDict: 397 """Return schema metadata, including all properties, in a serializable form.""" 398 data: SchemaToDict = { 399 "label": self.label, 400 "plural": self.plural, 401 "schemata": list(sorted(self.names)), 402 "extends": list(sorted([e.name for e in self.extends])), 403 } 404 if self.edge_source and self.edge_target and self.edge_label: 405 data["edge"] = { 406 "source": self.edge_source, 407 "target": self.edge_target, 408 "caption": self.edge_caption, 409 "label": self.edge_label, 410 "directed": self.edge_directed, 411 } 412 start_props = [ 413 prop.name for prop in self.temporal_start_props if prop.schema == self 414 ] 415 end_props = [ 416 prop.name for prop in self.temporal_end_props if prop.schema == self 417 ] 418 if start_props or end_props: 419 data["temporalExtent"] = { 420 "start": sorted(start_props), 421 "end": sorted(end_props), 422 } 423 if len(self.featured): 424 data["featured"] = self.featured 425 if len(self.required): 426 data["required"] = self.required 427 if len(self.caption): 428 data["caption"] = self.caption 429 if self.description: 430 data["description"] = self.description 431 if self.abstract: 432 data["abstract"] = True 433 if self.hidden: 434 data["hidden"] = True 435 if self.generated: 436 data["generated"] = True 437 if self.matchable: 438 data["matchable"] = True 439 if self.deprecated: 440 data["deprecated"] = True 441 properties: Dict[str, PropertyToDict] = {} 442 for name, prop in self.properties.items(): 443 if prop.schema == self: 444 properties[name] = prop.to_dict() 445 data["properties"] = properties 446 return data 447 448 def __eq__(self, other: Any) -> bool: 449 """Compare two schemata (via hash).""" 450 try: 451 return self._hash == hash(other) 452 except AttributeError: 453 return False 454 455 def __lt__(self, other: Any) -> bool: 456 return self.name.__lt__(other.name) 457 458 def __hash__(self) -> int: 459 try: 460 return self._hash 461 except AttributeError: 462 return super().__hash__() 463 464 def __repr__(self) -> str: 465 return "<Schema(%r)>" % self.name
A type definition for a class of entities that have certain properties.
Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple parent schemata from which it inherits all of their properties. A schema can also have descendant child schemata, which, in turn, add further properties. Schemata are usually accessed via the model, which holds all available definitions.
120 def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None: 121 #: Machine-readable name of the schema, used for identification. 122 self.name = name 123 self.model = model 124 self._label = data.get("label", name) 125 self._plural = data.get("plural", self.label) 126 self._description = data.get("description") 127 self._hash = hash("<Schema(%r)>" % name) 128 129 #: RDF identifier for this schema when it is transformed to a triple term. 130 self.uri = URIRef(cast(str, data.get("rdf", NS[name]))) 131 132 #: Do not store or emit entities of this type, it is used only for 133 #: inheritance. 134 self.abstract = as_bool(data.get("abstract"), False) 135 136 #: This schema is deprecated and should not be used. 137 self.deprecated = as_bool(data.get("deprecated", False)) 138 139 #: Hide this schema in listings. 140 self.hidden = as_bool(data.get("hidden"), False) 141 self.hidden = self.hidden and not self.abstract 142 143 #: Entities with this type are generated by the system - for example, via 144 #: `ingest-file`. The user should not be offered an option to create them 145 #: in the interface. 146 self.generated = as_bool(data.get("generated"), False) 147 148 #: Try to perform fuzzy matching. Fuzzy similarity search does not 149 #: make sense for entities which have a lot of similar names, such 150 #: as land plots, assets etc. 151 self.matchable = as_bool(data.get("matchable"), True) 152 153 #: Mark a set of properties as important, i.e. they should be shown 154 #: first, or in an abridged view of the entity. In Aleph, these properties 155 #: are included in tabular entity listings. 156 self.featured = ensure_list(data.get("featured", [])) 157 158 #: Mark a set of properties as required. This is applied only when 159 #: an entity is created by the user - bulk created entities will 160 #: slip through even if it is technically invalid. 161 self.required = ensure_list(data.get("required", [])) 162 163 #: Mark a set of properties to be used for the entity's caption. 164 #: They will be checked in order and the first existent value will 165 #: be used. 166 self.caption = ensure_list(data.get("caption", [])) 167 168 # A transform of the entity into an edge for its representation in 169 # the context of a property graph representation like Neo4J/Gephi. 170 edge = data.get("edge", {}) 171 self.edge_source = edge.get("source") 172 self.edge_target = edge.get("target") 173 174 #: Flag to indicate if this schema should be represented by an edge (rather than 175 #: a node) when the data is converted into a property graph. 176 self.edge: bool = self.edge_source is not None and self.edge_target is not None 177 self.edge_caption = ensure_list(edge.get("caption", [])) 178 self._edge_label = edge.get("label", self._label) 179 180 #: Flag to indicate if the edge should be presented as directed to the user, 181 #: e.g. by showing an error at the target end of the edge. 182 self.edge_directed = as_bool(edge.get("directed", True)) 183 184 #: Specify which properties should be used to represent this schema in a 185 #: timeline. 186 temporal_extent = data.get("temporalExtent", {}) 187 self.temporal_start = set(temporal_extent.get("start", [])) 188 self.temporal_end = set(temporal_extent.get("end", [])) 189 190 #: Direct parent schemata of this schema. 191 self._extends = ensure_list(data.get("extends", [])) 192 self.extends: Set["Schema"] = set() 193 194 #: All parents of this schema (including indirect parents and the schema 195 #: itself). 196 self.schemata = set([self]) 197 198 #: All names of :attr:`~schemata`. 199 self.names = set([self.name]) 200 201 #: Inverse of :attr:`~schemata`, all derived child types of this schema 202 #: and their children. 203 self.descendants: Set["Schema"] = set() 204 self._matchable_schemata: Optional[Set["Schema"]] = None 205 206 #: The full list of properties defined for the entity, including those 207 #: inherited from parent schemata. 208 self.properties: Dict[str, Property] = {} 209 for name, prop in data.get("properties", {}).items(): 210 self.properties[name] = Property(self, name, prop)
212 def generate(self, model: "Model") -> None: 213 """While loading the schema, this function will validate and 214 load the hierarchy, properties, and flags of the definition.""" 215 for extends in self._extends: 216 parent = model.get(extends) 217 if parent is None: 218 raise InvalidData("Invalid extends: %r" % extends) 219 parent.generate(model) 220 221 for name, prop in parent.properties.items(): 222 if name not in self.properties: 223 self.properties[name] = prop 224 225 self.extends.add(parent) 226 for ancestor in parent.schemata: 227 self.schemata.add(ancestor) 228 self.names.add(ancestor.name) 229 ancestor.descendants.add(self) 230 231 self.temporal_start |= parent.temporal_start 232 self.temporal_end |= parent.temporal_end 233 234 for prop in list(self.properties.values()): 235 prop.generate(model) 236 237 for featured in self.featured: 238 if self.get(featured) is None: 239 raise InvalidModel("Missing featured property: %s" % featured) 240 241 for caption in self.caption: 242 prop_ = self.get(caption) 243 if prop_ is None: 244 raise InvalidModel("Missing caption property: %s" % caption) 245 if prop_.type == registry.entity: 246 raise InvalidModel("Caption cannot be entity: %s" % caption) 247 248 for required in self.required: 249 if self.get(required) is None: 250 raise InvalidModel("Missing required property: %s" % required) 251 252 if self.edge: 253 if self.source_prop is None: 254 msg = "Missing edge source: %s" % self.edge_source 255 raise InvalidModel(msg) 256 257 if self.target_prop is None: 258 msg = "Missing edge target: %s" % self.edge_target 259 raise InvalidModel(msg)
While loading the schema, this function will validate and load the hierarchy, properties, and flags of the definition.
283 @property 284 def label(self) -> str: 285 """User-facing name of the schema.""" 286 return gettext(self._label)
User-facing name of the schema.
288 @property 289 def plural(self) -> str: 290 """Name of the schema to be used in plural constructions.""" 291 return gettext(self._plural)
Name of the schema to be used in plural constructions.
293 @property 294 def description(self) -> Optional[str]: 295 """A longer description of the semantics of the schema.""" 296 return gettext(self._description)
A longer description of the semantics of the schema.
298 @property 299 def edge_label(self) -> Optional[str]: 300 """Description label for edges derived from entities of this schema.""" 301 return gettext(self._edge_label)
Description label for edges derived from entities of this schema.
303 @property 304 def source_prop(self) -> Optional[Property]: 305 """The entity property to be used as an edge source.""" 306 return self.get(self.edge_source)
The entity property to be used as an edge source.
308 @property 309 def target_prop(self) -> Optional[Property]: 310 """The entity property to be used as an edge target.""" 311 return self.get(self.edge_target)
The entity property to be used as an edge target.
313 @property 314 def temporal_start_props(self) -> Set[Property]: 315 """The entity properties to be used as the start when representing the entity 316 in a timeline.""" 317 props = [self.get(prop_name) for prop_name in self.temporal_start] 318 return set([prop for prop in props if prop is not None])
The entity properties to be used as the start when representing the entity in a timeline.
320 @property 321 def temporal_end_props(self) -> Set[Property]: 322 """The entity properties to be used as the end when representing the entity 323 in a timeline.""" 324 props = [self.get(prop_name) for prop_name in self.temporal_end] 325 return set([prop for prop in props if prop is not None])
The entity properties to be used as the end when representing the entity in a timeline.
327 @property 328 def sorted_properties(self) -> List[Property]: 329 """All properties of the schema in the order in which they should be shown 330 to the user (alphabetically, with captions and featured properties first).""" 331 return sorted( 332 self.properties.values(), 333 key=lambda p: ( 334 p.name not in self.caption, 335 p.name not in self.featured, 336 p.label, 337 ), 338 )
All properties of the schema in the order in which they should be shown to the user (alphabetically, with captions and featured properties first).
340 @property 341 def matchable_schemata(self) -> Set["Schema"]: 342 """Return the set of schemata to which it makes sense to compare with this 343 schema. For example, it makes sense to compare a legal entity with a company, 344 but it does not make sense to compare a car and a person.""" 345 if self._matchable_schemata is None: 346 self._matchable_schemata = set() 347 if self.matchable: 348 # This is used by the cross-referencer to determine what 349 # other schemata should be considered for matches. For 350 # example, a Company may be compared to a Legal Entity, 351 # but it makes no sense to compare it to an Aircraft. 352 candidates = set(self.schemata) 353 candidates.update(self.descendants) 354 for schema in candidates: 355 if schema.matchable: 356 self._matchable_schemata.add(schema) 357 return self._matchable_schemata
Return the set of schemata to which it makes sense to compare with this schema. For example, it makes sense to compare a legal entity with a company, but it does not make sense to compare a car and a person.
359 def can_match(self, other: "Schema") -> bool: 360 """Check if an schema can match with another schema.""" 361 return other in self.matchable_schemata
Check if an schema can match with another schema.
363 @lru_cache(maxsize=None) 364 def is_a(self, other: Union[str, "Schema"]) -> bool: 365 """Check if the schema or one of its parents is the same as the given 366 candidate ``other``.""" 367 if not isinstance(other, str): 368 other = other.name 369 return other in self.names
Check if the schema or one of its parents is the same as the given
candidate other
.
371 def get(self, name: Optional[str]) -> Optional[Property]: 372 """Retrieve a property defined for this schema by its name.""" 373 if name is None: 374 return None 375 return self.properties.get(name)
Retrieve a property defined for this schema by its name.
377 def validate(self, data: Any) -> Optional[str]: 378 """Validate a dictionary against the given schema. 379 This will also drop keys which are not valid as properties. 380 """ 381 errors = {} 382 properties = cast(Dict[str, Any], ensure_dict(data.get("properties"))) 383 for name, prop in self.properties.items(): 384 values = ensure_list(properties.get(name, [])) 385 error = prop.validate(values) 386 if error is None and not len(values): 387 if prop.name in self.required: 388 error = gettext("Required") 389 if error is not None: 390 errors[name] = error 391 if len(errors): 392 msg = gettext("Entity validation failed") 393 raise InvalidData(msg, errors={"properties": errors}) 394 return None
Validate a dictionary against the given schema. This will also drop keys which are not valid as properties.
396 def to_dict(self) -> SchemaToDict: 397 """Return schema metadata, including all properties, in a serializable form.""" 398 data: SchemaToDict = { 399 "label": self.label, 400 "plural": self.plural, 401 "schemata": list(sorted(self.names)), 402 "extends": list(sorted([e.name for e in self.extends])), 403 } 404 if self.edge_source and self.edge_target and self.edge_label: 405 data["edge"] = { 406 "source": self.edge_source, 407 "target": self.edge_target, 408 "caption": self.edge_caption, 409 "label": self.edge_label, 410 "directed": self.edge_directed, 411 } 412 start_props = [ 413 prop.name for prop in self.temporal_start_props if prop.schema == self 414 ] 415 end_props = [ 416 prop.name for prop in self.temporal_end_props if prop.schema == self 417 ] 418 if start_props or end_props: 419 data["temporalExtent"] = { 420 "start": sorted(start_props), 421 "end": sorted(end_props), 422 } 423 if len(self.featured): 424 data["featured"] = self.featured 425 if len(self.required): 426 data["required"] = self.required 427 if len(self.caption): 428 data["caption"] = self.caption 429 if self.description: 430 data["description"] = self.description 431 if self.abstract: 432 data["abstract"] = True 433 if self.hidden: 434 data["hidden"] = True 435 if self.generated: 436 data["generated"] = True 437 if self.matchable: 438 data["matchable"] = True 439 if self.deprecated: 440 data["deprecated"] = True 441 properties: Dict[str, PropertyToDict] = {} 442 for name, prop in self.properties.items(): 443 if prop.schema == self: 444 properties[name] = prop.to_dict() 445 data["properties"] = properties 446 return data
Return schema metadata, including all properties, in a serializable form.