followthemoney.util
1import os 2import logging 3from hashlib import sha1 4from babel import Locale 5from gettext import translation 6 7from threading import local 8from typing import cast, Dict, Any, List, Optional, TypeVar, Union, Sequence 9from normality import stringify 10from normality.cleaning import compose_nfc 11from normality.cleaning import remove_unsafe_chars 12from normality.encoding import DEFAULT_ENCODING 13from banal import is_mapping, unique_list, ensure_list 14 15MEGABYTE = 1024 * 1024 16DEFAULT_LOCALE = "en" 17 18T = TypeVar("T") 19K = TypeVar("K") 20V = TypeVar("V") 21 22PathLike = Union[str, os.PathLike[str]] 23i18n_path = os.path.join(os.path.dirname(__file__), "translations") 24state = local() 25log = logging.getLogger(__name__) 26 27 28def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str: 29 if not hasattr(state, "translation"): 30 set_model_locale(Locale.parse(DEFAULT_LOCALE)) 31 return cast(str, state.translation.gettext(*args, **kwargs)) 32 33 34def defer(text: str) -> str: 35 return text 36 37 38def set_model_locale(locale: Locale) -> None: 39 state.locale = locale 40 state.translation = translation( 41 "followthemoney", i18n_path, [str(locale)], fallback=True 42 ) 43 44 45def get_locale() -> Locale: 46 if not hasattr(state, "locale"): 47 return Locale.parse(DEFAULT_LOCALE) 48 return Locale.parse(state.locale) 49 50 51def get_env_list(name: str, default: List[str] = []) -> List[str]: 52 value = stringify(os.environ.get(name)) 53 if value is not None: 54 values = value.split(":") 55 if len(values): 56 return values 57 return default 58 59 60def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]: 61 text = stringify(text, encoding_default=encoding) 62 if text is None: 63 return None 64 try: 65 text = compose_nfc(text) 66 except (SystemError, Exception) as ex: 67 log.warning("Cannot NFC text: %s", ex) 68 return None 69 text = remove_unsafe_chars(text) 70 if text is None: 71 return None 72 byte_text = text.encode(DEFAULT_ENCODING, "replace") 73 return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace")) 74 75 76def value_list(value: Union[T, Sequence[T]]) -> List[T]: 77 if not isinstance(value, (str, bytes)): 78 try: 79 return [v for v in cast(Sequence[T], value)] 80 except TypeError: 81 pass 82 return [cast(T, value)] 83 84 85def key_bytes(key: Any) -> bytes: 86 """Convert the given data to a value appropriate for hashing.""" 87 if isinstance(key, bytes): 88 return key 89 text = stringify(key) 90 if text is None: 91 return b"" 92 return text.encode("utf-8") 93 94 95def join_text(*parts: Any, sep: str = " ") -> Optional[str]: 96 """Join all the non-null arguments using sep.""" 97 texts: List[str] = [] 98 for part in parts: 99 text = stringify(part) 100 if text is not None: 101 texts.append(text) 102 if not len(texts): 103 return None 104 return sep.join(texts) 105 106 107def get_entity_id(obj: Any) -> Optional[str]: 108 """Given an entity-ish object, try to get the ID.""" 109 if is_mapping(obj): 110 obj = obj.get("id") 111 else: 112 try: 113 obj = obj.id 114 except AttributeError: 115 pass 116 return stringify(obj) 117 118 119def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]: 120 digest = sha1() 121 if key_prefix: 122 digest.update(key_bytes(key_prefix)) 123 base = digest.digest() 124 for part in parts: 125 digest.update(key_bytes(part)) 126 if digest.digest() == base: 127 return None 128 return digest.hexdigest() 129 130 131def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]: 132 """When merging two entities, make lists of all the duplicate context 133 keys.""" 134 combined = {} 135 keys = [*left.keys(), *right.keys()] 136 for key in set(keys): 137 if key in ("caption",): 138 continue 139 lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None] 140 rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None] 141 combined[key] = unique_list([*lval, *rval]) 142 return combined 143 144 145def dampen(short: int, long: int, text: str) -> float: 146 length = len(text) - short 147 baseline = max(1.0, (long - short)) 148 return max(0, min(1.0, (length / baseline))) 149 150 151def shortest(*texts: str) -> str: 152 return min(texts, key=len) 153 154 155def longest(*texts: str) -> str: 156 return max(texts, key=len)
MEGABYTE =
1048576
DEFAULT_LOCALE =
'en'
PathLike =
typing.Union[str, os.PathLike[str]]
i18n_path =
'/opt/hostedtoolcache/Python/3.10.15/x64/lib/python3.10/site-packages/followthemoney/translations'
state =
<_thread._local object>
log =
<Logger followthemoney.util (WARNING)>
def
gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
def
defer(text: str) -> str:
def
set_model_locale(locale: babel.core.Locale) -> None:
def
get_locale() -> babel.core.Locale:
def
get_env_list(name: str, default: List[str] = []) -> List[str]:
def
sanitize_text(text: Any, encoding: str = 'utf-8') -> Optional[str]:
61def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]: 62 text = stringify(text, encoding_default=encoding) 63 if text is None: 64 return None 65 try: 66 text = compose_nfc(text) 67 except (SystemError, Exception) as ex: 68 log.warning("Cannot NFC text: %s", ex) 69 return None 70 text = remove_unsafe_chars(text) 71 if text is None: 72 return None 73 byte_text = text.encode(DEFAULT_ENCODING, "replace") 74 return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace"))
def
value_list(value: Union[~T, Sequence[~T]]) -> List[~T]:
def
key_bytes(key: Any) -> bytes:
86def key_bytes(key: Any) -> bytes: 87 """Convert the given data to a value appropriate for hashing.""" 88 if isinstance(key, bytes): 89 return key 90 text = stringify(key) 91 if text is None: 92 return b"" 93 return text.encode("utf-8")
Convert the given data to a value appropriate for hashing.
def
join_text(*parts: Any, sep: str = ' ') -> Optional[str]:
96def join_text(*parts: Any, sep: str = " ") -> Optional[str]: 97 """Join all the non-null arguments using sep.""" 98 texts: List[str] = [] 99 for part in parts: 100 text = stringify(part) 101 if text is not None: 102 texts.append(text) 103 if not len(texts): 104 return None 105 return sep.join(texts)
Join all the non-null arguments using sep.
def
get_entity_id(obj: Any) -> Optional[str]:
108def get_entity_id(obj: Any) -> Optional[str]: 109 """Given an entity-ish object, try to get the ID.""" 110 if is_mapping(obj): 111 obj = obj.get("id") 112 else: 113 try: 114 obj = obj.id 115 except AttributeError: 116 pass 117 return stringify(obj)
Given an entity-ish object, try to get the ID.
def
make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
120def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]: 121 digest = sha1() 122 if key_prefix: 123 digest.update(key_bytes(key_prefix)) 124 base = digest.digest() 125 for part in parts: 126 digest.update(key_bytes(part)) 127 if digest.digest() == base: 128 return None 129 return digest.hexdigest()
def
merge_context(left: Dict[~K, ~V], right: Dict[~K, ~V]) -> Dict[~K, List[~V]]:
132def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]: 133 """When merging two entities, make lists of all the duplicate context 134 keys.""" 135 combined = {} 136 keys = [*left.keys(), *right.keys()] 137 for key in set(keys): 138 if key in ("caption",): 139 continue 140 lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None] 141 rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None] 142 combined[key] = unique_list([*lval, *rval]) 143 return combined
When merging two entities, make lists of all the duplicate context keys.
def
dampen(short: int, long: int, text: str) -> float:
def
shortest(*texts: str) -> str:
def
longest(*texts: str) -> str: