followthemoney.schema

  1from typing import (
  2    TYPE_CHECKING,
  3    Any,
  4    Dict,
  5    List,
  6    Optional,
  7    Set,
  8    TypedDict,
  9    Union,
 10    cast,
 11)
 12from banal import ensure_list, ensure_dict, as_bool
 13from functools import lru_cache
 14
 15from followthemoney.property import Property, PropertySpec, PropertyToDict, ReverseSpec
 16from followthemoney.types import registry
 17from followthemoney.exc import InvalidData, InvalidModel
 18from followthemoney.rdf import URIRef, NS
 19from followthemoney.util import gettext
 20
 21if TYPE_CHECKING:
 22    from followthemoney.model import Model
 23
 24
 25class EdgeSpec(TypedDict, total=False):
 26    source: str
 27    target: str
 28    caption: List[str]
 29    label: str
 30    directed: bool
 31
 32
 33class TemporalExtentSpec(TypedDict, total=False):
 34    start: List[str]
 35    end: List[str]
 36
 37
 38class SchemaSpec(TypedDict, total=False):
 39    label: str
 40    plural: str
 41    schemata: List[str]
 42    extends: List[str]
 43    properties: Dict[str, PropertySpec]
 44    featured: List[str]
 45    required: List[str]
 46    caption: List[str]
 47    edge: EdgeSpec
 48    temporalExtent: TemporalExtentSpec
 49    description: Optional[str]
 50    rdf: Optional[str]
 51    abstract: bool
 52    hidden: bool
 53    generated: bool
 54    matchable: bool
 55    deprecated: Optional[bool]
 56
 57
 58class SchemaToDict(TypedDict, total=False):
 59    label: str
 60    plural: str
 61    schemata: List[str]
 62    extends: List[str]
 63    properties: Dict[str, PropertyToDict]
 64    featured: List[str]
 65    required: List[str]
 66    caption: List[str]
 67    edge: EdgeSpec
 68    temporalExtent: TemporalExtentSpec
 69    description: Optional[str]
 70    abstract: bool
 71    hidden: bool
 72    generated: bool
 73    matchable: bool
 74    deprecated: bool
 75
 76
 77class Schema:
 78    """A type definition for a class of entities that have certain properties.
 79
 80    Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple
 81    parent schemata from which it inherits all of their properties. A schema can also
 82    have descendant child schemata, which, in turn, add further properties. Schemata
 83    are usually accessed via the model, which holds all available definitions.
 84    """
 85
 86    __slots__ = (
 87        "model",
 88        "name",
 89        "_label",
 90        "_plural",
 91        "_description",
 92        "_hash",
 93        "uri",
 94        "abstract",
 95        "hidden",
 96        "generated",
 97        "matchable",
 98        "featured",
 99        "required",
100        "deprecated",
101        "caption",
102        "edge",
103        "_edge_label",
104        "edge_directed",
105        "edge_source",
106        "edge_target",
107        "edge_caption",
108        "temporal_start",
109        "temporal_end",
110        "_extends",
111        "extends",
112        "schemata",
113        "names",
114        "descendants",
115        "properties",
116        "_matchable_schemata",
117    )
118
119    def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None:
120        #: Machine-readable name of the schema, used for identification.
121        self.name = name
122        self.model = model
123        self._label = data.get("label", name)
124        self._plural = data.get("plural", self.label)
125        self._description = data.get("description")
126        self._hash = hash("<Schema(%r)>" % name)
127
128        #: RDF identifier for this schema when it is transformed to a triple term.
129        self.uri = URIRef(cast(str, data.get("rdf", NS[name])))
130
131        #: Do not store or emit entities of this type, it is used only for
132        #: inheritance.
133        self.abstract = as_bool(data.get("abstract"), False)
134
135        #: This schema is deprecated and should not be used.
136        self.deprecated = as_bool(data.get("deprecated", False))
137
138        #: Hide this schema in listings.
139        self.hidden = as_bool(data.get("hidden"), False)
140        self.hidden = self.hidden and not self.abstract
141
142        #: Entities with this type are generated by the system - for example, via
143        #: `ingest-file`. The user should not be offered an option to create them
144        #: in the interface.
145        self.generated = as_bool(data.get("generated"), False)
146
147        #: Try to perform fuzzy matching. Fuzzy similarity search does not
148        #: make sense for entities which have a lot of similar names, such
149        #: as land plots, assets etc.
150        self.matchable = as_bool(data.get("matchable"), True)
151
152        #: Mark a set of properties as important, i.e. they should be shown
153        #: first, or in an abridged view of the entity. In Aleph, these properties
154        #: are included in tabular entity listings.
155        self.featured = ensure_list(data.get("featured", []))
156
157        #: Mark a set of properties as required. This is applied only when
158        #: an entity is created by the user - bulk created entities will
159        #: slip through even if it is technically invalid.
160        self.required = ensure_list(data.get("required", []))
161
162        #: Mark a set of properties to be used for the entity's caption.
163        #: They will be checked in order and the first existent value will
164        #: be used.
165        self.caption = ensure_list(data.get("caption", []))
166
167        # A transform of the entity into an edge for its representation in
168        # the context of a property graph representation like Neo4J/Gephi.
169        edge = data.get("edge", {})
170        self.edge_source = edge.get("source")
171        self.edge_target = edge.get("target")
172
173        #: Flag to indicate if this schema should be represented by an edge (rather than
174        #: a node) when the data is converted into a property graph.
175        self.edge: bool = self.edge_source is not None and self.edge_target is not None
176        self.edge_caption = ensure_list(edge.get("caption", []))
177        self._edge_label = edge.get("label", self._label)
178
179        #: Flag to indicate if the edge should be presented as directed to the user,
180        #: e.g. by showing an error at the target end of the edge.
181        self.edge_directed = as_bool(edge.get("directed", True))
182
183        #: Specify which properties should be used to represent this schema in a
184        #: timeline.
185        temporal_extent = data.get("temporalExtent", {})
186        self.temporal_start = set(temporal_extent.get("start", []))
187        self.temporal_end = set(temporal_extent.get("end", []))
188
189        #: Direct parent schemata of this schema.
190        self._extends = ensure_list(data.get("extends", []))
191        self.extends: Set["Schema"] = set()
192
193        #: All parents of this schema (including indirect parents and the schema
194        #: itself).
195        self.schemata = set([self])
196
197        #: All names of :attr:`~schemata`.
198        self.names = set([self.name])
199
200        #: Inverse of :attr:`~schemata`, all derived child types of this schema
201        #: and their children.
202        self.descendants: Set["Schema"] = set()
203        self._matchable_schemata: Optional[Set["Schema"]] = None
204
205        #: The full list of properties defined for the entity, including those
206        #: inherited from parent schemata.
207        self.properties: Dict[str, Property] = {}
208        for name, prop in data.get("properties", {}).items():
209            self.properties[name] = Property(self, name, prop)
210
211    def generate(self, model: "Model") -> None:
212        """While loading the schema, this function will validate and
213        load the hierarchy, properties, and flags of the definition."""
214        for extends in self._extends:
215            parent = model.get(extends)
216            if parent is None:
217                raise InvalidData("Invalid extends: %r" % extends)
218            parent.generate(model)
219
220            for name, prop in parent.properties.items():
221                if name not in self.properties:
222                    self.properties[name] = prop
223
224            self.extends.add(parent)
225            for ancestor in parent.schemata:
226                self.schemata.add(ancestor)
227                self.names.add(ancestor.name)
228                ancestor.descendants.add(self)
229
230            self.temporal_start |= parent.temporal_start
231            self.temporal_end |= parent.temporal_end
232
233        for prop in list(self.properties.values()):
234            prop.generate(model)
235
236        for featured in self.featured:
237            if self.get(featured) is None:
238                raise InvalidModel("Missing featured property: %s" % featured)
239
240        for caption in self.caption:
241            prop_ = self.get(caption)
242            if prop_ is None:
243                raise InvalidModel("Missing caption property: %s" % caption)
244            if prop_.type == registry.entity:
245                raise InvalidModel("Caption cannot be entity: %s" % caption)
246
247        for required in self.required:
248            if self.get(required) is None:
249                raise InvalidModel("Missing required property: %s" % required)
250
251        if self.edge:
252            if self.source_prop is None:
253                msg = "Missing edge source: %s" % self.edge_source
254                raise InvalidModel(msg)
255
256            if self.target_prop is None:
257                msg = "Missing edge target: %s" % self.edge_target
258                raise InvalidModel(msg)
259
260    def _add_reverse(
261        self, model: "Model", data: ReverseSpec, other: Property
262    ) -> Property:
263        name = data.get("name")
264        if name is None:
265            raise InvalidModel("Unnamed reverse: %s" % other)
266
267        prop = self.get(name)
268        if prop is None:
269            spec: PropertySpec = {
270                "label": data.get("label"),
271                "type": registry.entity.name,
272                "reverse": {"name": other.name},
273                "range": other.schema.name,
274                "hidden": data.get("hidden", other.hidden),
275            }
276            prop = Property(self, name, spec)
277            prop.stub = True
278            prop.generate(model)
279            self.properties[name] = prop
280        return prop
281
282    @property
283    def label(self) -> str:
284        """User-facing name of the schema."""
285        return gettext(self._label)
286
287    @property
288    def plural(self) -> str:
289        """Name of the schema to be used in plural constructions."""
290        return gettext(self._plural)
291
292    @property
293    def description(self) -> Optional[str]:
294        """A longer description of the semantics of the schema."""
295        return gettext(self._description)
296
297    @property
298    def edge_label(self) -> Optional[str]:
299        """Description label for edges derived from entities of this schema."""
300        return gettext(self._edge_label)
301
302    @property
303    def source_prop(self) -> Optional[Property]:
304        """The entity property to be used as an edge source."""
305        return self.get(self.edge_source)
306
307    @property
308    def target_prop(self) -> Optional[Property]:
309        """The entity property to be used as an edge target."""
310        return self.get(self.edge_target)
311
312    @property
313    def temporal_start_props(self) -> Set[Property]:
314        """The entity properties to be used as the start when representing the entity
315        in a timeline."""
316        props = [self.get(prop_name) for prop_name in self.temporal_start]
317        return set([prop for prop in props if prop is not None])
318
319    @property
320    def temporal_end_props(self) -> Set[Property]:
321        """The entity properties to be used as the end when representing the entity
322        in a timeline."""
323        props = [self.get(prop_name) for prop_name in self.temporal_end]
324        return set([prop for prop in props if prop is not None])
325
326    @property
327    def sorted_properties(self) -> List[Property]:
328        """All properties of the schema in the order in which they should be shown
329        to the user (alphabetically, with captions and featured properties first)."""
330        return sorted(
331            self.properties.values(),
332            key=lambda p: (
333                p.name not in self.caption,
334                p.name not in self.featured,
335                p.label,
336            ),
337        )
338
339    @property
340    def matchable_schemata(self) -> Set["Schema"]:
341        """Return the set of schemata to which it makes sense to compare with this
342        schema. For example, it makes sense to compare a legal entity with a company,
343        but it does not make sense to compare a car and a person."""
344        if self._matchable_schemata is None:
345            self._matchable_schemata = set()
346            if self.matchable:
347                # This is used by the cross-referencer to determine what
348                # other schemata should be considered for matches. For
349                # example, a Company may be compared to a Legal Entity,
350                # but it makes no sense to compare it to an Aircraft.
351                candidates = set(self.schemata)
352                candidates.update(self.descendants)
353                for schema in candidates:
354                    if schema.matchable:
355                        self._matchable_schemata.add(schema)
356        return self._matchable_schemata
357
358    def can_match(self, other: "Schema") -> bool:
359        """Check if an schema can match with another schema."""
360        return other in self.matchable_schemata
361
362    @lru_cache(maxsize=None)
363    def is_a(self, other: Union[str, "Schema"]) -> bool:
364        """Check if the schema or one of its parents is the same as the given
365        candidate ``other``."""
366        if not isinstance(other, str):
367            other = other.name
368        return other in self.names
369
370    def get(self, name: Optional[str]) -> Optional[Property]:
371        """Retrieve a property defined for this schema by its name."""
372        if name is None:
373            return None
374        return self.properties.get(name)
375
376    def validate(self, data: Any) -> Optional[str]:
377        """Validate a dictionary against the given schema.
378        This will also drop keys which are not valid as properties.
379        """
380        errors = {}
381        properties = cast(Dict[str, Any], ensure_dict(data.get("properties")))
382        for name, prop in self.properties.items():
383            values = ensure_list(properties.get(name, []))
384            error = prop.validate(values)
385            if error is None and not len(values):
386                if prop.name in self.required:
387                    error = gettext("Required")
388            if error is not None:
389                errors[name] = error
390        if len(errors):
391            msg = gettext("Entity validation failed")
392            raise InvalidData(msg, errors={"properties": errors})
393        return None
394
395    def to_dict(self) -> SchemaToDict:
396        """Return schema metadata, including all properties, in a serializable form."""
397        data: SchemaToDict = {
398            "label": self.label,
399            "plural": self.plural,
400            "schemata": list(sorted(self.names)),
401            "extends": list(sorted([e.name for e in self.extends])),
402        }
403        if self.edge_source and self.edge_target and self.edge_label:
404            data["edge"] = {
405                "source": self.edge_source,
406                "target": self.edge_target,
407                "caption": self.edge_caption,
408                "label": self.edge_label,
409                "directed": self.edge_directed,
410            }
411        start_props = [
412            prop.name for prop in self.temporal_start_props if prop.schema == self
413        ]
414        end_props = [
415            prop.name for prop in self.temporal_end_props if prop.schema == self
416        ]
417        if start_props or end_props:
418            data["temporalExtent"] = {
419                "start": sorted(start_props),
420                "end": sorted(end_props),
421            }
422        if len(self.featured):
423            data["featured"] = self.featured
424        if len(self.required):
425            data["required"] = self.required
426        if len(self.caption):
427            data["caption"] = self.caption
428        if self.description:
429            data["description"] = self.description
430        if self.abstract:
431            data["abstract"] = True
432        if self.hidden:
433            data["hidden"] = True
434        if self.generated:
435            data["generated"] = True
436        if self.matchable:
437            data["matchable"] = True
438        if self.deprecated:
439            data["deprecated"] = True
440        properties: Dict[str, PropertyToDict] = {}
441        for name, prop in self.properties.items():
442            if prop.schema == self:
443                properties[name] = prop.to_dict()
444        data["properties"] = properties
445        return data
446
447    def __eq__(self, other: Any) -> bool:
448        """Compare two schemata (via hash)."""
449        try:
450            return self._hash == hash(other)
451        except AttributeError:
452            return False
453
454    def __lt__(self, other: Any) -> bool:
455        return self.name.__lt__(other.name)
456
457    def __hash__(self) -> int:
458        try:
459            return self._hash
460        except AttributeError:
461            return super().__hash__()
462
463    def __repr__(self) -> str:
464        return "<Schema(%r)>" % self.name
class EdgeSpec(typing.TypedDict):
26class EdgeSpec(TypedDict, total=False):
27    source: str
28    target: str
29    caption: List[str]
30    label: str
31    directed: bool
source: str
target: str
caption: List[str]
label: str
directed: bool
class TemporalExtentSpec(typing.TypedDict):
34class TemporalExtentSpec(TypedDict, total=False):
35    start: List[str]
36    end: List[str]
start: List[str]
end: List[str]
class SchemaSpec(typing.TypedDict):
39class SchemaSpec(TypedDict, total=False):
40    label: str
41    plural: str
42    schemata: List[str]
43    extends: List[str]
44    properties: Dict[str, PropertySpec]
45    featured: List[str]
46    required: List[str]
47    caption: List[str]
48    edge: EdgeSpec
49    temporalExtent: TemporalExtentSpec
50    description: Optional[str]
51    rdf: Optional[str]
52    abstract: bool
53    hidden: bool
54    generated: bool
55    matchable: bool
56    deprecated: Optional[bool]
label: str
plural: str
schemata: List[str]
extends: List[str]
properties: Dict[str, followthemoney.property.PropertySpec]
featured: List[str]
required: List[str]
caption: List[str]
edge: EdgeSpec
temporalExtent: TemporalExtentSpec
description: Optional[str]
rdf: Optional[str]
abstract: bool
hidden: bool
generated: bool
matchable: bool
deprecated: Optional[bool]
class SchemaToDict(typing.TypedDict):
59class SchemaToDict(TypedDict, total=False):
60    label: str
61    plural: str
62    schemata: List[str]
63    extends: List[str]
64    properties: Dict[str, PropertyToDict]
65    featured: List[str]
66    required: List[str]
67    caption: List[str]
68    edge: EdgeSpec
69    temporalExtent: TemporalExtentSpec
70    description: Optional[str]
71    abstract: bool
72    hidden: bool
73    generated: bool
74    matchable: bool
75    deprecated: bool
label: str
plural: str
schemata: List[str]
extends: List[str]
featured: List[str]
required: List[str]
caption: List[str]
edge: EdgeSpec
temporalExtent: TemporalExtentSpec
description: Optional[str]
abstract: bool
hidden: bool
generated: bool
matchable: bool
deprecated: bool
class Schema:
 78class Schema:
 79    """A type definition for a class of entities that have certain properties.
 80
 81    Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple
 82    parent schemata from which it inherits all of their properties. A schema can also
 83    have descendant child schemata, which, in turn, add further properties. Schemata
 84    are usually accessed via the model, which holds all available definitions.
 85    """
 86
 87    __slots__ = (
 88        "model",
 89        "name",
 90        "_label",
 91        "_plural",
 92        "_description",
 93        "_hash",
 94        "uri",
 95        "abstract",
 96        "hidden",
 97        "generated",
 98        "matchable",
 99        "featured",
100        "required",
101        "deprecated",
102        "caption",
103        "edge",
104        "_edge_label",
105        "edge_directed",
106        "edge_source",
107        "edge_target",
108        "edge_caption",
109        "temporal_start",
110        "temporal_end",
111        "_extends",
112        "extends",
113        "schemata",
114        "names",
115        "descendants",
116        "properties",
117        "_matchable_schemata",
118    )
119
120    def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None:
121        #: Machine-readable name of the schema, used for identification.
122        self.name = name
123        self.model = model
124        self._label = data.get("label", name)
125        self._plural = data.get("plural", self.label)
126        self._description = data.get("description")
127        self._hash = hash("<Schema(%r)>" % name)
128
129        #: RDF identifier for this schema when it is transformed to a triple term.
130        self.uri = URIRef(cast(str, data.get("rdf", NS[name])))
131
132        #: Do not store or emit entities of this type, it is used only for
133        #: inheritance.
134        self.abstract = as_bool(data.get("abstract"), False)
135
136        #: This schema is deprecated and should not be used.
137        self.deprecated = as_bool(data.get("deprecated", False))
138
139        #: Hide this schema in listings.
140        self.hidden = as_bool(data.get("hidden"), False)
141        self.hidden = self.hidden and not self.abstract
142
143        #: Entities with this type are generated by the system - for example, via
144        #: `ingest-file`. The user should not be offered an option to create them
145        #: in the interface.
146        self.generated = as_bool(data.get("generated"), False)
147
148        #: Try to perform fuzzy matching. Fuzzy similarity search does not
149        #: make sense for entities which have a lot of similar names, such
150        #: as land plots, assets etc.
151        self.matchable = as_bool(data.get("matchable"), True)
152
153        #: Mark a set of properties as important, i.e. they should be shown
154        #: first, or in an abridged view of the entity. In Aleph, these properties
155        #: are included in tabular entity listings.
156        self.featured = ensure_list(data.get("featured", []))
157
158        #: Mark a set of properties as required. This is applied only when
159        #: an entity is created by the user - bulk created entities will
160        #: slip through even if it is technically invalid.
161        self.required = ensure_list(data.get("required", []))
162
163        #: Mark a set of properties to be used for the entity's caption.
164        #: They will be checked in order and the first existent value will
165        #: be used.
166        self.caption = ensure_list(data.get("caption", []))
167
168        # A transform of the entity into an edge for its representation in
169        # the context of a property graph representation like Neo4J/Gephi.
170        edge = data.get("edge", {})
171        self.edge_source = edge.get("source")
172        self.edge_target = edge.get("target")
173
174        #: Flag to indicate if this schema should be represented by an edge (rather than
175        #: a node) when the data is converted into a property graph.
176        self.edge: bool = self.edge_source is not None and self.edge_target is not None
177        self.edge_caption = ensure_list(edge.get("caption", []))
178        self._edge_label = edge.get("label", self._label)
179
180        #: Flag to indicate if the edge should be presented as directed to the user,
181        #: e.g. by showing an error at the target end of the edge.
182        self.edge_directed = as_bool(edge.get("directed", True))
183
184        #: Specify which properties should be used to represent this schema in a
185        #: timeline.
186        temporal_extent = data.get("temporalExtent", {})
187        self.temporal_start = set(temporal_extent.get("start", []))
188        self.temporal_end = set(temporal_extent.get("end", []))
189
190        #: Direct parent schemata of this schema.
191        self._extends = ensure_list(data.get("extends", []))
192        self.extends: Set["Schema"] = set()
193
194        #: All parents of this schema (including indirect parents and the schema
195        #: itself).
196        self.schemata = set([self])
197
198        #: All names of :attr:`~schemata`.
199        self.names = set([self.name])
200
201        #: Inverse of :attr:`~schemata`, all derived child types of this schema
202        #: and their children.
203        self.descendants: Set["Schema"] = set()
204        self._matchable_schemata: Optional[Set["Schema"]] = None
205
206        #: The full list of properties defined for the entity, including those
207        #: inherited from parent schemata.
208        self.properties: Dict[str, Property] = {}
209        for name, prop in data.get("properties", {}).items():
210            self.properties[name] = Property(self, name, prop)
211
212    def generate(self, model: "Model") -> None:
213        """While loading the schema, this function will validate and
214        load the hierarchy, properties, and flags of the definition."""
215        for extends in self._extends:
216            parent = model.get(extends)
217            if parent is None:
218                raise InvalidData("Invalid extends: %r" % extends)
219            parent.generate(model)
220
221            for name, prop in parent.properties.items():
222                if name not in self.properties:
223                    self.properties[name] = prop
224
225            self.extends.add(parent)
226            for ancestor in parent.schemata:
227                self.schemata.add(ancestor)
228                self.names.add(ancestor.name)
229                ancestor.descendants.add(self)
230
231            self.temporal_start |= parent.temporal_start
232            self.temporal_end |= parent.temporal_end
233
234        for prop in list(self.properties.values()):
235            prop.generate(model)
236
237        for featured in self.featured:
238            if self.get(featured) is None:
239                raise InvalidModel("Missing featured property: %s" % featured)
240
241        for caption in self.caption:
242            prop_ = self.get(caption)
243            if prop_ is None:
244                raise InvalidModel("Missing caption property: %s" % caption)
245            if prop_.type == registry.entity:
246                raise InvalidModel("Caption cannot be entity: %s" % caption)
247
248        for required in self.required:
249            if self.get(required) is None:
250                raise InvalidModel("Missing required property: %s" % required)
251
252        if self.edge:
253            if self.source_prop is None:
254                msg = "Missing edge source: %s" % self.edge_source
255                raise InvalidModel(msg)
256
257            if self.target_prop is None:
258                msg = "Missing edge target: %s" % self.edge_target
259                raise InvalidModel(msg)
260
261    def _add_reverse(
262        self, model: "Model", data: ReverseSpec, other: Property
263    ) -> Property:
264        name = data.get("name")
265        if name is None:
266            raise InvalidModel("Unnamed reverse: %s" % other)
267
268        prop = self.get(name)
269        if prop is None:
270            spec: PropertySpec = {
271                "label": data.get("label"),
272                "type": registry.entity.name,
273                "reverse": {"name": other.name},
274                "range": other.schema.name,
275                "hidden": data.get("hidden", other.hidden),
276            }
277            prop = Property(self, name, spec)
278            prop.stub = True
279            prop.generate(model)
280            self.properties[name] = prop
281        return prop
282
283    @property
284    def label(self) -> str:
285        """User-facing name of the schema."""
286        return gettext(self._label)
287
288    @property
289    def plural(self) -> str:
290        """Name of the schema to be used in plural constructions."""
291        return gettext(self._plural)
292
293    @property
294    def description(self) -> Optional[str]:
295        """A longer description of the semantics of the schema."""
296        return gettext(self._description)
297
298    @property
299    def edge_label(self) -> Optional[str]:
300        """Description label for edges derived from entities of this schema."""
301        return gettext(self._edge_label)
302
303    @property
304    def source_prop(self) -> Optional[Property]:
305        """The entity property to be used as an edge source."""
306        return self.get(self.edge_source)
307
308    @property
309    def target_prop(self) -> Optional[Property]:
310        """The entity property to be used as an edge target."""
311        return self.get(self.edge_target)
312
313    @property
314    def temporal_start_props(self) -> Set[Property]:
315        """The entity properties to be used as the start when representing the entity
316        in a timeline."""
317        props = [self.get(prop_name) for prop_name in self.temporal_start]
318        return set([prop for prop in props if prop is not None])
319
320    @property
321    def temporal_end_props(self) -> Set[Property]:
322        """The entity properties to be used as the end when representing the entity
323        in a timeline."""
324        props = [self.get(prop_name) for prop_name in self.temporal_end]
325        return set([prop for prop in props if prop is not None])
326
327    @property
328    def sorted_properties(self) -> List[Property]:
329        """All properties of the schema in the order in which they should be shown
330        to the user (alphabetically, with captions and featured properties first)."""
331        return sorted(
332            self.properties.values(),
333            key=lambda p: (
334                p.name not in self.caption,
335                p.name not in self.featured,
336                p.label,
337            ),
338        )
339
340    @property
341    def matchable_schemata(self) -> Set["Schema"]:
342        """Return the set of schemata to which it makes sense to compare with this
343        schema. For example, it makes sense to compare a legal entity with a company,
344        but it does not make sense to compare a car and a person."""
345        if self._matchable_schemata is None:
346            self._matchable_schemata = set()
347            if self.matchable:
348                # This is used by the cross-referencer to determine what
349                # other schemata should be considered for matches. For
350                # example, a Company may be compared to a Legal Entity,
351                # but it makes no sense to compare it to an Aircraft.
352                candidates = set(self.schemata)
353                candidates.update(self.descendants)
354                for schema in candidates:
355                    if schema.matchable:
356                        self._matchable_schemata.add(schema)
357        return self._matchable_schemata
358
359    def can_match(self, other: "Schema") -> bool:
360        """Check if an schema can match with another schema."""
361        return other in self.matchable_schemata
362
363    @lru_cache(maxsize=None)
364    def is_a(self, other: Union[str, "Schema"]) -> bool:
365        """Check if the schema or one of its parents is the same as the given
366        candidate ``other``."""
367        if not isinstance(other, str):
368            other = other.name
369        return other in self.names
370
371    def get(self, name: Optional[str]) -> Optional[Property]:
372        """Retrieve a property defined for this schema by its name."""
373        if name is None:
374            return None
375        return self.properties.get(name)
376
377    def validate(self, data: Any) -> Optional[str]:
378        """Validate a dictionary against the given schema.
379        This will also drop keys which are not valid as properties.
380        """
381        errors = {}
382        properties = cast(Dict[str, Any], ensure_dict(data.get("properties")))
383        for name, prop in self.properties.items():
384            values = ensure_list(properties.get(name, []))
385            error = prop.validate(values)
386            if error is None and not len(values):
387                if prop.name in self.required:
388                    error = gettext("Required")
389            if error is not None:
390                errors[name] = error
391        if len(errors):
392            msg = gettext("Entity validation failed")
393            raise InvalidData(msg, errors={"properties": errors})
394        return None
395
396    def to_dict(self) -> SchemaToDict:
397        """Return schema metadata, including all properties, in a serializable form."""
398        data: SchemaToDict = {
399            "label": self.label,
400            "plural": self.plural,
401            "schemata": list(sorted(self.names)),
402            "extends": list(sorted([e.name for e in self.extends])),
403        }
404        if self.edge_source and self.edge_target and self.edge_label:
405            data["edge"] = {
406                "source": self.edge_source,
407                "target": self.edge_target,
408                "caption": self.edge_caption,
409                "label": self.edge_label,
410                "directed": self.edge_directed,
411            }
412        start_props = [
413            prop.name for prop in self.temporal_start_props if prop.schema == self
414        ]
415        end_props = [
416            prop.name for prop in self.temporal_end_props if prop.schema == self
417        ]
418        if start_props or end_props:
419            data["temporalExtent"] = {
420                "start": sorted(start_props),
421                "end": sorted(end_props),
422            }
423        if len(self.featured):
424            data["featured"] = self.featured
425        if len(self.required):
426            data["required"] = self.required
427        if len(self.caption):
428            data["caption"] = self.caption
429        if self.description:
430            data["description"] = self.description
431        if self.abstract:
432            data["abstract"] = True
433        if self.hidden:
434            data["hidden"] = True
435        if self.generated:
436            data["generated"] = True
437        if self.matchable:
438            data["matchable"] = True
439        if self.deprecated:
440            data["deprecated"] = True
441        properties: Dict[str, PropertyToDict] = {}
442        for name, prop in self.properties.items():
443            if prop.schema == self:
444                properties[name] = prop.to_dict()
445        data["properties"] = properties
446        return data
447
448    def __eq__(self, other: Any) -> bool:
449        """Compare two schemata (via hash)."""
450        try:
451            return self._hash == hash(other)
452        except AttributeError:
453            return False
454
455    def __lt__(self, other: Any) -> bool:
456        return self.name.__lt__(other.name)
457
458    def __hash__(self) -> int:
459        try:
460            return self._hash
461        except AttributeError:
462            return super().__hash__()
463
464    def __repr__(self) -> str:
465        return "<Schema(%r)>" % self.name

