
  1import os
  2import logging
  3from hashlib import sha1
  4from babel import Locale
  5from gettext import translation
  7from threading import local
  8from typing import cast, Dict, Any, List, Optional, TypeVar, Union, Sequence
  9from normality import stringify
 10from import compose_nfc
 11from import remove_unsafe_chars
 12from normality.encoding import DEFAULT_ENCODING
 13from banal import is_mapping, unique_list, ensure_list
 15MEGABYTE = 1024 * 1024
 17ENTITY_ID_LEN = 200
 19T = TypeVar("T")
 20K = TypeVar("K")
 21V = TypeVar("V")
 23PathLike = Union[str, os.PathLike[str]]
 24i18n_path = os.path.join(os.path.dirname(__file__), "translations")
 25state = local()
 26log = logging.getLogger(__name__)
 29def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
 30    if not hasattr(state, "translation"):
 31        set_model_locale(Locale.parse(DEFAULT_LOCALE))
 32    return cast(str, state.translation.gettext(*args, **kwargs))
 35def defer(text: str) -> str:
 36    return text
 39def set_model_locale(locale: Locale) -> None:
 40    state.locale = locale
 41    state.translation = translation(
 42        "followthemoney", i18n_path, [str(locale)], fallback=True
 43    )
 46def get_locale() -> Locale:
 47    if not hasattr(state, "locale"):
 48        return Locale.parse(DEFAULT_LOCALE)
 49    return Locale.parse(state.locale)
 52def get_env_list(name: str, default: List[str] = []) -> List[str]:
 53    value = stringify(os.environ.get(name))
 54    if value is not None:
 55        values = value.split(":")
 56        if len(values):
 57            return values
 58    return default
 61def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]:
 62    text = stringify(text, encoding_default=encoding)
 63    if text is None:
 64        return None
 65    try:
 66        text = compose_nfc(text)
 67    except (SystemError, Exception) as ex:
 68        log.warning("Cannot NFC text: %s", ex)
 69        return None
 70    text = remove_unsafe_chars(text)
 71    if text is None:
 72        return None
 73    byte_text = text.encode(DEFAULT_ENCODING, "replace")
 74    return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace"))
 77def value_list(value: Union[T, Sequence[T]]) -> List[T]:
 78    if not isinstance(value, (str, bytes)):
 79        try:
 80            return [v for v in cast(Sequence[T], value)]
 81        except TypeError:
 82            pass
 83    return [cast(T, value)]
 86def key_bytes(key: Any) -> bytes:
 87    """Convert the given data to a value appropriate for hashing."""
 88    if isinstance(key, bytes):
 89        return key
 90    text = stringify(key)
 91    if text is None:
 92        return b""
 93    return text.encode("utf-8")
 96def join_text(*parts: Any, sep: str = " ") -> Optional[str]:
 97    """Join all the non-null arguments using sep."""
 98    texts: List[str] = []
 99    for part in parts:
100        text = stringify(part)
101        if text is not None:
102            texts.append(text)
103    if not len(texts):
104        return None
105    return sep.join(texts)
108def get_entity_id(obj: Any) -> Optional[str]:
109    """Given an entity-ish object, try to get the ID."""
110    if is_mapping(obj):
111        obj = obj.get("id")
112    else:
113        try:
114            obj =
115        except AttributeError:
116            pass
117    return stringify(obj)
120def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
121    digest = sha1()
122    if key_prefix:
123        digest.update(key_bytes(key_prefix))
124    base = digest.digest()
125    for part in parts:
126        digest.update(key_bytes(part))
127    if digest.digest() == base:
128        return None
129    return digest.hexdigest()
132def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]:
133    """When merging two entities, make lists of all the duplicate context
134    keys."""
135    combined = {}
136    keys = [*left.keys(), *right.keys()]
137    for key in set(keys):
138        if key in ("caption",):
139            continue
140        lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None]
141        rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None]
142        combined[key] = unique_list([*lval, *rval])
143    return combined
146def dampen(short: int, long: int, text: str) -> float:
147    length = len(text) - short
148    baseline = max(1.0, (long - short))
149    return max(0, min(1.0, (length / baseline)))
152def shortest(*texts: str) -> str:
153    return min(texts, key=len)
156def longest(*texts: str) -> str:
157    return max(texts, key=len)
MEGABYTE = 1048576
PathLike = typing.Union[str, os.PathLike[str]]
i18n_path = '/opt/hostedtoolcache/Python/3.10.16/x64/lib/python3.10/site-packages/followthemoney/translations'
state = <_thread._local object>
log = <Logger followthemoney.util (WARNING)>
def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
30def gettext(*args: Optional[str], **kwargs: Dict[str, str]) -> str:
31    if not hasattr(state, "translation"):
32        set_model_locale(Locale.parse(DEFAULT_LOCALE))
33    return cast(str, state.translation.gettext(*args, **kwargs))
def defer(text: str) -> str:
36def defer(text: str) -> str:
37    return text
def set_model_locale(locale: babel.core.Locale) -> None:
40def set_model_locale(locale: Locale) -> None:
41    state.locale = locale
42    state.translation = translation(
43        "followthemoney", i18n_path, [str(locale)], fallback=True
44    )
def get_locale() -> babel.core.Locale:
47def get_locale() -> Locale:
48    if not hasattr(state, "locale"):
49        return Locale.parse(DEFAULT_LOCALE)
50    return Locale.parse(state.locale)
def get_env_list(name: str, default: List[str] = []) -> List[str]:
53def get_env_list(name: str, default: List[str] = []) -> List[str]:
54    value = stringify(os.environ.get(name))
55    if value is not None:
56        values = value.split(":")
57        if len(values):
58            return values
59    return default
def sanitize_text(text: Any, encoding: str = 'utf-8') -> Optional[str]:
62def sanitize_text(text: Any, encoding: str = DEFAULT_ENCODING) -> Optional[str]:
63    text = stringify(text, encoding_default=encoding)
64    if text is None:
65        return None
66    try:
67        text = compose_nfc(text)
68    except (SystemError, Exception) as ex:
69        log.warning("Cannot NFC text: %s", ex)
70        return None
71    text = remove_unsafe_chars(text)
72    if text is None:
73        return None
74    byte_text = text.encode(DEFAULT_ENCODING, "replace")
75    return cast(str, byte_text.decode(DEFAULT_ENCODING, "replace"))
def value_list(value: Union[~T, Sequence[~T]]) -> List[~T]:
78def value_list(value: Union[T, Sequence[T]]) -> List[T]:
79    if not isinstance(value, (str, bytes)):
80        try:
81            return [v for v in cast(Sequence[T], value)]
82        except TypeError:
83            pass
84    return [cast(T, value)]
def key_bytes(key: Any) -> bytes:
87def key_bytes(key: Any) -> bytes:
88    """Convert the given data to a value appropriate for hashing."""
89    if isinstance(key, bytes):
90        return key
91    text = stringify(key)
92    if text is None:
93        return b""
94    return text.encode("utf-8")

Convert the given data to a value appropriate for hashing.

def join_text(*parts: Any, sep: str = ' ') -> Optional[str]:
 97def join_text(*parts: Any, sep: str = " ") -> Optional[str]:
 98    """Join all the non-null arguments using sep."""
 99    texts: List[str] = []
100    for part in parts:
101        text = stringify(part)
102        if text is not None:
103            texts.append(text)
104    if not len(texts):
105        return None
106    return sep.join(texts)

Join all the non-null arguments using sep.

def get_entity_id(obj: Any) -> Optional[str]:
109def get_entity_id(obj: Any) -> Optional[str]:
110    """Given an entity-ish object, try to get the ID."""
111    if is_mapping(obj):
112        obj = obj.get("id")
113    else:
114        try:
115            obj =
116        except AttributeError:
117            pass
118    return stringify(obj)

Given an entity-ish object, try to get the ID.

def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
121def make_entity_id(*parts: Any, key_prefix: Optional[str] = None) -> Optional[str]:
122    digest = sha1()
123    if key_prefix:
124        digest.update(key_bytes(key_prefix))
125    base = digest.digest()
126    for part in parts:
127        digest.update(key_bytes(part))
128    if digest.digest() == base:
129        return None
130    return digest.hexdigest()
def merge_context(left: Dict[~K, ~V], right: Dict[~K, ~V]) -> Dict[~K, List[~V]]:
133def merge_context(left: Dict[K, V], right: Dict[K, V]) -> Dict[K, List[V]]:
134    """When merging two entities, make lists of all the duplicate context
135    keys."""
136    combined = {}
137    keys = [*left.keys(), *right.keys()]
138    for key in set(keys):
139        if key in ("caption",):
140            continue
141        lval: List[V] = [i for i in ensure_list(left.get(key)) if i is not None]
142        rval: List[V] = [i for i in ensure_list(right.get(key)) if i is not None]
143        combined[key] = unique_list([*lval, *rval])
144    return combined

When merging two entities, make lists of all the duplicate context keys.

def dampen(short: int, long: int, text: str) -> float:
147def dampen(short: int, long: int, text: str) -> float:
148    length = len(text) - short
149    baseline = max(1.0, (long - short))
150    return max(0, min(1.0, (length / baseline)))
def shortest(*texts: str) -> str:
153def shortest(*texts: str) -> str:
154    return min(texts, key=len)
def longest(*texts: str) -> str:
157def longest(*texts: str) -> str:
158    return max(texts, key=len)