followthemoney.schema

  1from typing import TYPE_CHECKING, Any, cast
  2from typing import Dict, List, Optional, Set, TypedDict, Union
  3from banal import ensure_list, ensure_dict, as_bool
  4from functools import lru_cache
  5
  6from followthemoney.property import Property, PropertySpec, PropertyToDict, ReverseSpec
  7from followthemoney.types import registry
  8from followthemoney.exc import InvalidData, InvalidModel
  9from followthemoney.util import gettext
 10
 11if TYPE_CHECKING:
 12    from followthemoney.model import Model
 13
 14
 15class EdgeSpec(TypedDict, total=False):
 16    source: str
 17    target: str
 18    caption: List[str]
 19    label: str
 20    directed: bool
 21
 22
 23class TemporalExtentSpec(TypedDict, total=False):
 24    start: List[str]
 25    end: List[str]
 26
 27
 28class SchemaSpec(TypedDict, total=False):
 29    label: str
 30    plural: str
 31    schemata: List[str]
 32    extends: List[str]
 33    properties: Dict[str, PropertySpec]
 34    featured: List[str]
 35    required: List[str]
 36    caption: List[str]
 37    edge: EdgeSpec
 38    temporalExtent: TemporalExtentSpec
 39    description: Optional[str]
 40    abstract: bool
 41    hidden: bool
 42    generated: bool
 43    matchable: bool
 44    deprecated: Optional[bool]
 45
 46
 47class SchemaToDict(TypedDict, total=False):
 48    label: str
 49    plural: str
 50    schemata: List[str]
 51    extends: List[str]
 52    properties: Dict[str, PropertyToDict]
 53    featured: List[str]
 54    required: List[str]
 55    caption: List[str]
 56    edge: EdgeSpec
 57    temporalExtent: TemporalExtentSpec
 58    description: Optional[str]
 59    abstract: bool
 60    hidden: bool
 61    generated: bool
 62    matchable: bool
 63    deprecated: bool
 64
 65
 66class Schema:
 67    """A type definition for a class of entities that have certain properties.
 68
 69    Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple
 70    parent schemata from which it inherits all of their properties. A schema can also
 71    have descendant child schemata, which, in turn, add further properties. Schemata
 72    are usually accessed via the model, which holds all available definitions.
 73    """
 74
 75    __slots__ = (
 76        "model",
 77        "name",
 78        "_label",
 79        "_plural",
 80        "_description",
 81        "_hash",
 82        "abstract",
 83        "hidden",
 84        "generated",
 85        "matchable",
 86        "featured",
 87        "required",
 88        "deprecated",
 89        "caption",
 90        "edge",
 91        "_edge_label",
 92        "edge_directed",
 93        "edge_source",
 94        "edge_target",
 95        "edge_caption",
 96        "_temporal_start",
 97        "_temporal_end",
 98        "_extends",
 99        "extends",
100        "schemata",
101        "names",
102        "descendants",
103        "properties",
104        "_matchable_schemata",
105    )
106
107    def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None:
108        #: Machine-readable name of the schema, used for identification.
109        self.name = name
110        self.model = model
111        self._label = data.get("label", name)
112        self._plural = data.get("plural", self.label)
113        self._description = data.get("description")
114        self._hash = hash("<Schema(%r)>" % name)
115
116        #: Do not store or emit entities of this type, it is used only for
117        #: inheritance.
118        self.abstract = as_bool(data.get("abstract"), False)
119
120        #: This schema is deprecated and should not be used.
121        self.deprecated = as_bool(data.get("deprecated", False))
122
123        #: Hide this schema in listings.
124        self.hidden = as_bool(data.get("hidden"), False)
125        self.hidden = self.hidden and not self.abstract
126
127        #: Entities with this type are generated by the system - for example, via
128        #: `ingest-file`. The user should not be offered an option to create them
129        #: in the interface.
130        self.generated = as_bool(data.get("generated"), False)
131
132        #: Try to perform fuzzy matching. Fuzzy similarity search does not
133        #: make sense for entities which have a lot of similar names, such
134        #: as land plots, assets etc.
135        self.matchable = as_bool(data.get("matchable"), True)
136
137        #: Mark a set of properties as important, i.e. they should be shown
138        #: first, or in an abridged view of the entity. In Aleph, these properties
139        #: are included in tabular entity listings.
140        self.featured = ensure_list(data.get("featured", []))
141
142        #: Mark a set of properties as required. This is applied only when
143        #: an entity is created by the user - bulk created entities will
144        #: slip through even if it is technically invalid.
145        self.required = ensure_list(data.get("required", []))
146
147        #: Mark a set of properties to be used for the entity's caption.
148        #: They will be checked in order and the first existent value will
149        #: be used.
150        self.caption = ensure_list(data.get("caption", []))
151
152        # A transform of the entity into an edge for its representation in
153        # the context of a property graph representation like Neo4J/Gephi.
154        edge = data.get("edge", {})
155        self.edge_source = edge.get("source")
156        self.edge_target = edge.get("target")
157
158        #: Flag to indicate if this schema should be represented by an edge (rather than
159        #: a node) when the data is converted into a property graph.
160        self.edge: bool = self.edge_source is not None and self.edge_target is not None
161        self.edge_caption = ensure_list(edge.get("caption", []))
162        self._edge_label = edge.get("label", self._label)
163
164        #: Flag to indicate if the edge should be presented as directed to the user,
165        #: e.g. by showing an error at the target end of the edge.
166        self.edge_directed = as_bool(edge.get("directed", True))
167
168        #: Specify which properties should be used to represent this schema in a
169        #: timeline.
170        temporal_extent = data.get("temporalExtent", {})
171        self._temporal_start = ensure_list(temporal_extent.get("start", []))
172        self._temporal_end = ensure_list(temporal_extent.get("end", []))
173
174        #: Direct parent schemata of this schema.
175        self._extends = ensure_list(data.get("extends", []))
176        self.extends: Set["Schema"] = set()
177
178        #: All parents of this schema (including indirect parents and the schema
179        #: itself).
180        self.schemata = set([self])
181
182        #: All names of :attr:`~schemata`.
183        self.names = set([self.name])
184
185        #: Inverse of :attr:`~schemata`, all derived child types of this schema
186        #: and their children.
187        self.descendants: Set["Schema"] = set()
188        self._matchable_schemata: Optional[Set["Schema"]] = None
189
190        #: The full list of properties defined for the entity, including those
191        #: inherited from parent schemata.
192        self.properties: Dict[str, Property] = {}
193        for name, prop in data.get("properties", {}).items():
194            self.properties[name] = Property(self, name, prop)
195
196    def generate(self, model: "Model") -> None:
197        """While loading the schema, this function will validate and
198        load the hierarchy, properties, and flags of the definition."""
199        temporal_start: Optional[List[str]] = None
200        temporal_end: Optional[List[str]] = None
201        for extends in self._extends:
202            parent = model.get(extends)
203            if parent is None:
204                raise InvalidData("Invalid extends: %r" % extends)
205            parent.generate(model)
206
207            for name, prop in parent.properties.items():
208                if name not in self.properties:
209                    self.properties[name] = prop
210
211            self.extends.add(parent)
212            for ancestor in parent.schemata:
213                self.schemata.add(ancestor)
214                self.names.add(ancestor.name)
215                ancestor.descendants.add(self)
216
217            if len(self._temporal_start) == 0 and parent.temporal_start:
218                if (
219                    temporal_start is not None
220                    and temporal_start != parent.temporal_start
221                ):
222                    raise InvalidModel(
223                        "Conflicting temporal start properties: %s" % self.name
224                    )
225                temporal_start = parent.temporal_start
226
227            if len(self._temporal_end) == 0 and parent.temporal_end:
228                if temporal_end is not None and temporal_end != parent.temporal_end:
229                    raise InvalidModel(
230                        "Conflicting temporal start properties: %s" % self.name
231                    )
232                temporal_end = parent.temporal_end
233
234        for prop in list(self.properties.values()):
235            prop.generate(model)
236
237        for featured in self.featured:
238            if self.get(featured) is None:
239                raise InvalidModel("Missing featured property: %s" % featured)
240
241        for caption in self.caption:
242            prop_ = self.get(caption)
243            if prop_ is None:
244                raise InvalidModel("Missing caption property: %s" % caption)
245            if prop_.type == registry.entity:
246                raise InvalidModel("Caption cannot be entity: %s" % caption)
247
248        for required in self.required:
249            if self.get(required) is None:
250                raise InvalidModel("Missing required property: %s" % required)
251
252        if self.edge:
253            if self.source_prop is None:
254                msg = "Missing edge source: %s" % self.edge_source
255                raise InvalidModel(msg)
256
257            if self.target_prop is None:
258                msg = "Missing edge target: %s" % self.edge_target
259                raise InvalidModel(msg)
260
261    def _add_reverse(
262        self, model: "Model", data: ReverseSpec, other: Property
263    ) -> Property:
264        name = data.get("name")
265        if name is None:
266            raise InvalidModel("Unnamed reverse: %s" % other)
267
268        prop = self.get(name)
269        if prop is None:
270            spec: PropertySpec = {
271                "label": data.get("label"),
272                "type": registry.entity.name,
273                "reverse": {"name": other.name},
274                "range": other.schema.name,
275                "hidden": data.get("hidden", other.hidden),
276            }
277            prop = Property(self, name, spec)
278            prop.stub = True
279            prop.generate(model)
280            self.properties[name] = prop
281        return prop
282
283    @property
284    def label(self) -> str:
285        """User-facing name of the schema."""
286        return gettext(self._label)
287
288    @property
289    def plural(self) -> str:
290        """Name of the schema to be used in plural constructions."""
291        return gettext(self._plural)
292
293    @property
294    def description(self) -> Optional[str]:
295        """A longer description of the semantics of the schema."""
296        return gettext(self._description)
297
298    @property
299    def edge_label(self) -> Optional[str]:
300        """Description label for edges derived from entities of this schema."""
301        return gettext(self._edge_label)
302
303    @property
304    def source_prop(self) -> Optional[Property]:
305        """The entity property to be used as an edge source when the schema is
306        considered as a relationship."""
307        if self.edge_source is None:
308            return None
309        return self.get(self.edge_source)
310
311    @property
312    def target_prop(self) -> Optional[Property]:
313        """The entity property to be used as an edge target when the schema is transformed
314        into a relationship."""
315        if self.edge_target is None:
316            return None
317        return self.get(self.edge_target)
318
319    @property
320    def temporal_start(self) -> List[str]:
321        """The entity properties to be used as the start when representing the entity
322        in a timeline."""
323        if not len(self._temporal_start):
324            for parent in self.extends:
325                if len(parent.temporal_start):
326                    return parent.temporal_start
327        return self._temporal_start
328
329    @property
330    def temporal_end(self) -> List[str]:
331        """The entity properties to be used as the end when representing the entity
332        in a timeline."""
333        if not len(self._temporal_end):
334            for parent in self.extends:
335                if len(parent.temporal_end):
336                    return parent.temporal_end
337        return self._temporal_end
338
339    @property
340    def temporal_start_props(self) -> List[Property]:
341        """The entity properties to be used as the start when representing the entity
342        in a timeline."""
343        props = [self.get(prop_name) for prop_name in self.temporal_start]
344        return [prop for prop in props if prop is not None]
345
346    @property
347    def temporal_end_props(self) -> List[Property]:
348        """The entity properties to be used as the end when representing the entity
349        in a timeline."""
350        props = [self.get(prop_name) for prop_name in self.temporal_end]
351        return [prop for prop in props if prop is not None]
352
353    @property
354    def sorted_properties(self) -> List[Property]:
355        """All properties of the schema in the order in which they should be shown
356        to the user (alphabetically, with captions and featured properties first)."""
357        return sorted(
358            self.properties.values(),
359            key=lambda p: (
360                p.name not in self.caption,
361                p.name not in self.featured,
362                p.label,
363            ),
364        )
365
366    @property
367    def matchable_schemata(self) -> Set["Schema"]:
368        """Return the set of schemata to which it makes sense to compare with this
369        schema. For example, it makes sense to compare a legal entity with a company,
370        but it does not make sense to compare a car and a person."""
371        if self._matchable_schemata is None:
372            self._matchable_schemata = set()
373            if self.matchable:
374                # This is used by the cross-referencer to determine what
375                # other schemata should be considered for matches. For
376                # example, a Company may be compared to a Legal Entity,
377                # but it makes no sense to compare it to an Aircraft.
378                candidates = set(self.schemata)
379                candidates.update(self.descendants)
380                for schema in candidates:
381                    if schema.matchable:
382                        self._matchable_schemata.add(schema)
383        return self._matchable_schemata
384
385    @lru_cache(maxsize=None)
386    def can_match(self, other: "Schema") -> bool:
387        """Check if an schema can match with another schema."""
388        return other in self.matchable_schemata
389
390    @lru_cache(maxsize=None)
391    def is_a(self, other: Union[str, "Schema"]) -> bool:
392        """Check if the schema or one of its parents is the same as the given
393        candidate ``other``."""
394        if not isinstance(other, str):
395            other = other.name
396        return other in self.names
397
398    def get(self, name: str) -> Optional[Property]:
399        """Retrieve a property defined for this schema by its name."""
400        if name is None:
401            return None
402        return self.properties.get(name)
403
404    def validate(self, data: Dict[str, Any]) -> Optional[str]:
405        """Validate a dictionary against the given schema.
406        This will also drop keys which are not valid as properties.
407        """
408        errors = {}
409        properties = cast(Dict[str, Any], ensure_dict(data.get("properties")))
410        for name, prop in self.properties.items():
411            values = ensure_list(properties.get(name, []))
412            error = prop.validate(values)
413            if error is None and not len(values):
414                if prop.name in self.required:
415                    error = gettext("Required")
416            if error is not None:
417                errors[name] = error
418        if len(errors):
419            msg = gettext("Entity validation failed")
420            raise InvalidData(msg, errors={"properties": errors})
421        return None
422
423    def to_dict(self) -> SchemaToDict:
424        """Return schema metadata, including all properties, in a serializable form."""
425        data: SchemaToDict = {
426            "label": self.label,
427            "plural": self.plural,
428            "schemata": list(sorted(self.names)),
429            "extends": list(sorted([e.name for e in self.extends])),
430        }
431        if self.edge_source and self.edge_target and self.edge_label:
432            data["edge"] = {
433                "source": self.edge_source,
434                "target": self.edge_target,
435                "caption": self.edge_caption,
436                "label": self.edge_label,
437                "directed": self.edge_directed,
438            }
439        if len(self.temporal_start) or len(self.temporal_end):
440            data["temporalExtent"] = {
441                "start": self.temporal_start,
442                "end": self.temporal_end,
443            }
444        if len(self.featured):
445            data["featured"] = self.featured
446        if len(self.required):
447            data["required"] = self.required
448        if len(self.caption):
449            data["caption"] = self.caption
450        if self.description:
451            data["description"] = self.description
452        if self.abstract:
453            data["abstract"] = True
454        if self.hidden:
455            data["hidden"] = True
456        if self.generated:
457            data["generated"] = True
458        if self.matchable:
459            data["matchable"] = True
460        if self.deprecated:
461            data["deprecated"] = True
462        properties: Dict[str, PropertyToDict] = {}
463        for name, prop in self.properties.items():
464            if prop.schema == self:
465                properties[name] = prop.to_dict()
466        data["properties"] = properties
467        return data
468
469    def __eq__(self, other: Any) -> bool:
470        """Compare two schemata (via hash)."""
471        try:
472            return self._hash == other._hash  # type: ignore
473        except AttributeError:
474            return False
475
476    def __lt__(self, other: Any) -> bool:
477        return self.name.__lt__(other.name)
478
479    def __hash__(self) -> int:
480        return self._hash
481
482    def __repr__(self) -> str:
483        return "<Schema(%r)>" % self.name
class EdgeSpec(typing.TypedDict):
16class EdgeSpec(TypedDict, total=False):
17    source: str
18    target: str
19    caption: List[str]
20    label: str
21    directed: bool
source: str
target: str
caption: List[str]
label: str
directed: bool
class TemporalExtentSpec(typing.TypedDict):
24class TemporalExtentSpec(TypedDict, total=False):
25    start: List[str]
26    end: List[str]
start: List[str]
end: List[str]
class SchemaSpec(typing.TypedDict):
29class SchemaSpec(TypedDict, total=False):
30    label: str
31    plural: str
32    schemata: List[str]
33    extends: List[str]
34    properties: Dict[str, PropertySpec]
35    featured: List[str]
36    required: List[str]
37    caption: List[str]
38    edge: EdgeSpec
39    temporalExtent: TemporalExtentSpec
40    description: Optional[str]
41    abstract: bool
42    hidden: bool
43    generated: bool
44    matchable: bool
45    deprecated: Optional[bool]
label: str
plural: str
schemata: List[str]
extends: List[str]
properties: Dict[str, followthemoney.property.PropertySpec]
featured: List[str]
required: List[str]
caption: List[str]
edge: EdgeSpec
temporalExtent: TemporalExtentSpec
description: Optional[str]
abstract: bool
hidden: bool
generated: bool
matchable: bool
deprecated: Optional[bool]
class SchemaToDict(typing.TypedDict):
48class SchemaToDict(TypedDict, total=False):
49    label: str
50    plural: str
51    schemata: List[str]
52    extends: List[str]
53    properties: Dict[str, PropertyToDict]
54    featured: List[str]
55    required: List[str]
56    caption: List[str]
57    edge: EdgeSpec
58    temporalExtent: TemporalExtentSpec
59    description: Optional[str]
60    abstract: bool
61    hidden: bool
62    generated: bool
63    matchable: bool
64    deprecated: bool
label: str
plural: str
schemata: List[str]
extends: List[str]
featured: List[str]
required: List[str]
caption: List[str]
edge: EdgeSpec
temporalExtent: TemporalExtentSpec
description: Optional[str]
abstract: bool
hidden: bool
generated: bool
matchable: bool
deprecated: bool
class Schema:
 67class Schema:
 68    """A type definition for a class of entities that have certain properties.
 69
 70    Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple
 71    parent schemata from which it inherits all of their properties. A schema can also
 72    have descendant child schemata, which, in turn, add further properties. Schemata
 73    are usually accessed via the model, which holds all available definitions.
 74    """
 75
 76    __slots__ = (
 77        "model",
 78        "name",
 79        "_label",
 80        "_plural",
 81        "_description",
 82        "_hash",
 83        "abstract",
 84        "hidden",
 85        "generated",
 86        "matchable",
 87        "featured",
 88        "required",
 89        "deprecated",
 90        "caption",
 91        "edge",
 92        "_edge_label",
 93        "edge_directed",
 94        "edge_source",
 95        "edge_target",
 96        "edge_caption",
 97        "_temporal_start",
 98        "_temporal_end",
 99        "_extends",
100        "extends",
101        "schemata",
102        "names",
103        "descendants",
104        "properties",
105        "_matchable_schemata",
106    )
107
108    def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None:
109        #: Machine-readable name of the schema, used for identification.
110        self.name = name
111        self.model = model
112        self._label = data.get("label", name)
113        self._plural = data.get("plural", self.label)
114        self._description = data.get("description")
115        self._hash = hash("<Schema(%r)>" % name)
116
117        #: Do not store or emit entities of this type, it is used only for
118        #: inheritance.
119        self.abstract = as_bool(data.get("abstract"), False)
120
121        #: This schema is deprecated and should not be used.
122        self.deprecated = as_bool(data.get("deprecated", False))
123
124        #: Hide this schema in listings.
125        self.hidden = as_bool(data.get("hidden"), False)
126        self.hidden = self.hidden and not self.abstract
127
128        #: Entities with this type are generated by the system - for example, via
129        #: `ingest-file`. The user should not be offered an option to create them
130        #: in the interface.
131        self.generated = as_bool(data.get("generated"), False)
132
133        #: Try to perform fuzzy matching. Fuzzy similarity search does not
134        #: make sense for entities which have a lot of similar names, such
135        #: as land plots, assets etc.
136        self.matchable = as_bool(data.get("matchable"), True)
137
138        #: Mark a set of properties as important, i.e. they should be shown
139        #: first, or in an abridged view of the entity. In Aleph, these properties
140        #: are included in tabular entity listings.
141        self.featured = ensure_list(data.get("featured", []))
142
143        #: Mark a set of properties as required. This is applied only when
144        #: an entity is created by the user - bulk created entities will
145        #: slip through even if it is technically invalid.
146        self.required = ensure_list(data.get("required", []))
147
148        #: Mark a set of properties to be used for the entity's caption.
149        #: They will be checked in order and the first existent value will
150        #: be used.
151        self.caption = ensure_list(data.get("caption", []))
152
153        # A transform of the entity into an edge for its representation in
154        # the context of a property graph representation like Neo4J/Gephi.
155        edge = data.get("edge", {})
156        self.edge_source = edge.get("source")
157        self.edge_target = edge.get("target")
158
159        #: Flag to indicate if this schema should be represented by an edge (rather than
160        #: a node) when the data is converted into a property graph.
161        self.edge: bool = self.edge_source is not None and self.edge_target is not None
162        self.edge_caption = ensure_list(edge.get("caption", []))
163        self._edge_label = edge.get("label", self._label)
164
165        #: Flag to indicate if the edge should be presented as directed to the user,
166        #: e.g. by showing an error at the target end of the edge.
167        self.edge_directed = as_bool(edge.get("directed", True))
168
169        #: Specify which properties should be used to represent this schema in a
170        #: timeline.
171        temporal_extent = data.get("temporalExtent", {})
172        self._temporal_start = ensure_list(temporal_extent.get("start", []))
173        self._temporal_end = ensure_list(temporal_extent.get("end", []))
174
175        #: Direct parent schemata of this schema.
176        self._extends = ensure_list(data.get("extends", []))
177        self.extends: Set["Schema"] = set()
178
179        #: All parents of this schema (including indirect parents and the schema
180        #: itself).
181        self.schemata = set([self])
182
183        #: All names of :attr:`~schemata`.
184        self.names = set([self.name])
185
186        #: Inverse of :attr:`~schemata`, all derived child types of this schema
187        #: and their children.
188        self.descendants: Set["Schema"] = set()
189        self._matchable_schemata: Optional[Set["Schema"]] = None
190
191        #: The full list of properties defined for the entity, including those
192        #: inherited from parent schemata.
193        self.properties: Dict[str, Property] = {}
194        for name, prop in data.get("properties", {}).items():
195            self.properties[name] = Property(self, name, prop)
196
197    def generate(self, model: "Model") -> None:
198        """While loading the schema, this function will validate and
199        load the hierarchy, properties, and flags of the definition."""
200        temporal_start: Optional[List[str]] = None
201        temporal_end: Optional[List[str]] = None
202        for extends in self._extends:
203            parent = model.get(extends)
204            if parent is None:
205                raise InvalidData("Invalid extends: %r" % extends)
206            parent.generate(model)
207
208            for name, prop in parent.properties.items():
209                if name not in self.properties:
210                    self.properties[name] = prop
211
212            self.extends.add(parent)
213            for ancestor in parent.schemata:
214                self.schemata.add(ancestor)
215                self.names.add(ancestor.name)
216                ancestor.descendants.add(self)
217
218            if len(self._temporal_start) == 0 and parent.temporal_start:
219                if (
220                    temporal_start is not None
221                    and temporal_start != parent.temporal_start
222                ):
223                    raise InvalidModel(
224                        "Conflicting temporal start properties: %s" % self.name
225                    )
226                temporal_start = parent.temporal_start
227
228            if len(self._temporal_end) == 0 and parent.temporal_end:
229                if temporal_end is not None and temporal_end != parent.temporal_end:
230                    raise InvalidModel(
231                        "Conflicting temporal start properties: %s" % self.name
232                    )
233                temporal_end = parent.temporal_end
234
235        for prop in list(self.properties.values()):
236            prop.generate(model)
237
238        for featured in self.featured:
239            if self.get(featured) is None:
240                raise InvalidModel("Missing featured property: %s" % featured)
241
242        for caption in self.caption:
243            prop_ = self.get(caption)
244            if prop_ is None:
245                raise InvalidModel("Missing caption property: %s" % caption)
246            if prop_.type == registry.entity:
247                raise InvalidModel("Caption cannot be entity: %s" % caption)
248
249        for required in self.required:
250            if self.get(required) is None:
251                raise InvalidModel("Missing required property: %s" % required)
252
253        if self.edge:
254            if self.source_prop is None:
255                msg = "Missing edge source: %s" % self.edge_source
256                raise InvalidModel(msg)
257
258            if self.target_prop is None:
259                msg = "Missing edge target: %s" % self.edge_target
260                raise InvalidModel(msg)
261
262    def _add_reverse(
263        self, model: "Model", data: ReverseSpec, other: Property
264    ) -> Property:
265        name = data.get("name")
266        if name is None:
267            raise InvalidModel("Unnamed reverse: %s" % other)
268
269        prop = self.get(name)
270        if prop is None:
271            spec: PropertySpec = {
272                "label": data.get("label"),
273                "type": registry.entity.name,
274                "reverse": {"name": other.name},
275                "range": other.schema.name,
276                "hidden": data.get("hidden", other.hidden),
277            }
278            prop = Property(self, name, spec)
279            prop.stub = True
280            prop.generate(model)
281            self.properties[name] = prop
282        return prop
283
284    @property
285    def label(self) -> str:
286        """User-facing name of the schema."""
287        return gettext(self._label)
288
289    @property
290    def plural(self) -> str:
291        """Name of the schema to be used in plural constructions."""
292        return gettext(self._plural)
293
294    @property
295    def description(self) -> Optional[str]:
296        """A longer description of the semantics of the schema."""
297        return gettext(self._description)
298
299    @property
300    def edge_label(self) -> Optional[str]:
301        """Description label for edges derived from entities of this schema."""
302        return gettext(self._edge_label)
303
304    @property
305    def source_prop(self) -> Optional[Property]:
306        """The entity property to be used as an edge source when the schema is
307        considered as a relationship."""
308        if self.edge_source is None:
309            return None
310        return self.get(self.edge_source)
311
312    @property
313    def target_prop(self) -> Optional[Property]:
314        """The entity property to be used as an edge target when the schema is transformed
315        into a relationship."""
316        if self.edge_target is None:
317            return None
318        return self.get(self.edge_target)
319
320    @property
321    def temporal_start(self) -> List[str]:
322        """The entity properties to be used as the start when representing the entity
323        in a timeline."""
324        if not len(self._temporal_start):
325            for parent in self.extends:
326                if len(parent.temporal_start):
327                    return parent.temporal_start
328        return self._temporal_start
329
330    @property
331    def temporal_end(self) -> List[str]:
332        """The entity properties to be used as the end when representing the entity
333        in a timeline."""
334        if not len(self._temporal_end):
335            for parent in self.extends:
336                if len(parent.temporal_end):
337                    return parent.temporal_end
338        return self._temporal_end
339
340    @property
341    def temporal_start_props(self) -> List[Property]:
342        """The entity properties to be used as the start when representing the entity
343        in a timeline."""
344        props = [self.get(prop_name) for prop_name in self.temporal_start]
345        return [prop for prop in props if prop is not None]
346
347    @property
348    def temporal_end_props(self) -> List[Property]:
349        """The entity properties to be used as the end when representing the entity
350        in a timeline."""
351        props = [self.get(prop_name) for prop_name in self.temporal_end]
352        return [prop for prop in props if prop is not None]
353
354    @property
355    def sorted_properties(self) -> List[Property]:
356        """All properties of the schema in the order in which they should be shown
357        to the user (alphabetically, with captions and featured properties first)."""
358        return sorted(
359            self.properties.values(),
360            key=lambda p: (
361                p.name not in self.caption,
362                p.name not in self.featured,
363                p.label,
364            ),
365        )
366
367    @property
368    def matchable_schemata(self) -> Set["Schema"]:
369        """Return the set of schemata to which it makes sense to compare with this
370        schema. For example, it makes sense to compare a legal entity with a company,
371        but it does not make sense to compare a car and a person."""
372        if self._matchable_schemata is None:
373            self._matchable_schemata = set()
374            if self.matchable:
375                # This is used by the cross-referencer to determine what
376                # other schemata should be considered for matches. For
377                # example, a Company may be compared to a Legal Entity,
378                # but it makes no sense to compare it to an Aircraft.
379                candidates = set(self.schemata)
380                candidates.update(self.descendants)
381                for schema in candidates:
382                    if schema.matchable:
383                        self._matchable_schemata.add(schema)
384        return self._matchable_schemata
385
386    @lru_cache(maxsize=None)
387    def can_match(self, other: "Schema") -> bool:
388        """Check if an schema can match with another schema."""
389        return other in self.matchable_schemata
390
391    @lru_cache(maxsize=None)
392    def is_a(self, other: Union[str, "Schema"]) -> bool:
393        """Check if the schema or one of its parents is the same as the given
394        candidate ``other``."""
395        if not isinstance(other, str):
396            other = other.name
397        return other in self.names
398
399    def get(self, name: str) -> Optional[Property]:
400        """Retrieve a property defined for this schema by its name."""
401        if name is None:
402            return None
403        return self.properties.get(name)
404
405    def validate(self, data: Dict[str, Any]) -> Optional[str]:
406        """Validate a dictionary against the given schema.
407        This will also drop keys which are not valid as properties.
408        """
409        errors = {}
410        properties = cast(Dict[str, Any], ensure_dict(data.get("properties")))
411        for name, prop in self.properties.items():
412            values = ensure_list(properties.get(name, []))
413            error = prop.validate(values)
414            if error is None and not len(values):
415                if prop.name in self.required:
416                    error = gettext("Required")
417            if error is not None:
418                errors[name] = error
419        if len(errors):
420            msg = gettext("Entity validation failed")
421            raise InvalidData(msg, errors={"properties": errors})
422        return None
423
424    def to_dict(self) -> SchemaToDict:
425        """Return schema metadata, including all properties, in a serializable form."""
426        data: SchemaToDict = {
427            "label": self.label,
428            "plural": self.plural,
429            "schemata": list(sorted(self.names)),
430            "extends": list(sorted([e.name for e in self.extends])),
431        }
432        if self.edge_source and self.edge_target and self.edge_label:
433            data["edge"] = {
434                "source": self.edge_source,
435                "target": self.edge_target,
436                "caption": self.edge_caption,
437                "label": self.edge_label,
438                "directed": self.edge_directed,
439            }
440        if len(self.temporal_start) or len(self.temporal_end):
441            data["temporalExtent"] = {
442                "start": self.temporal_start,
443                "end": self.temporal_end,
444            }
445        if len(self.featured):
446            data["featured"] = self.featured
447        if len(self.required):
448            data["required"] = self.required
449        if len(self.caption):
450            data["caption"] = self.caption
451        if self.description:
452            data["description"] = self.description
453        if self.abstract:
454            data["abstract"] = True
455        if self.hidden:
456            data["hidden"] = True
457        if self.generated:
458            data["generated"] = True
459        if self.matchable:
460            data["matchable"] = True
461        if self.deprecated:
462            data["deprecated"] = True
463        properties: Dict[str, PropertyToDict] = {}
464        for name, prop in self.properties.items():
465            if prop.schema == self:
466                properties[name] = prop.to_dict()
467        data["properties"] = properties
468        return data
469
470    def __eq__(self, other: Any) -> bool:
471        """Compare two schemata (via hash)."""
472        try:
473            return self._hash == other._hash  # type: ignore
474        except AttributeError:
475            return False
476
477    def __lt__(self, other: Any) -> bool:
478        return self.name.__lt__(other.name)
479
480    def __hash__(self) -> int:
481        return self._hash
482
483    def __repr__(self) -> str:
484        return "<Schema(%r)>" % self.name

A type definition for a class of entities that have certain properties.

Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple parent schemata from which it inherits all of their properties. A schema can also have descendant child schemata, which, in turn, add further properties. Schemata are usually accessed via the model, which holds all available definitions.

Schema( model: followthemoney.model.Model, name: str, data: SchemaSpec)
108    def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None:
109        #: Machine-readable name of the schema, used for identification.
110        self.name = name
111        self.model = model
112        self._label = data.get("label", name)
113        self._plural = data.get("plural", self.label)
114        self._description = data.get("description")
115        self._hash = hash("<Schema(%r)>" % name)
116
117        #: Do not store or emit entities of this type, it is used only for
118        #: inheritance.
119        self.abstract = as_bool(data.get("abstract"), False)
120
121        #: This schema is deprecated and should not be used.
122        self.deprecated = as_bool(data.get("deprecated", False))
123
124        #: Hide this schema in listings.
125        self.hidden = as_bool(data.get("hidden"), False)
126        self.hidden = self.hidden and not self.abstract
127
128        #: Entities with this type are generated by the system - for example, via
129        #: `ingest-file`. The user should not be offered an option to create them
130        #: in the interface.
131        self.generated = as_bool(data.get("generated"), False)
132
133        #: Try to perform fuzzy matching. Fuzzy similarity search does not
134        #: make sense for entities which have a lot of similar names, such
135        #: as land plots, assets etc.
136        self.matchable = as_bool(data.get("matchable"), True)
137
138        #: Mark a set of properties as important, i.e. they should be shown
139        #: first, or in an abridged view of the entity. In Aleph, these properties
140        #: are included in tabular entity listings.
141        self.featured = ensure_list(data.get("featured", []))
142
143        #: Mark a set of properties as required. This is applied only when
144        #: an entity is created by the user - bulk created entities will
145        #: slip through even if it is technically invalid.
146        self.required = ensure_list(data.get("required", []))
147
148        #: Mark a set of properties to be used for the entity's caption.
149        #: They will be checked in order and the first existent value will
150        #: be used.
151        self.caption = ensure_list(data.get("caption", []))
152
153        # A transform of the entity into an edge for its representation in
154        # the context of a property graph representation like Neo4J/Gephi.
155        edge = data.get("edge", {})
156        self.edge_source = edge.get("source")
157        self.edge_target = edge.get("target")
158
159        #: Flag to indicate if this schema should be represented by an edge (rather than
160        #: a node) when the data is converted into a property graph.
161        self.edge: bool = self.edge_source is not None and self.edge_target is not None
162        self.edge_caption = ensure_list(edge.get("caption", []))
163        self._edge_label = edge.get("label", self._label)
164
165        #: Flag to indicate if the edge should be presented as directed to the user,
166        #: e.g. by showing an error at the target end of the edge.
167        self.edge_directed = as_bool(edge.get("directed", True))
168
169        #: Specify which properties should be used to represent this schema in a
170        #: timeline.
171        temporal_extent = data.get("temporalExtent", {})
172        self._temporal_start = ensure_list(temporal_extent.get("start", []))
173        self._temporal_end = ensure_list(temporal_extent.get("end", []))
174
175        #: Direct parent schemata of this schema.
176        self._extends = ensure_list(data.get("extends", []))
177        self.extends: Set["Schema"] = set()
178
179        #: All parents of this schema (including indirect parents and the schema
180        #: itself).
181        self.schemata = set([self])
182
183        #: All names of :attr:`~schemata`.
184        self.names = set([self.name])
185
186        #: Inverse of :attr:`~schemata`, all derived child types of this schema
187        #: and their children.
188        self.descendants: Set["Schema"] = set()
189        self._matchable_schemata: Optional[Set["Schema"]] = None
190
191        #: The full list of properties defined for the entity, including those
192        #: inherited from parent schemata.
193        self.properties: Dict[str, Property] = {}
194        for name, prop in data.get("properties", {}).items():
195            self.properties[name] = Property(self, name, prop)
name
model
abstract
deprecated
hidden
generated
matchable
featured
required
caption
edge_source
edge_target
edge: bool
edge_caption
edge_directed
extends: Set[Schema]
schemata
names
descendants: Set[Schema]
properties: Dict[str, followthemoney.property.Property]
def generate(self, model: followthemoney.model.Model) -> None:
197    def generate(self, model: "Model") -> None:
198        """While loading the schema, this function will validate and
199        load the hierarchy, properties, and flags of the definition."""
200        temporal_start: Optional[List[str]] = None
201        temporal_end: Optional[List[str]] = None
202        for extends in self._extends:
203            parent = model.get(extends)
204            if parent is None:
205                raise InvalidData("Invalid extends: %r" % extends)
206            parent.generate(model)
207
208            for name, prop in parent.properties.items():
209                if name not in self.properties:
210                    self.properties[name] = prop
211
212            self.extends.add(parent)
213            for ancestor in parent.schemata:
214                self.schemata.add(ancestor)
215                self.names.add(ancestor.name)
216                ancestor.descendants.add(self)
217
218            if len(self._temporal_start) == 0 and parent.temporal_start:
219                if (
220                    temporal_start is not None
221                    and temporal_start != parent.temporal_start
222                ):
223                    raise InvalidModel(
224                        "Conflicting temporal start properties: %s" % self.name
225                    )
226                temporal_start = parent.temporal_start
227
228            if len(self._temporal_end) == 0 and parent.temporal_end:
229                if temporal_end is not None and temporal_end != parent.temporal_end:
230                    raise InvalidModel(
231                        "Conflicting temporal start properties: %s" % self.name
232                    )
233                temporal_end = parent.temporal_end
234
235        for prop in list(self.properties.values()):
236            prop.generate(model)
237
238        for featured in self.featured:
239            if self.get(featured) is None:
240                raise InvalidModel("Missing featured property: %s" % featured)
241
242        for caption in self.caption:
243            prop_ = self.get(caption)
244            if prop_ is None:
245                raise InvalidModel("Missing caption property: %s" % caption)
246            if prop_.type == registry.entity:
247                raise InvalidModel("Caption cannot be entity: %s" % caption)
248
249        for required in self.required:
250            if self.get(required) is None:
251                raise InvalidModel("Missing required property: %s" % required)
252
253        if self.edge:
254            if self.source_prop is None:
255                msg = "Missing edge source: %s" % self.edge_source
256                raise InvalidModel(msg)
257
258            if self.target_prop is None:
259                msg = "Missing edge target: %s" % self.edge_target
260                raise InvalidModel(msg)

While loading the schema, this function will validate and load the hierarchy, properties, and flags of the definition.

label: str
284    @property
285    def label(self) -> str:
286        """User-facing name of the schema."""
287        return gettext(self._label)

User-facing name of the schema.

plural: str
289    @property
290    def plural(self) -> str:
291        """Name of the schema to be used in plural constructions."""
292        return gettext(self._plural)

Name of the schema to be used in plural constructions.

description: Optional[str]
294    @property
295    def description(self) -> Optional[str]:
296        """A longer description of the semantics of the schema."""
297        return gettext(self._description)

A longer description of the semantics of the schema.

edge_label: Optional[str]
299    @property
300    def edge_label(self) -> Optional[str]:
301        """Description label for edges derived from entities of this schema."""
302        return gettext(self._edge_label)

Description label for edges derived from entities of this schema.

source_prop: Optional[followthemoney.property.Property]
304    @property
305    def source_prop(self) -> Optional[Property]:
306        """The entity property to be used as an edge source when the schema is
307        considered as a relationship."""
308        if self.edge_source is None:
309            return None
310        return self.get(self.edge_source)

The entity property to be used as an edge source when the schema is considered as a relationship.

target_prop: Optional[followthemoney.property.Property]
312    @property
313    def target_prop(self) -> Optional[Property]:
314        """The entity property to be used as an edge target when the schema is transformed
315        into a relationship."""
316        if self.edge_target is None:
317            return None
318        return self.get(self.edge_target)

The entity property to be used as an edge target when the schema is transformed into a relationship.

temporal_start: List[str]
320    @property
321    def temporal_start(self) -> List[str]:
322        """The entity properties to be used as the start when representing the entity
323        in a timeline."""
324        if not len(self._temporal_start):
325            for parent in self.extends:
326                if len(parent.temporal_start):
327                    return parent.temporal_start
328        return self._temporal_start

The entity properties to be used as the start when representing the entity in a timeline.

temporal_end: List[str]
330    @property
331    def temporal_end(self) -> List[str]:
332        """The entity properties to be used as the end when representing the entity
333        in a timeline."""
334        if not len(self._temporal_end):
335            for parent in self.extends:
336                if len(parent.temporal_end):
337                    return parent.temporal_end
338        return self._temporal_end

The entity properties to be used as the end when representing the entity in a timeline.

temporal_start_props: List[followthemoney.property.Property]
340    @property
341    def temporal_start_props(self) -> List[Property]:
342        """The entity properties to be used as the start when representing the entity
343        in a timeline."""
344        props = [self.get(prop_name) for prop_name in self.temporal_start]
345        return [prop for prop in props if prop is not None]

The entity properties to be used as the start when representing the entity in a timeline.

temporal_end_props: List[followthemoney.property.Property]
347    @property
348    def temporal_end_props(self) -> List[Property]:
349        """The entity properties to be used as the end when representing the entity
350        in a timeline."""
351        props = [self.get(prop_name) for prop_name in self.temporal_end]
352        return [prop for prop in props if prop is not None]

The entity properties to be used as the end when representing the entity in a timeline.

sorted_properties: List[followthemoney.property.Property]
354    @property
355    def sorted_properties(self) -> List[Property]:
356        """All properties of the schema in the order in which they should be shown
357        to the user (alphabetically, with captions and featured properties first)."""
358        return sorted(
359            self.properties.values(),
360            key=lambda p: (
361                p.name not in self.caption,
362                p.name not in self.featured,
363                p.label,
364            ),
365        )

All properties of the schema in the order in which they should be shown to the user (alphabetically, with captions and featured properties first).

matchable_schemata: Set[Schema]
367    @property
368    def matchable_schemata(self) -> Set["Schema"]:
369        """Return the set of schemata to which it makes sense to compare with this
370        schema. For example, it makes sense to compare a legal entity with a company,
371        but it does not make sense to compare a car and a person."""
372        if self._matchable_schemata is None:
373            self._matchable_schemata = set()
374            if self.matchable:
375                # This is used by the cross-referencer to determine what
376                # other schemata should be considered for matches. For
377                # example, a Company may be compared to a Legal Entity,
378                # but it makes no sense to compare it to an Aircraft.
379                candidates = set(self.schemata)
380                candidates.update(self.descendants)
381                for schema in candidates:
382                    if schema.matchable:
383                        self._matchable_schemata.add(schema)
384        return self._matchable_schemata

