followthemoney.util

  1import os
  2import logging
  3from hashlib import sha1
  4from babel import Locale
  5from gettext import translation
  6
  7from threading import local
  8from typing import cast, Dict, Any, List, Optional, TypeVar, Union, Sequence
  9from normality import stringify
 10from normality.cleaning import compose_nfc
 11from normality.cleaning import remove_unsafe_chars
 12from normality.encoding import DEFAULT_ENCODING
 13from banal import is_mapping, unique_list, ensure_list
 14
 15MEGABYTE = 1024 * 1024
 16DEFAULT_LOCALE = "en"
 17ENTITY_ID_LEN = 200
 18
 19T = TypeVar("T")
 20K = TypeVar("K")
 21V = TypeVar("V")
 22
 23PathLike = Union[str, os.PathLike[str]]
 24i18n_path = os.path.join(os.path.dirname(__file__), "translations")
 25state = local()
 26log = logging.getLogger(__name__)
 27
 28
 29def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
 30    if not hasattr(state, "translation"):
 31        set_model_locale(Locale.parse(DEFAULT_LOCALE))
 32    return cast(str, state.translation.gettext(*args, **kwargs))
 33
 34
 35def defer(text: str) -> str:
 36    return text
 37
 38
 39def set_model_locale(locale: Locale) -> None:
 40    state.locale = locale
 41    state.translation = translation(
 42        "followthemoney", i18n_path, [str(locale)], fallback=True
 43    )
 44
 45
 46def get_locale() -> Locale:
 47    if not hasattr(state, "locale"):
 48        return Locale.parse(DEFAULT_LOCALE)
 49    return Locale.parse(state.locale)
 50
 51
 52def get_env_list(name: str, default: List[str] = []) -> List[str]:
 53    value = stringify(os.environ.get(name))
 54    if value is not None:
 55        values = value.split(":")
 56        if len(values):
 57            return values
 58    return default
 59
 60
 61def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]:
 62    text = stringify(text, encoding_default=encoding)
 63    if text is None:
 64        return None
 65    try:
 66        text = compose_nfc(text)
 67    except (SystemError, Exception) as ex:
 68        log.warning("Cannot NFC text: %s", ex)
 69        return None
 70    text = remove_unsafe_chars(text)
 71    if text is None:
 72        return None
 73    byte_text = text.encode(DEFAULT_ENCODING, "replace")
 74    return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace"))
 75
 76
 77def value_list(value: Union[T, Sequence[T]]) -> List[T]:
 78    if not isinstance(value, (str, bytes)):
 79        try:
 80            return [v for v in cast(Sequence[T], value)]
 81        except TypeError:
 82            pass
 83    return [cast(T, value)]
 84
 85
 86def key_bytes(key: Any) -> bytes:
 87    """Convert the given data to a value appropriate for hashing."""
 88    if isinstance(key, bytes):
 89        return key
 90    text = stringify(key)
 91    if text is None:
 92        return b""
 93    return text.encode("utf-8")
 94
 95
 96def join_text(*parts: Any, sep: str = " ") -> Optional[str]:
 97    """Join all the non-null arguments using sep."""
 98    texts: List[str] = []
 99    for part in parts:
100        text = stringify(part)
101        if text is not None:
102            texts.append(text)
103    if not len(texts):
104        return None
105    return sep.join(texts)
106
107
108def get_entity_id(obj: Any) -> Optional[str]:
109    """Given an entity-ish object, try to get the ID."""
110    if is_mapping(obj):
111        obj = obj.get("id")
112    else:
113        try:
114            obj = obj.id
115        except AttributeError:
116            pass
117    return stringify(obj)
118
119
120def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
121    digest = sha1()
122    if key_prefix:
123        digest.update(key_bytes(key_prefix))
124    base = digest.digest()
125    for part in parts:
126        digest.update(key_bytes(part))
127    if digest.digest() == base:
128        return None
129    return digest.hexdigest()
130
131
132def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]:
133    """When merging two entities, make lists of all the duplicate context
134    keys."""
135    combined = {}
136    keys = [*left.keys(), *right.keys()]
137    for key in set(keys):
138        if key in ("caption",):
139            continue
140        lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None]
141        rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None]
142        combined[key] = unique_list([*lval, *rval])
143    return combined
144
145
146def dampen(short: int, long: int, text: str) -> float:
147    length = len(text) - short
148    baseline = max(1.0, (long - short))
149    return max(0, min(1.0, (length / baseline)))
150
151
152def shortest(*texts: str) -> str:
153    return min(texts, key=len)
154
155
156def longest(*texts: str) -> str:
157    return max(texts, key=len)
MEGABYTE = 1048576
DEFAULT_LOCALE = 'en'
ENTITY_ID_LEN = 200
PathLike = typing.Union[str, os.PathLike[str]]
i18n_path = '/opt/hostedtoolcache/Python/3.10.15/x64/lib/python3.10/site-packages/followthemoney/translations'
state = <_thread._local object>
log = <Logger followthemoney.util (WARNING)>
def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
30def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
31    if not hasattr(state, "translation"):
32        set_model_locale(Locale.parse(DEFAULT_LOCALE))
33    return cast(str, state.translation.gettext(*args, **kwargs))
def defer(text: str) -> str:
36def defer(text: str) -> str:
37    return text
def set_model_locale(locale: babel.core.Locale) -> None:
40def set_model_locale(locale: Locale) -> None:
41    state.locale = locale
42    state.translation = translation(
43        "followthemoney", i18n_path, [str(locale)], fallback=True
44    )
def get_locale() -> babel.core.Locale:
47def get_locale() -> Locale:
48    if not hasattr(state, "locale"):
49        return Locale.parse(DEFAULT_LOCALE)
50    return Locale.parse(state.locale)
def get_env_list(name: str, default: List[str] = []) -> List[str]:
53def get_env_list(name: str, default: List[str] = []) -> List[str]:
54    value = stringify(os.environ.get(name))
55    if value is not None:
56        values = value.split(":")
57        if len(values):
58            return values
59    return default
def sanitize_text(text: Any, encoding: str = 'utf-8') -> Optional[str]:
62def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]:
63    text = stringify(text, encoding_default=encoding)
64    if text is None:
65        return None
66    try:
67        text = compose_nfc(text)
68    except (SystemError, Exception) as ex:
69        log.warning("Cannot NFC text: %s", ex)
70        return None
71    text = remove_unsafe_chars(text)
72    if text is None:
73        return None
74    byte_text = text.encode(DEFAULT_ENCODING, "replace")
75    return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace"))
def value_list(value: Union[~T, Sequence[~T]]) -> List[~T]:
78def value_list(value: Union[T, Sequence[T]]) -> List[T]:
79    if not isinstance(value, (str, bytes)):
80        try:
81            return [v for v in cast(Sequence[T], value)]
82        except TypeError:
83            pass
84    return [cast(T, value)]
def key_bytes(key: Any) -> bytes:
87def key_bytes(key: Any) -> bytes:
88    """Convert the given data to a value appropriate for hashing."""
89    if isinstance(key, bytes):
90        return key
91    text = stringify(key)
92    if text is None:
93        return b""
94    return text.encode("utf-8")

Convert the given data to a value appropriate for hashing.

def join_text(*parts: Any, sep: str = ' ') -> Optional[str]:
 97def join_text(*parts: Any, sep: str = " ") -> Optional[str]:
 98    """Join all the non-null arguments using sep."""
 99    texts: List[str] = []
100    for part in parts:
101        text = stringify(part)
102        if text is not None:
103            texts.append(text)
104    if not len(texts):
105        return None
106    return sep.join(texts)

Join all the non-null arguments using sep.

def get_entity_id(obj: Any) -> Optional[str]:
109def get_entity_id(obj: Any) -> Optional[str]:
110    """Given an entity-ish object, try to get the ID."""
111    if is_mapping(obj):
112        obj = obj.get("id")
113    else:
114        try:
115            obj = obj.id
116        except AttributeError:
117            pass
118    return stringify(obj)

Given an entity-ish object, try to get the ID.

def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
121def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
122    digest = sha1()
123    if key_prefix:
124        digest.update(key_bytes(key_prefix))
125    base = digest.digest()
126    for part in parts:
127        digest.update(key_bytes(part))
128    if digest.digest() == base:
129        return None
130    return digest.hexdigest()
def merge_context(left: Dict[~K, ~V], right: Dict[~K, ~V]) -> Dict[~K, List[~V]]:
133def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]:
134    """When merging two entities, make lists of all the duplicate context
135    keys."""
136    combined = {}
137    keys = [*left.keys(), *right.keys()]
138    for key in set(keys):
139        if key in ("caption",):
140            continue
141        lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None]
142        rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None]
143        combined[key] = unique_list([*lval, *rval])
144    return combined

When merging two entities, make lists of all the duplicate context keys.

def dampen(short: int, long: int, text: str) -> float:
147def dampen(short: int, long: int, text: str) -> float:
148    length = len(text) - short
149    baseline = max(1.0, (long - short))
150    return max(0, min(1.0, (length / baseline)))
def shortest(*texts: str) -> str:
153def shortest(*texts: str) -> str:
154    return min(texts, key=len)
def longest(*texts: str) -> str:
157def longest(*texts: str) -> str:
158    return max(texts, key=len)