followthemoney.util

  1import os
  2import logging
  3from hashlib import sha1
  4from babel import Locale
  5from gettext import translation
  6
  7from threading import local
  8from typing import cast, Dict, Any, List, Optional, TypeVar, Union, Sequence
  9from normality import stringify
 10from normality.cleaning import compose_nfc
 11from normality.cleaning import remove_unsafe_chars
 12from normality.encoding import DEFAULT_ENCODING
 13from banal import is_mapping, unique_list, ensure_list
 14
 15MEGABYTE = 1024 * 1024
 16DEFAULT_LOCALE = "en"
 17
 18T = TypeVar("T")
 19K = TypeVar("K")
 20V = TypeVar("V")
 21try:
 22    # Work-around for Python 3.8 backward compat:
 23    PathLike = Union[str, os.PathLike[str]]
 24except TypeError:
 25    PathLike = Union[str, os.PathLike]  # type: ignore
 26
 27i18n_path = os.path.join(os.path.dirname(__file__), "translations")
 28state = local()
 29log = logging.getLogger(__name__)
 30
 31
 32def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
 33    if not hasattr(state, "translation"):
 34        set_model_locale(Locale.parse(DEFAULT_LOCALE))
 35    return cast(str, state.translation.gettext(*args, **kwargs))
 36
 37
 38def defer(text: str) -> str:
 39    return text
 40
 41
 42def set_model_locale(locale: Locale) -> None:
 43    state.locale = locale
 44    state.translation = translation(
 45        "followthemoney", i18n_path, [str(locale)], fallback=True
 46    )
 47
 48
 49def get_locale() -> Locale:
 50    if not hasattr(state, "locale"):
 51        return Locale.parse(DEFAULT_LOCALE)
 52    return Locale.parse(state.locale)
 53
 54
 55def get_env_list(name: str, default: List[str] = []) -> List[str]:
 56    value = stringify(os.environ.get(name))
 57    if value is not None:
 58        values = value.split(":")
 59        if len(values):
 60            return values
 61    return default
 62
 63
 64def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]:
 65    text = stringify(text, encoding_default=encoding)
 66    if text is None:
 67        return None
 68    try:
 69        text = compose_nfc(text)
 70    except (SystemError, Exception) as ex:
 71        log.warning("Cannot NFC text: %s", ex)
 72        return None
 73    text = remove_unsafe_chars(text)
 74    if text is None:
 75        return None
 76    byte_text = text.encode(DEFAULT_ENCODING, "replace")
 77    return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace"))
 78
 79
 80def value_list(value: Union[T, Sequence[T]]) -> List[T]:
 81    if not isinstance(value, (str, bytes)):
 82        try:
 83            return [v for v in cast(Sequence[T], value)]
 84        except TypeError:
 85            pass
 86    return [cast(T, value)]
 87
 88
 89def key_bytes(key: Any) -> bytes:
 90    """Convert the given data to a value appropriate for hashing."""
 91    if isinstance(key, bytes):
 92        return key
 93    text = stringify(key)
 94    if text is None:
 95        return b""
 96    return text.encode("utf-8")
 97
 98
 99def join_text(*parts: Any, sep: str = " ") -> Optional[str]:
100    """Join all the non-null arguments using sep."""
101    texts: List[str] = []
102    for part in parts:
103        text = stringify(part)
104        if text is not None:
105            texts.append(text)
106    if not len(texts):
107        return None
108    return sep.join(texts)
109
110
111def get_entity_id(obj: Any) -> Optional[str]:
112    """Given an entity-ish object, try to get the ID."""
113    if is_mapping(obj):
114        obj = obj.get("id")
115    else:
116        try:
117            obj = obj.id
118        except AttributeError:
119            pass
120    return stringify(obj)
121
122
123def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
124    digest = sha1()
125    if key_prefix:
126        digest.update(key_bytes(key_prefix))
127    base = digest.digest()
128    for part in parts:
129        digest.update(key_bytes(part))
130    if digest.digest() == base:
131        return None
132    return digest.hexdigest()
133
134
135def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]:
136    """When merging two entities, make lists of all the duplicate context
137    keys."""
138    combined = {}
139    keys = [*left.keys(), *right.keys()]
140    for key in set(keys):
141        if key in ("caption",):
142            continue
143        lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None]
144        rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None]
145        combined[key] = unique_list([*lval, *rval])
146    return combined
147
148
149def dampen(short: int, long: int, text: str) -> float:
150    length = len(text) - short
151    baseline = max(1.0, (long - short))
152    return max(0, min(1.0, (length / baseline)))
153
154
155def shortest(*texts: str) -> str:
156    return min(texts, key=len)
157
158
159def longest(*texts: str) -> str:
160    return max(texts, key=len)
MEGABYTE = 1048576
DEFAULT_LOCALE = 'en'
i18n_path = '/opt/hostedtoolcache/Python/3.10.14/x64/lib/python3.10/site-packages/followthemoney/translations'
state = <_thread._local object>
log = <Logger followthemoney.util (WARNING)>
def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
33def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
34    if not hasattr(state, "translation"):
35        set_model_locale(Locale.parse(DEFAULT_LOCALE))
36    return cast(str, state.translation.gettext(*args, **kwargs))
def defer(text: str) -> str:
39def defer(text: str) -> str:
40    return text
def set_model_locale(locale: babel.core.Locale) -> None:
43def set_model_locale(locale: Locale) -> None:
44    state.locale = locale
45    state.translation = translation(
46        "followthemoney", i18n_path, [str(locale)], fallback=True
47    )
def get_locale() -> babel.core.Locale:
50def get_locale() -> Locale:
51    if not hasattr(state, "locale"):
52        return Locale.parse(DEFAULT_LOCALE)
53    return Locale.parse(state.locale)
def get_env_list(name: str, default: List[str] = []) -> List[str]:
56def get_env_list(name: str, default: List[str] = []) -> List[str]:
57    value = stringify(os.environ.get(name))
58    if value is not None:
59        values = value.split(":")
60        if len(values):
61            return values
62    return default
def sanitize_text(text: Any, encoding: str = 'utf-8') -> Optional[str]:
65def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]:
66    text = stringify(text, encoding_default=encoding)
67    if text is None:
68        return None
69    try:
70        text = compose_nfc(text)
71    except (SystemError, Exception) as ex:
72        log.warning("Cannot NFC text: %s", ex)
73        return None
74    text = remove_unsafe_chars(text)
75    if text is None:
76        return None
77    byte_text = text.encode(DEFAULT_ENCODING, "replace")
78    return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace"))
def value_list(value: Union[~T, Sequence[~T]]) -> List[~T]:
81def value_list(value: Union[T, Sequence[T]]) -> List[T]:
82    if not isinstance(value, (str, bytes)):
83        try:
84            return [v for v in cast(Sequence[T], value)]
85        except TypeError:
86            pass
87    return [cast(T, value)]
def key_bytes(key: Any) -> bytes:
90def key_bytes(key: Any) -> bytes:
91    """Convert the given data to a value appropriate for hashing."""
92    if isinstance(key, bytes):
93        return key
94    text = stringify(key)
95    if text is None:
96        return b""
97    return text.encode("utf-8")

Convert the given data to a value appropriate for hashing.

def join_text(*parts: Any, sep: str = ' ') -> Optional[str]:
100def join_text(*parts: Any, sep: str = " ") -> Optional[str]:
101    """Join all the non-null arguments using sep."""
102    texts: List[str] = []
103    for part in parts:
104        text = stringify(part)
105        if text is not None:
106            texts.append(text)
107    if not len(texts):
108        return None
109    return sep.join(texts)

Join all the non-null arguments using sep.

def get_entity_id(obj: Any) -> Optional[str]:
112def get_entity_id(obj: Any) -> Optional[str]:
113    """Given an entity-ish object, try to get the ID."""
114    if is_mapping(obj):
115        obj = obj.get("id")
116    else:
117        try:
118            obj = obj.id
119        except AttributeError:
120            pass
121    return stringify(obj)

Given an entity-ish object, try to get the ID.

def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
124def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
125    digest = sha1()
126    if key_prefix:
127        digest.update(key_bytes(key_prefix))
128    base = digest.digest()
129    for part in parts:
130        digest.update(key_bytes(part))
131    if digest.digest() == base:
132        return None
133    return digest.hexdigest()
def merge_context(left: Dict[~K, ~V], right: Dict[~K, ~V]) -> Dict[~K, List[~V]]:
136def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]:
137    """When merging two entities, make lists of all the duplicate context
138    keys."""
139    combined = {}
140    keys = [*left.keys(), *right.keys()]
141    for key in set(keys):
142        if key in ("caption",):
143            continue
144        lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None]
145        rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None]
146        combined[key] = unique_list([*lval, *rval])
147    return combined

When merging two entities, make lists of all the duplicate context keys.

def dampen(short: int, long: int, text: str) -> float:
150def dampen(short: int, long: int, text: str) -> float:
151    length = len(text) - short
152    baseline = max(1.0, (long - short))
153    return max(0, min(1.0, (length / baseline)))
def shortest(*texts: str) -> str:
156def shortest(*texts: str) -> str:
157    return min(texts, key=len)
def longest(*texts: str) -> str:
160def longest(*texts: str) -> str:
161    return max(texts, key=len)