followthemoney.util
1import os 2import logging 3from hashlib import sha1 4from babel import Locale 5from gettext import translation 6 7from threading import local 8from typing import cast, Dict, Any, List, Optional, TypeVar, Union, Sequence 9from normality import stringify 10from normality.cleaning import compose_nfc 11from normality.cleaning import remove_unsafe_chars 12from normality.encoding import DEFAULT_ENCODING 13from banal import is_mapping, unique_list, ensure_list 14 15MEGABYTE = 1024 * 1024 16DEFAULT_LOCALE = "en" 17ENTITY_ID_LEN = 200 18 19T = TypeVar("T") 20K = TypeVar("K") 21V = TypeVar("V") 22 23PathLike = Union[str, os.PathLike[str]] 24i18n_path = os.path.join(os.path.dirname(__file__), "translations") 25state = local() 26log = logging.getLogger(__name__) 27 28 29def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str: 30 if not hasattr(state, "translation"): 31 set_model_locale(Locale.parse(DEFAULT_LOCALE)) 32 return cast(str, state.translation.gettext(*args, **kwargs)) 33 34 35def defer(text: str) -> str: 36 return text 37 38 39def set_model_locale(locale: Locale) -> None: 40 state.locale = locale 41 state.translation = translation( 42 "followthemoney", i18n_path, [str(locale)], fallback=True 43 ) 44 45 46def get_locale() -> Locale: 47 if not hasattr(state, "locale"): 48 return Locale.parse(DEFAULT_LOCALE) 49 return Locale.parse(state.locale) 50 51 52def get_env_list(name: str, default: List[str] = []) -> List[str]: 53 value = stringify(os.environ.get(name)) 54 if value is not None: 55 values = value.split(":") 56 if len(values): 57 return values 58 return default 59 60 61def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]: 62 text = stringify(text, encoding_default=encoding) 63 if text is None: 64 return None 65 try: 66 text = compose_nfc(text) 67 except (SystemError, Exception) as ex: 68 log.warning("Cannot NFC text: %s", ex) 69 return None 70 text = remove_unsafe_chars(text) 71 if text is None: 72 return None 73 byte_text = text.encode(DEFAULT_ENCODING, "replace") 74 return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace")) 75 76 77def value_list(value: Union[T, Sequence[T]]) -> List[T]: 78 if not isinstance(value, (str, bytes)): 79 try: 80 return [v for v in cast(Sequence[T], value)] 81 except TypeError: 82 pass 83 return [cast(T, value)] 84 85 86def key_bytes(key: Any) -> bytes: 87 """Convert the given data to a value appropriate for hashing.""" 88 if isinstance(key, bytes): 89 return key 90 text = stringify(key) 91 if text is None: 92 return b"" 93 return text.encode("utf-8") 94 95 96def join_text(*parts: Any, sep: str = " ") -> Optional[str]: 97 """Join all the non-null arguments using sep.""" 98 texts: List[str] = [] 99 for part in parts: 100 text = stringify(part) 101 if text is not None: 102 texts.append(text) 103 if not len(texts): 104 return None 105 return sep.join(texts) 106 107 108def get_entity_id(obj: Any) -> Optional[str]: 109 """Given an entity-ish object, try to get the ID.""" 110 if is_mapping(obj): 111 obj = obj.get("id") 112 else: 113 try: 114 obj = obj.id 115 except AttributeError: 116 pass 117 return stringify(obj) 118 119 120def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]: 121 digest = sha1() 122 if key_prefix: 123 digest.update(key_bytes(key_prefix)) 124 base = digest.digest() 125 for part in parts: 126 digest.update(key_bytes(part)) 127 if digest.digest() == base: 128 return None 129 return digest.hexdigest() 130 131 132def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]: 133 """When merging two entities, make lists of all the duplicate context 134 keys.""" 135 combined = {} 136 keys = [*left.keys(), *right.keys()] 137 for key in set(keys): 138 if key in ("caption",): 139 continue 140 lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None] 141 rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None] 142 combined[key] = unique_list([*lval, *rval]) 143 return combined 144 145 146def dampen(short: int, long: int, text: str) -> float: 147 length = len(text) - short 148 baseline = max(1.0, (long - short)) 149 return max(0, min(1.0, (length / baseline))) 150 151 152def shortest(*texts: str) -> str: 153 return min(texts, key=len) 154 155 156def longest(*texts: str) -> str: 157 return max(texts, key=len)
MEGABYTE =
1048576
DEFAULT_LOCALE =
'en'
ENTITY_ID_LEN =
200
PathLike =
typing.Union[str, os.PathLike[str]]
i18n_path =
'/opt/hostedtoolcache/Python/3.10.15/x64/lib/python3.10/site-packages/followthemoney/translations'
state =
<_thread._local object>
log =
<Logger followthemoney.util (WARNING)>
def
gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
def
defer(text: str) -> str:
def
set_model_locale(locale: babel.core.Locale) -> None:
def
get_locale() -> babel.core.Locale:
def
get_env_list(name: str, default: List[str] = []) -> List[str]:
def
sanitize_text(text: Any, encoding: str = 'utf-8') -> Optional[str]:
62def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]: 63 text = stringify(text, encoding_default=encoding) 64 if text is None: 65 return None 66 try: 67 text = compose_nfc(text) 68 except (SystemError, Exception) as ex: 69 log.warning("Cannot NFC text: %s", ex) 70 return None 71 text = remove_unsafe_chars(text) 72 if text is None: 73 return None 74 byte_text = text.encode(DEFAULT_ENCODING, "replace") 75 return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace"))
def
value_list(value: Union[~T, Sequence[~T]]) -> List[~T]:
def
key_bytes(key: Any) -> bytes:
87def key_bytes(key: Any) -> bytes: 88 """Convert the given data to a value appropriate for hashing.""" 89 if isinstance(key, bytes): 90 return key 91 text = stringify(key) 92 if text is None: 93 return b"" 94 return text.encode("utf-8")
Convert the given data to a value appropriate for hashing.
def
join_text(*parts: Any, sep: str = ' ') -> Optional[str]:
97def join_text(*parts: Any, sep: str = " ") -> Optional[str]: 98 """Join all the non-null arguments using sep.""" 99 texts: List[str] = [] 100 for part in parts: 101 text = stringify(part) 102 if text is not None: 103 texts.append(text) 104 if not len(texts): 105 return None 106 return sep.join(texts)
Join all the non-null arguments using sep.
def
get_entity_id(obj: Any) -> Optional[str]:
109def get_entity_id(obj: Any) -> Optional[str]: 110 """Given an entity-ish object, try to get the ID.""" 111 if is_mapping(obj): 112 obj = obj.get("id") 113 else: 114 try: 115 obj = obj.id 116 except AttributeError: 117 pass 118 return stringify(obj)
Given an entity-ish object, try to get the ID.
def
make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
121def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]: 122 digest = sha1() 123 if key_prefix: 124 digest.update(key_bytes(key_prefix)) 125 base = digest.digest() 126 for part in parts: 127 digest.update(key_bytes(part)) 128 if digest.digest() == base: 129 return None 130 return digest.hexdigest()
def
merge_context(left: Dict[~K, ~V], right: Dict[~K, ~V]) -> Dict[~K, List[~V]]:
133def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]: 134 """When merging two entities, make lists of all the duplicate context 135 keys.""" 136 combined = {} 137 keys = [*left.keys(), *right.keys()] 138 for key in set(keys): 139 if key in ("caption",): 140 continue 141 lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None] 142 rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None] 143 combined[key] = unique_list([*lval, *rval]) 144 return combined
When merging two entities, make lists of all the duplicate context keys.
def
dampen(short: int, long: int, text: str) -> float:
def
shortest(*texts: str) -> str:
def
longest(*texts: str) -> str: