followthemoney.util

  1import os
  2import logging
  3from hashlib import sha1
  4from babel import Locale
  5from gettext import translation
  6
  7from threading import local
  8from typing import cast, Dict, Any, List, Optional, TypeVar, Union, Sequence
  9from normality import stringify
 10from normality.cleaning import compose_nfc
 11from normality.cleaning import remove_unsafe_chars
 12from normality.encoding import DEFAULT_ENCODING
 13from banal import is_mapping, unique_list, ensure_list
 14
 15MEGABYTE = 1024 * 1024
 16DEFAULT_LOCALE = "en"
 17
 18T = TypeVar("T")
 19K = TypeVar("K")
 20V = TypeVar("V")
 21
 22PathLike = Union[str, os.PathLike[str]]
 23i18n_path = os.path.join(os.path.dirname(__file__), "translations")
 24state = local()
 25log = logging.getLogger(__name__)
 26
 27
 28def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
 29    if not hasattr(state, "translation"):
 30        set_model_locale(Locale.parse(DEFAULT_LOCALE))
 31    return cast(str, state.translation.gettext(*args, **kwargs))
 32
 33
 34def defer(text: str) -> str:
 35    return text
 36
 37
 38def set_model_locale(locale: Locale) -> None:
 39    state.locale = locale
 40    state.translation = translation(
 41        "followthemoney", i18n_path, [str(locale)], fallback=True
 42    )
 43
 44
 45def get_locale() -> Locale:
 46    if not hasattr(state, "locale"):
 47        return Locale.parse(DEFAULT_LOCALE)
 48    return Locale.parse(state.locale)
 49
 50
 51def get_env_list(name: str, default: List[str] = []) -> List[str]:
 52    value = stringify(os.environ.get(name))
 53    if value is not None:
 54        values = value.split(":")
 55        if len(values):
 56            return values
 57    return default
 58
 59
 60def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]:
 61    text = stringify(text, encoding_default=encoding)
 62    if text is None:
 63        return None
 64    try:
 65        text = compose_nfc(text)
 66    except (SystemError, Exception) as ex:
 67        log.warning("Cannot NFC text: %s", ex)
 68        return None
 69    text = remove_unsafe_chars(text)
 70    if text is None:
 71        return None
 72    byte_text = text.encode(DEFAULT_ENCODING, "replace")
 73    return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace"))
 74
 75
 76def value_list(value: Union[T, Sequence[T]]) -> List[T]:
 77    if not isinstance(value, (str, bytes)):
 78        try:
 79            return [v for v in cast(Sequence[T], value)]
 80        except TypeError:
 81            pass
 82    return [cast(T, value)]
 83
 84
 85def key_bytes(key: Any) -> bytes:
 86    """Convert the given data to a value appropriate for hashing."""
 87    if isinstance(key, bytes):
 88        return key
 89    text = stringify(key)
 90    if text is None:
 91        return b""
 92    return text.encode("utf-8")
 93
 94
 95def join_text(*parts: Any, sep: str = " ") -> Optional[str]:
 96    """Join all the non-null arguments using sep."""
 97    texts: List[str] = []
 98    for part in parts:
 99        text = stringify(part)
