followthemoney.util
1import os 2import logging 3from hashlib import sha1 4from babel import Locale 5from gettext import translation 6 7from threading import local 8from typing import cast, Dict, Any, List, Optional, TypeVar, Union, Sequence 9from normality import stringify 10from normality.cleaning import compose_nfc 11from normality.cleaning import remove_unsafe_chars 12from normality.encoding import DEFAULT_ENCODING 13from banal import is_mapping, unique_list, ensure_list 14 15MEGABYTE = 1024 * 1024 16DEFAULT_LOCALE = "en" 17ENTITY_ID_LEN = 200 18 19T = TypeVar("T") 20K = TypeVar("K") 21V = TypeVar("V") 22 23PathLike = Union[str, os.PathLike[str]] 24i18n_path = os.path.join(os.path.dirname(__file__), "translations") 25state = local() 26log = logging.getLogger(__name__) 27 28 29def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str: 30 if not hasattr(state, "translation"): 31 set_model_locale(Locale.parse(DEFAULT_LOCALE)) 32 return cast(str, state.translation.gettext(*args, **kwargs)) 33 34 35def defer(text: str) -> str: 36 return text 37 38 39def set_model_locale(locale: Locale) -> None: 40 state.locale = locale 41 state.translation = translation( 42 "followthemoney", i18n_path, [str(locale)], fallback=True 43 ) 44 45 46def get_locale() -> Locale: 47 if not hasattr(state, "locale"): 48 return Locale.parse(DEFAULT_LOCALE) 49 return Locale.parse(state.locale) 50 51 52def get_env_list(name: str, default: List[str] = []) -> List[str]: 53 value = stringify(os.environ.get(name)) 54 if value is not None: 55 values = value.split(":") 56 if len(values): 57 return values 58 return default 59 60 61def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]: 62 text = stringify(text, encoding_default=encoding) 63 if text is None: 64 return None 65 try: 66 text = compose_nfc(text) 67 except (SystemError, Exception) as ex: 68 log.warning("Cannot NFC text: %s", ex) 69 return None 70 text = remove_unsafe_chars(text) 71 if text is None: 72 return None 73 byte_text = text.encode(DEFAULT_ENCODING, "replace") 74 return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace")) 75 76 77def value_list(value: Union[T, Sequence[T]]) -> List[T]: 78 if not isinstance(value, (str, bytes)): 79 try: 80 return [v for v in cast(Sequence[T], value)] 81 except TypeError: 82 pass 83 return [cast(T, value)] 84 85 86def key_bytes(key: Any) -> bytes: 87 """Convert the given data to a value appropriate for hashing.""" 88 if isinstance(key, bytes): 89 return key 90 text = stringify(key) 91 if text is None: 92 return b"" 93 return text.encode("utf-8") 94 95 96def join_text(*parts: Any, sep: str = " ") -> Optional[str]: 97 """Join all the non-null arguments using sep.""" 98 texts: List[str] = [] 99 for part in parts: 100 text = stringify(part) 101 if text is not None: 102 texts.append(text) 103 if not len(texts): 104 return None 105 return sep.join(texts) 106 107 108def const_case(text: str) -> str: 109 """Convert the given text to a constant case.""" 110 return text.upper().replace(" ", "_") 111 112 113def get_entity_id(obj: Any) -> Optional[str]: 114 """Given an entity-ish object, try to get the ID.""" 115 if is_mapping(obj): 116 obj = obj.get("id") 117 else: 118 try: 119 obj = obj.id 120 except AttributeError: 121 pass 122 return stringify(obj) 123 124 125def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]: 126 digest = sha1() 127 if key_prefix: 128 digest.update(key_bytes(key_prefix)) 129 base = digest.digest() 130 for part in parts: 131 digest.update(key_bytes(part)) 132 if digest.digest() == base: 133 return None 134 return digest.hexdigest() 135 136 137def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]: 138 """When merging two entities, make lists of all the duplicate context 139 keys.""" 140 combined = {} 141 keys = [*left.keys(), *right.keys()] 142 for key in set(keys): 143 if key in ("caption",): 144 continue 145 lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None] 146 rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None] 147 combined[key] = unique_list([*lval, *rval]) 148 return combined 149 150 151def dampen(short: int, long: int, text: str) -> float: 152 length = len(text) - short 153 baseline = max(1.0, (long - short)) 154 return max(0, min(1.0, (length / baseline))) 155 156 157def shortest(*texts: str) -> str: 158 return min(texts, key=len) 159 160 161def longest(*texts: str) -> str: 162 return max(texts, key=len)
MEGABYTE =
1048576
DEFAULT_LOCALE =
'en'
ENTITY_ID_LEN =
200
PathLike =
typing.Union[str, os.PathLike[str]]
i18n_path =
'/opt/hostedtoolcache/Python/3.10.16/x64/lib/python3.10/site-packages/followthemoney/translations'
state =
<_thread._local object>
log =
<Logger followthemoney.util (WARNING)>
def
gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
def
defer(text: str) -> str:
def
set_model_locale(locale: babel.core.Locale) -> None:
def
get_locale() -> babel.core.Locale:
def
get_env_list(name: str, default: List[str] = []) -> List[str]:
def
sanitize_text(text: Any, encoding: str = 'utf-8') -> Optional[str]:
62def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]: 63 text = stringify(text, encoding_default=encoding) 64 if text is None: 65 return None 66 try: 67 text = compose_nfc(text) 68 except (SystemError, Exception) as ex: 69 log.warning("Cannot NFC text: %s", ex) 70 return None 71 text = remove_unsafe_chars(text) 72 if text is None: 73 return None 74 byte_text = text.encode(DEFAULT_ENCODING, "replace") 75 return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace"))
def
value_list(value: Union[~T, Sequence[~T]]) -> List[~T]:
def
key_bytes(key: Any) -> bytes:
87def key_bytes(key: Any) -> bytes: 88 """Convert the given data to a value appropriate for hashing.""" 89 if isinstance(key, bytes): 90 return key 91 text = stringify(key) 92 if text is None: 93 return b"" 94 return text.encode("utf-8")
Convert the given data to a value appropriate for hashing.
def
join_text(*parts: Any, sep: str = ' ') -> Optional[str]:
97def join_text(*parts: Any, sep: str = " ") -> Optional[str]: 98 """Join all the non-null arguments using sep.""" 99 texts: List[str] = [] 100 for part in parts: 101 text = stringify(part) 102 if text is not None: 103 texts.append(text) 104 if not len(texts): 105 return None 106 return sep.join(texts)
Join all the non-null arguments using sep.
def
const_case(text: str) -> str:
109def const_case(text: str) -> str: 110 """Convert the given text to a constant case.""" 111 return text.upper().replace(" ", "_")
Convert the given text to a constant case.
def
get_entity_id(obj: Any) -> Optional[str]:
114def get_entity_id(obj: Any) -> Optional[str]: 115 """Given an entity-ish object, try to get the ID.""" 116 if is_mapping(obj): 117 obj = obj.get("id") 118 else: 119 try: 120 obj = obj.id 121 except AttributeError: 122 pass 123 return stringify(obj)
Given an entity-ish object, try to get the ID.
def
make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
126def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]: 127 digest = sha1() 128 if key_prefix: 129 digest.update(key_bytes(key_prefix)) 130 base = digest.digest() 131 for part in parts: 132 digest.update(key_bytes(part)) 133 if digest.digest() == base: 134 return None 135 return digest.hexdigest()
def
merge_context(left: Dict[~K, ~V], right: Dict[~K, ~V]) -> Dict[~K, List[~V]]:
138def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]: 139 """When merging two entities, make lists of all the duplicate context 140 keys.""" 141 combined = {} 142 keys = [*left.keys(), *right.keys()] 143 for key in set(keys): 144 if key in ("caption",): 145 continue 146 lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None] 147 rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None] 148 combined[key] = unique_list([*lval, *rval]) 149 return combined
When merging two entities, make lists of all the duplicate context keys.
def
dampen(short: int, long: int, text: str) -> float:
def
shortest(*texts: str) -> str:
def
longest(*texts: str) -> str: