followthemoney.util
1import os 2import logging 3from hashlib import sha1 4from babel import Locale 5from gettext import translation 6 7from threading import local 8from typing import cast, Dict, Any, List, Optional, TypeVar, Union, Sequence 9from normality import stringify 10from normality.cleaning import compose_nfc 11from normality.cleaning import remove_unsafe_chars 12from normality.encoding import DEFAULT_ENCODING 13from banal import is_mapping, unique_list, ensure_list 14 15MEGABYTE = 1024 * 1024 16DEFAULT_LOCALE = "en" 17 18T = TypeVar("T") 19K = TypeVar("K") 20V = TypeVar("V") 21try: 22 # Work-around for Python 3.8 backward compat: 23 PathLike = Union[str, os.PathLike[str]] 24except TypeError: 25 PathLike = Union[str, os.PathLike] # type: ignore 26 27i18n_path = os.path.join(os.path.dirname(__file__), "translations") 28state = local() 29log = logging.getLogger(__name__) 30 31 32def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str: 33 if not hasattr(state, "translation"): 34 set_model_locale(Locale.parse(DEFAULT_LOCALE)) 35 return cast(str, state.translation.gettext(*args, **kwargs)) 36 37 38def defer(text: str) -> str: 39 return text 40 41 42def set_model_locale(locale: Locale) -> None: 43 state.locale = locale 44 state.translation = translation( 45 "followthemoney", i18n_path, [str(locale)], fallback=True 46 ) 47 48 49def get_locale() -> Locale: 50 if not hasattr(state, "locale"): 51 return Locale.parse(DEFAULT_LOCALE) 52 return Locale.parse(state.locale) 53 54 55def get_env_list(name: str, default: List[str] = []) -> List[str]: 56 value = stringify(os.environ.get(name)) 57 if value is not None: 58 values = value.split(":") 59 if len(values): 60 return values 61 return default 62 63 64def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]: 65 text = stringify(text, encoding_default=encoding) 66 if text is None: 67 return None 68 try: 69 text = compose_nfc(text) 70 except (SystemError, Exception) as ex: 71 log.warning("Cannot NFC text: %s", ex) 72 return None 73 text = remove_unsafe_chars(text) 74 if text is None: 75 return None 76 byte_text = text.encode(DEFAULT_ENCODING, "replace") 77 return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace")) 78 79 80def value_list(value: Union[T, Sequence[T]]) -> List[T]: 81 if not isinstance(value, (str, bytes)): 82 try: 83 return [v for v in cast(Sequence[T], value)] 84 except TypeError: 85 pass 86 return [cast(T, value)] 87 88 89def key_bytes(key: Any) -> bytes: 90 """Convert the given data to a value appropriate for hashing.""" 91 if isinstance(key, bytes): 92 return key 93 text = stringify(key) 94 if text is None: 95 return b"" 96 return text.encode("utf-8") 97 98 99def join_text(*parts: Any, sep: str = " ") -> Optional[str]: 100 """Join all the non-null arguments using sep.""" 101 texts: List[str] = [] 102 for part in parts: 103 text = stringify(part) 104 if text is not None: 105 texts.append(text) 106 if not len(texts): 107 return None 108 return sep.join(texts) 109 110 111def get_entity_id(obj: Any) -> Optional[str]: 112 """Given an entity-ish object, try to get the ID.""" 113 if is_mapping(obj): 114 obj = obj.get("id") 115 else: 116 try: 117 obj = obj.id 118 except AttributeError: 119 pass 120 return stringify(obj) 121 122 123def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]: 124 digest = sha1() 125 if key_prefix: 126 digest.update(key_bytes(key_prefix)) 127 base = digest.digest() 128 for part in parts: 129 digest.update(key_bytes(part)) 130 if digest.digest() == base: 131 return None 132 return digest.hexdigest() 133 134 135def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]: 136 """When merging two entities, make lists of all the duplicate context 137 keys.""" 138 combined = {} 139 keys = [*left.keys(), *right.keys()] 140 for key in set(keys): 141 if key in ("caption",): 142 continue 143 lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None] 144 rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None] 145 combined[key] = unique_list([*lval, *rval]) 146 return combined 147 148 149def dampen(short: int, long: int, text: str) -> float: 150 length = len(text) - short 151 baseline = max(1.0, (long - short)) 152 return max(0, min(1.0, (length / baseline))) 153 154 155def shortest(*texts: str) -> str: 156 return min(texts, key=len) 157 158 159def longest(*texts: str) -> str: 160 return max(texts, key=len)
MEGABYTE =
1048576
DEFAULT_LOCALE =
'en'
i18n_path =
'/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/site-packages/followthemoney/translations'
state =
<_thread._local object>
log =
<Logger followthemoney.util (WARNING)>
def
gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
def
defer(text: str) -> str:
def
set_model_locale(locale: babel.core.Locale) -> None:
def
get_locale() -> babel.core.Locale:
def
get_env_list(name: str, default: List[str] = []) -> List[str]:
def
sanitize_text(text: Any, encoding: str = 'utf-8') -> Optional[str]:
65def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]: 66 text = stringify(text, encoding_default=encoding) 67 if text is None: 68 return None 69 try: 70 text = compose_nfc(text) 71 except (SystemError, Exception) as ex: 72 log.warning("Cannot NFC text: %s", ex) 73 return None 74 text = remove_unsafe_chars(text) 75 if text is None: 76 return None 77 byte_text = text.encode(DEFAULT_ENCODING, "replace") 78 return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace"))
def
value_list(value: Union[~T, Sequence[~T]]) -> List[~T]:
def
key_bytes(key: Any) -> bytes:
90def key_bytes(key: Any) -> bytes: 91 """Convert the given data to a value appropriate for hashing.""" 92 if isinstance(key, bytes): 93 return key 94 text = stringify(key) 95 if text is None: 96 return b"" 97 return text.encode("utf-8")
Convert the given data to a value appropriate for hashing.
def
join_text(*parts: Any, sep: str = ' ') -> Optional[str]:
100def join_text(*parts: Any, sep: str = " ") -> Optional[str]: 101 """Join all the non-null arguments using sep.""" 102 texts: List[str] = [] 103 for part in parts: 104 text = stringify(part) 105 if text is not None: 106 texts.append(text) 107 if not len(texts): 108 return None 109 return sep.join(texts)
Join all the non-null arguments using sep.
def
get_entity_id(obj: Any) -> Optional[str]:
112def get_entity_id(obj: Any) -> Optional[str]: 113 """Given an entity-ish object, try to get the ID.""" 114 if is_mapping(obj): 115 obj = obj.get("id") 116 else: 117 try: 118 obj = obj.id 119 except AttributeError: 120 pass 121 return stringify(obj)
Given an entity-ish object, try to get the ID.
def
make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
124def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]: 125 digest = sha1() 126 if key_prefix: 127 digest.update(key_bytes(key_prefix)) 128 base = digest.digest() 129 for part in parts: 130 digest.update(key_bytes(part)) 131 if digest.digest() == base: 132 return None 133 return digest.hexdigest()
def
merge_context(left: Dict[~K, ~V], right: Dict[~K, ~V]) -> Dict[~K, List[~V]]:
136def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]: 137 """When merging two entities, make lists of all the duplicate context 138 keys.""" 139 combined = {} 140 keys = [*left.keys(), *right.keys()] 141 for key in set(keys): 142 if key in ("caption",): 143 continue 144 lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None] 145 rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None] 146 combined[key] = unique_list([*lval, *rval]) 147 return combined
When merging two entities, make lists of all the duplicate context keys.
def
dampen(short: int, long: int, text: str) -> float:
def
shortest(*texts: str) -> str:
def
longest(*texts: str) -> str: