followthemoney.model
1import os 2import yaml 3from typing import Any, Dict, Generator, Iterator, Optional, Set, TypedDict, Union 4 5from followthemoney.types import registry 6from followthemoney.types.common import PropertyType, PropertyTypeToDict 7from followthemoney.schema import Schema, SchemaToDict 8from followthemoney.property import Property 9from followthemoney.mapping import QueryMapping 10from followthemoney.proxy import EntityProxy 11from followthemoney.exc import InvalidModel, InvalidData 12 13 14class ModelToDict(TypedDict): 15 schemata: Dict[str, SchemaToDict] 16 types: Dict[str, PropertyTypeToDict] 17 18 19class Model(object): 20 """A collection of all the schemata available in followthemoney. The model 21 provides some helper functions to find schemata, properties or to instantiate 22 entity proxies based on the schema metadata.""" 23 24 __slots__ = ("path", "schemata", "properties", "qnames") 25 26 def __init__(self, path: str) -> None: 27 self.path = path 28 29 #: A mapping with all schemata, organised by their name. 30 self.schemata: Dict[str, Schema] = {} 31 32 #: All properties defined in the model. 33 self.properties: Set[Property] = set() 34 self.qnames: Dict[str, Property] = {} 35 for (path, _, filenames) in os.walk(self.path): 36 for filename in filenames: 37 self._load(os.path.join(path, filename)) 38 self.generate() 39 40 def generate(self) -> None: 41 """Loading the model is a weird process because the schemata reference 42 each other in complex ways, so the generation process cannot be fully 43 run as schemata are being instantiated. Hence this process needs to be 44 called once all schemata are loaded to finalise dereferencing the 45 schemata.""" 46 for schema in self: 47 schema.generate(self) 48 for prop in self.properties: 49 self.qnames[prop.qname] = prop 50 for schema in prop.schema.descendants: 51 if prop.name not in schema.properties: 52 schema.properties[prop.name] = prop 53 54 def _load(self, filepath: str) -> None: 55 with open(filepath, "r", encoding="utf-8") as fh: 56 data = yaml.safe_load(fh) 57 if not isinstance(data, dict): 58 raise InvalidModel("Model file is not a mapping: %s" % filepath) 59 for name, config in data.items(): 60 self.schemata[name] = Schema(self, name, config) 61 62 def get(self, name: Union[str, Schema]) -> Optional[Schema]: 63 """Get a schema object based on a schema name. If the input is already 64 a schema object, it will just be returned.""" 65 if isinstance(name, str): 66 return self.schemata.get(name) 67 return name 68 69 def get_qname(self, qname: str) -> Optional[Property]: 70 """Get a property object based on a qualified name (i.e. schema:property).""" 71 return self.qnames.get(qname) 72 73 def __getitem__(self, name: str) -> Schema: 74 """Same as get(), but throws an exception when the given name does not exist.""" 75 schema = self.get(name) 76 if schema is None: 77 raise KeyError("No such schema: %s" % name) 78 return schema 79 80 def get_type_schemata(self, type_: PropertyType) -> Set[Schema]: 81 """Return all the schemata which have a property of the given type.""" 82 schemata = set() 83 for schema in self.schemata.values(): 84 for prop in schema.properties.values(): 85 if prop.type == type_: 86 schemata.add(schema) 87 return schemata 88 89 def make_mapping( 90 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 91 ) -> QueryMapping: 92 """Parse a mapping that applies (tabular) source data to the model.""" 93 return QueryMapping(self, mapping, key_prefix=key_prefix) 94 95 def map_entities( 96 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 97 ) -> Generator[EntityProxy, None, None]: 98 """Given a mapping, yield a series of entities from the data source.""" 99 gen = self.make_mapping(mapping, key_prefix=key_prefix) 100 for record in gen.source.records: 101 for entity in gen.map(record).values(): 102 yield entity 103 104 def common_schema( 105 self, left: Union[str, Schema], right: Union[str, Schema] 106 ) -> Schema: 107 """Select the most narrow of two schemata. 108 109 When indexing data from a dataset, an entity may be declared as a 110 LegalEntity in one query, and as a Person in another. This function 111 will select the most specific of two schemata offered. In the example, 112 that would be Person. 113 """ 114 left_schema = self.get(left) or self.get(right) 115 right_schema = self.get(right) or self.get(left) 116 if left_schema is None or right_schema is None: 117 raise InvalidData("Invalid schema") 118 if left_schema.is_a(right_schema): 119 return left_schema 120 if right_schema.is_a(left_schema): 121 return right_schema 122 # for schema in self.schemata.values(): 123 # if schema.is_a(left) and schema.is_a(right): 124 # return schema 125 msg = "No common schema: %s and %s" 126 raise InvalidData(msg % (left, right)) 127 128 def make_entity( 129 self, schema: Union[str, Schema], key_prefix: Optional[str] = None 130 ) -> EntityProxy: 131 """Instantiate an empty entity proxy of the given schema type.""" 132 return EntityProxy(self, {"schema": schema}, key_prefix=key_prefix) 133 134 def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> EntityProxy: 135 """Create an entity proxy to reflect the entity data in the given 136 dictionary. If ``cleaned`` is disabled, all property values are 137 fully re-validated and normalised. Use this if handling input data 138 from an untrusted source.""" 139 if isinstance(data, EntityProxy): 140 return data 141 return EntityProxy.from_dict(self, data, cleaned=cleaned) 142 143 def to_dict(self) -> ModelToDict: 144 """Return metadata for all schemata and properties, in a serializable form.""" 145 return { 146 "schemata": {s.name: s.to_dict() for s in self.schemata.values()}, 147 "types": {t.name: t.to_dict() for t in registry.types}, 148 } 149 150 def __iter__(self) -> Iterator[Schema]: 151 """Iterate across all schemata.""" 152 return iter(self.schemata.values())
15class ModelToDict(TypedDict): 16 schemata: Dict[str, SchemaToDict] 17 types: Dict[str, PropertyTypeToDict]
20class Model(object): 21 """A collection of all the schemata available in followthemoney. The model 22 provides some helper functions to find schemata, properties or to instantiate 23 entity proxies based on the schema metadata.""" 24 25 __slots__ = ("path", "schemata", "properties", "qnames") 26 27 def __init__(self, path: str) -> None: 28 self.path = path 29 30 #: A mapping with all schemata, organised by their name. 31 self.schemata: Dict[str, Schema] = {} 32 33 #: All properties defined in the model. 34 self.properties: Set[Property] = set() 35 self.qnames: Dict[str, Property] = {} 36 for (path, _, filenames) in os.walk(self.path): 37 for filename in filenames: 38 self._load(os.path.join(path, filename)) 39 self.generate() 40 41 def generate(self) -> None: 42 """Loading the model is a weird process because the schemata reference 43 each other in complex ways, so the generation process cannot be fully 44 run as schemata are being instantiated. Hence this process needs to be 45 called once all schemata are loaded to finalise dereferencing the 46 schemata.""" 47 for schema in self: 48 schema.generate(self) 49 for prop in self.properties: 50 self.qnames[prop.qname] = prop 51 for schema in prop.schema.descendants: 52 if prop.name not in schema.properties: 53 schema.properties[prop.name] = prop 54 55 def _load(self, filepath: str) -> None: 56 with open(filepath, "r", encoding="utf-8") as fh: 57 data = yaml.safe_load(fh) 58 if not isinstance(data, dict): 59 raise InvalidModel("Model file is not a mapping: %s" % filepath) 60 for name, config in data.items(): 61 self.schemata[name] = Schema(self, name, config) 62 63 def get(self, name: Union[str, Schema]) -> Optional[Schema]: 64 """Get a schema object based on a schema name. If the input is already 65 a schema object, it will just be returned.""" 66 if isinstance(name, str): 67 return self.schemata.get(name) 68 return name 69 70 def get_qname(self, qname: str) -> Optional[Property]: 71 """Get a property object based on a qualified name (i.e. schema:property).""" 72 return self.qnames.get(qname) 73 74 def __getitem__(self, name: str) -> Schema: 75 """Same as get(), but throws an exception when the given name does not exist.""" 76 schema = self.get(name) 77 if schema is None: 78 raise KeyError("No such schema: %s" % name) 79 return schema 80 81 def get_type_schemata(self, type_: PropertyType) -> Set[Schema]: 82 """Return all the schemata which have a property of the given type.""" 83 schemata = set() 84 for schema in self.schemata.values(): 85 for prop in schema.properties.values(): 86 if prop.type == type_: 87 schemata.add(schema) 88 return schemata 89 90 def make_mapping( 91 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 92 ) -> QueryMapping: 93 """Parse a mapping that applies (tabular) source data to the model.""" 94 return QueryMapping(self, mapping, key_prefix=key_prefix) 95 96 def map_entities( 97 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 98 ) -> Generator[EntityProxy, None, None]: 99 """Given a mapping, yield a series of entities from the data source.""" 100 gen = self.make_mapping(mapping, key_prefix=key_prefix) 101 for record in gen.source.records: 102 for entity in gen.map(record).values(): 103 yield entity 104 105 def common_schema( 106 self, left: Union[str, Schema], right: Union[str, Schema] 107 ) -> Schema: 108 """Select the most narrow of two schemata. 109 110 When indexing data from a dataset, an entity may be declared as a 111 LegalEntity in one query, and as a Person in another. This function 112 will select the most specific of two schemata offered. In the example, 113 that would be Person. 114 """ 115 left_schema = self.get(left) or self.get(right) 116 right_schema = self.get(right) or self.get(left) 117 if left_schema is None or right_schema is None: 118 raise InvalidData("Invalid schema") 119 if left_schema.is_a(right_schema): 120 return left_schema 121 if right_schema.is_a(left_schema): 122 return right_schema 123 # for schema in self.schemata.values(): 124 # if schema.is_a(left) and schema.is_a(right): 125 # return schema 126 msg = "No common schema: %s and %s" 127 raise InvalidData(msg % (left, right)) 128 129 def make_entity( 130 self, schema: Union[str, Schema], key_prefix: Optional[str] = None 131 ) -> EntityProxy: 132 """Instantiate an empty entity proxy of the given schema type.""" 133 return EntityProxy(self, {"schema": schema}, key_prefix=key_prefix) 134 135 def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> EntityProxy: 136 """Create an entity proxy to reflect the entity data in the given 137 dictionary. If ``cleaned`` is disabled, all property values are 138 fully re-validated and normalised. Use this if handling input data 139 from an untrusted source.""" 140 if isinstance(data, EntityProxy): 141 return data 142 return EntityProxy.from_dict(self, data, cleaned=cleaned) 143 144 def to_dict(self) -> ModelToDict: 145 """Return metadata for all schemata and properties, in a serializable form.""" 146 return { 147 "schemata": {s.name: s.to_dict() for s in self.schemata.values()}, 148 "types": {t.name: t.to_dict() for t in registry.types}, 149 } 150 151 def __iter__(self) -> Iterator[Schema]: 152 """Iterate across all schemata.""" 153 return iter(self.schemata.values())
A collection of all the schemata available in followthemoney. The model provides some helper functions to find schemata, properties or to instantiate entity proxies based on the schema metadata.
27 def __init__(self, path: str) -> None: 28 self.path = path 29 30 #: A mapping with all schemata, organised by their name. 31 self.schemata: Dict[str, Schema] = {} 32 33 #: All properties defined in the model. 34 self.properties: Set[Property] = set() 35 self.qnames: Dict[str, Property] = {} 36 for (path, _, filenames) in os.walk(self.path): 37 for filename in filenames: 38 self._load(os.path.join(path, filename)) 39 self.generate()
41 def generate(self) -> None: 42 """Loading the model is a weird process because the schemata reference 43 each other in complex ways, so the generation process cannot be fully 44 run as schemata are being instantiated. Hence this process needs to be 45 called once all schemata are loaded to finalise dereferencing the 46 schemata.""" 47 for schema in self: 48 schema.generate(self) 49 for prop in self.properties: 50 self.qnames[prop.qname] = prop 51 for schema in prop.schema.descendants: 52 if prop.name not in schema.properties: 53 schema.properties[prop.name] = prop
Loading the model is a weird process because the schemata reference each other in complex ways, so the generation process cannot be fully run as schemata are being instantiated. Hence this process needs to be called once all schemata are loaded to finalise dereferencing the schemata.
63 def get(self, name: Union[str, Schema]) -> Optional[Schema]: 64 """Get a schema object based on a schema name. If the input is already 65 a schema object, it will just be returned.""" 66 if isinstance(name, str): 67 return self.schemata.get(name) 68 return name
Get a schema object based on a schema name. If the input is already a schema object, it will just be returned.
70 def get_qname(self, qname: str) -> Optional[Property]: 71 """Get a property object based on a qualified name (i.e. schema:property).""" 72 return self.qnames.get(qname)
Get a property object based on a qualified name (i.e. schema:property).
81 def get_type_schemata(self, type_: PropertyType) -> Set[Schema]: 82 """Return all the schemata which have a property of the given type.""" 83 schemata = set() 84 for schema in self.schemata.values(): 85 for prop in schema.properties.values(): 86 if prop.type == type_: 87 schemata.add(schema) 88 return schemata
Return all the schemata which have a property of the given type.
90 def make_mapping( 91 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 92 ) -> QueryMapping: 93 """Parse a mapping that applies (tabular) source data to the model.""" 94 return QueryMapping(self, mapping, key_prefix=key_prefix)
Parse a mapping that applies (tabular) source data to the model.
96 def map_entities( 97 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 98 ) -> Generator[EntityProxy, None, None]: 99 """Given a mapping, yield a series of entities from the data source.""" 100 gen = self.make_mapping(mapping, key_prefix=key_prefix) 101 for record in gen.source.records: 102 for entity in gen.map(record).values(): 103 yield entity
Given a mapping, yield a series of entities from the data source.
105 def common_schema( 106 self, left: Union[str, Schema], right: Union[str, Schema] 107 ) -> Schema: 108 """Select the most narrow of two schemata. 109 110 When indexing data from a dataset, an entity may be declared as a 111 LegalEntity in one query, and as a Person in another. This function 112 will select the most specific of two schemata offered. In the example, 113 that would be Person. 114 """ 115 left_schema = self.get(left) or self.get(right) 116 right_schema = self.get(right) or self.get(left) 117 if left_schema is None or right_schema is None: 118 raise InvalidData("Invalid schema") 119 if left_schema.is_a(right_schema): 120 return left_schema 121 if right_schema.is_a(left_schema): 122 return right_schema 123 # for schema in self.schemata.values(): 124 # if schema.is_a(left) and schema.is_a(right): 125 # return schema 126 msg = "No common schema: %s and %s" 127 raise InvalidData(msg % (left, right))
Select the most narrow of two schemata.
When indexing data from a dataset, an entity may be declared as a LegalEntity in one query, and as a Person in another. This function will select the most specific of two schemata offered. In the example, that would be Person.
129 def make_entity( 130 self, schema: Union[str, Schema], key_prefix: Optional[str] = None 131 ) -> EntityProxy: 132 """Instantiate an empty entity proxy of the given schema type.""" 133 return EntityProxy(self, {"schema": schema}, key_prefix=key_prefix)
Instantiate an empty entity proxy of the given schema type.
135 def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> EntityProxy: 136 """Create an entity proxy to reflect the entity data in the given 137 dictionary. If ``cleaned`` is disabled, all property values are 138 fully re-validated and normalised. Use this if handling input data 139 from an untrusted source.""" 140 if isinstance(data, EntityProxy): 141 return data 142 return EntityProxy.from_dict(self, data, cleaned=cleaned)
Create an entity proxy to reflect the entity data in the given
dictionary. If cleaned
is disabled, all property values are
fully re-validated and normalised. Use this if handling input data
from an untrusted source.
144 def to_dict(self) -> ModelToDict: 145 """Return metadata for all schemata and properties, in a serializable form.""" 146 return { 147 "schemata": {s.name: s.to_dict() for s in self.schemata.values()}, 148 "types": {t.name: t.to_dict() for t in registry.types}, 149 }
Return metadata for all schemata and properties, in a serializable form.