A type definition for a class of entities that have certain properties.

Schemata are arranged in a multi-rooted hierarchy: each schema can have multiple parent schemata from which it inherits all of their properties. A schema can also have descendant child schemata, which, in turn, add further properties. Schemata are usually accessed via the model, which holds all available definitions.

Schema( model: followthemoney.model.Model, name: str, data: SchemaSpec)
120    def __init__(self, model: "Model", name: str, data: SchemaSpec) -> None:
121        #: Machine-readable name of the schema, used for identification.
122        self.name = name
123        self.model = model
124        self._label = data.get("label", name)
125        self._plural = data.get("plural", self.label)
126        self._description = data.get("description")
127        self._hash = hash("<Schema(%r)>" % name)
128
129        #: RDF identifier for this schema when it is transformed to a triple term.
130        self.uri = URIRef(cast(str, data.get("rdf", NS[name])))
131
132        #: Do not store or emit entities of this type, it is used only for
133        #: inheritance.
134        self.abstract = as_bool(data.get("abstract"), False)
135
136        #: This schema is deprecated and should not be used.
137        self.deprecated = as_bool(data.get("deprecated", False))
138
139        #: Hide this schema in listings.
140        self.hidden = as_bool(data.get("hidden"), False)
141        self.hidden = self.hidden and not self.abstract
142
143        #: Entities with this type are generated by the system - for example, via
144        #: `ingest-file`. The user should not be offered an option to create them
145        #: in the interface.
146        self.generated = as_bool(data.get("generated"), False)
147
148        #: Try to perform fuzzy matching. Fuzzy similarity search does not
149        #: make sense for entities which have a lot of similar names, such
150        #: as land plots, assets etc.
151        self.matchable = as_bool(data.get("matchable"), True)
152
153        #: Mark a set of properties as important, i.e. they should be shown
154        #: first, or in an abridged view of the entity. In Aleph, these properties
155        #: are included in tabular entity listings.
156        self.featured = ensure_list(data.get("featured", []))
157
158        #: Mark a set of properties as required. This is applied only when
159        #: an entity is created by the user - bulk created entities will
160        #: slip through even if it is technically invalid.
161        self.required = ensure_list(data.get("required", []))
162
163        #: Mark a set of properties to be used for the entity's caption.
164        #: They will be checked in order and the first existent value will
165        #: be used.
166        self.caption = ensure_list(data.get("caption", []))
167
168        # A transform of the entity into an edge for its representation in
169        # the context of a property graph representation like Neo4J/Gephi.
170        edge = data.get("edge", {})
171        self.edge_source = edge.get("source")
172        self.edge_target = edge.get("target")
173
174        #: Flag to indicate if this schema should be represented by an edge (rather than
175        #: a node) when the data is converted into a property graph.
176        self.edge: bool = self.edge_source is not None and self.edge_target is not None
177        self.edge_caption = ensure_list(edge.get("caption", []))
178        self._edge_label = edge.get("label", self._label)
179
180        #: Flag to indicate if the edge should be presented as directed to the user,
181        #: e.g. by showing an error at the target end of the edge.
182        self.edge_directed = as_bool(edge.get("directed", True))
183
184        #: Specify which properties should be used to represent this schema in a
185        #: timeline.
186        temporal_extent = data.get("temporalExtent", {})
187        self.temporal_start = set(temporal_extent.get("start", []))
188        self.temporal_end = set(temporal_extent.get("end", []))
189
190        #: Direct parent schemata of this schema.
191        self._extends = ensure_list(data.get("extends", []))
192        self.extends: Set["Schema"] = set()
193
194        #: All parents of this schema (including indirect parents and the schema
195        #: itself).
196        self.schemata = set([self])
197
198        #: All names of :attr:`~schemata`.
199        self.names = set([self.name])
200
201        #: Inverse of :attr:`~schemata`, all derived child types of this schema
202        #: and their children.
203        self.descendants: Set["Schema"] = set()
204        self._matchable_schemata: Optional[Set["Schema"]] = None
205
206        #: The full list of properties defined for the entity, including those
207        #: inherited from parent schemata.
208        self.properties: Dict[str, Property] = {}
209        for name, prop in data.get("properties", {}).items():
210            self.properties[name] = Property(self, name, prop)
name
model
uri
abstract
deprecated
hidden
generated
matchable
featured
required
caption
edge_source
edge_target
edge: bool
edge_caption
edge_directed
temporal_start
temporal_end
extends: Set[Schema]
schemata
names
descendants: Set[Schema]
properties: Dict[str, followthemoney.property.Property]
def generate(self, model: followthemoney.model.Model) -> None:
212    def generate(self, model: "Model") -> None:
213        """While loading the schema, this function will validate and
214        load the hierarchy, properties, and flags of the definition."""
215        for extends in self._extends:
216            parent = model.get(extends)
217            if parent is None:
218                raise InvalidData("Invalid extends: %r" % extends)
219            parent.generate(model)
220
221            for name, prop in parent.properties.items():
222                if name not in self.properties:
223                    self.properties[name] = prop
224
225            self.extends.add(parent)
226            for ancestor in parent.schemata:
227                self.schemata.add(ancestor)
228                self.names.add(ancestor.name)
229                ancestor.descendants.add(self)
230
231            self.temporal_start |= parent.temporal_start
232            self.temporal_end |= parent.temporal_end
233
234        for prop in list(self.properties.values()):
235            prop.generate(model)
236
237        for featured in self.featured:
238            if self.get(featured) is None:
239                raise InvalidModel("Missing featured property: %s" % featured)
240
241        for caption in self.caption:
242            prop_ = self.get(caption)
243            if prop_ is None:
244                raise InvalidModel("Missing caption property: %s" % caption)
245            if prop_.type == registry.entity:
246                raise InvalidModel("Caption cannot be entity: %s" % caption)
247
248        for required in self.required:
249            if self.get(required) is None:
250                raise InvalidModel("Missing required property: %s" % required)
251
252        if self.edge:
253            if self.source_prop is None:
254                msg = "Missing edge source: %s" % self.edge_source
255                raise InvalidModel(msg)
256
257            if self.target_prop is None:
258                msg = "Missing edge target: %s" % self.edge_target
259                raise InvalidModel(msg)

