followthemoney.schema

  1from typing import (
  2    TYPE_CHECKING,
  3    Any,
  4    Dict,
  5    List,
  6    Optional,
  7    Set,
  8    TypedDict,
  9    Union,
 10    cast,
 11)
 12from banal import ensure_list, ensure_dict, as_bool
 13from functools import lru_cache
 14
 15from followthemoney.property import Property, PropertySpec, PropertyToDict, ReverseSpec
 16from followthemoney.types import registry
 17from followthemoney.exc import InvalidData, InvalidModel
 18from followthemoney.rdf import URIRef, NS
 19from followthemoney.util import gettext
 20
 21if TYPE_CHECKING:
 22    from followthemoney.model import Model
 23
 24
 25class EdgeSpec(TypedDict, total=False):
 26    source: str
 27    target: str
 28    caption: List[str]
 29    label: str
 30    directed: bool
 31
 32
 33class TemporalExtentSpec(TypedDict, total=False):
 34    start: List[str]
 35    end: List[str]
 36
 37
 38class SchemaSpec(TypedDict, total=False):
 39    label: str
 40    plural: str
 41    schemata: List[str]
 42    extends: List[str]
 43    properties: Dict[str, PropertySpec]
 44    featured: List[str]
 45    required: List[str]
 46    caption: List[str]
 47    edge: EdgeSpec
 48    temporalExtent: TemporalExtentSpec
 49    description: Optional[str]
 50    rdf: Optional[str]
 51    abstract: bool
 52    hidden: bool
 53    generated: bool
 54    matchable: bool
 55    deprecated: Optional[bool]
 56
 57
 58class SchemaToDict(TypedDict, total=False):
 59    label: str
 60    plural: str
 61    schemata: List[str]
 62    extends: List[str]
 63    properties: Dict[str, PropertyToDict]
 64    featured: List[str]
 65    required: List[str]
 66    caption: List[str]
 67    edge: EdgeSpec
 68    temporalExtent: TemporalExtentSpec
 69    description: Optional[str]
 70    abstract: bool
 71    hidden: bool
 72    generated: bool
 73    matchable: bool
 74    deprecated: bool
 75
 76
 77class Schema:
 78    """A type definition for a class of entities that have certain properties.
 79
 80    Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple
 81    parent schemata from which it inherits all of their properties. A schema can also
 82    have descendant child schemata, which, in turn, add further properties. Schemata
 83    are usually accessed via the model, which holds all available definitions.
 84    """
 85
 86    __slots__ = (
 87        "model",
 88        "name",
 89        "_label",
 90        "_plural",
 91        "_description",
 92        "_hash",
 93        "uri",
 94        "abstract",
 95        "hidden",
 96        "generated",
 97        "matchable",
 98        "featured",
 99        "required",
100        "deprecated",
101        "caption",
102        "edge",
103        "_edge_label",
104        "edge_directed",
105        "edge_source",
106        "edge_target",
107        "edge_caption",
108        "_temporal_start",
109        "_temporal_end",
110        "_extends",
111        "extends",
112        "schemata",
113        "names",
114        "descendants",
115        "properties",
116        "_matchable_schemata",
117    )
118
119    def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None:
120        #: Machine-readable name of the schema, used for identification.
121        self.name = name
122        self.model = model
123        self._label = data.get("label", name)
124        self._plural = data.get("plural", self.label)
125        self._description = data.get("description")
126        self._hash = hash("<Schema(%r)>" % name)
127
128        #: RDF identifier for this schema when it is transformed to a triple term.
129        self.uri = URIRef(cast(str, data.get("rdf", NS[name])))
130
131        #: Do not store or emit entities of this type, it is used only for
132        #: inheritance.
133        self.abstract = as_bool(data.get("abstract"), False)
134
135        #: This schema is deprecated and should not be used.
136        self.deprecated = as_bool(data.get("deprecated", False))
137
138        #: Hide this schema in listings.
139        self.hidden = as_bool(data.get("hidden"), False)
140        self.hidden = self.hidden and not self.abstract
141
142        #: Entities with this type are generated by the system - for example, via
143        #: `ingest-file`. The user should not be offered an option to create them
144        #: in the interface.
145        self.generated = as_bool(data.get("generated"), False)
146
147        #: Try to perform fuzzy matching. Fuzzy similarity search does not
148        #: make sense for entities which have a lot of similar names, such
149        #: as land plots, assets etc.
150        self.matchable = as_bool(data.get("matchable"), True)
151
152        #: Mark a set of properties as important, i.e. they should be shown
153        #: first, or in an abridged view of the entity. In Aleph, these properties
154        #: are included in tabular entity listings.
155        self.featured = ensure_list(data.get("featured", []))
156
157        #: Mark a set of properties as required. This is applied only when
158        #: an entity is created by the user - bulk created entities will
159        #: slip through even if it is technically invalid.
160        self.required = ensure_list(data.get("required", []))
161
162        #: Mark a set of properties to be used for the entity's caption.
163        #: They will be checked in order and the first existent value will
164        #: be used.
165        self.caption = ensure_list(data.get("caption", []))
166
167        # A transform of the entity into an edge for its representation in
168        # the context of a property graph representation like Neo4J/Gephi.
169        edge = data.get("edge", {})
170        self.edge_source = edge.get("source")
171        self.edge_target = edge.get("target")
172
173        #: Flag to indicate if this schema should be represented by an edge (rather than
174        #: a node) when the data is converted into a property graph.
175        self.edge: bool = self.edge_source is not None and self.edge_target is not None
176        self.edge_caption = ensure_list(edge.get("caption", []))
177        self._edge_label = edge.get("label", self._label)
178
179        #: Flag to indicate if the edge should be presented as directed to the user,
180        #: e.g. by showing an error at the target end of the edge.
181        self.edge_directed = as_bool(edge.get("directed", True))
182
183        #: Specify which properties should be used to represent this schema in a
184        #: timeline.
185        temporal_extent = data.get("temporalExtent", {})
186        self._temporal_start = ensure_list(temporal_extent.get("start", []))
187        self._temporal_end = ensure_list(temporal_extent.get("end", []))
188
189        #: Direct parent schemata of this schema.
190        self._extends = ensure_list(data.get("extends", []))
191        self.extends: Set["Schema"] = set()
192
193        #: All parents of this schema (including indirect parents and the schema
194        #: itself).
195        self.schemata = set([self])
196
197        #: All names of :attr:`~schemata`.
198        self.names = set([self.name])
199
200        #: Inverse of :attr:`~schemata`, all derived child types of this schema
201        #: and their children.
202        self.descendants: Set["Schema"] = set()
203        self._matchable_schemata: Optional[Set["Schema"]] = None
204
205        #: The full list of properties defined for the entity, including those
206        #: inherited from parent schemata.
207        self.properties: Dict[str, Property] = {}
208        for name, prop in data.get("properties", {}).items():
209            self.properties[name] = Property(self, name, prop)
210
211    def generate(self, model: "Model") -> None:
212        """While loading the schema, this function will validate and
213        load the hierarchy, properties, and flags of the definition."""
214        temporal_start: Optional[List[str]] = None
215        temporal_end: Optional[List[str]] = None
216        for extends in self._extends:
217            parent = model.get(extends)
218            if parent is None:
219                raise InvalidData("Invalid extends: %r" % extends)
220            parent.generate(model)
221
222            for name, prop in parent.properties.items():
223                if name not in self.properties:
224                    self.properties[name] = prop
225
226            self.extends.add(parent)
227            for ancestor in parent.schemata:
228                self.schemata.add(ancestor)
229                self.names.add(ancestor.name)
230                ancestor.descendants.add(self)
231
232            if len(self._temporal_start) == 0 and parent.temporal_start:
233                if (
234                    temporal_start is not None
235                    and temporal_start != parent.temporal_start
236                ):
237                    raise InvalidModel(
238                        "Conflicting temporal start properties: %s" % self.name
239                    )
240                temporal_start = parent.temporal_start
241
242            if len(self._temporal_end) == 0 and parent.temporal_end:
243                if temporal_end is not None and temporal_end != parent.temporal_end:
244                    raise InvalidModel(
245                        "Conflicting temporal start properties: %s" % self.name
246                    )
247                temporal_end = parent.temporal_end
248
249        for prop in list(self.properties.values()):
250            prop.generate(model)
251
252        for featured in self.featured:
253            if self.get(featured) is None:
254                raise InvalidModel("Missing featured property: %s" % featured)
255
256        for caption in self.caption:
257            prop_ = self.get(caption)
258            if prop_ is None:
259                raise InvalidModel("Missing caption property: %s" % caption)
260            if prop_.type == registry.entity:
261                raise InvalidModel("Caption cannot be entity: %s" % caption)
262
263        for required in self.required:
264            if self.get(required) is None:
265                raise InvalidModel("Missing required property: %s" % required)
266
267        if self.edge:
268            if self.source_prop is None:
269                msg = "Missing edge source: %s" % self.edge_source
270                raise InvalidModel(msg)
271
272            if self.target_prop is None:
273                msg = "Missing edge target: %s" % self.edge_target
274                raise InvalidModel(msg)
275
276    def _add_reverse(
277        self, model: "Model", data: ReverseSpec, other: Property
278    ) -> Property:
279        name = data.get("name")
280        if name is None:
281            raise InvalidModel("Unnamed reverse: %s" % other)
282
283        prop = self.get(name)
284        if prop is None:
285            spec: PropertySpec = {
286                "label": data.get("label"),
287                "type": registry.entity.name,
288                "reverse": {"name": other.name},
289                "range": other.schema.name,
290                "hidden": data.get("hidden", other.hidden),
291            }
292            prop = Property(self, name, spec)
293            prop.stub = True
294            prop.generate(model)
295            self.properties[name] = prop
296        return prop
297
298    @property
299    def label(self) -> str:
300        """User-facing name of the schema."""
301        return gettext(self._label)
302
303    @property
304    def plural(self) -> str:
305        """Name of the schema to be used in plural constructions."""
306        return gettext(self._plural)
307
308    @property
309    def description(self) -> Optional[str]:
310        """A longer description of the semantics of the schema."""
311        return gettext(self._description)
312
313    @property
314    def edge_label(self) -> Optional[str]:
315        """Description label for edges derived from entities of this schema."""
316        return gettext(self._edge_label)
317
318    @property
319    def source_prop(self) -> Optional[Property]:
320        """The entity property to be used as an edge source."""
321        return self.get(self.edge_source)
322
323    @property
324    def target_prop(self) -> Optional[Property]:
325        """The entity property to be used as an edge target."""
326        return self.get(self.edge_target)
327
328    @property
329    def temporal_start(self) -> List[str]:
330        """The entity properties to be used as the start when representing the entity
331        in a timeline."""
332        if not len(self._temporal_start):
333            for parent in self.extends:
334                if len(parent.temporal_start):
335                    return parent.temporal_start
336        return self._temporal_start
337
338    @property
339    def temporal_end(self) -> List[str]:
340        """The entity properties to be used as the end when representing the entity
341        in a timeline."""
342        if not len(self._temporal_end):
343            for parent in self.extends:
344                if len(parent.temporal_end):
345                    return parent.temporal_end
346        return self._temporal_end
347
348    @property
349    def temporal_start_props(self) -> List[Property]:
350        """The entity properties to be used as the start when representing the entity
351        in a timeline."""
352        props = [self.get(prop_name) for prop_name in self.temporal_start]
353        return [prop for prop in props if prop is not None]
354
355    @property
356    def temporal_end_props(self) -> List[Property]:
357        """The entity properties to be used as the end when representing the entity
358        in a timeline."""
359        props = [self.get(prop_name) for prop_name in self.temporal_end]
360        return [prop for prop in props if prop is not None]
361
362    @property
363    def sorted_properties(self) -> List[Property]:
364        """All properties of the schema in the order in which they should be shown
365        to the user (alphabetically, with captions and featured properties first)."""
366        return sorted(
367            self.properties.values(),
368            key=lambda p: (
369                p.name not in self.caption,
370                p.name not in self.featured,
371                p.label,
372            ),
373        )
374
375    @property
376    def matchable_schemata(self) -> Set["Schema"]:
377        """Return the set of schemata to which it makes sense to compare with this
378        schema. For example, it makes sense to compare a legal entity with a company,
379        but it does not make sense to compare a car and a person."""
380        if self._matchable_schemata is None:
381            self._matchable_schemata = set()
382            if self.matchable:
383                # This is used by the cross-referencer to determine what
384                # other schemata should be considered for matches. For
385                # example, a Company may be compared to a Legal Entity,
386                # but it makes no sense to compare it to an Aircraft.
387                candidates = set(self.schemata)
388                candidates.update(self.descendants)
389                for schema in candidates:
390                    if schema.matchable:
391                        self._matchable_schemata.add(schema)
392        return self._matchable_schemata
393
394    def can_match(self, other: "Schema") -> bool:
395        """Check if an schema can match with another schema."""
396        return other in self.matchable_schemata
397
398    @lru_cache(maxsize=None)
399    def is_a(self, other: Union[str, "Schema"]) -> bool:
400        """Check if the schema or one of its parents is the same as the given
401        candidate ``other``."""
402        if not isinstance(other, str):
403            other = other.name
404        return other in self.names
405
406    def get(self, name: Optional[str]) -> Optional[Property]:
407        """Retrieve a property defined for this schema by its name."""
408        if name is None:
409            return None
410        return self.properties.get(name)
411
412    def validate(self, data: Any) -> Optional[str]:
413        """Validate a dictionary against the given schema.
414        This will also drop keys which are not valid as properties.
415        """
416        errors = {}
417        properties = cast(Dict[str, Any], ensure_dict(data.get("properties")))
418        for name, prop in self.properties.items():
419            values = ensure_list(properties.get(name, []))
420            error = prop.validate(values)
421            if error is None and not len(values):
422                if prop.name in self.required:
423                    error = gettext("Required")
424            if error is not None:
425                errors[name] = error
426        if len(errors):
427            msg = gettext("Entity validation failed")
428            raise InvalidData(msg, errors={"properties": errors})
429        return None
430
431    def to_dict(self) -> SchemaToDict:
432        """Return schema metadata, including all properties, in a serializable form."""
433        data: SchemaToDict = {
434            "label": self.label,
435            "plural": self.plural,
436            "schemata": list(sorted(self.names)),
437            "extends": list(sorted([e.name for e in self.extends])),
438        }
439        if self.edge_source and self.edge_target and self.edge_label:
440            data["edge"] = {
441                "source": self.edge_source,
442                "target": self.edge_target,
443                "caption": self.edge_caption,
444                "label": self.edge_label,
445                "directed": self.edge_directed,
446            }
447        if len(self.temporal_start) or len(self.temporal_end):
448            data["temporalExtent"] = {
449                "start": self.temporal_start,
450                "end": self.temporal_end,
451            }
452        if len(self.featured):
453            data["featured"] = self.featured
454        if len(self.required):
455            data["required"] = self.required
456        if len(self.caption):
457            data["caption"] = self.caption
458        if self.description:
459            data["description"] = self.description
460        if self.abstract:
461            data["abstract"] = True
462        if self.hidden:
463            data["hidden"] = True
464        if self.generated:
465            data["generated"] = True
466        if self.matchable:
467            data["matchable"] = True
468        if self.deprecated:
469            data["deprecated"] = True
470        properties: Dict[str, PropertyToDict] = {}
471        for name, prop in self.properties.items():
472            if prop.schema == self:
473                properties[name] = prop.to_dict()
474        data["properties"] = properties
475        return data
476
477    def __eq__(self, other: Any) -> bool:
478        """Compare two schemata (via hash)."""
479        try:
480            return self._hash == hash(other)
481        except AttributeError:
482            return False
483
484    def __lt__(self, other: Any) -> bool:
485        return self.name.__lt__(other.name)
486
487    def __hash__(self) -> int:
488        try:
489            return self._hash
490        except AttributeError:
491            return super().__hash__()
492
493    def __repr__(self) -> str:
494        return "<Schema(%r)>" % self.name
class EdgeSpec(typing.TypedDict):
26class EdgeSpec(TypedDict, total=False):
27    source: str
28    target: str
29    caption: List[str]
30    label: str
31    directed: bool
source: str
target: str
caption: List[str]
label: str
directed: bool
class TemporalExtentSpec(typing.TypedDict):
34class TemporalExtentSpec(TypedDict, total=False):
35    start: List[str]
36    end: List[str]
start: List[str]
end: List[str]
class SchemaSpec(typing.TypedDict):
39class SchemaSpec(TypedDict, total=False):
40    label: str
41    plural: str
42    schemata: List[str]
43    extends: List[str]
44    properties: Dict[str, PropertySpec]
45    featured: List[str]
46    required: List[str]
47    caption: List[str]
48    edge: EdgeSpec
49    temporalExtent: TemporalExtentSpec
50    description: Optional[str]
51    rdf: Optional[str]
52    abstract: bool
53    hidden: bool
54    generated: bool
55    matchable: bool
56    deprecated: Optional[bool]
label: str
plural: str
schemata: List[str]
extends: List[str]
properties: Dict[str, followthemoney.property.PropertySpec]
featured: List[str]
required: List[str]
caption: List[str]
edge: EdgeSpec
temporalExtent: TemporalExtentSpec
description: Optional[str]
rdf: Optional[str]
abstract: bool
hidden: bool
generated: bool
matchable: bool
deprecated: Optional[bool]
class SchemaToDict(typing.TypedDict):
59class SchemaToDict(TypedDict, total=False):
60    label: str
61    plural: str
62    schemata: List[str]
63    extends: List[str]
64    properties: Dict[str, PropertyToDict]
65    featured: List[str]
66    required: List[str]
67    caption: List[str]
68    edge: EdgeSpec
69    temporalExtent: TemporalExtentSpec
70    description: Optional[str]
71    abstract: bool
72    hidden: bool
73    generated: bool
74    matchable: bool
75    deprecated: bool
label: str
plural: str
schemata: List[str]
extends: List[str]
featured: List[str]
required: List[str]
caption: List[str]
edge: EdgeSpec
temporalExtent: TemporalExtentSpec
description: Optional[str]
abstract: bool
hidden: bool
generated: bool
matchable: bool
deprecated: bool
class Schema:
 78class Schema:
 79    """A type definition for a class of entities that have certain properties.
 80
 81    Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple
 82    parent schemata from which it inherits all of their properties. A schema can also
 83    have descendant child schemata, which, in turn, add further properties. Schemata
 84    are usually accessed via the model, which holds all available definitions.
 85    """
 86
 87    __slots__ = (
 88        "model",
 89        "name",
 90        "_label",
 91        "_plural",
 92        "_description",
 93        "_hash",
 94        "uri",
 95        "abstract",
 96        "hidden",
 97        "generated",
 98        "matchable",
 99        "featured",
100        "required",
101        "deprecated",
102        "caption",
103        "edge",
104        "_edge_label",
105        "edge_directed",
106        "edge_source",
107        "edge_target",
108        "edge_caption",
109        "_temporal_start",
110        "_temporal_end",
111        "_extends",
112        "extends",
113        "schemata",
114        "names",
115        "descendants",
116        "properties",
117        "_matchable_schemata",
118    )
119
120    def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None:
121        #: Machine-readable name of the schema, used for identification.
122        self.name = name
123        self.model = model
124        self._label = data.get("label", name)
125        self._plural = data.get("plural", self.label)
126        self._description = data.get("description")
127        self._hash = hash("<Schema(%r)>" % name)
128
129        #: RDF identifier for this schema when it is transformed to a triple term.
130        self.uri = URIRef(cast(str, data.get("rdf", NS[name])))
131
132        #: Do not store or emit entities of this type, it is used only for
133        #: inheritance.
134        self.abstract = as_bool(data.get("abstract"), False)
135
136        #: This schema is deprecated and should not be used.
137        self.deprecated = as_bool(data.get("deprecated", False))
138
139        #: Hide this schema in listings.
140        self.hidden = as_bool(data.get("hidden"), False)
141        self.hidden = self.hidden and not self.abstract
142
143        #: Entities with this type are generated by the system - for example, via
144        #: `ingest-file`. The user should not be offered an option to create them
145        #: in the interface.
146        self.generated = as_bool(data.get("generated"), False)
147
148        #: Try to perform fuzzy matching. Fuzzy similarity search does not
149        #: make sense for entities which have a lot of similar names, such
150        #: as land plots, assets etc.
151        self.matchable = as_bool(data.get("matchable"), True)
152
153        #: Mark a set of properties as important, i.e. they should be shown
154        #: first, or in an abridged view of the entity. In Aleph, these properties
155        #: are included in tabular entity listings.
156        self.featured = ensure_list(data.get("featured", []))
157
158        #: Mark a set of properties as required. This is applied only when
159        #: an entity is created by the user - bulk created entities will
160        #: slip through even if it is technically invalid.
161        self.required = ensure_list(data.get("required", []))
162
163        #: Mark a set of properties to be used for the entity's caption.
164        #: They will be checked in order and the first existent value will
165        #: be used.
166        self.caption = ensure_list(data.get("caption", []))
167
168        # A transform of the entity into an edge for its representation in
169        # the context of a property graph representation like Neo4J/Gephi.
170        edge = data.get("edge", {})
171        self.edge_source = edge.get("source")
172        self.edge_target = edge.get("target")
173
174        #: Flag to indicate if this schema should be represented by an edge (rather than
175        #: a node) when the data is converted into a property graph.
176        self.edge: bool = self.edge_source is not None and self.edge_target is not None
177        self.edge_caption = ensure_list(edge.get("caption", []))
178        self._edge_label = edge.get("label", self._label)
179
180        #: Flag to indicate if the edge should be presented as directed to the user,
181        #: e.g. by showing an error at the target end of the edge.
182        self.edge_directed = as_bool(edge.get("directed", True))
183
184        #: Specify which properties should be used to represent this schema in a
185        #: timeline.
186        temporal_extent = data.get("temporalExtent", {})
187        self._temporal_start = ensure_list(temporal_extent.get("start", []))
188        self._temporal_end = ensure_list(temporal_extent.get("end", []))
189
190        #: Direct parent schemata of this schema.
191        self._extends = ensure_list(data.get("extends", []))
192        self.extends: Set["Schema"] = set()
193
194        #: All parents of this schema (including indirect parents and the schema
195        #: itself).
196        self.schemata = set([self])
197
198        #: All names of :attr:`~schemata`.
199        self.names = set([self.name])
200
201        #: Inverse of :attr:`~schemata`, all derived child types of this schema
202        #: and their children.
203        self.descendants: Set["Schema"] = set()
204        self._matchable_schemata: Optional[Set["Schema"]] = None
205
206        #: The full list of properties defined for the entity, including those
207        #: inherited from parent schemata.
208        self.properties: Dict[str, Property] = {}
209        for name, prop in data.get("properties", {}).items():
210            self.properties[name] = Property(self, name, prop)
211
212    def generate(self, model: "Model") -> None:
213        """While loading the schema, this function will validate and
214        load the hierarchy, properties, and flags of the definition."""
215        temporal_start: Optional[List[str]] = None
216        temporal_end: Optional[List[str]] = None
217        for extends in self._extends:
218            parent = model.get(extends)
219            if parent is None:
220                raise InvalidData("Invalid extends: %r" % extends)
221            parent.generate(model)
222
223            for name, prop in parent.properties.items():
224                if name not in self.properties:
225                    self.properties[name] = prop
226
227            self.extends.add(parent)
228            for ancestor in parent.schemata:
229                self.schemata.add(ancestor)
230                self.names.add(ancestor.name)
231                ancestor.descendants.add(self)
232
233            if len(self._temporal_start) == 0 and parent.temporal_start:
234                if (
235                    temporal_start is not None
236                    and temporal_start != parent.temporal_start
237                ):
238                    raise InvalidModel(
239                        "Conflicting temporal start properties: %s" % self.name
240                    )
241                temporal_start = parent.temporal_start
242
243            if len(self._temporal_end) == 0 and parent.temporal_end:
244                if temporal_end is not None and temporal_end != parent.temporal_end:
245                    raise InvalidModel(
246                        "Conflicting temporal start properties: %s" % self.name
247                    )
248                temporal_end = parent.temporal_end
249
250        for prop in list(self.properties.values()):
251            prop.generate(model)
252
253        for featured in self.featured:
254            if self.get(featured) is None:
255                raise InvalidModel("Missing featured property: %s" % featured)
256
257        for caption in self.caption:
258            prop_ = self.get(caption)
259            if prop_ is None:
260                raise InvalidModel("Missing caption property: %s" % caption)
261            if prop_.type == registry.entity:
262                raise InvalidModel("Caption cannot be entity: %s" % caption)
263
264        for required in self.required:
265            if self.get(required) is None:
266                raise InvalidModel("Missing required property: %s" % required)
267
268        if self.edge:
269            if self.source_prop is None:
270                msg = "Missing edge source: %s" % self.edge_source
271                raise InvalidModel(msg)
272
273            if self.target_prop is None:
274                msg = "Missing edge target: %s" % self.edge_target
275                raise InvalidModel(msg)
276
277    def _add_reverse(
278        self, model: "Model", data: ReverseSpec, other: Property
279    ) -> Property:
280        name = data.get("name")
281        if name is None:
282            raise InvalidModel("Unnamed reverse: %s" % other)
283
284        prop = self.get(name)
285        if prop is None:
286            spec: PropertySpec = {
287                "label": data.get("label"),
288                "type": registry.entity.name,
289                "reverse": {"name": other.name},
290                "range": other.schema.name,
291                "hidden": data.get("hidden", other.hidden),
292            }
293            prop = Property(self, name, spec)
294            prop.stub = True
295            prop.generate(model)
296            self.properties[name] = prop
297        return prop
298
299    @property
300    def label(self) -> str:
301        """User-facing name of the schema."""
302        return gettext(self._label)
303
304    @property
305    def plural(self) -> str:
306        """Name of the schema to be used in plural constructions."""
307        return gettext(self._plural)
308
309    @property
310    def description(self) -> Optional[str]:
311        """A longer description of the semantics of the schema."""
312        return gettext(self._description)
313
314    @property
315    def edge_label(self) -> Optional[str]:
316        """Description label for edges derived from entities of this schema."""
317        return gettext(self._edge_label)
318
319    @property
320    def source_prop(self) -> Optional[Property]:
321        """The entity property to be used as an edge source."""
322        return self.get(self.edge_source)
323
324    @property
325    def target_prop(self) -> Optional[Property]:
326        """The entity property to be used as an edge target."""
327        return self.get(self.edge_target)
328
329    @property
330    def temporal_start(self) -> List[str]:
331        """The entity properties to be used as the start when representing the entity
332        in a timeline."""
333        if not len(self._temporal_start):
334            for parent in self.extends:
335                if len(parent.temporal_start):
336                    return parent.temporal_start
337        return self._temporal_start
338
339    @property
340    def temporal_end(self) -> List[str]:
341        """The entity properties to be used as the end when representing the entity
342        in a timeline."""
343        if not len(self._temporal_end):
344            for parent in self.extends:
345                if len(parent.temporal_end):
346                    return parent.temporal_end
347        return self._temporal_end
348
349    @property
350    def temporal_start_props(self) -> List[Property]:
351        """The entity properties to be used as the start when representing the entity
352        in a timeline."""
353        props = [self.get(prop_name) for prop_name in self.temporal_start]
354        return [prop for prop in props if prop is not None]
355
356    @property
357    def temporal_end_props(self) -> List[Property]:
358        """The entity properties to be used as the end when representing the entity
359        in a timeline."""
360        props = [self.get(prop_name) for prop_name in self.temporal_end]
361        return [prop for prop in props if prop is not None]
362
363    @property
364    def sorted_properties(self) -> List[Property]:
365        """All properties of the schema in the order in which they should be shown
366        to the user (alphabetically, with captions and featured properties first)."""
367        return sorted(
368            self.properties.values(),
369            key=lambda p: (
370                p.name not in self.caption,
371                p.name not in self.featured,
372                p.label,
373            ),
374        )
375
376    @property
377    def matchable_schemata(self) -> Set["Schema"]:
378        """Return the set of schemata to which it makes sense to compare with this
379        schema. For example, it makes sense to compare a legal entity with a company,
380        but it does not make sense to compare a car and a person."""
381        if self._matchable_schemata is None:
382            self._matchable_schemata = set()
383            if self.matchable:
384                # This is used by the cross-referencer to determine what
385                # other schemata should be considered for matches. For
386                # example, a Company may be compared to a Legal Entity,
387                # but it makes no sense to compare it to an Aircraft.
388                candidates = set(self.schemata)
389                candidates.update(self.descendants)
390                for schema in candidates:
391                    if schema.matchable:
392                        self._matchable_schemata.add(schema)
393        return self._matchable_schemata
394
395    def can_match(self, other: "Schema") -> bool:
396        """Check if an schema can match with another schema."""
397        return other in self.matchable_schemata
398
399    @lru_cache(maxsize=None)
400    def is_a(self, other: Union[str, "Schema"]) -> bool:
401        """Check if the schema or one of its parents is the same as the given
402        candidate ``other``."""
403        if not isinstance(other, str):
404            other = other.name
405        return other in self.names
406
407    def get(self, name: Optional[str]) -> Optional[Property]:
408        """Retrieve a property defined for this schema by its name."""
409        if name is None:
410            return None
411        return self.properties.get(name)
412
413    def validate(self, data: Any) -> Optional[str]:
414        """Validate a dictionary against the given schema.
415        This will also drop keys which are not valid as properties.
416        """
417        errors = {}
418        properties = cast(Dict[str, Any], ensure_dict(data.get("properties")))
419        for name, prop in self.properties.items():
420            values = ensure_list(properties.get(name, []))
421            error = prop.validate(values)
422            if error is None and not len(values):
423                if prop.name in self.required:
424                    error = gettext("Required")
425            if error is not None:
426                errors[name] = error
427        if len(errors):
428            msg = gettext("Entity validation failed")
429            raise InvalidData(msg, errors={"properties": errors})
430        return None
431
432    def to_dict(self) -> SchemaToDict:
433        """Return schema metadata, including all properties, in a serializable form."""
434        data: SchemaToDict = {
435            "label": self.label,
436            "plural": self.plural,
437            "schemata": list(sorted(self.names)),
438            "extends": list(sorted([e.name for e in self.extends])),
439        }
440        if self.edge_source and self.edge_target and self.edge_label:
441            data["edge"] = {
442                "source": self.edge_source,
443                "target": self.edge_target,
444                "caption": self.edge_caption,
445                "label": self.edge_label,
446                "directed": self.edge_directed,
447            }
448        if len(self.temporal_start) or len(self.temporal_end):
449            data["temporalExtent"] = {
450                "start": self.temporal_start,
451                "end": self.temporal_end,
452            }
453        if len(self.featured):
454            data["featured"] = self.featured
455        if len(self.required):
456            data["required"] = self.required
457        if len(self.caption):
458            data["caption"] = self.caption
459        if self.description:
460            data["description"] = self.description
461        if self.abstract:
462            data["abstract"] = True
463        if self.hidden:
464            data["hidden"] = True
465        if self.generated:
466            data["generated"] = True
467        if self.matchable:
468            data["matchable"] = True
469        if self.deprecated:
470            data["deprecated"] = True
471        properties: Dict[str, PropertyToDict] = {}
472        for name, prop in self.properties.items():
473            if prop.schema == self:
474                properties[name] = prop.to_dict()
475        data["properties"] = properties
476        return data
477
478    def __eq__(self, other: Any) -> bool:
479        """Compare two schemata (via hash)."""
480        try:
481            return self._hash == hash(other)
482        except AttributeError:
483            return False
484
485    def __lt__(self, other: Any) -> bool:
486        return self.name.__lt__(other.name)
487
488    def __hash__(self) -> int:
489        try:
490            return self._hash
491        except AttributeError:
492            return super().__hash__()
493
494    def __repr__(self) -> str:
495        return "<Schema(%r)>" % self.name

