followthemoney.model

  1import os
  2import yaml
  3from typing import Any, Dict, Generator, Iterator, Optional, Set, TypedDict, Union
  4
  5from followthemoney.types import registry
  6from followthemoney.types.common import PropertyType, PropertyTypeToDict
  7from followthemoney.schema import Schema, SchemaToDict
  8from followthemoney.property import Property
  9from followthemoney.mapping import QueryMapping
 10from followthemoney.proxy import EntityProxy
 11from followthemoney.exc import InvalidModel, InvalidData
 12
 13
 14class ModelToDict(TypedDict):
 15    schemata: Dict[str, SchemaToDict]
 16    types: Dict[str, PropertyTypeToDict]
 17
 18
 19class Model(object):
 20    """A collection of all the schemata available in followthemoney. The model
 21    provides some helper functions to find schemata, properties or to instantiate
 22    entity proxies based on the schema metadata."""
 23
 24    __slots__ = ("path", "schemata", "properties", "qnames")
 25
 26    def __init__(self, path: str) -> None:
 27        self.path = path
 28
 29        #: A mapping with all schemata, organised by their name.
 30        self.schemata: Dict[str, Schema] = {}
 31
 32        #: All properties defined in the model.
 33        self.properties: Set[Property] = set()
 34        self.qnames: Dict[str, Property] = {}
 35        for (path, _, filenames) in os.walk(self.path):
 36            for filename in filenames:
 37                self._load(os.path.join(path, filename))
 38        self.generate()
 39
 40    def generate(self) -> None:
 41        """Loading the model is a weird process because the schemata reference
 42        each other in complex ways, so the generation process cannot be fully
 43        run as schemata are being instantiated. Hence this process needs to be
 44        called once all schemata are loaded to finalise dereferencing the
 45        schemata."""
 46        for schema in self:
 47            schema.generate(self)
 48        for prop in self.properties:
 49            self.qnames[prop.qname] = prop
 50            for schema in prop.schema.descendants:
 51                if prop.name not in schema.properties:
 52                    schema.properties[prop.name] = prop
 53
 54    def _load(self, filepath: str) -> None:
 55        with open(filepath, "r", encoding="utf-8") as fh:
 56            data = yaml.safe_load(fh)
 57            if not isinstance(data, dict):
 58                raise InvalidModel("Model file is not a mapping: %s" % filepath)
 59            for name, config in data.items():
 60                self.schemata[name] = Schema(self, name, config)
 61
 62    def get(self, name: Union[str, Schema]) -> Optional[Schema]:
 63        """Get a schema object based on a schema name. If the input is already
 64        a schema object, it will just be returned."""
 65        if isinstance(name, str):
 66            return self.schemata.get(name)
 67        return name
 68
 69    def get_qname(self, qname: str) -> Optional[Property]:
 70        """Get a property object based on a qualified name (i.e. schema:property)."""
 71        return self.qnames.get(qname)
 72
 73    def __getitem__(self, name: str) -> Schema:
 74        """Same as get(), but throws an exception when the given name does not exist."""
 75        schema = self.get(name)
 76        if schema is None:
 77            raise KeyError("No such schema: %s" % name)
 78        return schema
 79
 80    def get_type_schemata(self, type_: PropertyType) -> Set[Schema]:
 81        """Return all the schemata which have a property of the given type."""
 82        schemata = set()
 83        for schema in self.schemata.values():
 84            for prop in schema.properties.values():
 85                if prop.type == type_:
 86                    schemata.add(schema)
 87        return schemata
 88
 89    def make_mapping(
 90        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
 91    ) -> QueryMapping:
 92        """Parse a mapping that applies (tabular) source data to the model."""
 93        return QueryMapping(self, mapping, key_prefix=key_prefix)
 94
 95    def map_entities(
 96        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
 97    ) -> Generator[EntityProxy, None, None]:
 98        """Given a mapping, yield a series of entities from the data source."""
 99        gen = self.make_mapping(mapping, key_prefix=key_prefix)
100        for record in gen.source.records:
101            for entity in gen.map(record).values():
102                yield entity
103
104    def common_schema(
105        self, left: Union[str, Schema], right: Union[str, Schema]
106    ) -> Schema:
107        """Select the most narrow of two schemata.
108
109        When indexing data from a dataset, an entity may be declared as a
110        LegalEntity in one query, and as a Person in another. This function
111        will select the most specific of two schemata offered. In the example,
112        that would be Person.
113        """
114        left_schema = self.get(left) or self.get(right)
115        right_schema = self.get(right) or self.get(left)
116        if left_schema is None or right_schema is None:
117            raise InvalidData("Invalid schema")
118        if left_schema.is_a(right_schema):
119            return left_schema
120        if right_schema.is_a(left_schema):
121            return right_schema
122        # for schema in self.schemata.values():
123        #     if schema.is_a(left) and schema.is_a(right):
124        #         return schema
125        msg = "No common schema: %s and %s"
126        raise InvalidData(msg % (left, right))
127
128    def make_entity(
129        self, schema: Union[str, Schema], key_prefix: Optional[str] = None
130    ) -> EntityProxy:
131        """Instantiate an empty entity proxy of the given schema type."""
132        return EntityProxy(self, {"schema": schema}, key_prefix=key_prefix)
133
134    def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> EntityProxy:
135        """Create an entity proxy to reflect the entity data in the given
136        dictionary. If ``cleaned`` is disabled, all property values are
137        fully re-validated and normalised. Use this if handling input data
138        from an untrusted source."""
139        if isinstance(data, EntityProxy):
140            return data
141        return EntityProxy.from_dict(self, data, cleaned=cleaned)
142
143    def to_dict(self) -> ModelToDict:
144        """Return metadata for all schemata and properties, in a serializable form."""
145        return {
146            "schemata": {s.name: s.to_dict() for s in self.schemata.values()},
147            "types": {t.name: t.to_dict() for t in registry.types},
148        }
149
150    def __iter__(self) -> Iterator[Schema]:
151        """Iterate across all schemata."""
152        return iter(self.schemata.values())
class ModelToDict(typing.TypedDict):
15class ModelToDict(TypedDict):
16    schemata: Dict[str, SchemaToDict]
17    types: Dict[str, PropertyTypeToDict]
schemata: Dict[str, followthemoney.schema.SchemaToDict]
class Model:
 20class Model(object):
 21    """A collection of all the schemata available in followthemoney. The model
 22    provides some helper functions to find schemata, properties or to instantiate
 23    entity proxies based on the schema metadata."""
 24
 25    __slots__ = ("path", "schemata", "properties", "qnames")
 26
 27    def __init__(self, path: str) -> None:
 28        self.path = path
 29
 30        #: A mapping with all schemata, organised by their name.
 31        self.schemata: Dict[str, Schema] = {}
 32
 33        #: All properties defined in the model.
 34        self.properties: Set[Property] = set()
 35        self.qnames: Dict[str, Property] = {}
 36        for (path, _, filenames) in os.walk(self.path):
 37            for filename in filenames:
 38                self._load(os.path.join(path, filename))
 39        self.generate()
 40
 41    def generate(self) -> None:
 42        """Loading the model is a weird process because the schemata reference
 43        each other in complex ways, so the generation process cannot be fully
 44        run as schemata are being instantiated. Hence this process needs to be
 45        called once all schemata are loaded to finalise dereferencing the
 46        schemata."""
 47        for schema in self:
 48            schema.generate(self)
 49        for prop in self.properties:
 50            self.qnames[prop.qname] = prop
 51            for schema in prop.schema.descendants:
 52                if prop.name not in schema.properties:
 53                    schema.properties[prop.name] = prop
 54
 55    def _load(self, filepath: str) -> None:
 56        with open(filepath, "r", encoding="utf-8") as fh:
 57            data = yaml.safe_load(fh)
 58            if not isinstance(data, dict):
 59                raise InvalidModel("Model file is not a mapping: %s" % filepath)
 60            for name, config in data.items():
 61                self.schemata[name] = Schema(self, name, config)
 62
 63    def get(self, name: Union[str, Schema]) -> Optional[Schema]:
 64        """Get a schema object based on a schema name. If the input is already
 65        a schema object, it will just be returned."""
 66        if isinstance(name, str):
 67            return self.schemata.get(name)
 68        return name
 69
 70    def get_qname(self, qname: str) -> Optional[Property]:
 71        """Get a property object based on a qualified name (i.e. schema:property)."""
 72        return self.qnames.get(qname)
 73
 74    def __getitem__(self, name: str) -> Schema:
 75        """Same as get(), but throws an exception when the given name does not exist."""
 76        schema = self.get(name)
 77        if schema is None:
 78            raise KeyError("No such schema: %s" % name)
 79        return schema
 80
 81    def get_type_schemata(self, type_: PropertyType) -> Set[Schema]:
 82        """Return all the schemata which have a property of the given type."""
 83        schemata = set()
 84        for schema in self.schemata.values():
 85            for prop in schema.properties.values():
 86                if prop.type == type_:
 87                    schemata.add(schema)
 88        return schemata
 89
 90    def make_mapping(
 91        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
 92    ) -> QueryMapping:
 93        """Parse a mapping that applies (tabular) source data to the model."""
 94        return QueryMapping(self, mapping, key_prefix=key_prefix)
 95
 96    def map_entities(
 97        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
 98    ) -> Generator[EntityProxy, None, None]:
 99        """Given a mapping, yield a series of entities from the data source."""
100        gen = self.make_mapping(mapping, key_prefix=key_prefix)
101        for record in gen.source.records:
102            for entity in gen.map(record).values():
103                yield entity
104
105    def common_schema(
106        self, left: Union[str, Schema], right: Union[str, Schema]
107    ) -> Schema:
108        """Select the most narrow of two schemata.
109
110        When indexing data from a dataset, an entity may be declared as a
111        LegalEntity in one query, and as a Person in another. This function
112        will select the most specific of two schemata offered. In the example,
113        that would be Person.
114        """
115        left_schema = self.get(left) or self.get(right)
116        right_schema = self.get(right) or self.get(left)
117        if left_schema is None or right_schema is None:
118            raise InvalidData("Invalid schema")
119        if left_schema.is_a(right_schema):
120            return left_schema
121        if right_schema.is_a(left_schema):
122            return right_schema
123        # for schema in self.schemata.values():
124        #     if schema.is_a(left) and schema.is_a(right):
125        #         return schema
126        msg = "No common schema: %s and %s"
127        raise InvalidData(msg % (left, right))
128
129    def make_entity(
130        self, schema: Union[str, Schema], key_prefix: Optional[str] = None
131    ) -> EntityProxy:
132        """Instantiate an empty entity proxy of the given schema type."""
133        return EntityProxy(self, {"schema": schema}, key_prefix=key_prefix)
134
135    def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> EntityProxy:
136        """Create an entity proxy to reflect the entity data in the given
137        dictionary. If ``cleaned`` is disabled, all property values are
138        fully re-validated and normalised. Use this if handling input data
139        from an untrusted source."""
140        if isinstance(data, EntityProxy):
141            return data
142        return EntityProxy.from_dict(self, data, cleaned=cleaned)
143
144    def to_dict(self) -> ModelToDict:
145        """Return metadata for all schemata and properties, in a serializable form."""
146        return {
147            "schemata": {s.name: s.to_dict() for s in self.schemata.values()},
148            "types": {t.name: t.to_dict() for t in registry.types},
149        }
150
151    def __iter__(self) -> Iterator[Schema]:
152        """Iterate across all schemata."""
153        return iter(self.schemata.values())

A collection of all the schemata available in followthemoney. The model provides some helper functions to find schemata, properties or to instantiate entity proxies based on the schema metadata.

Model(path: str)
27    def __init__(self, path: str) -> None:
28        self.path = path
29
30        #: A mapping with all schemata, organised by their name.
31        self.schemata: Dict[str, Schema] = {}
32
33        #: All properties defined in the model.
34        self.properties: Set[Property] = set()
35        self.qnames: Dict[str, Property] = {}
36        for (path, _, filenames) in os.walk(self.path):
37            for filename in filenames:
38                self._load(os.path.join(path, filename))
39        self.generate()
path
schemata: Dict[str, followthemoney.schema.Schema]
def generate(self) -> None:
41    def generate(self) -> None:
42        """Loading the model is a weird process because the schemata reference
43        each other in complex ways, so the generation process cannot be fully
44        run as schemata are being instantiated. Hence this process needs to be
45        called once all schemata are loaded to finalise dereferencing the
46        schemata."""
47        for schema in self:
48            schema.generate(self)
49        for prop in self.properties:
50            self.qnames[prop.qname] = prop
51            for schema in prop.schema.descendants:
52                if prop.name not in schema.properties:
53                    schema.properties[prop.name] = prop

Loading the model is a weird process because the schemata reference each other in complex ways, so the generation process cannot be fully run as schemata are being instantiated. Hence this process needs to be called once all schemata are loaded to finalise dereferencing the schemata.

def get( self, name: Union[str, followthemoney.schema.Schema]) -> Optional[followthemoney.schema.Schema]:
63    def get(self, name: Union[str, Schema]) -> Optional[Schema]:
64        """Get a schema object based on a schema name. If the input is already
65        a schema object, it will just be returned."""
66        if isinstance(name, str):
67            return self.schemata.get(name)
68        return name

Get a schema object based on a schema name. If the input is already a schema object, it will just be returned.

def get_qname(self, qname: str) -> Optional[followthemoney.property.Property]:
70    def get_qname(self, qname: str) -> Optional[Property]:
71        """Get a property object based on a qualified name (i.e. schema:property)."""
72        return self.qnames.get(qname)

Get a property object based on a qualified name (i.e. schema:property).

def get_type_schemata( self, type_: followthemoney.types.common.PropertyType) -> Set[followthemoney.schema.Schema]:
81    def get_type_schemata(self, type_: PropertyType) -> Set[Schema]:
82        """Return all the schemata which have a property of the given type."""
83        schemata = set()
84        for schema in self.schemata.values():
85            for prop in schema.properties.values():
86                if prop.type == type_:
87                    schemata.add(schema)
88        return schemata

Return all the schemata which have a property of the given type.

def make_mapping( self, mapping: Dict[str, Any], key_prefix: Optional[str] = None) -> followthemoney.mapping.query.QueryMapping:
90    def make_mapping(
91        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
92    ) -> QueryMapping:
93        """Parse a mapping that applies (tabular) source data to the model."""
94        return QueryMapping(self, mapping, key_prefix=key_prefix)

Parse a mapping that applies (tabular) source data to the model.

def map_entities( self, mapping: Dict[str, Any], key_prefix: Optional[str] = None) -> Generator[followthemoney.proxy.EntityProxy, NoneType, NoneType]:
 96    def map_entities(
 97        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
 98    ) -> Generator[EntityProxy, None, None]:
 99        """Given a mapping, yield a series of entities from the data source."""
100        gen = self.make_mapping(mapping, key_prefix=key_prefix)
101        for record in gen.source.records:
102            for entity in gen.map(record).values():
103                yield entity

Given a mapping, yield a series of entities from the data source.

def common_schema( self, left: Union[str, followthemoney.schema.Schema], right: Union[str, followthemoney.schema.Schema]) -> followthemoney.schema.Schema:
105    def common_schema(
106        self, left: Union[str, Schema], right: Union[str, Schema]
107    ) -> Schema:
108        """Select the most narrow of two schemata.
109
110        When indexing data from a dataset, an entity may be declared as a
111        LegalEntity in one query, and as a Person in another. This function
112        will select the most specific of two schemata offered. In the example,
113        that would be Person.
114        """
115        left_schema = self.get(left) or self.get(right)
116        right_schema = self.get(right) or self.get(left)
117        if left_schema is None or right_schema is None:
118            raise InvalidData("Invalid schema")
119        if left_schema.is_a(right_schema):
120            return left_schema
121        if right_schema.is_a(left_schema):
122            return right_schema
123        # for schema in self.schemata.values():
124        #     if schema.is_a(left) and schema.is_a(right):
125        #         return schema
126        msg = "No common schema: %s and %s"
127        raise InvalidData(msg % (left, right))

Select the most narrow of two schemata.

When indexing data from a dataset, an entity may be declared as a LegalEntity in one query, and as a Person in another. This function will select the most specific of two schemata offered. In the example, that would be Person.

def make_entity( self, schema: Union[str, followthemoney.schema.Schema], key_prefix: Optional[str] = None) -> followthemoney.proxy.EntityProxy:
129    def make_entity(
130        self, schema: Union[str, Schema], key_prefix: Optional[str] = None
131    ) -> EntityProxy:
132        """Instantiate an empty entity proxy of the given schema type."""
133        return EntityProxy(self, {"schema": schema}, key_prefix=key_prefix)

Instantiate an empty entity proxy of the given schema type.

def get_proxy( self, data: Dict[str, Any], cleaned: bool = True) -> followthemoney.proxy.EntityProxy:
135    def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> EntityProxy:
136        """Create an entity proxy to reflect the entity data in the given
137        dictionary. If ``cleaned`` is disabled, all property values are
138        fully re-validated and normalised. Use this if handling input data
139        from an untrusted source."""
140        if isinstance(data, EntityProxy):
141            return data
142        return EntityProxy.from_dict(self, data, cleaned=cleaned)

Create an entity proxy to reflect the entity data in the given dictionary. If cleaned is disabled, all property values are fully re-validated and normalised. Use this if handling input data from an untrusted source.

def to_dict(self) -> ModelToDict:
144    def to_dict(self) -> ModelToDict:
145        """Return metadata for all schemata and properties, in a serializable form."""
146        return {
147            "schemata": {s.name: s.to_dict() for s in self.schemata.values()},
148            "types": {t.name: t.to_dict() for t in registry.types},
149        }

Return metadata for all schemata and properties, in a serializable form.