followthemoney.util
1import os 2import logging 3from hashlib import sha1 4from babel import Locale 5from gettext import translation 6 7from threading import local 8from typing import cast, Dict, Any, List, Optional, TypeVar, Union 9from normality import stringify 10from normality.cleaning import compose_nfc 11from normality.cleaning import remove_unsafe_chars 12from normality.encoding import DEFAULT_ENCODING 13from banal import is_mapping, unique_list, ensure_list 14 15MEGABYTE = 1024 * 1024 16DEFAULT_LOCALE = "en" 17ENTITY_ID_LEN = 200 18 19T = TypeVar("T") 20K = TypeVar("K") 21V = TypeVar("V") 22 23PathLike = Union[str, os.PathLike[str]] 24i18n_path = os.path.join(os.path.dirname(__file__), "translations") 25state = local() 26log = logging.getLogger(__name__) 27 28 29def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str: 30 if not hasattr(state, "translation"): 31 set_model_locale(Locale.parse(DEFAULT_LOCALE)) 32 return cast(str, state.translation.gettext(*args, **kwargs)) 33 34 35def defer(text: str) -> str: 36 return text 37 38 39def set_model_locale(locale: Locale) -> None: 40 state.locale = locale 41 state.translation = translation( 42 "followthemoney", i18n_path, [str(locale)], fallback=True 43 ) 44 45 46def get_locale() -> Locale: 47 if not hasattr(state, "locale"): 48 return Locale.parse(DEFAULT_LOCALE) 49 return Locale.parse(state.locale) 50 51 52def get_env_list(name: str, default: List[str] = []) -> List[str]: 53 value = stringify(os.environ.get(name)) 54 if value is not None: 55 values = value.split(":") 56 if len(values): 57 return values 58 return default 59 60 61def sanitize_text(value: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]: 62 text = stringify(value, encoding_default=encoding) 63 if text is None: 64 return None 65 try: 66 text = compose_nfc(text) 67 if text is None: 68 return None 69 except (SystemError, Exception) as ex: 70 log.warning("Cannot NFC text: %s", ex) 71 return None 72 text = remove_unsafe_chars(text) 73 if text is None: 74 return None 75 byte_text = text.encode(DEFAULT_ENCODING, "replace") 76 return byte_text.decode(DEFAULT_ENCODING, "replace") 77 78 79def key_bytes(key: Any) -> bytes: 80 """Convert the given data to a value appropriate for hashing.""" 81 if isinstance(key, bytes): 82 return key 83 text = stringify(key) 84 if text is None: 85 return b"" 86 return text.encode("utf-8") 87 88 89def join_text(*parts: Any, sep: str = " ") -> Optional[str]: 90 """Join all the non-null arguments using sep.""" 91 texts: List[str] = [] 92 for part in parts: 93 text = stringify(part) 94 if text is not None: 95 texts.append(text) 96 if not len(texts): 97 return None 98 return sep.join(texts) 99 100 101def const_case(text: str) -> str: 102 """Convert the given text to a constant case.""" 103 return text.upper().replace(" ", "_") 104 105 106def get_entity_id(obj: Any) -> Optional[str]: 107 """Given an entity-ish object, try to get the ID.""" 108 if is_mapping(obj): 109 obj = obj.get("id") 110 else: 111 try: 112 obj = obj.id 113 except AttributeError: 114 pass 115 return stringify(obj) 116 117 118def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]: 119 digest = sha1() 120 if key_prefix: 121 digest.update(key_bytes(key_prefix)) 122 base = digest.digest() 123 for part in parts: 124 digest.update(key_bytes(part)) 125 if digest.digest() == base: 126 return None 127 return digest.hexdigest() 128 129 130def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]: 131 """When merging two entities, make lists of all the duplicate context 132 keys.""" 133 combined = {} 134 keys = [*left.keys(), *right.keys()] 135 for key in set(keys): 136 if key in ("caption",): 137 continue 138 lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None] 139 rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None] 140 combined[key] = unique_list([*lval, *rval]) 141 return combined 142 143 144def dampen(short: int, long: int, text: str) -> float: 145 length = len(text) - short 146 baseline = max(1.0, (long - short)) 147 return max(0, min(1.0, (length / baseline))) 148 149 150def shortest(*texts: str) -> str: 151 return min(texts, key=len) 152 153 154def longest(*texts: str) -> str: 155 return max(texts, key=len)
MEGABYTE =
1048576
DEFAULT_LOCALE =
'en'
ENTITY_ID_LEN =
200
PathLike =
typing.Union[str, os.PathLike[str]]
i18n_path =
'/opt/hostedtoolcache/Python/3.12.11/x64/lib/python3.12/site-packages/followthemoney/translations'
state =
<_thread._local object>
log =
<Logger followthemoney.util (WARNING)>
def
gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
def
defer(text: str) -> str:
def
set_model_locale(locale: babel.core.Locale) -> None:
def
get_locale() -> babel.core.Locale:
def
get_env_list(name: str, default: List[str] = []) -> List[str]:
def
sanitize_text(value: Any, encoding: str = 'utf-8') -> Optional[str]:
62def sanitize_text(value: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]: 63 text = stringify(value, encoding_default=encoding) 64 if text is None: 65 return None 66 try: 67 text = compose_nfc(text) 68 if text is None: 69 return None 70 except (SystemError, Exception) as ex: 71 log.warning("Cannot NFC text: %s", ex) 72 return None 73 text = remove_unsafe_chars(text) 74 if text is None: 75 return None 76 byte_text = text.encode(DEFAULT_ENCODING, "replace") 77 return byte_text.decode(DEFAULT_ENCODING, "replace")
def
key_bytes(key: Any) -> bytes:
80def key_bytes(key: Any) -> bytes: 81 """Convert the given data to a value appropriate for hashing.""" 82 if isinstance(key, bytes): 83 return key 84 text = stringify(key) 85 if text is None: 86 return b"" 87 return text.encode("utf-8")
Convert the given data to a value appropriate for hashing.
def
join_text(*parts: Any, sep: str = ' ') -> Optional[str]:
90def join_text(*parts: Any, sep: str = " ") -> Optional[str]: 91 """Join all the non-null arguments using sep.""" 92 texts: List[str] = [] 93 for part in parts: 94 text = stringify(part) 95 if text is not None: 96 texts.append(text) 97 if not len(texts): 98 return None 99 return sep.join(texts)
Join all the non-null arguments using sep.
def
const_case(text: str) -> str:
102def const_case(text: str) -> str: 103 """Convert the given text to a constant case.""" 104 return text.upper().replace(" ", "_")
Convert the given text to a constant case.
def
get_entity_id(obj: Any) -> Optional[str]:
107def get_entity_id(obj: Any) -> Optional[str]: 108 """Given an entity-ish object, try to get the ID.""" 109 if is_mapping(obj): 110 obj = obj.get("id") 111 else: 112 try: 113 obj = obj.id 114 except AttributeError: 115 pass 116 return stringify(obj)
Given an entity-ish object, try to get the ID.
def
make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
119def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]: 120 digest = sha1() 121 if key_prefix: 122 digest.update(key_bytes(key_prefix)) 123 base = digest.digest() 124 for part in parts: 125 digest.update(key_bytes(part)) 126 if digest.digest() == base: 127 return None 128 return digest.hexdigest()
def
merge_context(left: Dict[~K, ~V], right: Dict[~K, ~V]) -> Dict[~K, List[~V]]:
131def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]: 132 """When merging two entities, make lists of all the duplicate context 133 keys.""" 134 combined = {} 135 keys = [*left.keys(), *right.keys()] 136 for key in set(keys): 137 if key in ("caption",): 138 continue 139 lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None] 140 rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None] 141 combined[key] = unique_list([*lval, *rval]) 142 return combined
When merging two entities, make lists of all the duplicate context keys.
def
dampen(short: int, long: int, text: str) -> float:
def
shortest(*texts: str) -> str:
def
longest(*texts: str) -> str: