followthemoney.model

  1import os
  2import yaml
  3from functools import lru_cache
  4from typing import Any, Dict, Generator, Iterator, Optional, Set, TypedDict, Union
  5
  6from followthemoney.types import registry
  7from followthemoney.types.common import PropertyType, PropertyTypeToDict
  8from followthemoney.schema import Schema, SchemaToDict
  9from followthemoney.property import Property
 10from followthemoney.mapping import QueryMapping
 11from followthemoney.proxy import EntityProxy
 12from followthemoney.exc import InvalidModel, InvalidData
 13
 14
 15class ModelToDict(TypedDict):
 16    schemata: Dict[str, SchemaToDict]
 17    types: Dict[str, PropertyTypeToDict]
 18
 19
 20class Model(object):
 21    """A collection of all the schemata available in followthemoney. The model
 22    provides some helper functions to find schemata, properties or to instantiate
 23    entity proxies based on the schema metadata."""
 24
 25    __slots__ = ("path", "schemata", "properties", "qnames")
 26
 27    def __init__(self, path: str) -> None:
 28        self.path = path
 29
 30        #: A mapping with all schemata, organised by their name.
 31        self.schemata: Dict[str, Schema] = {}
 32
 33        #: All properties defined in the model.
 34        self.properties: Set[Property] = set()
 35        self.qnames: Dict[str, Property] = {}
 36        for path, _, filenames in os.walk(self.path):
 37            for filename in filenames:
 38                self._load(os.path.join(path, filename))
 39        self.generate()
 40
 41    def generate(self) -> None:
 42        """Loading the model is a weird process because the schemata reference
 43        each other in complex ways, so the generation process cannot be fully
 44        run as schemata are being instantiated. Hence this process needs to be
 45        called once all schemata are loaded to finalise dereferencing the
 46        schemata."""
 47        for schema in self:
 48            schema.generate(self)
 49        for prop in self.properties:
 50            self.qnames[prop.qname] = prop
 51            for schema in prop.schema.descendants:
 52                if prop.name not in schema.properties:
 53                    schema.properties[prop.name] = prop
 54
 55    def _load(self, filepath: str) -> None:
 56        with open(filepath, "r", encoding="utf-8") as fh:
 57            data = yaml.safe_load(fh)
 58            if not isinstance(data, dict):
 59                raise InvalidModel("Model file is not a mapping: %s" % filepath)
 60            for name, config in data.items():
 61                self.schemata[name] = Schema(self, name, config)
 62
 63    def get(self, name: Union[str, Schema]) -> Optional[Schema]:
 64        """Get a schema object based on a schema name. If the input is already
 65        a schema object, it will just be returned."""
 66        if isinstance(name, str):
 67            return self.schemata.get(name)
 68        return name
 69
 70    def get_qname(self, qname: str) -> Optional[Property]:
 71        """Get a property object based on a qualified name (i.e. schema:property)."""
 72        return self.qnames.get(qname)
 73
 74    def __getitem__(self, name: str) -> Schema:
 75        """Same as get(), but throws an exception when the given name does not exist."""
 76        schema = self.get(name)
 77        if schema is None:
 78            raise KeyError("No such schema: %s" % name)
 79        return schema
 80
 81    def get_type_schemata(self, type_: PropertyType) -> Set[Schema]:
 82        """Return all the schemata which have a property of the given type."""
 83        schemata = set()
 84        for schema in self.schemata.values():
 85            for prop in schema.properties.values():
 86                if prop.type == type_:
 87                    schemata.add(schema)
 88        return schemata
 89
 90    def make_mapping(
 91        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
 92    ) -> QueryMapping:
 93        """Parse a mapping that applies (tabular) source data to the model."""
 94        return QueryMapping(self, mapping, key_prefix=key_prefix)
 95
 96    def map_entities(
 97        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
 98    ) -> Generator[EntityProxy, None, None]:
 99        """Given a mapping, yield a series of entities from the data source."""
100        gen = self.make_mapping(mapping, key_prefix=key_prefix)
101        for record in gen.source.records:
102            for entity in gen.map(record).values():
103                yield entity
104
105    @lru_cache(maxsize=None)
106    def common_schema(
107        self, left: Union[str, Schema], right: Union[str, Schema]
108    ) -> Schema:
109        """Select the most narrow of two schemata.
110
111        When indexing data from a dataset, an entity may be declared as a
112        LegalEntity in one query, and as a Person in another. This function
113        will select the most specific of two schemata offered. In the example,
114        that would be Person.
115        """
116        left_schema = self.get(left) or self.get(right)
117        right_schema = self.get(right) or self.get(left)
118        if left_schema is None or right_schema is None:
119            raise InvalidData("Invalid schema")
120        if left_schema.is_a(right_schema):
121            return left_schema
122        if right_schema.is_a(left_schema):
123            return right_schema
124        # for schema in self.schemata.values():
125        #     if schema.is_a(left) and schema.is_a(right):
126        #         return schema
127        msg = "No common schema: %s and %s"
128        raise InvalidData(msg % (left, right))
129
130    def make_entity(
131        self, schema: Union[str, Schema], key_prefix: Optional[str] = None
132    ) -> EntityProxy:
133        """Instantiate an empty entity proxy of the given schema type."""
134        return EntityProxy(self, {"schema": schema}, key_prefix=key_prefix)
135
136    def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> EntityProxy:
137        """Create an entity proxy to reflect the entity data in the given
138        dictionary. If ``cleaned`` is disabled, all property values are
139        fully re-validated and normalised. Use this if handling input data
140        from an untrusted source."""
141        if isinstance(data, EntityProxy):
142            return data
143        return EntityProxy.from_dict(self, data, cleaned=cleaned)
144
145    def to_dict(self) -> ModelToDict:
146        """Return metadata for all schemata and properties, in a serializable form."""
147        return {
148            "schemata": {s.name: s.to_dict() for s in self.schemata.values()},
149            "types": {t.name: t.to_dict() for t in registry.types},
150        }
151
152    def __iter__(self) -> Iterator[Schema]:
153        """Iterate across all schemata."""
154        return iter(self.schemata.values())
class ModelToDict(typing.TypedDict):
16class ModelToDict(TypedDict):
17    schemata: Dict[str, SchemaToDict]
18    types: Dict[str, PropertyTypeToDict]
schemata: Dict[str, followthemoney.schema.SchemaToDict]
class Model:
 21class Model(object):
 22    """A collection of all the schemata available in followthemoney. The model
 23    provides some helper functions to find schemata, properties or to instantiate
 24    entity proxies based on the schema metadata."""
 25
 26    __slots__ = ("path", "schemata", "properties", "qnames")
 27
 28    def __init__(self, path: str) -> None:
 29        self.path = path
 30
 31        #: A mapping with all schemata, organised by their name.
 32        self.schemata: Dict[str, Schema] = {}
 33
 34        #: All properties defined in the model.
 35        self.properties: Set[Property] = set()
 36        self.qnames: Dict[str, Property] = {}
 37        for path, _, filenames in os.walk(self.path):
 38            for filename in filenames:
 39                self._load(os.path.join(path, filename))
 40        self.generate()
 41
 42    def generate(self) -> None:
 43        """Loading the model is a weird process because the schemata reference
 44        each other in complex ways, so the generation process cannot be fully
 45        run as schemata are being instantiated. Hence this process needs to be
 46        called once all schemata are loaded to finalise dereferencing the
 47        schemata."""
 48        for schema in self:
 49            schema.generate(self)
 50        for prop in self.properties:
 51            self.qnames[prop.qname] = prop
 52            for schema in prop.schema.descendants:
 53                if prop.name not in schema.properties:
 54                    schema.properties[prop.name] = prop
 55
 56    def _load(self, filepath: str) -> None:
 57        with open(filepath, "r", encoding="utf-8") as fh:
 58            data = yaml.safe_load(fh)
 59            if not isinstance(data, dict):
 60                raise InvalidModel("Model file is not a mapping: %s" % filepath)
 61            for name, config in data.items():
 62                self.schemata[name] = Schema(self, name, config)
 63
 64    def get(self, name: Union[str, Schema]) -> Optional[Schema]:
 65        """Get a schema object based on a schema name. If the input is already
 66        a schema object, it will just be returned."""
 67        if isinstance(name, str):
 68            return self.schemata.get(name)
 69        return name
 70
 71    def get_qname(self, qname: str) -> Optional[Property]:
 72        """Get a property object based on a qualified name (i.e. schema:property)."""
 73        return self.qnames.get(qname)
 74
 75    def __getitem__(self, name: str) -> Schema:
 76        """Same as get(), but throws an exception when the given name does not exist."""
 77        schema = self.get(name)
 78        if schema is None:
 79            raise KeyError("No such schema: %s" % name)
 80        return schema
 81
 82    def get_type_schemata(self, type_: PropertyType) -> Set[Schema]:
 83        """Return all the schemata which have a property of the given type."""
 84        schemata = set()
 85        for schema in self.schemata.values():
 86            for prop in schema.properties.values():
 87                if prop.type == type_:
 88                    schemata.add(schema)
 89        return schemata
 90
 91    def make_mapping(
 92        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
 93    ) -> QueryMapping:
 94        """Parse a mapping that applies (tabular) source data to the model."""
 95        return QueryMapping(self, mapping, key_prefix=key_prefix)
 96
 97    def map_entities(
 98        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
 99    ) -> Generator[EntityProxy, None, None]:
100        """Given a mapping, yield a series of entities from the data source."""
101        gen = self.make_mapping(mapping, key_prefix=key_prefix)
102        for record in gen.source.records:
103            for entity in gen.map(record).values():
104                yield entity
105
106    @lru_cache(maxsize=None)
107    def common_schema(
108        self, left: Union[str, Schema], right: Union[str, Schema]
109    ) -> Schema:
110        """Select the most narrow of two schemata.
111
112        When indexing data from a dataset, an entity may be declared as a
113        LegalEntity in one query, and as a Person in another. This function
114        will select the most specific of two schemata offered. In the example,
115        that would be Person.
116        """
117        left_schema = self.get(left) or self.get(right)
118        right_schema = self.get(right) or self.get(left)
119        if left_schema is None or right_schema is None:
120            raise InvalidData("Invalid schema")
121        if left_schema.is_a(right_schema):
122            return left_schema
123        if right_schema.is_a(left_schema):
124            return right_schema
125        # for schema in self.schemata.values():
126        #     if schema.is_a(left) and schema.is_a(right):
127        #         return schema
128        msg = "No common schema: %s and %s"
129        raise InvalidData(msg % (left, right))
130
131    def make_entity(
132        self, schema: Union[str, Schema], key_prefix: Optional[str] = None
133    ) -> EntityProxy:
134        """Instantiate an empty entity proxy of the given schema type."""
135        return EntityProxy(self, {"schema": schema}, key_prefix=key_prefix)
136
137    def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> EntityProxy:
138        """Create an entity proxy to reflect the entity data in the given
139        dictionary. If ``cleaned`` is disabled, all property values are
140        fully re-validated and normalised. Use this if handling input data
141        from an untrusted source."""
142        if isinstance(data, EntityProxy):
143            return data
144        return EntityProxy.from_dict(self, data, cleaned=cleaned)
145
146    def to_dict(self) -> ModelToDict:
147        """Return metadata for all schemata and properties, in a serializable form."""
148        return {
149            "schemata": {s.name: s.to_dict() for s in self.schemata.values()},
150            "types": {t.name: t.to_dict() for t in registry.types},
151        }
152
153    def __iter__(self) -> Iterator[Schema]:
154        """Iterate across all schemata."""
155        return iter(self.schemata.values())

A collection of all the schemata available in followthemoney. The model provides some helper functions to find schemata, properties or to instantiate entity proxies based on the schema metadata.

Model(path: str)
28    def __init__(self, path: str) -> None:
29        self.path = path
30
31        #: A mapping with all schemata, organised by their name.
32        self.schemata: Dict[str, Schema] = {}
33
34        #: All properties defined in the model.
35        self.properties: Set[Property] = set()
36        self.qnames: Dict[str, Property] = {}
37        for path, _, filenames in os.walk(self.path):
38            for filename in filenames:
39                self._load(os.path.join(path, filename))
40        self.generate()
path
schemata: Dict[str, followthemoney.schema.Schema]
def generate(self) -> None:
42    def generate(self) -> None:
43        """Loading the model is a weird process because the schemata reference
44        each other in complex ways, so the generation process cannot be fully
45        run as schemata are being instantiated. Hence this process needs to be
46        called once all schemata are loaded to finalise dereferencing the
47        schemata."""
48        for schema in self:
49            schema.generate(self)
50        for prop in self.properties:
51            self.qnames[prop.qname] = prop
52            for schema in prop.schema.descendants:
53                if prop.name not in schema.properties:
54                    schema.properties[prop.name] = prop

Loading the model is a weird process because the schemata reference each other in complex ways, so the generation process cannot be fully run as schemata are being instantiated. Hence this process needs to be called once all schemata are loaded to finalise dereferencing the schemata.

def get( self, name: Union[str, followthemoney.schema.Schema]) -> Optional[followthemoney.schema.Schema]:
64    def get(self, name: Union[str, Schema]) -> Optional[Schema]:
65        """Get a schema object based on a schema name. If the input is already
66        a schema object, it will just be returned."""
67        if isinstance(name, str):
68            return self.schemata.get(name)
69        return name

Get a schema object based on a schema name. If the input is already a schema object, it will just be returned.