Return the set of schemata to which it makes sense to compare with this schema. For example, it makes sense to compare a legal entity with a company, but it does not make sense to compare a car and a person.

@lru_cache(maxsize=None)
def can_match(self, other: Schema) -> bool:
386    @lru_cache(maxsize=None)
387    def can_match(self, other: "Schema") -> bool:
388        """Check if an schema can match with another schema."""
389        return other in self.matchable_schemata

Check if an schema can match with another schema.

@lru_cache(maxsize=None)
def is_a(self, other: Union[str, Schema]) -> bool:
391    @lru_cache(maxsize=None)
392    def is_a(self, other: Union[str, "Schema"]) -> bool:
393        """Check if the schema or one of its parents is the same as the given
394        candidate ``other``."""
395        if not isinstance(other, str):
396            other = other.name
397        return other in self.names

Check if the schema or one of its parents is the same as the given candidate other.

def get(self, name: str) -> Optional[followthemoney.property.Property]:
399    def get(self, name: str) -> Optional[Property]:
400        """Retrieve a property defined for this schema by its name."""
401        if name is None:
402            return None
403        return self.properties.get(name)

Retrieve a property defined for this schema by its name.

def validate(self, data: Dict[str, Any]) -> Optional[str]:
405    def validate(self, data: Dict[str, Any]) -> Optional[str]:
406        """Validate a dictionary against the given schema.
407        This will also drop keys which are not valid as properties.
408        """
409        errors = {}
410        properties = cast(Dict[str, Any], ensure_dict(data.get("properties")))
411        for name, prop in self.properties.items():
412            values = ensure_list(properties.get(name, []))
413            error = prop.validate(values)
414            if error is None and not len(values):
415                if prop.name in self.required:
416                    error = gettext("Required")
417            if error is not None:
418                errors[name] = error
419        if len(errors):
420            msg = gettext("Entity validation failed")
421            raise InvalidData(msg, errors={"properties": errors})
422        return None

