followthemoney.model
1import os 2import yaml 3from functools import lru_cache 4from typing import TYPE_CHECKING, Any 5from typing import Dict, Generator, Iterator, Optional, Set, TypedDict, Union 6 7from followthemoney.types import registry 8from followthemoney.types.common import PropertyType, PropertyTypeToDict 9from followthemoney.schema import Schema, SchemaToDict 10from followthemoney.property import Property 11from followthemoney.exc import InvalidModel, InvalidData 12 13if TYPE_CHECKING: 14 from followthemoney.proxy import EntityProxy 15 from followthemoney.mapping import QueryMapping 16 17 18class ModelToDict(TypedDict): 19 schemata: Dict[str, SchemaToDict] 20 types: Dict[str, PropertyTypeToDict] 21 22 23class Model(object): 24 """A collection of all the schemata available in followthemoney. The model 25 provides some helper functions to find schemata, properties or to instantiate 26 entity proxies based on the schema metadata.""" 27 28 _instance: Optional["Model"] = None 29 30 __slots__ = ("path", "schemata", "properties", "qnames") 31 32 def __init__(self, path: str) -> None: 33 self.path = path 34 35 #: A mapping with all schemata, organised by their name. 36 self.schemata: Dict[str, Schema] = {} 37 38 #: All properties defined in the model. 39 self.properties: Set[Property] = set() 40 self.qnames: Dict[str, Property] = {} 41 for path, _, filenames in os.walk(self.path): 42 for filename in filenames: 43 self._load(os.path.join(path, filename)) 44 self.generate() 45 46 @classmethod 47 def instance(cls) -> "Model": 48 if cls._instance is None: 49 model_path = os.path.dirname(__file__) 50 model_path = os.path.join(model_path, "schema") 51 model_path = os.environ.get("FTM_MODEL_PATH", model_path) 52 cls._instance = cls(model_path) 53 return cls._instance 54 55 def generate(self) -> None: 56 """Loading the model is a weird process because the schemata reference 57 each other in complex ways, so the generation process cannot be fully 58 run as schemata are being instantiated. Hence this process needs to be 59 called once all schemata are loaded to finalise dereferencing the 60 schemata.""" 61 for schema in self: 62 schema.generate(self) 63 for prop in self.properties: 64 self.qnames[prop.qname] = prop 65 for schema in prop.schema.descendants: 66 if prop.name not in schema.properties: 67 schema.properties[prop.name] = prop 68 69 def _load(self, filepath: str) -> None: 70 with open(filepath, "r", encoding="utf-8") as fh: 71 data = yaml.safe_load(fh) 72 if not isinstance(data, dict): 73 raise InvalidModel("Model file is not a mapping: %s" % filepath) 74 for name, config in data.items(): 75 self.schemata[name] = Schema(self, name, config) 76 77 def get(self, name: Union[str, Schema]) -> Optional[Schema]: 78 """Get a schema object based on a schema name. If the input is already 79 a schema object, it will just be returned.""" 80 if isinstance(name, str): 81 return self.schemata.get(name) 82 return name 83 84 def get_qname(self, qname: str) -> Optional[Property]: 85 """Get a property object based on a qualified name (i.e. schema:property).""" 86 return self.qnames.get(qname) 87 88 def __getitem__(self, name: str) -> Schema: 89 """Same as get(), but throws an exception when the given name does not exist.""" 90 schema = self.get(name) 91 if schema is None: 92 raise KeyError("No such schema: %s" % name) 93 return schema 94 95 def get_type_schemata(self, type_: PropertyType) -> Set[Schema]: 96 """Return all the schemata which have a property of the given type.""" 97 schemata = set() 98 for schema in self.schemata.values(): 99 for prop in schema.properties.values(): 100 if prop.type == type_: 101 schemata.add(schema) 102 return schemata 103 104 def make_mapping( 105 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 106 ) -> "QueryMapping": 107 """Parse a mapping that applies (tabular) source data to the model.""" 108 from followthemoney.mapping import QueryMapping 109 110 return QueryMapping(self, mapping, key_prefix=key_prefix) 111 112 def map_entities( 113 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 114 ) -> Generator["EntityProxy", None, None]: 115 """Given a mapping, yield a series of entities from the data source.""" 116 gen = self.make_mapping(mapping, key_prefix=key_prefix) 117 for record in gen.source.records: 118 for entity in gen.map(record).values(): 119 yield entity 120 121 @lru_cache(maxsize=None) 122 def common_schema( 123 self, left: Union[str, Schema], right: Union[str, Schema] 124 ) -> Schema: 125 """Select the most narrow of two schemata. 126 127 When indexing data from a dataset, an entity may be declared as a 128 LegalEntity in one query, and as a Person in another. This function 129 will select the most specific of two schemata offered. In the example, 130 that would be Person. 131 """ 132 left_schema = self.get(left) or self.get(right) 133 right_schema = self.get(right) or self.get(left) 134 if left_schema is None or right_schema is None: 135 raise InvalidData("Invalid schema") 136 if left_schema.is_a(right_schema): 137 return left_schema 138 if right_schema.is_a(left_schema): 139 return right_schema 140 # for schema in self.schemata.values(): 141 # if schema.is_a(left) and schema.is_a(right): 142 # return schema 143 msg = "No common schema: %s and %s" 144 raise InvalidData(msg % (left, right)) 145 146 def matchable_schemata(self) -> Set[Schema]: 147 """Return a list of all schemata that are matchable.""" 148 return set([s for s in self.schemata.values() if s.matchable]) 149 150 def make_entity( 151 self, schema: Union[str, Schema], key_prefix: Optional[str] = None 152 ) -> "EntityProxy": 153 """Instantiate an empty entity proxy of the given schema type.""" 154 from followthemoney.proxy import EntityProxy 155 156 schema_ = self.get(schema) 157 if schema_ is None: 158 raise InvalidData("Schema does not exist: %s" % schema) 159 return EntityProxy(schema_, {}, key_prefix=key_prefix) 160 161 def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> "EntityProxy": 162 """Create an entity proxy to reflect the entity data in the given 163 dictionary. If ``cleaned`` is disabled, all property values are 164 fully re-validated and normalised. Use this if handling input data 165 from an untrusted source.""" 166 from followthemoney.proxy import EntityProxy 167 168 if isinstance(data, EntityProxy): 169 return data 170 return EntityProxy.from_dict(data, cleaned=cleaned) 171 172 def to_dict(self) -> ModelToDict: 173 """Return metadata for all schemata and properties, in a serializable form.""" 174 return { 175 "schemata": {s.name: s.to_dict() for s in self.schemata.values()}, 176 "types": {t.name: t.to_dict() for t in registry.types}, 177 } 178 179 def __iter__(self) -> Iterator[Schema]: 180 """Iterate across all schemata.""" 181 return iter(self.schemata.values())
19class ModelToDict(TypedDict): 20 schemata: Dict[str, SchemaToDict] 21 types: Dict[str, PropertyTypeToDict]
24class Model(object): 25 """A collection of all the schemata available in followthemoney. The model 26 provides some helper functions to find schemata, properties or to instantiate 27 entity proxies based on the schema metadata.""" 28 29 _instance: Optional["Model"] = None 30 31 __slots__ = ("path", "schemata", "properties", "qnames") 32 33 def __init__(self, path: str) -> None: 34 self.path = path 35 36 #: A mapping with all schemata, organised by their name. 37 self.schemata: Dict[str, Schema] = {} 38 39 #: All properties defined in the model. 40 self.properties: Set[Property] = set() 41 self.qnames: Dict[str, Property] = {} 42 for path, _, filenames in os.walk(self.path): 43 for filename in filenames: 44 self._load(os.path.join(path, filename)) 45 self.generate() 46 47 @classmethod 48 def instance(cls) -> "Model": 49 if cls._instance is None: 50 model_path = os.path.dirname(__file__) 51 model_path = os.path.join(model_path, "schema") 52 model_path = os.environ.get("FTM_MODEL_PATH", model_path) 53 cls._instance = cls(model_path) 54 return cls._instance 55 56 def generate(self) -> None: 57 """Loading the model is a weird process because the schemata reference 58 each other in complex ways, so the generation process cannot be fully 59 run as schemata are being instantiated. Hence this process needs to be 60 called once all schemata are loaded to finalise dereferencing the 61 schemata.""" 62 for schema in self: 63 schema.generate(self) 64 for prop in self.properties: 65 self.qnames[prop.qname] = prop 66 for schema in prop.schema.descendants: 67 if prop.name not in schema.properties: 68 schema.properties[prop.name] = prop 69 70 def _load(self, filepath: str) -> None: 71 with open(filepath, "r", encoding="utf-8") as fh: 72 data = yaml.safe_load(fh) 73 if not isinstance(data, dict): 74 raise InvalidModel("Model file is not a mapping: %s" % filepath) 75 for name, config in data.items(): 76 self.schemata[name] = Schema(self, name, config) 77 78 def get(self, name: Union[str, Schema]) -> Optional[Schema]: 79 """Get a schema object based on a schema name. If the input is already 80 a schema object, it will just be returned.""" 81 if isinstance(name, str): 82 return self.schemata.get(name) 83 return name 84 85 def get_qname(self, qname: str) -> Optional[Property]: 86 """Get a property object based on a qualified name (i.e. schema:property).""" 87 return self.qnames.get(qname) 88 89 def __getitem__(self, name: str) -> Schema: 90 """Same as get(), but throws an exception when the given name does not exist.""" 91 schema = self.get(name) 92 if schema is None: 93 raise KeyError("No such schema: %s" % name) 94 return schema 95 96 def get_type_schemata(self, type_: PropertyType) -> Set[Schema]: 97 """Return all the schemata which have a property of the given type.""" 98 schemata = set() 99 for schema in self.schemata.values(): 100 for prop in schema.properties.values(): 101 if prop.type == type_: 102 schemata.add(schema) 103 return schemata 104 105 def make_mapping( 106 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 107 ) -> "QueryMapping": 108 """Parse a mapping that applies (tabular) source data to the model.""" 109 from followthemoney.mapping import QueryMapping 110 111 return QueryMapping(self, mapping, key_prefix=key_prefix) 112 113 def map_entities( 114 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 115 ) -> Generator["EntityProxy", None, None]: 116 """Given a mapping, yield a series of entities from the data source.""" 117 gen = self.make_mapping(mapping, key_prefix=key_prefix) 118 for record in gen.source.records: 119 for entity in gen.map(record).values(): 120 yield entity 121 122 @lru_cache(maxsize=None) 123 def common_schema( 124 self, left: Union[str, Schema], right: Union[str, Schema] 125 ) -> Schema: 126 """Select the most narrow of two schemata. 127 128 When indexing data from a dataset, an entity may be declared as a 129 LegalEntity in one query, and as a Person in another. This function 130 will select the most specific of two schemata offered. In the example, 131 that would be Person. 132 """ 133 left_schema = self.get(left) or self.get(right) 134 right_schema = self.get(right) or self.get(left) 135 if left_schema is None or right_schema is None: 136 raise InvalidData("Invalid schema") 137 if left_schema.is_a(right_schema): 138 return left_schema 139 if right_schema.is_a(left_schema): 140 return right_schema 141 # for schema in self.schemata.values(): 142 # if schema.is_a(left) and schema.is_a(right): 143 # return schema 144 msg = "No common schema: %s and %s" 145 raise InvalidData(msg % (left, right)) 146 147 def matchable_schemata(self) -> Set[Schema]: 148 """Return a list of all schemata that are matchable.""" 149 return set([s for s in self.schemata.values() if s.matchable]) 150 151 def make_entity( 152 self, schema: Union[str, Schema], key_prefix: Optional[str] = None 153 ) -> "EntityProxy": 154 """Instantiate an empty entity proxy of the given schema type.""" 155 from followthemoney.proxy import EntityProxy 156 157 schema_ = self.get(schema) 158 if schema_ is None: 159 raise InvalidData("Schema does not exist: %s" % schema) 160 return EntityProxy(schema_, {}, key_prefix=key_prefix) 161 162 def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> "EntityProxy": 163 """Create an entity proxy to reflect the entity data in the given 164 dictionary. If ``cleaned`` is disabled, all property values are 165 fully re-validated and normalised. Use this if handling input data 166 from an untrusted source.""" 167 from followthemoney.proxy import EntityProxy 168 169 if isinstance(data, EntityProxy): 170 return data 171 return EntityProxy.from_dict(data, cleaned=cleaned) 172 173 def to_dict(self) -> ModelToDict: 174 """Return metadata for all schemata and properties, in a serializable form.""" 175 return { 176 "schemata": {s.name: s.to_dict() for s in self.schemata.values()}, 177 "types": {t.name: t.to_dict() for t in registry.types}, 178 } 179 180 def __iter__(self) -> Iterator[Schema]: 181 """Iterate across all schemata.""" 182 return iter(self.schemata.values())
A collection of all the schemata available in followthemoney. The model provides some helper functions to find schemata, properties or to instantiate entity proxies based on the schema metadata.
33 def __init__(self, path: str) -> None: 34 self.path = path 35 36 #: A mapping with all schemata, organised by their name. 37 self.schemata: Dict[str, Schema] = {} 38 39 #: All properties defined in the model. 40 self.properties: Set[Property] = set() 41 self.qnames: Dict[str, Property] = {} 42 for path, _, filenames in os.walk(self.path): 43 for filename in filenames: 44 self._load(os.path.join(path, filename)) 45 self.generate()
56 def generate(self) -> None: 57 """Loading the model is a weird process because the schemata reference 58 each other in complex ways, so the generation process cannot be fully 59 run as schemata are being instantiated. Hence this process needs to be 60 called once all schemata are loaded to finalise dereferencing the 61 schemata.""" 62 for schema in self: 63 schema.generate(self) 64 for prop in self.properties: 65 self.qnames[prop.qname] = prop 66 for schema in prop.schema.descendants: 67 if prop.name not in schema.properties: 68 schema.properties[prop.name] = prop
Loading the model is a weird process because the schemata reference each other in complex ways, so the generation process cannot be fully run as schemata are being instantiated. Hence this process needs to be called once all schemata are loaded to finalise dereferencing the schemata.
78 def get(self, name: Union[str, Schema]) -> Optional[Schema]: 79 """Get a schema object based on a schema name. If the input is already 80 a schema object, it will just be returned.""" 81 if isinstance(name, str): 82 return self.schemata.get(name) 83 return name
Get a schema object based on a schema name. If the input is already a schema object, it will just be returned.
85 def get_qname(self, qname: str) -> Optional[Property]: 86 """Get a property object based on a qualified name (i.e. schema:property).""" 87 return self.qnames.get(qname)
Get a property object based on a qualified name (i.e. schema:property).
96 def get_type_schemata(self, type_: PropertyType) -> Set[Schema]: 97 """Return all the schemata which have a property of the given type.""" 98 schemata = set() 99 for schema in self.schemata.values(): 100 for prop in schema.properties.values(): 101 if prop.type == type_: 102 schemata.add(schema) 103 return schemata
Return all the schemata which have a property of the given type.
105 def make_mapping( 106 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 107 ) -> "QueryMapping": 108 """Parse a mapping that applies (tabular) source data to the model.""" 109 from followthemoney.mapping import QueryMapping 110 111 return QueryMapping(self, mapping, key_prefix=key_prefix)
Parse a mapping that applies (tabular) source data to the model.
113 def map_entities( 114 self, mapping: Dict[str, Any], key_prefix: Optional[str] = None 115 ) -> Generator["EntityProxy", None, None]: 116 """Given a mapping, yield a series of entities from the data source.""" 117 gen = self.make_mapping(mapping, key_prefix=key_prefix) 118 for record in gen.source.records: 119 for entity in gen.map(record).values(): 120 yield entity
Given a mapping, yield a series of entities from the data source.
122 @lru_cache(maxsize=None) 123 def common_schema( 124 self, left: Union[str, Schema], right: Union[str, Schema] 125 ) -> Schema: 126 """Select the most narrow of two schemata. 127 128 When indexing data from a dataset, an entity may be declared as a 129 LegalEntity in one query, and as a Person in another. This function 130 will select the most specific of two schemata offered. In the example, 131 that would be Person. 132 """ 133 left_schema = self.get(left) or self.get(right) 134 right_schema = self.get(right) or self.get(left) 135 if left_schema is None or right_schema is None: 136 raise InvalidData("Invalid schema") 137 if left_schema.is_a(right_schema): 138 return left_schema 139 if right_schema.is_a(left_schema): 140 return right_schema 141 # for schema in self.schemata.values(): 142 # if schema.is_a(left) and schema.is_a(right): 143 # return schema 144 msg = "No common schema: %s and %s" 145 raise InvalidData(msg % (left, right))
Select the most narrow of two schemata.
When indexing data from a dataset, an entity may be declared as a LegalEntity in one query, and as a Person in another. This function will select the most specific of two schemata offered. In the example, that would be Person.
147 def matchable_schemata(self) -> Set[Schema]: 148 """Return a list of all schemata that are matchable.""" 149 return set([s for s in self.schemata.values() if s.matchable])
Return a list of all schemata that are matchable.
151 def make_entity( 152 self, schema: Union[str, Schema], key_prefix: Optional[str] = None 153 ) -> "EntityProxy": 154 """Instantiate an empty entity proxy of the given schema type.""" 155 from followthemoney.proxy import EntityProxy 156 157 schema_ = self.get(schema) 158 if schema_ is None: 159 raise InvalidData("Schema does not exist: %s" % schema) 160 return EntityProxy(schema_, {}, key_prefix=key_prefix)
Instantiate an empty entity proxy of the given schema type.
162 def get_proxy(self, data: Dict[str, Any], cleaned: bool = True) -> "EntityProxy": 163 """Create an entity proxy to reflect the entity data in the given 164 dictionary. If ``cleaned`` is disabled, all property values are 165 fully re-validated and normalised. Use this if handling input data 166 from an untrusted source.""" 167 from followthemoney.proxy import EntityProxy 168 169 if isinstance(data, EntityProxy): 170 return data 171 return EntityProxy.from_dict(data, cleaned=cleaned)
Create an entity proxy to reflect the entity data in the given
dictionary. If cleaned
is disabled, all property values are
fully re-validated and normalised. Use this if handling input data
from an untrusted source.
173 def to_dict(self) -> ModelToDict: 174 """Return metadata for all schemata and properties, in a serializable form.""" 175 return { 176 "schemata": {s.name: s.to_dict() for s in self.schemata.values()}, 177 "types": {t.name: t.to_dict() for t in registry.types}, 178 }
Return metadata for all schemata and properties, in a serializable form.