100        if text is not None:
101            texts.append(text)
102    if not len(texts):
103        return None
104    return sep.join(texts)
105
106
107def get_entity_id(obj: Any) -> Optional[str]:
108    """Given an entity-ish object, try to get the ID."""
109    if is_mapping(obj):
110        obj = obj.get("id")
111    else:
112        try:
113            obj = obj.id
114        except AttributeError:
115            pass
116    return stringify(obj)
117
118
119def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
120    digest = sha1()
121    if key_prefix:
122        digest.update(key_bytes(key_prefix))
123    base = digest.digest()
124    for part in parts:
125        digest.update(key_bytes(part))
126    if digest.digest() == base:
127        return None
128    return digest.hexdigest()
129
130
131def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]:
132    """When merging two entities, make lists of all the duplicate context
133    keys."""
134    combined = {}
135    keys = [*left.keys(), *right.keys()]
136    for key in set(keys):
137        if key in ("caption",):
138            continue
139        lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None]
140        rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None]
141        combined[key] = unique_list([*lval, *rval])
142    return combined
143
144
145def dampen(short: int, long: int, text: str) -> float:
146    length = len(text) - short
147    baseline = max(1.0, (long - short))
148    return max(0, min(1.0, (length / baseline)))
149
150
151def shortest(*texts: str) -> str:
152    return min(texts, key=len)
153
154
155def longest(*texts: str) -> str:
156    return max(texts, key=len)
MEGABYTE = 1048576
DEFAULT_LOCALE = 'en'
PathLike = typing.Union[str, os.PathLike[str]]
i18n_path = '/opt/hostedtoolcache/Python/3.10.15/x64/lib/python3.10/site-packages/followthemoney/translations'
state = <_thread._local object>
log = <Logger followthemoney.util (WARNING)>
def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
29def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
30    if not hasattr(state, "translation"):
31        set_model_locale(Locale.parse(DEFAULT_LOCALE))
32    return cast(str, state.translation.gettext(*args, **kwargs))
def defer(text: str) -> str:
35def defer(text: str) -> str:
36    return text
def set_model_locale(locale: babel.core.Locale) -> None:
39def set_model_locale(locale: Locale) -> None:
40    state.locale = locale
41    state.translation = translation(
42        "followthemoney", i18n_path, [str(locale)], fallback=True
43    )
def get_locale() -> babel.core.Locale:
46def get_locale() -> Locale:
47    if not hasattr(state, "locale"):
48        return Locale.parse(DEFAULT_LOCALE)
49    return Locale.parse(state.locale)
def get_env_list(name: str, default: List[str] = []) -> List[str]:
52def get_env_list(name: str, default: List[str] = []) -> List[str]:
53    value = stringify(os.environ.get(name))
54    if value is not None:
55        values = value.split(":")
56        if len(values):
57            return values
58    return default
def sanitize_text(text: Any, encoding: str = 'utf-8') -> Optional[str]:
61def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]:
62    text = stringify(text, encoding_default=encoding)
63    if text is None:
64        return None
65    try:
66        text = compose_nfc(text)
67    except (SystemError, Exception) as ex:
68        log.warning("Cannot NFC text: %s", ex)
69        return None
70    text = remove_unsafe_chars(text)
71    if text is None:
72        return None
73    byte_text = text.encode(DEFAULT_ENCODING, "replace")
74    return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace"))
def value_list(value: Union[~T, Sequence[~T]]) -> List[~T]:
77def value_list(value: Union[T, Sequence[T]]) -> List[T]:
78    if not isinstance(value, (str, bytes)):
79        try:
80            return [v for v in cast(Sequence[T], value)]
81        except TypeError:
82            pass
83    return [cast(T, value)]
def key_bytes(key: Any) -> bytes:
86def key_bytes(key: Any) -> bytes:
87    """Convert the given data to a value appropriate for hashing."""
88    if isinstance(key, bytes):
89        return key
90    text = stringify(key)
91    if text is None:
92        return b""
93    return text.encode("utf-8")

Convert the given data to a value appropriate for hashing.

def join_text(*parts: Any, sep: str = ' ') -> Optional[str]:
 96def join_text(*parts: Any, sep: str = " ") -> Optional[str]:
 97    """Join all the non-null arguments using sep."""
 98    texts: List[str] = []
 99    for part in parts:
100        text = stringify(part)
101        if text is not None:
102            texts.append(text)
103    if not len(texts):
104        return None
105    return sep.join(texts)

Join all the non-null arguments using sep.

def get_entity_id(obj: Any) -> Optional[str]:
108def get_entity_id(obj: Any) -> Optional[str]:
109    """Given an entity-ish object, try to get the ID."""
110    if is_mapping(obj):
111        obj = obj.get("id")
112    else:
113        try:
114            obj = obj.id
115        except AttributeError:
116            pass
117    return stringify(obj)

Given an entity-ish object, try to get the ID.

def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
120def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
121    digest = sha1()
122    if key_prefix:
123        digest.update(key_bytes(key_prefix))
124    base = digest.digest()
125    for part in parts:
126        digest.update(key_bytes(part))
127    if digest.digest() == base:
128        return None
129    return digest.hexdigest()
def merge_context(left: Dict[~K, ~V], right: Dict[~K, ~V]) -> Dict[~K, List[~V]]:
132def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]:
133    """When merging two entities, make lists of all the duplicate context
134    keys."""
135    combined = {}
136    keys = [*left.keys(), *right.keys()]
137    for key in set(keys):
138        if key in ("caption",):
139            continue
140        lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None]
141        rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None]
142        combined[key] = unique_list([*lval, *rval])
143    return combined

When merging two entities, make lists of all the duplicate context keys.

def dampen(short: int, long: int, text: str) -> float:
146def dampen(short: int, long: int, text: str) -> float:
147    length = len(text) - short
148    baseline = max(1.0, (long - short))
149    return max(0, min(1.0, (length / baseline)))
def shortest(*texts: str) -> str:
152def shortest(*texts: str) -> str:
153    return min(texts, key=len)
def longest(*texts: str) -> str:
156def longest(*texts: str) -> str:
157    return max(texts, key=len)