followthemoney.model

  1import os
  2import yaml
  3from functools import lru_cache
  4from typing import TYPE_CHECKING, Any
  5from typing import Dict, Generator, Iterator, Optional, Set, TypedDict, Union
  6
  7from followthemoney.types import registry
  8from followthemoney.types.common import PropertyType, PropertyTypeToDict
  9from followthemoney.schema import Schema, SchemaToDict
 10from followthemoney.property import Property
 11from followthemoney.exc import InvalidModel, InvalidData
 12
 13if TYPE_CHECKING:
 14    from followthemoney.proxy import EntityProxy
 15    from followthemoney.mapping import QueryMapping
 16
 17
 18class ModelToDict(TypedDict):
 19    schemata: Dict[str, SchemaToDict]
 20    types: Dict[str, PropertyTypeToDict]
 21
 22
 23class Model(object):
 24    """A collection of all the schemata available in followthemoney. The model
 25    provides some helper functions to find schemata, properties or to instantiate
 26    entity proxies based on the schema metadata."""
 27
 28    _instance: Optional["Model"] = None
 29
 30    __slots__ = ("path", "schemata", "properties", "qnames")
 31
 32    def __init__(self, path: str) -> None:
 33        self.path = path
 34
 35        #: A mapping with all schemata, organised by their name.
 36        self.schemata: Dict[str, Schema] = {}
 37
 38        #: All properties defined in the model.
 39        self.properties: Set[Property] = set()
 40        self.qnames: Dict[str, Property] = {}
 41        for path, _, filenames in os.walk(self.path):
 42            for filename in filenames:
 43                self._load(os.path.join(path, filename))
 44        self.generate()
 45
 46    @classmethod
 47    def instance(cls) -> "Model":
 48        if cls._instance is None:
 49            model_path = os.path.dirname(__file__)
 50            model_path = os.path.join(model_path, "schema")
 51            model_path = os.environ.get("FTM_MODEL_PATH", model_path)
 52            cls._instance = cls(model_path)
 53        return cls._instance
 54
 55    def generate(self) -> None:
 56        """Loading the model is a weird process because the schemata reference
 57        each other in complex ways, so the generation process cannot be fully
 58        run as schemata are being instantiated. Hence this process needs to be
 59        called once all schemata are loaded to finalise dereferencing the
 60        schemata."""
 61        for schema in self:
 62            schema.generate(self)
 63        for prop in self.properties:
 64            self.qnames[prop.qname] = prop
 65            for schema in prop.schema.descendants:
 66                if prop.name not in schema.properties:
 67                    schema.properties[prop.name] = prop
 68
 69    def _load(self, filepath: str) -> None:
 70        with open(filepath, "r", encoding="utf-8") as fh:
 71            data = yaml.safe_load(fh)
 72            if not isinstance(data, dict):
 73                raise InvalidModel("Model file is not a mapping: %s" % filepath)
 74            for name, config in data.items():
 75                self.schemata[name] = Schema(self, name, config)
 76
 77    def get(self, name: Union[str, Schema]) -> Optional[Schema]:
 78        """Get a schema object based on a schema name. If the input is already
 79        a schema object, it will just be returned."""
 80        if isinstance(name, str):
 81            return self.schemata.get(name)
 82        return name
 83
 84    def get_qname(self, qname: str) -> Optional[Property]:
 85        """Get a property object based on a qualified name (i.e. schema:property)."""
 86        return self.qnames.get(qname)
 87
 88    def __getitem__(self, name: str) -> Schema:
 89        """Same as get(), but throws an exception when the given name does not exist."""
 90        schema = self.get(name)
 91        if schema is None:
 92            raise KeyError("No such schema: %s" % name)
 93        return schema
 94
 95    def get_type_schemata(self, type_: PropertyType) -> Set[Schema]:
 96        """Return all the schemata which have a property of the given type."""
 97        schemata = set()
 98        for schema in self.schemata.values():
 99            for prop in schema.properties.values():
100                if prop.type == type_:
101                    schemata.add(schema)
102        return schemata
103
104    def make_mapping(
105        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
106    ) -> "QueryMapping":
107        """Parse a mapping that applies (tabular) source data to the model."""
108        from followthemoney.mapping import QueryMapping
109
110        return QueryMapping(self, mapping, key_prefix=key_prefix)
111
112    def map_entities(
113        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
114    ) -> Generator["EntityProxy", None, None]:
115        """Given a mapping, yield a series of entities from the data source."""
116        gen = self.make_mapping(mapping, key_prefix=key_prefix)
117        for record in gen.source.records:
118            for entity in gen.map(record).values():
119                yield entity
120
121    @lru_cache(maxsize=None)
122    def common_schema(
123        self, left: Union[str, Schema], right: Union[str, Schema]
124    ) -> Schema:
125        """Select the most narrow of two schemata.
126
127        When indexing data from a dataset, an entity may be declared as a
128        LegalEntity in one query, and as a Person in another. This function
129        will select the most specific of two schemata offered. In the example,
130        that would be Person.
131        """
132        left_schema = self.get(left) or self.get(right)
133        right_schema = self.get(right) or self.get(left)
134        if left_schema is None or right_schema is None:
135            raise InvalidData("Invalid schema")
136        if left_schema.is_a(right_schema):
137            return left_schema
138        if right_schema.is_a(left_schema):
139            return right_schema
140        # for schema in self.schemata.values():
141        #     if schema.is_a(left) and schema.is_a(right):
142        #         return schema
143        msg = "No common schema: %s and %s"
144        raise InvalidData(msg % (left, right))
145
146    def matchable_schemata(self) -> Set[Schema]:
147        """Return a list of all schemata that are matchable."""
148        return set([s for s in self.schemata.values() if s.matchable])
149
150    def make_entity(
151        self, schema: Union[str, Schema], key_prefix: Optional[str] = None
152    ) -> "EntityProxy":
153        """Instantiate an empty entity proxy of the given schema type."""
154        from followthemoney.proxy import EntityProxy
155
156        schema_ = self.get(schema)
157        if schema_ is None:
158            raise InvalidData("Schema does not exist: %s" % schema)
159        return EntityProxy(schema_, {}, key_prefix=key_prefix)
160
161    def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> "EntityProxy":
162        """Create an entity proxy to reflect the entity data in the given
163        dictionary. If ``cleaned`` is disabled, all property values are
164        fully re-validated and normalised. Use this if handling input data
165        from an untrusted source."""
166        from followthemoney.proxy import EntityProxy
167
168        if isinstance(data, EntityProxy):
169            return data
170        return EntityProxy.from_dict(data, cleaned=cleaned)
171
172    def to_dict(self) -> ModelToDict:
173        """Return metadata for all schemata and properties, in a serializable form."""
174        return {
175            "schemata": {s.name: s.to_dict() for s in self.schemata.values()},
176            "types": {t.name: t.to_dict() for t in registry.types},
177        }
178
179    def __iter__(self) -> Iterator[Schema]:
180        """Iterate across all schemata."""
181        return iter(self.schemata.values())
class ModelToDict(typing.TypedDict):
19class ModelToDict(TypedDict):
20    schemata: Dict[str, SchemaToDict]
21    types: Dict[str, PropertyTypeToDict]
schemata: Dict[str, followthemoney.schema.SchemaToDict]
class Model:
 24class Model(object):
 25    """A collection of all the schemata available in followthemoney. The model
 26    provides some helper functions to find schemata, properties or to instantiate
 27    entity proxies based on the schema metadata."""
 28
 29    _instance: Optional["Model"] = None
 30
 31    __slots__ = ("path", "schemata", "properties", "qnames")
 32
 33    def __init__(self, path: str) -> None:
 34        self.path = path
 35
 36        #: A mapping with all schemata, organised by their name.
 37        self.schemata: Dict[str, Schema] = {}
 38
 39        #: All properties defined in the model.
 40        self.properties: Set[Property] = set()
 41        self.qnames: Dict[str, Property] = {}
 42        for path, _, filenames in os.walk(self.path):
 43            for filename in filenames:
 44                self._load(os.path.join(path, filename))
 45        self.generate()
 46
 47    @classmethod
 48    def instance(cls) -> "Model":
 49        if cls._instance is None:
 50            model_path = os.path.dirname(__file__)
 51            model_path = os.path.join(model_path, "schema")
 52            model_path = os.environ.get("FTM_MODEL_PATH", model_path)
 53            cls._instance = cls(model_path)
 54        return cls._instance
 55
 56    def generate(self) -> None:
 57        """Loading the model is a weird process because the schemata reference
 58        each other in complex ways, so the generation process cannot be fully
 59        run as schemata are being instantiated. Hence this process needs to be
 60        called once all schemata are loaded to finalise dereferencing the
 61        schemata."""
 62        for schema in self:
 63            schema.generate(self)
 64        for prop in self.properties:
 65            self.qnames[prop.qname] = prop
 66            for schema in prop.schema.descendants:
 67                if prop.name not in schema.properties:
 68                    schema.properties[prop.name] = prop
 69
 70    def _load(self, filepath: str) -> None:
 71        with open(filepath, "r", encoding="utf-8") as fh:
 72            data = yaml.safe_load(fh)
 73            if not isinstance(data, dict):
 74                raise InvalidModel("Model file is not a mapping: %s" % filepath)
 75            for name, config in data.items():
 76                self.schemata[name] = Schema(self, name, config)
 77
 78    def get(self, name: Union[str, Schema]) -> Optional[Schema]:
 79        """Get a schema object based on a schema name. If the input is already
 80        a schema object, it will just be returned."""
 81        if isinstance(name, str):
 82            return self.schemata.get(name)
 83        return name
 84
 85    def get_qname(self, qname: str) -> Optional[Property]:
 86        """Get a property object based on a qualified name (i.e. schema:property)."""
 87        return self.qnames.get(qname)
 88
 89    def __getitem__(self, name: str) -> Schema:
 90        """Same as get(), but throws an exception when the given name does not exist."""
 91        schema = self.get(name)
 92        if schema is None:
 93            raise KeyError("No such schema: %s" % name)
 94        return schema
 95
 96    def get_type_schemata(self, type_: PropertyType) -> Set[Schema]:
 97        """Return all the schemata which have a property of the given type."""
 98        schemata = set()
 99        for schema in self.schemata.values():
100            for prop in schema.properties.values():
101                if prop.type == type_:
102                    schemata.add(schema)
103        return schemata
104
105    def make_mapping(
106        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
107    ) -> "QueryMapping":
108        """Parse a mapping that applies (tabular) source data to the model."""
109        from followthemoney.mapping import QueryMapping
110
111        return QueryMapping(self, mapping, key_prefix=key_prefix)
112
113    def map_entities(
114        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
115    ) -> Generator["EntityProxy", None, None]:
116        """Given a mapping, yield a series of entities from the data source."""
117        gen = self.make_mapping(mapping, key_prefix=key_prefix)
118        for record in gen.source.records:
119            for entity in gen.map(record).values():
120                yield entity
121
122    @lru_cache(maxsize=None)
123    def common_schema(
124        self, left: Union[str, Schema], right: Union[str, Schema]
125    ) -> Schema:
126        """Select the most narrow of two schemata.
127
128        When indexing data from a dataset, an entity may be declared as a
129        LegalEntity in one query, and as a Person in another. This function
130        will select the most specific of two schemata offered. In the example,
131        that would be Person.
132        """
133        left_schema = self.get(left) or self.get(right)
134        right_schema = self.get(right) or self.get(left)
135        if left_schema is None or right_schema is None:
136            raise InvalidData("Invalid schema")
137        if left_schema.is_a(right_schema):
138            return left_schema
139        if right_schema.is_a(left_schema):
140            return right_schema
141        # for schema in self.schemata.values():
142        #     if schema.is_a(left) and schema.is_a(right):
143        #         return schema
144        msg = "No common schema: %s and %s"
145        raise InvalidData(msg % (left, right))
146
147    def matchable_schemata(self) -> Set[Schema]:
148        """Return a list of all schemata that are matchable."""
149        return set([s for s in self.schemata.values() if s.matchable])
150
151    def make_entity(
152        self, schema: Union[str, Schema], key_prefix: Optional[str] = None
153    ) -> "EntityProxy":
154        """Instantiate an empty entity proxy of the given schema type."""
155        from followthemoney.proxy import EntityProxy
156
157        schema_ = self.get(schema)
158        if schema_ is None:
159            raise InvalidData("Schema does not exist: %s" % schema)
160        return EntityProxy(schema_, {}, key_prefix=key_prefix)
161
162    def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> "EntityProxy":
163        """Create an entity proxy to reflect the entity data in the given
164        dictionary. If ``cleaned`` is disabled, all property values are
165        fully re-validated and normalised. Use this if handling input data
166        from an untrusted source."""
167        from followthemoney.proxy import EntityProxy
168
169        if isinstance(data, EntityProxy):
170            return data
171        return EntityProxy.from_dict(data, cleaned=cleaned)
172
173    def to_dict(self) -> ModelToDict:
174        """Return metadata for all schemata and properties, in a serializable form."""
175        return {
176            "schemata": {s.name: s.to_dict() for s in self.schemata.values()},
177            "types": {t.name: t.to_dict() for t in registry.types},
178        }
179
180    def __iter__(self) -> Iterator[Schema]:
181        """Iterate across all schemata."""
182        return iter(self.schemata.values())