def get_qname(self, qname: str) -> Optional[followthemoney.property.Property]:
71    def get_qname(self, qname: str) -> Optional[Property]:
72        """Get a property object based on a qualified name (i.e. schema:property)."""
73        return self.qnames.get(qname)

Get a property object based on a qualified name (i.e. schema:property).

def get_type_schemata( self, type_: followthemoney.types.common.PropertyType) -> Set[followthemoney.schema.Schema]:
82    def get_type_schemata(self, type_: PropertyType) -> Set[Schema]:
83        """Return all the schemata which have a property of the given type."""
84        schemata = set()
85        for schema in self.schemata.values():
86            for prop in schema.properties.values():
87                if prop.type == type_:
88                    schemata.add(schema)
89        return schemata

Return all the schemata which have a property of the given type.

def make_mapping( self, mapping: Dict[str, Any], key_prefix: Optional[str] = None) -> followthemoney.mapping.query.QueryMapping:
91    def make_mapping(
92        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
93    ) -> QueryMapping:
94        """Parse a mapping that applies (tabular) source data to the model."""
95        return QueryMapping(self, mapping, key_prefix=key_prefix)

Parse a mapping that applies (tabular) source data to the model.

def map_entities( self, mapping: Dict[str, Any], key_prefix: Optional[str] = None) -> Generator[followthemoney.proxy.EntityProxy, NoneType, NoneType]:
 97    def map_entities(
 98        self, mapping: Dict[str, Any], key_prefix: Optional[str] = None
 99    ) -> Generator[EntityProxy, None, None]:
100        """Given a mapping, yield a series of entities from the data source."""
101        gen = self.make_mapping(mapping, key_prefix=key_prefix)
102        for record in gen.source.records:
103            for entity in gen.map(record).values():
104                yield entity

Given a mapping, yield a series of entities from the data source.

@lru_cache(maxsize=None)
def common_schema( self, left: Union[str, followthemoney.schema.Schema], right: Union[str, followthemoney.schema.Schema]) -> followthemoney.schema.Schema:
106    @lru_cache(maxsize=None)
107    def common_schema(
108        self, left: Union[str, Schema], right: Union[str, Schema]
109    ) -> Schema:
110        """Select the most narrow of two schemata.
111
112        When indexing data from a dataset, an entity may be declared as a
113        LegalEntity in one query, and as a Person in another. This function
114        will select the most specific of two schemata offered. In the example,
115        that would be Person.
116        """
117        left_schema = self.get(left) or self.get(right)
118        right_schema = self.get(right) or self.get(left)
119        if left_schema is None or right_schema is None:
120            raise InvalidData("Invalid schema")
121        if left_schema.is_a(right_schema):
122            return left_schema
123        if right_schema.is_a(left_schema):
124            return right_schema
125        # for schema in self.schemata.values():
126        #     if schema.is_a(left) and schema.is_a(right):
127        #         return schema
128        msg = "No common schema: %s and %s"
129        raise InvalidData(msg % (left, right))

Select the most narrow of two schemata.

When indexing data from a dataset, an entity may be declared as a LegalEntity in one query, and as a Person in another. This function will select the most specific of two schemata offered. In the example, that would be Person.

def make_entity( self, schema: Union[str, followthemoney.schema.Schema], key_prefix: Optional[str] = None) -> followthemoney.proxy.EntityProxy:
131    def make_entity(
132        self, schema: Union[str, Schema], key_prefix: Optional[str] = None
133    ) -> EntityProxy:
134        """Instantiate an empty entity proxy of the given schema type."""
135        return EntityProxy(self, {"schema": schema}, key_prefix=key_prefix)

Instantiate an empty entity proxy of the given schema type.

def get_proxy( self, data: Dict[str, Any], cleaned: bool = True) -> followthemoney.proxy.EntityProxy:
137    def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> EntityProxy:
138        """Create an entity proxy to reflect the entity data in the given
139        dictionary. If ``cleaned`` is disabled, all property values are
140        fully re-validated and normalised. Use this if handling input data
141        from an untrusted source."""
142        if isinstance(data, EntityProxy):
143            return data
144        return EntityProxy.from_dict(self, data, cleaned=cleaned)

Create an entity proxy to reflect the entity data in the given dictionary. If cleaned is disabled, all property values are fully re-validated and normalised. Use this if handling input data from an untrusted source.

def to_dict(self) -> ModelToDict:
146    def to_dict(self) -> ModelToDict:
147        """Return metadata for all schemata and properties, in a serializable form."""
148        return {
149            "schemata": {s.name: s.to_dict() for s in self.schemata.values()},
150            "types": {t.name: t.to_dict() for t in registry.types},
151        }

Return metadata for all schemata and properties, in a serializable form.