A type definition for a class of entities that have certain properties.

Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple parent schemata from which it inherits all of their properties. A schema can also have descendant child schemata, which, in turn, add further properties. Schemata are usually accessed via the model, which holds all available definitions.

Schema( model: followthemoney.model.Model, name: str, data: SchemaSpec)
120    def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None:
121        #: Machine-readable name of the schema, used for identification.
122        self.name = name
123        self.model = model
124        self._label = data.get("label", name)
125        self._plural = data.get("plural", self.label)
126        self._description = data.get("description")
127        self._hash = hash("<Schema(%r)>" % name)
128
129        #: RDF identifier for this schema when it is transformed to a triple term.
130        self.uri = URIRef(cast(str, data.get("rdf", NS[name])))
131
132        #: Do not store or emit entities of this type, it is used only for
133        #: inheritance.
134        self.abstract = as_bool(data.get("abstract"), False)
135
136        #: This schema is deprecated and should not be used.
137        self.deprecated = as_bool(data.get("deprecated", False))
138
139        #: Hide this schema in listings.
140        self.hidden = as_bool(data.get("hidden"), False)
141        self.hidden = self.hidden and not self.abstract
142
143        #: Entities with this type are generated by the system - for example, via
144        #: `ingest-file`. The user should not be offered an option to create them
145        #: in the interface.
146        self.generated = as_bool(data.get("generated"), False)
147
148        #: Try to perform fuzzy matching. Fuzzy similarity search does not
149        #: make sense for entities which have a lot of similar names, such
150        #: as land plots, assets etc.
151        self.matchable = as_bool(data.get("matchable"), True)
152
153        #: Mark a set of properties as important, i.e. they should be shown
154        #: first, or in an abridged view of the entity. In Aleph, these properties
155        #: are included in tabular entity listings.
156        self.featured = ensure_list(data.get("featured", []))
157
158        #: Mark a set of properties as required. This is applied only when
159        #: an entity is created by the user - bulk created entities will
160        #: slip through even if it is technically invalid.
161        self.required = ensure_list(data.get("required", []))
162
163        #: Mark a set of properties to be used for the entity's caption.
164        #: They will be checked in order and the first existent value will
165        #: be used.
166        self.caption = ensure_list(data.get("caption", []))
167
168        # A transform of the entity into an edge for its representation in
169        # the context of a property graph representation like Neo4J/Gephi.
170        edge = data.get("edge", {})
171        self.edge_source = edge.get("source")
172        self.edge_target = edge.get("target")
173
174        #: Flag to indicate if this schema should be represented by an edge (rather than
175        #: a node) when the data is converted into a property graph.
176        self.edge: bool = self.edge_source is not None and self.edge_target is not None
177        self.edge_caption = ensure_list(edge.get("caption", []))
178        self._edge_label = edge.get("label", self._label)
179
180        #: Flag to indicate if the edge should be presented as directed to the user,
181        #: e.g. by showing an error at the target end of the edge.
182        self.edge_directed = as_bool(edge.get("directed", True))
183
184        #: Specify which properties should be used to represent this schema in a
185        #: timeline.
186        temporal_extent = data.get("temporalExtent", {})
187        self._temporal_start = ensure_list(temporal_extent.get("start", []))
188        self._temporal_end = ensure_list(temporal_extent.get("end", []))
189
190        #: Direct parent schemata of this schema.
191        self._extends = ensure_list(data.get("extends", []))
192        self.extends: Set["Schema"] = set()
193
194        #: All parents of this schema (including indirect parents and the schema
195        #: itself).
196        self.schemata = set([self])
197
198        #: All names of :attr:`~schemata`.
199        self.names = set([self.name])
200
201        #: Inverse of :attr:`~schemata`, all derived child types of this schema
202        #: and their children.
203        self.descendants: Set["Schema"] = set()
204        self._matchable_schemata: Optional[Set["Schema"]] = None
205
206        #: The full list of properties defined for the entity, including those
207        #: inherited from parent schemata.
208        self.properties: Dict[str, Property] = {}
209        for name, prop in data.get("properties", {}).items():
210            self.properties[name] = Property(self, name, prop)
name
model
uri
abstract
deprecated
hidden
generated
matchable
featured
required
caption
edge_source
edge_target
edge: bool
edge_caption
edge_directed
extends: Set[Schema]
schemata
names
descendants: Set[Schema]
properties: Dict[str, followthemoney.property.Property]
def generate(self, model: followthemoney.model.Model) -> None:
212    def generate(self, model: "Model") -> None:
213        """While loading the schema, this function will validate and
214        load the hierarchy, properties, and flags of the definition."""
215        temporal_start: Optional[List[str]] = None
216        temporal_end: Optional[List[str]] = None
217        for extends in self._extends:
218            parent = model.get(extends)
219            if parent is None:
220                raise InvalidData("Invalid extends: %r" % extends)
221            parent.generate(model)
222
223            for name, prop in parent.properties.items():
224                if name not in self.properties:
225                    self.properties[name] = prop
226
227            self.extends.add(parent)
228            for ancestor in parent.schemata:
229                self.schemata.add(ancestor)
230                self.names.add(ancestor.name)
231                ancestor.descendants.add(self)
232
233            if len(self._temporal_start) == 0 and parent.temporal_start:
234                if (
235                    temporal_start is not None
236                    and temporal_start != parent.temporal_start
237                ):
238                    raise InvalidModel(
239                        "Conflicting temporal start properties: %s" % self.name
240                    )
241                temporal_start = parent.temporal_start
242
243            if len(self._temporal_end) == 0 and parent.temporal_end:
244                if temporal_end is not None and temporal_end != parent.temporal_end:
245                    raise InvalidModel(
246                        "Conflicting temporal start properties: %s" % self.name
247                    )
248                temporal_end = parent.temporal_end
249
250        for prop in list(self.properties.values()):
251            prop.generate(model)
252
253        for featured in self.featured:
254            if self.get(featured) is None:
255                raise InvalidModel("Missing featured property: %s" % featured)
256
257        for caption in self.caption:
258            prop_ = self.get(caption)
259            if prop_ is None:
260                raise InvalidModel("Missing caption property: %s" % caption)
261            if prop_.type == registry.entity:
262                raise InvalidModel("Caption cannot be entity: %s" % caption)
263
264        for required in self.required:
265            if self.get(required) is None:
266                raise InvalidModel("Missing required property: %s" % required)
267
268        if self.edge:
269            if self.source_prop is None:
270                msg = "Missing edge source: %s" % self.edge_source
271                raise InvalidModel(msg)
272
273            if self.target_prop is None:
274                msg = "Missing edge target: %s" % self.edge_target
275                raise InvalidModel(msg)

