followthemoney.util

  1import os
  2import logging
  3from hashlib import sha1
  4from babel import Locale
  5from gettext import translation
  6
  7from threading import local
  8from typing import cast, Dict, Any, List, Optional, TypeVar, Union, Sequence
  9from normality import stringify
 10from normality.cleaning import compose_nfc
 11from normality.cleaning import remove_unsafe_chars
 12from normality.encoding import DEFAULT_ENCODING
 13from banal import is_mapping, unique_list, ensure_list
 14
 15MEGABYTE = 1024 * 1024
 16DEFAULT_LOCALE = "en"
 17ENTITY_ID_LEN = 200
 18
 19T = TypeVar("T")
 20K = TypeVar("K")
 21V = TypeVar("V")
 22
 23PathLike = Union[str, os.PathLike[str]]
 24i18n_path = os.path.join(os.path.dirname(__file__), "translations")
 25state = local()
 26log = logging.getLogger(__name__)
 27
 28
 29def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
 30    if not hasattr(state, "translation"):
 31        set_model_locale(Locale.parse(DEFAULT_LOCALE))
 32    return cast(str, state.translation.gettext(*args, **kwargs))
 33
 34
 35def defer(text: str) -> str:
 36    return text
 37
 38
 39def set_model_locale(locale: Locale) -> None:
 40    state.locale = locale
 41    state.translation = translation(
 42        "followthemoney", i18n_path, [str(locale)], fallback=True
 43    )
 44
 45
 46def get_locale() -> Locale:
 47    if not hasattr(state, "locale"):
 48        return Locale.parse(DEFAULT_LOCALE)
 49    return Locale.parse(state.locale)
 50
 51
 52def get_env_list(name: str, default: List[str] = []) -> List[str]:
 53    value = stringify(os.environ.get(name))
 54    if value is not None:
 55        values = value.split(":")
 56        if len(values):
 57            return values
 58    return default
 59
 60
 61def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]:
 62    text = stringify(text, encoding_default=encoding)
 63    if text is None:
 64        return None
 65    try:
 66        text = compose_nfc(text)
 67    except (SystemError, Exception) as ex:
 68        log.warning("Cannot NFC text: %s", ex)
 69        return None
 70    text = remove_unsafe_chars(text)
 71    if text is None:
 72        return None
 73    byte_text = text.encode(DEFAULT_ENCODING, "replace")
 74    return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace"))
 75
 76
 77def value_list(value: Union[T, Sequence[T]]) -> List[T]:
 78    if not isinstance(value, (str, bytes)):
 79        try:
 80            return [v for v in cast(Sequence[T], value)]
 81        except TypeError:
 82            pass
 83    return [cast(T, value)]
 84
 85
 86def key_bytes(key: Any) -> bytes:
 87    """Convert the given data to a value appropriate for hashing."""
 88    if isinstance(key, bytes):
 89        return key
 90    text = stringify(key)
 91    if text is None:
 92        return b""
 93    return text.encode("utf-8")
 94
 95
 96def join_text(*parts: Any, sep: str = " ") -> Optional[str]:
 97    """Join all the non-null arguments using sep."""
 98    texts: List[str] = []
 99    for part in parts:
