followthemoney.model
1import os 2import yaml 3from functools import lru_cache 4from typing import Any, Dict, Generator, Iterator, Optional, Set, TypedDict, Union 5 6from followthemoney.types import registry 7from followthemoney.types.common import PropertyType, PropertyTypeToDict 8from followthemoney.schema import Schema, SchemaToDict 9from followthemoney.property import Property 10from followthemoney.mapping import QueryMapping 11from followthemoney.proxy import EntityProxy 12from followthemoney.exc import InvalidModel, InvalidData 13 14 15class ModelToDict(TypedDict): 16 schemata: Dict[str, SchemaToDict] 17 types: Dict[str, PropertyTypeToDict] 18 19 20class Model(object): 21 """A collection of all the schemata available in followthemoney. The model 22 provides some helper functions to find schemata, properties or to instantiate 23 entity proxies based on the schema metadata.""" 24 25 __slots__ = ("path", "schemata", "properties", "qnames") 26 27 def __init__(self, path: str) -> None: 28 self.path = path 29 30 #: A mapping with all schemata, organised by their name. 31 self.schemata: Dict[str, Schema] = {} 32 33 #: All properties defined in the model. 34 self.properties: Set[Property] = set() 35 self.qnames: Dict[str, Property] = {} 36 for path, _, filenames in os.walk(self.path): 37 for filename in filenames: 38 self._load(os.path.join(path, filename)) 39 self.generate() 40 41 def generate(self) -> None: 42 """Loading the model is a weird process because the schemata reference 43 each other in complex ways, so the generation process cannot be fully 44 run as schemata are being instantiated. Hence this process needs to be 45 called once all schemata are loaded to finalise dereferencing the 46 schemata.""" 47 for schema in self: 48 schema.generate(self) 49 for prop in self.properties: 50 self.qnames[prop.qname] = prop 51 for schema in prop.schema.descendants: 52 if prop.name not in schema.properties: 53 schema.properties[prop.name] = prop 54 55 def _load(self, filepath: str) -> None: 56 with open(filepath, "r", encoding="utf-8") as fh: 57 data = yaml.safe_load(fh) 58 if not isinstance(data, dict): 59 raise InvalidModel("Model file is not a mapping: %s" % filepath) 60 for name, config in data.items(): 61 self.schemata[name] = Schema(self, name, config) 62 63 def get(self, name: Union[str, Schema]) -> Optional[Schema]: 64 """Get a schema object based on a schema name. If the input is already 65 a schema object, it will just be returned.""" 66 if isinstance(name, str): 67 return self.schemata.get(name) 68 return name 69 70 def get_qname(self, qname: str) -> Optional[Property]: 71 """Get a property object based on a qualified name (i.e. schema:property).""" 72 return self.qnames.get(qname) 73 74 def __getitem__(self, name: str) -> Schema: 75 """Same as get(), but throws an exception when the given name does not exist.""" 76 schema = self.get(name) 77 if schema is None: 78 raise KeyError("No such schema: %s" % name) 79 return schema 80 81 def get_type_schemata(self, type_: PropertyType) -> Set[Schema]: 82 """Return all the schemata which have a property of the given type.""" 83 schemata = set() 84 for schema in self.schemata.values(): 85 for prop in schema.properties.values(): 86 if prop.type == type_: 87 schemata.add(schema) 88 return schemata 89 90 def make_mapping( 91 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 92 ) -> QueryMapping: 93 """Parse a mapping that applies (tabular) source data to the model.""" 94 return QueryMapping(self, mapping, key_prefix=key_prefix) 95 96 def map_entities( 97 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 98 ) -> Generator[EntityProxy, None, None]: 99 """Given a mapping, yield a series of entities from the data source.""" 100 gen = self.make_mapping(mapping, key_prefix=key_prefix) 101 for record in gen.source.records: 102 for entity in gen.map(record).values(): 103 yield entity 104 105 @lru_cache(maxsize=None) 106 def common_schema( 107 self, left: Union[str, Schema], right: Union[str, Schema] 108 ) -> Schema: 109 """Select the most narrow of two schemata. 110 111 When indexing data from a dataset, an entity may be declared as a 112 LegalEntity in one query, and as a Person in another. This function 113 will select the most specific of two schemata offered. In the example, 114 that would be Person. 115 """ 116 left_schema = self.get(left) or self.get(right) 117 right_schema = self.get(right) or self.get(left) 118 if left_schema is None or right_schema is None: 119 raise InvalidData("Invalid schema") 120 if left_schema.is_a(right_schema): 121 return left_schema 122 if right_schema.is_a(left_schema): 123 return right_schema 124 # for schema in self.schemata.values(): 125 # if schema.is_a(left) and schema.is_a(right): 126 # return schema 127 msg = "No common schema: %s and %s" 128 raise InvalidData(msg % (left, right)) 129 130 def make_entity( 131 self, schema: Union[str, Schema], key_prefix: Optional[str] = None 132 ) -> EntityProxy: 133 """Instantiate an empty entity proxy of the given schema type.""" 134 return EntityProxy(self, {"schema": schema}, key_prefix=key_prefix) 135 136 def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> EntityProxy: 137 """Create an entity proxy to reflect the entity data in the given 138 dictionary. If ``cleaned`` is disabled, all property values are 139 fully re-validated and normalised. Use this if handling input data 140 from an untrusted source.""" 141 if isinstance(data, EntityProxy): 142 return data 143 return EntityProxy.from_dict(self, data, cleaned=cleaned) 144 145 def to_dict(self) -> ModelToDict: 146 """Return metadata for all schemata and properties, in a serializable form.""" 147 return { 148 "schemata": {s.name: s.to_dict() for s in self.schemata.values()}, 149 "types": {t.name: t.to_dict() for t in registry.types}, 150 } 151 152 def __iter__(self) -> Iterator[Schema]: 153 """Iterate across all schemata.""" 154 return iter(self.schemata.values())
16class ModelToDict(TypedDict): 17 schemata: Dict[str, SchemaToDict] 18 types: Dict[str, PropertyTypeToDict]
21class Model(object): 22 """A collection of all the schemata available in followthemoney. The model 23 provides some helper functions to find schemata, properties or to instantiate 24 entity proxies based on the schema metadata.""" 25 26 __slots__ = ("path", "schemata", "properties", "qnames") 27 28 def __init__(self, path: str) -> None: 29 self.path = path 30 31 #: A mapping with all schemata, organised by their name. 32 self.schemata: Dict[str, Schema] = {} 33 34 #: All properties defined in the model. 35 self.properties: Set[Property] = set() 36 self.qnames: Dict[str, Property] = {} 37 for path, _, filenames in os.walk(self.path): 38 for filename in filenames: 39 self._load(os.path.join(path, filename)) 40 self.generate() 41 42 def generate(self) -> None: 43 """Loading the model is a weird process because the schemata reference 44 each other in complex ways, so the generation process cannot be fully 45 run as schemata are being instantiated. Hence this process needs to be 46 called once all schemata are loaded to finalise dereferencing the 47 schemata.""" 48 for schema in self: 49 schema.generate(self) 50 for prop in self.properties: 51 self.qnames[prop.qname] = prop 52 for schema in prop.schema.descendants: 53 if prop.name not in schema.properties: 54 schema.properties[prop.name] = prop 55 56 def _load(self, filepath: str) -> None: 57 with open(filepath, "r", encoding="utf-8") as fh: 58 data = yaml.safe_load(fh) 59 if not isinstance(data, dict): 60 raise InvalidModel("Model file is not a mapping: %s" % filepath) 61 for name, config in data.items(): 62 self.schemata[name] = Schema(self, name, config) 63 64 def get(self, name: Union[str, Schema]) -> Optional[Schema]: 65 """Get a schema object based on a schema name. If the input is already 66 a schema object, it will just be returned.""" 67 if isinstance(name, str): 68 return self.schemata.get(name) 69 return name 70 71 def get_qname(self, qname: str) -> Optional[Property]: 72 """Get a property object based on a qualified name (i.e. schema:property).""" 73 return self.qnames.get(qname) 74 75 def __getitem__(self, name: str) -> Schema: 76 """Same as get(), but throws an exception when the given name does not exist.""" 77 schema = self.get(name) 78 if schema is None: 79 raise KeyError("No such schema: %s" % name) 80 return schema 81 82 def get_type_schemata(self, type_: PropertyType) -> Set[Schema]: 83 """Return all the schemata which have a property of the given type.""" 84 schemata = set() 85 for schema in self.schemata.values(): 86 for prop in schema.properties.values(): 87 if prop.type == type_: 88 schemata.add(schema) 89 return schemata 90 91 def make_mapping( 92 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 93 ) -> QueryMapping: 94 """Parse a mapping that applies (tabular) source data to the model.""" 95 return QueryMapping(self, mapping, key_prefix=key_prefix) 96 97 def map_entities( 98 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 99 ) -> Generator[EntityProxy, None, None]: 100 """Given a mapping, yield a series of entities from the data source.""" 101 gen = self.make_mapping(mapping, key_prefix=key_prefix) 102 for record in gen.source.records: 103 for entity in gen.map(record).values(): 104 yield entity 105 106 @lru_cache(maxsize=None) 107 def common_schema( 108 self, left: Union[str, Schema], right: Union[str, Schema] 109 ) -> Schema: 110 """Select the most narrow of two schemata. 111 112 When indexing data from a dataset, an entity may be declared as a 113 LegalEntity in one query, and as a Person in another. This function 114 will select the most specific of two schemata offered. In the example, 115 that would be Person. 116 """ 117 left_schema = self.get(left) or self.get(right) 118 right_schema = self.get(right) or self.get(left) 119 if left_schema is None or right_schema is None: 120 raise InvalidData("Invalid schema") 121 if left_schema.is_a(right_schema): 122 return left_schema 123 if right_schema.is_a(left_schema): 124 return right_schema 125 # for schema in self.schemata.values(): 126 # if schema.is_a(left) and schema.is_a(right): 127 # return schema 128 msg = "No common schema: %s and %s" 129 raise InvalidData(msg % (left, right)) 130 131 def make_entity( 132 self, schema: Union[str, Schema], key_prefix: Optional[str] = None 133 ) -> EntityProxy: 134 """Instantiate an empty entity proxy of the given schema type.""" 135 return EntityProxy(self, {"schema": schema}, key_prefix=key_prefix) 136 137 def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> EntityProxy: 138 """Create an entity proxy to reflect the entity data in the given 139 dictionary. If ``cleaned`` is disabled, all property values are 140 fully re-validated and normalised. Use this if handling input data 141 from an untrusted source.""" 142 if isinstance(data, EntityProxy): 143 return data 144 return EntityProxy.from_dict(self, data, cleaned=cleaned) 145 146 def to_dict(self) -> ModelToDict: 147 """Return metadata for all schemata and properties, in a serializable form.""" 148 return { 149 "schemata": {s.name: s.to_dict() for s in self.schemata.values()}, 150 "types": {t.name: t.to_dict() for t in registry.types}, 151 } 152 153 def __iter__(self) -> Iterator[Schema]: 154 """Iterate across all schemata.""" 155 return iter(self.schemata.values())
A collection of all the schemata available in followthemoney. The model provides some helper functions to find schemata, properties or to instantiate entity proxies based on the schema metadata.
28 def __init__(self, path: str) -> None: 29 self.path = path 30 31 #: A mapping with all schemata, organised by their name. 32 self.schemata: Dict[str, Schema] = {} 33 34 #: All properties defined in the model. 35 self.properties: Set[Property] = set() 36 self.qnames: Dict[str, Property] = {} 37 for path, _, filenames in os.walk(self.path): 38 for filename in filenames: 39 self._load(os.path.join(path, filename)) 40 self.generate()
42 def generate(self) -> None: 43 """Loading the model is a weird process because the schemata reference 44 each other in complex ways, so the generation process cannot be fully 45 run as schemata are being instantiated. Hence this process needs to be 46 called once all schemata are loaded to finalise dereferencing the 47 schemata.""" 48 for schema in self: 49 schema.generate(self) 50 for prop in self.properties: 51 self.qnames[prop.qname] = prop 52 for schema in prop.schema.descendants: 53 if prop.name not in schema.properties: 54 schema.properties[prop.name] = prop
Loading the model is a weird process because the schemata reference each other in complex ways, so the generation process cannot be fully run as schemata are being instantiated. Hence this process needs to be called once all schemata are loaded to finalise dereferencing the schemata.
64 def get(self, name: Union[str, Schema]) -> Optional[Schema]: 65 """Get a schema object based on a schema name. If the input is already 66 a schema object, it will just be returned.""" 67 if isinstance(name, str): 68 return self.schemata.get(name) 69 return name
Get a schema object based on a schema name. If the input is already a schema object, it will just be returned.
71 def get_qname(self, qname: str) -> Optional[Property]: 72 """Get a property object based on a qualified name (i.e. schema:property).""" 73 return self.qnames.get(qname)
Get a property object based on a qualified name (i.e. schema:property).
82 def get_type_schemata(self, type_: PropertyType) -> Set[Schema]: 83 """Return all the schemata which have a property of the given type.""" 84 schemata = set() 85 for schema in self.schemata.values(): 86 for prop in schema.properties.values(): 87 if prop.type == type_: 88 schemata.add(schema) 89 return schemata
Return all the schemata which have a property of the given type.
91 def make_mapping( 92 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 93 ) -> QueryMapping: 94 """Parse a mapping that applies (tabular) source data to the model.""" 95 return QueryMapping(self, mapping, key_prefix=key_prefix)
Parse a mapping that applies (tabular) source data to the model.
97 def map_entities( 98 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 99 ) -> Generator[EntityProxy, None, None]: 100 """Given a mapping, yield a series of entities from the data source.""" 101 gen = self.make_mapping(mapping, key_prefix=key_prefix) 102 for record in gen.source.records: 103 for entity in gen.map(record).values(): 104 yield entity
Given a mapping, yield a series of entities from the data source.
106 @lru_cache(maxsize=None) 107 def common_schema( 108 self, left: Union[str, Schema], right: Union[str, Schema] 109 ) -> Schema: 110 """Select the most narrow of two schemata. 111 112 When indexing data from a dataset, an entity may be declared as a 113 LegalEntity in one query, and as a Person in another. This function 114 will select the most specific of two schemata offered. In the example, 115 that would be Person. 116 """ 117 left_schema = self.get(left) or self.get(right) 118 right_schema = self.get(right) or self.get(left) 119 if left_schema is None or right_schema is None: 120 raise InvalidData("Invalid schema") 121 if left_schema.is_a(right_schema): 122 return left_schema 123 if right_schema.is_a(left_schema): 124 return right_schema 125 # for schema in self.schemata.values(): 126 # if schema.is_a(left) and schema.is_a(right): 127 # return schema 128 msg = "No common schema: %s and %s" 129 raise InvalidData(msg % (left, right))
Select the most narrow of two schemata.
When indexing data from a dataset, an entity may be declared as a LegalEntity in one query, and as a Person in another. This function will select the most specific of two schemata offered. In the example, that would be Person.
131 def make_entity( 132 self, schema: Union[str, Schema], key_prefix: Optional[str] = None 133 ) -> EntityProxy: 134 """Instantiate an empty entity proxy of the given schema type.""" 135 return EntityProxy(self, {"schema": schema}, key_prefix=key_prefix)
Instantiate an empty entity proxy of the given schema type.
137 def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> EntityProxy: 138 """Create an entity proxy to reflect the entity data in the given 139 dictionary. If ``cleaned`` is disabled, all property values are 140 fully re-validated and normalised. Use this if handling input data 141 from an untrusted source.""" 142 if isinstance(data, EntityProxy): 143 return data 144 return EntityProxy.from_dict(self, data, cleaned=cleaned)
Create an entity proxy to reflect the entity data in the given
dictionary. If cleaned
is disabled, all property values are
fully re-validated and normalised. Use this if handling input data
from an untrusted source.
146 def to_dict(self) -> ModelToDict: 147 """Return metadata for all schemata and properties, in a serializable form.""" 148 return { 149 "schemata": {s.name: s.to_dict() for s in self.schemata.values()}, 150 "types": {t.name: t.to_dict() for t in registry.types}, 151 }
Return metadata for all schemata and properties, in a serializable form.