While loading the schema, this function will validate and load the hierarchy, properties, and flags of the definition.

label: str
299    @property
300    def label(self) -> str:
301        """User-facing name of the schema."""
302        return gettext(self._label)

User-facing name of the schema.

plural: str
304    @property
305    def plural(self) -> str:
306        """Name of the schema to be used in plural constructions."""
307        return gettext(self._plural)

Name of the schema to be used in plural constructions.

description: Optional[str]
309    @property
310    def description(self) -> Optional[str]:
311        """A longer description of the semantics of the schema."""
312        return gettext(self._description)

A longer description of the semantics of the schema.

edge_label: Optional[str]
314    @property
315    def edge_label(self) -> Optional[str]:
316        """Description label for edges derived from entities of this schema."""
317        return gettext(self._edge_label)

Description label for edges derived from entities of this schema.

source_prop: Optional[followthemoney.property.Property]
319    @property
320    def source_prop(self) -> Optional[Property]:
321        """The entity property to be used as an edge source."""
322        return self.get(self.edge_source)

The entity property to be used as an edge source.

target_prop: Optional[followthemoney.property.Property]
324    @property
325    def target_prop(self) -> Optional[Property]:
326        """The entity property to be used as an edge target."""
327        return self.get(self.edge_target)

The entity property to be used as an edge target.

temporal_start: List[str]
329    @property
330    def temporal_start(self) -> List[str]:
331        """The entity properties to be used as the start when representing the entity
332        in a timeline."""
333        if not len(self._temporal_start):
334            for parent in self.extends:
335                if len(parent.temporal_start):
336                    return parent.temporal_start
337        return self._temporal_start

The entity properties to be used as the start when representing the entity in a timeline.

temporal_end: List[str]
339    @property
340    def temporal_end(self) -> List[str]:
341        """The entity properties to be used as the end when representing the entity
342        in a timeline."""
343        if not len(self._temporal_end):
344            for parent in self.extends:
345                if len(parent.temporal_end):
346                    return parent.temporal_end
347        return self._temporal_end

The entity properties to be used as the end when representing the entity in a timeline.

temporal_start_props: List[followthemoney.property.Property]
349    @property
350    def temporal_start_props(self) -> List[Property]:
351        """The entity properties to be used as the start when representing the entity
352        in a timeline."""
353        props = [self.get(prop_name) for prop_name in self.temporal_start]
354        return [prop for prop in props if prop is not None]

The entity properties to be used as the start when representing the entity in a timeline.

temporal_end_props: List[followthemoney.property.Property]
356    @property
357    def temporal_end_props(self) -> List[Property]:
358        """The entity properties to be used as the end when representing the entity
359        in a timeline."""
360        props = [self.get(prop_name) for prop_name in self.temporal_end]
361        return [prop for prop in props if prop is not None]

The entity properties to be used as the end when representing the entity in a timeline.

sorted_properties: List[followthemoney.property.Property]
363    @property
364    def sorted_properties(self) -> List[Property]:
365        """All properties of the schema in the order in which they should be shown
366        to the user (alphabetically, with captions and featured properties first)."""
367        return sorted(
368            self.properties.values(),
369            key=lambda p: (
370                p.name not in self.caption,
371                p.name not in self.featured,
372                p.label,
373            ),
374        )

All properties of the schema in the order in which they should be shown to the user (alphabetically, with captions and featured properties first).

matchable_schemata: Set[Schema]
376    @property
377    def matchable_schemata(self) -> Set["Schema"]:
378        """Return the set of schemata to which it makes sense to compare with this
379        schema. For example, it makes sense to compare a legal entity with a company,
380        but it does not make sense to compare a car and a person."""
381        if self._matchable_schemata is None:
382            self._matchable_schemata = set()
383            if self.matchable:
384                # This is used by the cross-referencer to determine what
385                # other schemata should be considered for matches. For
386                # example, a Company may be compared to a Legal Entity,
387                # but it makes no sense to compare it to an Aircraft.
388                candidates = set(self.schemata)
389                candidates.update(self.descendants)
390                for schema in candidates:
391                    if schema.matchable:
392                        self._matchable_schemata.add(schema)
393        return self._matchable_schemata

Return the set of schemata to which it makes sense to compare with this schema. For example, it makes sense to compare a legal entity with a company, but it does not make sense to compare a car and a person.

def can_match(self, other: Schema) -> bool:
395    def can_match(self, other: "Schema") -> bool:
396        """Check if an schema can match with another schema."""
397        return other in self.matchable_schemata

Check if an schema can match with another schema.

@lru_cache(maxsize=None)
def is_a(self, other: Union[str, Schema]) -> bool:
399    @lru_cache(maxsize=None)
400    def is_a(self, other: Union[str, "Schema"]) -> bool:
401        """Check if the schema or one of its parents is the same as the given
402        candidate ``other``."""
403        if not isinstance(other, str):
404            other = other.name
405        return other in self.names

Check if the schema or one of its parents is the same as the given candidate other.

def get(self, name: Optional[str]) -> Optional[followthemoney.property.Property]:
407    def get(self, name: Optional[str]) -> Optional[Property]:
408        """Retrieve a property defined for this schema by its name."""
409        if name is None:
410            return None
411        return self.properties.get(name)

Retrieve a property defined for this schema by its name.

def validate(self, data: Any) -> Optional[str]:
413    def validate(self, data: Any) -> Optional[str]:
414        """Validate a dictionary against the given schema.
415        This will also drop keys which are not valid as properties.
416        """
417        errors = {}
418        properties = cast(Dict[str, Any], ensure_dict(data.get("properties")))
419        for name, prop in self.properties.items():
420            values = ensure_list(properties.get(name, []))
421            error = prop.validate(values)
422            if error is None and not len(values):
423                if prop.name in self.required:
424                    error = gettext("Required")
425            if error is not None:
426                errors[name] = error
427        if len(errors):
428            msg = gettext("Entity validation failed")
429            raise InvalidData(msg, errors={"properties": errors})
430        return None

Validate a dictionary against the given schema. This will also drop keys which are not valid as properties.

def to_dict(self) -> SchemaToDict:
432    def to_dict(self) -> SchemaToDict:
433        """Return schema metadata, including all properties, in a serializable form."""
434        data: SchemaToDict = {
435            "label": self.label,
436            "plural": self.plural,
437            "schemata": list(sorted(self.names)),
438            "extends": list(sorted([e.name for e in self.extends])),
439        }
440        if self.edge_source and self.edge_target and self.edge_label:
441            data["edge"] = {
442                "source": self.edge_source,
443                "target": self.edge_target,
444                "caption": self.edge_caption,
445                "label": self.edge_label,
446                "directed": self.edge_directed,
447            }
448        if len(self.temporal_start) or len(self.temporal_end):
449            data["temporalExtent"] = {
450                "start": self.temporal_start,
451                "end": self.temporal_end,
452            }
453        if len(self.featured):
454            data["featured"] = self.featured
455        if len(self.required):
456            data["required"] = self.required
457        if len(self.caption):
458            data["caption"] = self.caption
459        if self.description:
460            data["description"] = self.description
461        if self.abstract:
462            data["abstract"] = True
463        if self.hidden:
464            data["hidden"] = True
465        if self.generated:
466            data["generated"] = True
467        if self.matchable:
468            data["matchable"] = True
469        if self.deprecated:
470            data["deprecated"] = True
471        properties: Dict[str, PropertyToDict] = {}
472        for name, prop in self.properties.items():
473            if prop.schema == self:
474                properties[name] = prop.to_dict()
475        data["properties"] = properties
476        return data

Return schema metadata, including all properties, in a serializable form.