Validate a dictionary against the given schema. This will also drop keys which are not valid as properties.

def to_dict(self) -> SchemaToDict:
424    def to_dict(self) -> SchemaToDict:
425        """Return schema metadata, including all properties, in a serializable form."""
426        data: SchemaToDict = {
427            "label": self.label,
428            "plural": self.plural,
429            "schemata": list(sorted(self.names)),
430            "extends": list(sorted([e.name for e in self.extends])),
431        }
432        if self.edge_source and self.edge_target and self.edge_label:
433            data["edge"] = {
434                "source": self.edge_source,
435                "target": self.edge_target,
436                "caption": self.edge_caption,
437                "label": self.edge_label,
438                "directed": self.edge_directed,
439            }
440        if len(self.temporal_start) or len(self.temporal_end):
441            data["temporalExtent"] = {
442                "start": self.temporal_start,
443                "end": self.temporal_end,
444            }
445        if len(self.featured):
446            data["featured"] = self.featured
447        if len(self.required):
448            data["required"] = self.required
449        if len(self.caption):
450            data["caption"] = self.caption
451        if self.description:
452            data["description"] = self.description
453        if self.abstract:
454            data["abstract"] = True
455        if self.hidden:
456            data["hidden"] = True
457        if self.generated:
458            data["generated"] = True
459        if self.matchable:
460            data["matchable"] = True
461        if self.deprecated:
462            data["deprecated"] = True
463        properties: Dict[str, PropertyToDict] = {}
464        for name, prop in self.properties.items():
465            if prop.schema == self:
466                properties[name] = prop.to_dict()
467        data["properties"] = properties
468        return data

Return schema metadata, including all properties, in a serializable form.