A collection of all the schemata available in followthemoney. The model provides some helper functions to find schemata, properties or to instantiate entity proxies based on the schema metadata.

Model(path: str)
33    def __init__(self, path: str) -> None:
34        self.path = path
35
36        #: A mapping with all schemata, organised by their name.
37        self.schemata: Dict[str, Schema] = {}
38
39        #: All properties defined in the model.
40        self.properties: Set[Property] = set()
41        self.qnames: Dict[str, Property] = {}
42        for path, _, filenames in os.walk(self.path):
43            for filename in filenames:
44                self._load(os.path.join(path, filename))
45        self.generate()
path
schemata: Dict[str, followthemoney.schema.Schema]
@classmethod
def instance(cls) -> Model:
47    @classmethod
48    def instance(cls) -> "Model":
49        if cls._instance is None:
50            model_path = os.path.dirname(__file__)
51            model_path = os.path.join(model_path, "schema")
52            model_path = os.environ.get("FTM_MODEL_PATH", model_path)
53            cls._instance = cls(model_path)
54        return cls._instance
def generate(self) -> None:
56    def generate(self) -> None:
57        """Loading the model is a weird process because the schemata reference
58        each other in complex ways, so the generation process cannot be fully
59        run as schemata are being instantiated. Hence this process needs to be
60        called once all schemata are loaded to finalise dereferencing the
61        schemata."""
62        for schema in self:
63            schema.generate(self)
64        for prop in self.properties:
65            self.qnames[prop.qname] = prop
66            for schema in prop.schema.descendants:
67                if prop.name not in schema.properties:
68                    schema.properties[prop.name] = prop

Loading the model is a weird process because the schemata reference each other in complex ways, so the generation process cannot be fully run as schemata are being instantiated. Hence this process needs to be called once all schemata are loaded to finalise dereferencing the schemata.

def get( self, name: Union[str, followthemoney.schema.Schema]) -> Optional[followthemoney.schema.Schema]:
78    def get(self, name: Union[str, Schema]) -> Optional[Schema]:
79        """Get a schema object based on a schema name. If the input is already
80        a schema object, it will just be returned."""
81        if isinstance(name, str):
82            return self.schemata.get(name)
83        return name

Get a schema object based on a schema name. If the input is already a schema object, it will just be returned.

def get_qname(self, qname: str) -> Optional[followthemoney.property.Property]:
85    def get_qname(self, qname: str) -> Optional[Property]:
86        """Get a property object based on a qualified name (i.e. schema:property)."""
87        return self.qnames.get(qname)

Get a property object based on a qualified name (i.e. schema:property).

def get_type_schemata( self, type_: followthemoney.types.common.PropertyType) -> Set[followthemoney.schema.Schema]:
 96    def get_type_schemata(self, type_: PropertyType) -> Set[Schema]:
 97        """Return all the schemata which have a property of the given type."""
 98        schemata = set()
 99        for schema in self.schemata.values():
100            for prop in schema.properties.values():
101                if prop.type == type_:
102                    schemata.add(schema)
103        return schemata

Return all the schemata which have a property of the given type.

def make_mapping( self, mapping: Dict[str, Any], key_prefix: Optional[str] = None) -> followthemoney.mapping.query.QueryMapping:
105    def make_mapping(
106        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
107    ) -> "QueryMapping":
108        """Parse a mapping that applies (tabular) source data to the model."""
109        from followthemoney.mapping import QueryMapping
110
111        return QueryMapping(self, mapping, key_prefix=key_prefix)

