followthemoney.util

  1import os
  2import logging
  3from hashlib import sha1
  4from babel import Locale
  5from gettext import translation
  6
  7from threading import local
  8from typing import cast, Dict, Any, List, Optional, TypeVar, Union
  9from normality import stringify
 10from normality.cleaning import compose_nfc
 11from normality.cleaning import remove_unsafe_chars
 12from normality.encoding import DEFAULT_ENCODING
 13from banal import is_mapping, unique_list, ensure_list
 14
 15MEGABYTE = 1024 * 1024
 16DEFAULT_LOCALE = "en"
 17ENTITY_ID_LEN = 200
 18
 19T = TypeVar("T")
 20K = TypeVar("K")
 21V = TypeVar("V")
 22
 23PathLike = Union[str, os.PathLike[str]]
 24i18n_path = os.path.join(os.path.dirname(__file__), "translations")
 25state = local()
 26log = logging.getLogger(__name__)
 27
 28
 29def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
 30    if not hasattr(state, "translation"):
 31        set_model_locale(Locale.parse(DEFAULT_LOCALE))
 32    return cast(str, state.translation.gettext(*args, **kwargs))
 33
 34
 35def defer(text: str) -> str:
 36    return text
 37
 38
 39def set_model_locale(locale: Locale) -> None:
 40    state.locale = locale
 41    state.translation = translation(
 42        "followthemoney", i18n_path, [str(locale)], fallback=True
 43    )
 44
 45
 46def get_locale() -> Locale:
 47    if not hasattr(state, "locale"):
 48        return Locale.parse(DEFAULT_LOCALE)
 49    return Locale.parse(state.locale)
 50
 51
 52def get_env_list(name: str, default: List[str] = []) -> List[str]:
 53    value = stringify(os.environ.get(name))
 54    if value is not None:
 55        values = value.split(":")
 56        if len(values):
 57            return values
 58    return default
 59
 60
 61def sanitize_text(value: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]:
 62    text = stringify(value, encoding_default=encoding)
 63    if text is None:
 64        return None
 65    try:
 66        text = compose_nfc(text)
 67        if text is None:
 68            return None
 69    except (SystemError, Exception) as ex:
 70        log.warning("Cannot NFC text: %s", ex)
 71        return None
 72    text = remove_unsafe_chars(text)
 73    if text is None:
 74        return None
 75    byte_text = text.encode(DEFAULT_ENCODING, "replace")
 76    return byte_text.decode(DEFAULT_ENCODING, "replace")
 77
 78
 79def key_bytes(key: Any) -> bytes:
 80    """Convert the given data to a value appropriate for hashing."""
 81    if isinstance(key, bytes):
 82        return key
 83    text = stringify(key)
 84    if text is None:
 85        return b""
 86    return text.encode("utf-8")
 87
 88
 89def join_text(*parts: Any, sep: str = " ") -> Optional[str]:
 90    """Join all the non-null arguments using sep."""
 91    texts: List[str] = []
 92    for part in parts:
 93        text = stringify(part)
 94        if text is not None:
 95            texts.append(text)
 96    if not len(texts):
 97        return None
 98    return sep.join(texts)
 99
100
101def const_case(text: str) -> str:
102    """Convert the given text to a constant case."""
103    return text.upper().replace(" ", "_")
104
105
106def get_entity_id(obj: Any) -> Optional[str]:
107    """Given an entity-ish object, try to get the ID."""
108    if is_mapping(obj):
109        obj = obj.get("id")
110    else:
111        try:
112            obj = obj.id
113        except AttributeError:
114            pass
115    return stringify(obj)
116
117
118def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
119    digest = sha1()
120    if key_prefix:
121        digest.update(key_bytes(key_prefix))
122    base = digest.digest()
123    for part in parts:
124        digest.update(key_bytes(part))
125    if digest.digest() == base:
126        return None
127    return digest.hexdigest()
128
129
130def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]:
131    """When merging two entities, make lists of all the duplicate context
132    keys."""
133    combined = {}
134    keys = [*left.keys(), *right.keys()]
135    for key in set(keys):
136        if key in ("caption",):
137            continue
138        lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None]
139        rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None]
140        combined[key] = unique_list([*lval, *rval])
141    return combined
142
143
144def dampen(short: int, long: int, text: str) -> float:
145    length = len(text) - short
146    baseline = max(1.0, (long - short))
147    return max(0, min(1.0, (length / baseline)))
148
149
150def shortest(*texts: str) -> str:
151    return min(texts, key=len)
152
153
154def longest(*texts: str) -> str:
155    return max(texts, key=len)
MEGABYTE = 1048576
DEFAULT_LOCALE = 'en'
ENTITY_ID_LEN = 200
PathLike = typing.Union[str, os.PathLike[str]]
i18n_path = '/opt/hostedtoolcache/Python/3.12.11/x64/lib/python3.12/site-packages/followthemoney/translations'
state = <_thread._local object>
log = <Logger followthemoney.util (WARNING)>
def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
30def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
31    if not hasattr(state, "translation"):
32        set_model_locale(Locale.parse(DEFAULT_LOCALE))
33    return cast(str, state.translation.gettext(*args, **kwargs))
def defer(text: str) -> str:
36def defer(text: str) -> str:
37    return text
def set_model_locale(locale: babel.core.Locale) -> None:
40def set_model_locale(locale: Locale) -> None:
41    state.locale = locale
42    state.translation = translation(
43        "followthemoney", i18n_path, [str(locale)], fallback=True
44    )
def get_locale() -> babel.core.Locale:
47def get_locale() -> Locale:
48    if not hasattr(state, "locale"):
49        return Locale.parse(DEFAULT_LOCALE)
50    return Locale.parse(state.locale)
def get_env_list(name: str, default: List[str] = []) -> List[str]:
53def get_env_list(name: str, default: List[str] = []) -> List[str]:
54    value = stringify(os.environ.get(name))
55    if value is not None:
56        values = value.split(":")
57        if len(values):
58            return values
59    return default
def sanitize_text(value: Any, encoding: str = 'utf-8') -> Optional[str]:
62def sanitize_text(value: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]:
63    text = stringify(value, encoding_default=encoding)
64    if text is None:
65        return None
66    try:
67        text = compose_nfc(text)
68        if text is None:
69            return None
70    except (SystemError, Exception) as ex:
71        log.warning("Cannot NFC text: %s", ex)
72        return None
73    text = remove_unsafe_chars(text)
74    if text is None:
75        return None
76    byte_text = text.encode(DEFAULT_ENCODING, "replace")
77    return byte_text.decode(DEFAULT_ENCODING, "replace")
def key_bytes(key: Any) -> bytes:
80def key_bytes(key: Any) -> bytes:
81    """Convert the given data to a value appropriate for hashing."""
82    if isinstance(key, bytes):
83        return key
84    text = stringify(key)
85    if text is None:
86        return b""
87    return text.encode("utf-8")

Convert the given data to a value appropriate for hashing.

def join_text(*parts: Any, sep: str = ' ') -> Optional[str]:
90def join_text(*parts: Any, sep: str = " ") -> Optional[str]:
91    """Join all the non-null arguments using sep."""
92    texts: List[str] = []
93    for part in parts:
94        text = stringify(part)
95        if text is not None:
96            texts.append(text)
97    if not len(texts):
98        return None
99    return sep.join(texts)

Join all the non-null arguments using sep.

def const_case(text: str) -> str:
102def const_case(text: str) -> str:
103    """Convert the given text to a constant case."""
104    return text.upper().replace(" ", "_")

Convert the given text to a constant case.

def get_entity_id(obj: Any) -> Optional[str]:
107def get_entity_id(obj: Any) -> Optional[str]:
108    """Given an entity-ish object, try to get the ID."""
109    if is_mapping(obj):
110        obj = obj.get("id")
111    else:
112        try:
113            obj = obj.id
114        except AttributeError:
115            pass
116    return stringify(obj)

Given an entity-ish object, try to get the ID.

def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
119def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
120    digest = sha1()
121    if key_prefix:
122        digest.update(key_bytes(key_prefix))
123    base = digest.digest()
124    for part in parts:
125        digest.update(key_bytes(part))
126    if digest.digest() == base:
127        return None
128    return digest.hexdigest()
def merge_context(left: Dict[~K, ~V], right: Dict[~K, ~V]) -> Dict[~K, List[~V]]:
131def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]:
132    """When merging two entities, make lists of all the duplicate context
133    keys."""
134    combined = {}
135    keys = [*left.keys(), *right.keys()]
136    for key in set(keys):
137        if key in ("caption",):
138            continue
139        lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None]
140        rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None]
141        combined[key] = unique_list([*lval, *rval])
142    return combined

When merging two entities, make lists of all the duplicate context keys.

def dampen(short: int, long: int, text: str) -> float:
145def dampen(short: int, long: int, text: str) -> float:
146    length = len(text) - short
147    baseline = max(1.0, (long - short))
148    return max(0, min(1.0, (length / baseline)))
def shortest(*texts: str) -> str:
151def shortest(*texts: str) -> str:
152    return min(texts, key=len)
def longest(*texts: str) -> str:
155def longest(*texts: str) -> str:
156    return max(texts, key=len)