100        text = stringify(part)
101        if text is not None:
102            texts.append(text)
103    if not len(texts):
104        return None
105    return sep.join(texts)
106
107
108def const_case(text: str) -> str:
109    """Convert the given text to a constant case."""
110    return text.upper().replace(" ", "_")
111
112
113def get_entity_id(obj: Any) -> Optional[str]:
114    """Given an entity-ish object, try to get the ID."""
115    if is_mapping(obj):
116        obj = obj.get("id")
117    else:
118        try:
119            obj = obj.id
120        except AttributeError:
121            pass
122    return stringify(obj)
123
124
125def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
126    digest = sha1()
127    if key_prefix:
128        digest.update(key_bytes(key_prefix))
129    base = digest.digest()
130    for part in parts:
131        digest.update(key_bytes(part))
132    if digest.digest() == base:
133        return None
134    return digest.hexdigest()
135
136
137def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]:
138    """When merging two entities, make lists of all the duplicate context
139    keys."""
140    combined = {}
141    keys = [*left.keys(), *right.keys()]
142    for key in set(keys):
143        if key in ("caption",):
144            continue
145        lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None]
146        rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None]
147        combined[key] = unique_list([*lval, *rval])
148    return combined
149
150
151def dampen(short: int, long: int, text: str) -> float:
152    length = len(text) - short
153    baseline = max(1.0, (long - short))
154    return max(0, min(1.0, (length / baseline)))
155
156
157def shortest(*texts: str) -> str:
158    return min(texts, key=len)
159
160
161def longest(*texts: str) -> str:
162    return max(texts, key=len)
MEGABYTE = 1048576
DEFAULT_LOCALE = 'en'
ENTITY_ID_LEN = 200
PathLike = typing.Union[str, os.PathLike[str]]
i18n_path = '/opt/hostedtoolcache/Python/3.10.16/x64/lib/python3.10/site-packages/followthemoney/translations'
state = <_thread._local object>
log = <Logger followthemoney.util (WARNING)>
def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
30def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
31    if not hasattr(state, "translation"):
32        set_model_locale(Locale.parse(DEFAULT_LOCALE))
33    return cast(str, state.translation.gettext(*args, **kwargs))
def defer(text: str) -> str:
36def defer(text: str) -> str:
37    return text
def set_model_locale(locale: babel.core.Locale) -> None:
40def set_model_locale(locale: Locale) -> None:
41    state.locale = locale
42    state.translation = translation(
43        "followthemoney", i18n_path, [str(locale)], fallback=True
44    )
def get_locale() -> babel.core.Locale:
47def get_locale() -> Locale:
48    if not hasattr(state, "locale"):
49        return Locale.parse(DEFAULT_LOCALE)
50    return Locale.parse(state.locale)
def get_env_list(name: str, default: List[str] = []) -> List[str]:
53def get_env_list(name: str, default: List[str] = []) -> List[str]:
54    value = stringify(os.environ.get(name))
55    if value is not None:
56        values = value.split(":")
57        if len(values):
58            return values
59    return default
def sanitize_text(text: Any, encoding: str = 'utf-8') -> Optional[str]:
62def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]:
63    text = stringify(text, encoding_default=encoding)
64    if text is None:
65        return None
66    try:
67        text = compose_nfc(text)
68    except (SystemError, Exception) as ex:
69        log.warning("Cannot NFC text: %s", ex)
70        return None
71    text = remove_unsafe_chars(text)
72    if text is None:
73        return None
74    byte_text = text.encode(DEFAULT_ENCODING, "replace")
75    return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace"))
def value_list(value: Union[~T, Sequence[~T]]) -> List[~T]:
78def value_list(value: Union[T, Sequence[T]]) -> List[T]:
79    if not isinstance(value, (str, bytes)):
80        try:
81            return [v for v in cast(Sequence[T], value)]
82        except TypeError:
83            pass
84    return [cast(T, value)]
def key_bytes(key: Any) -> bytes:
87def key_bytes(key: Any) -> bytes:
88    """Convert the given data to a value appropriate for hashing."""
89    if isinstance(key, bytes):
90        return key
91    text = stringify(key)
92    if text is None:
93        return b""
94    return text.encode("utf-8")

Convert the given data to a value appropriate for hashing.

def join_text(*parts: Any, sep: str = ' ') -> Optional[str]:
 97def join_text(*parts: Any, sep: str = " ") -> Optional[str]:
 98    """Join all the non-null arguments using sep."""
 99    texts: List[str] = []
100    for part in parts:
101        text = stringify(part)
102        if text is not None:
103            texts.append(text)
104    if not len(texts):
105        return None
106    return sep.join(texts)

Join all the non-null arguments using sep.

def const_case(text: str) -> str:
109def const_case(text: str) -> str:
110    """Convert the given text to a constant case."""
111    return text.upper().replace(" ", "_")

Convert the given text to a constant case.

def get_entity_id(obj: Any) -> Optional[str]:
114def get_entity_id(obj: Any) -> Optional[str]:
115    """Given an entity-ish object, try to get the ID."""
116    if is_mapping(obj):
117        obj = obj.get("id")
118    else:
119        try:
120            obj = obj.id
121        except AttributeError:
122            pass
123    return stringify(obj)

Given an entity-ish object, try to get the ID.

def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
126def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
127    digest = sha1()
128    if key_prefix:
129        digest.update(key_bytes(key_prefix))
130    base = digest.digest()
131    for part in parts:
132        digest.update(key_bytes(part))
133    if digest.digest() == base:
134        return None
135    return digest.hexdigest()
def merge_context(left: Dict[~K, ~V], right: Dict[~K, ~V]) -> Dict[~K, List[~V]]:
138def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]:
139    """When merging two entities, make lists of all the duplicate context
140    keys."""
141    combined = {}
142    keys = [*left.keys(), *right.keys()]
143    for key in set(keys):
144        if key in ("caption",):
145            continue
146        lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None]
147        rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None]
148        combined[key] = unique_list([*lval, *rval])
149    return combined

When merging two entities, make lists of all the duplicate context keys.

def dampen(short: int, long: int, text: str) -> float:
152def dampen(short: int, long: int, text: str) -> float:
153    length = len(text) - short
154    baseline = max(1.0, (long - short))
155    return max(0, min(1.0, (length / baseline)))
def shortest(*texts: str) -> str:
158def shortest(*texts: str) -> str:
159    return min(texts, key=len)
def longest(*texts: str) -> str:
162def longest(*texts: str) -> str:
163    return max(texts, key=len)