Parse a mapping that applies (tabular) source data to the model.

def map_entities( self, mapping: Dict[str, Any], key_prefix: Optional[str] = None) -> Generator[followthemoney.proxy.EntityProxy, NoneType, NoneType]:
113    def map_entities(
114        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
115    ) -> Generator["EntityProxy", None, None]:
116        """Given a mapping, yield a series of entities from the data source."""
117        gen = self.make_mapping(mapping, key_prefix=key_prefix)
118        for record in gen.source.records:
119            for entity in gen.map(record).values():
120                yield entity

Given a mapping, yield a series of entities from the data source.

@lru_cache(maxsize=None)
def common_schema( self, left: Union[str, followthemoney.schema.Schema], right: Union[str, followthemoney.schema.Schema]) -> followthemoney.schema.Schema:
122    @lru_cache(maxsize=None)
123    def common_schema(
124        self, left: Union[str, Schema], right: Union[str, Schema]
125    ) -> Schema:
126        """Select the most narrow of two schemata.
127
128        When indexing data from a dataset, an entity may be declared as a
129        LegalEntity in one query, and as a Person in another. This function
130        will select the most specific of two schemata offered. In the example,
131        that would be Person.
132        """
133        left_schema = self.get(left) or self.get(right)
134        right_schema = self.get(right) or self.get(left)
135        if left_schema is None or right_schema is None:
136            raise InvalidData("Invalid schema")
137        if left_schema.is_a(right_schema):
138            return left_schema
139        if right_schema.is_a(left_schema):
140            return right_schema
141        # for schema in self.schemata.values():
142        #     if schema.is_a(left) and schema.is_a(right):
143        #         return schema
144        msg = "No common schema: %s and %s"
145        raise InvalidData(msg % (left, right))

Select the most narrow of two schemata.

When indexing data from a dataset, an entity may be declared as a LegalEntity in one query, and as a Person in another. This function will select the most specific of two schemata offered. In the example, that would be Person.

def matchable_schemata(self) -> Set[followthemoney.schema.Schema]:
147    def matchable_schemata(self) -> Set[Schema]:
148        """Return a list of all schemata that are matchable."""
149        return set([s for s in self.schemata.values() if s.matchable])

Return a list of all schemata that are matchable.

def make_entity( self, schema: Union[str, followthemoney.schema.Schema], key_prefix: Optional[str] = None) -> followthemoney.proxy.EntityProxy:
151    def make_entity(
152        self, schema: Union[str, Schema], key_prefix: Optional[str] = None
153    ) -> "EntityProxy":
154        """Instantiate an empty entity proxy of the given schema type."""
155        from followthemoney.proxy import EntityProxy
156
157        schema_ = self.get(schema)
158        if schema_ is None:
159            raise InvalidData("Schema does not exist: %s" % schema)
160        return EntityProxy(schema_, {}, key_prefix=key_prefix)

Instantiate an empty entity proxy of the given schema type.

def get_proxy( self, data: Dict[str, Any], cleaned: bool = True) -> followthemoney.proxy.EntityProxy:
162    def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> "EntityProxy":
163        """Create an entity proxy to reflect the entity data in the given
164        dictionary. If ``cleaned`` is disabled, all property values are
165        fully re-validated and normalised. Use this if handling input data
166        from an untrusted source."""
167        from followthemoney.proxy import EntityProxy
168
169        if isinstance(data, EntityProxy):
170            return data
171        return EntityProxy.from_dict(data, cleaned=cleaned)

Create an entity proxy to reflect the entity data in the given dictionary. If cleaned is disabled, all property values are fully re-validated and normalised. Use this if handling input data from an untrusted source.

def to_dict(self) -> ModelToDict:
173    def to_dict(self) -> ModelToDict:
174        """Return metadata for all schemata and properties, in a serializable form."""
175        return {
176            "schemata": {s.name: s.to_dict() for s in self.schemata.values()},
177            "types": {t.name: t.to_dict() for t in registry.types},
178        }

Return metadata for all schemata and properties, in a serializable form.