While loading the schema, this function will validate and load the hierarchy, properties, and flags of the definition.

label: str
283    @property
284    def label(self) -> str:
285        """User-facing name of the schema."""
286        return gettext(self._label)

User-facing name of the schema.

plural: str
288    @property
289    def plural(self) -> str:
290        """Name of the schema to be used in plural constructions."""
291        return gettext(self._plural)

Name of the schema to be used in plural constructions.

description: Optional[str]
293    @property
294    def description(self) -> Optional[str]:
295        """A longer description of the semantics of the schema."""
296        return gettext(self._description)

A longer description of the semantics of the schema.

edge_label: Optional[str]
298    @property
299    def edge_label(self) -> Optional[str]:
300        """Description label for edges derived from entities of this schema."""
301        return gettext(self._edge_label)

Description label for edges derived from entities of this schema.

source_prop: Optional[followthemoney.property.Property]
303    @property
304    def source_prop(self) -> Optional[Property]:
305        """The entity property to be used as an edge source."""
306        return self.get(self.edge_source)

The entity property to be used as an edge source.

target_prop: Optional[followthemoney.property.Property]
308    @property
309    def target_prop(self) -> Optional[Property]:
310        """The entity property to be used as an edge target."""
311        return self.get(self.edge_target)

The entity property to be used as an edge target.

temporal_start_props: Set[followthemoney.property.Property]
313    @property
314    def temporal_start_props(self) -> Set[Property]:
315        """The entity properties to be used as the start when representing the entity
316        in a timeline."""
317        props = [self.get(prop_name) for prop_name in self.temporal_start]
318        return set([prop for prop in props if prop is not None])

The entity properties to be used as the start when representing the entity in a timeline.

temporal_end_props: Set[followthemoney.property.Property]
320    @property
321    def temporal_end_props(self) -> Set[Property]:
322        """The entity properties to be used as the end when representing the entity
323        in a timeline."""
324        props = [self.get(prop_name) for prop_name in self.temporal_end]
325        return set([prop for prop in props if prop is not None])

The entity properties to be used as the end when representing the entity in a timeline.

sorted_properties: List[followthemoney.property.Property]
327    @property
328    def sorted_properties(self) -> List[Property]:
329        """All properties of the schema in the order in which they should be shown
330        to the user (alphabetically, with captions and featured properties first)."""
331        return sorted(
332            self.properties.values(),
333            key=lambda p: (
334                p.name not in self.caption,
335                p.name not in self.featured,
336                p.label,
337            ),
338        )

All properties of the schema in the order in which they should be shown to the user (alphabetically, with captions and featured properties first).

matchable_schemata: Set[Schema]
340    @property
341    def matchable_schemata(self) -> Set["Schema"]:
342        """Return the set of schemata to which it makes sense to compare with this
343        schema. For example, it makes sense to compare a legal entity with a company,
344        but it does not make sense to compare a car and a person."""
345        if self._matchable_schemata is None:
346            self._matchable_schemata = set()
347            if self.matchable:
348                # This is used by the cross-referencer to determine what
349                # other schemata should be considered for matches. For
350                # example, a Company may be compared to a Legal Entity,
351                # but it makes no sense to compare it to an Aircraft.
352                candidates = set(self.schemata)
353                candidates.update(self.descendants)
354                for schema in candidates:
355                    if schema.matchable:
356                        self._matchable_schemata.add(schema)
357        return self._matchable_schemata

Return the set of schemata to which it makes sense to compare with this schema. For example, it makes sense to compare a legal entity with a company, but it does not make sense to compare a car and a person.

def can_match(self, other: Schema) -> bool:
359    def can_match(self, other: "Schema") -> bool:
360        """Check if an schema can match with another schema."""
361        return other in self.matchable_schemata

Check if an schema can match with another schema.

@lru_cache(maxsize=None)
def is_a(self, other: Union[str, Schema]) -> bool:
363    @lru_cache(maxsize=None)
364    def is_a(self, other: Union[str, "Schema"]) -> bool:
365        """Check if the schema or one of its parents is the same as the given
366        candidate ``other``."""
367        if not isinstance(other, str):
368            other = other.name
369        return other in self.names

Check if the schema or one of its parents is the same as the given candidate other.

def get(self, name: Optional[str]) -> Optional[followthemoney.property.Property]:
371    def get(self, name: Optional[str]) -> Optional[Property]:
372        """Retrieve a property defined for this schema by its name."""
373        if name is None:
374            return None
375        return self.properties.get(name)

Retrieve a property defined for this schema by its name.

def validate(self, data: Any) -> Optional[str]:
377    def validate(self, data: Any) -> Optional[str]:
378        """Validate a dictionary against the given schema.
379        This will also drop keys which are not valid as properties.
380        """
381        errors = {}
382        properties = cast(Dict[str, Any], ensure_dict(data.get("properties")))
383        for name, prop in self.properties.items():
384            values = ensure_list(properties.get(name, []))
385            error = prop.validate(values)
386            if error is None and not len(values):
387                if prop.name in self.required:
388                    error = gettext("Required")
389            if error is not None:
390                errors[name] = error
391        if len(errors):
392            msg = gettext("Entity validation failed")
393            raise InvalidData(msg, errors={"properties": errors})
394        return None

Validate a dictionary against the given schema. This will also drop keys which are not valid as properties.

def to_dict(self) -> SchemaToDict:
396    def to_dict(self) -> SchemaToDict:
397        """Return schema metadata, including all properties, in a serializable form."""
398        data: SchemaToDict = {
399            "label": self.label,
400            "plural": self.plural,
401            "schemata": list(sorted(self.names)),
402            "extends": list(sorted([e.name for e in self.extends])),
403        }
404        if self.edge_source and self.edge_target and self.edge_label:
405            data["edge"] = {
406                "source": self.edge_source,
407                "target": self.edge_target,
408                "caption": self.edge_caption,
409                "label": self.edge_label,
410                "directed": self.edge_directed,
411            }
412        start_props = [
413            prop.name for prop in self.temporal_start_props if prop.schema == self
414        ]
415        end_props = [
416            prop.name for prop in self.temporal_end_props if prop.schema == self
417        ]
418        if start_props or end_props:
419            data["temporalExtent"] = {
420                "start": sorted(start_props),
421                "end": sorted(end_props),
422            }
423        if len(self.featured):
424            data["featured"] = self.featured
425        if len(self.required):
426            data["required"] = self.required
427        if len(self.caption):
428            data["caption"] = self.caption
429        if self.description:
430            data["description"] = self.description
431        if self.abstract:
432            data["abstract"] = True
433        if self.hidden:
434            data["hidden"] = True
435        if self.generated:
436            data["generated"] = True
437        if self.matchable:
438            data["matchable"] = True
439        if self.deprecated:
440            data["deprecated"] = True
441        properties: Dict[str, PropertyToDict] = {}
442        for name, prop in self.properties.items():
443            if prop.schema == self:
444                properties[name] = prop.to_dict()
445        data["properties"] = properties
446        return data

Return schema metadata, including all properties, in a serializable form.