Skip to content

Entity Fragmentation

Generating graph data is a difficult process. The size of the datasets we want to process using FtM makes it difficult to incrementally build nodes and edges in memory like you would in many data science workflows. Instead, we use a stream-based solution for constructing graph entities. That is why the toolkit supports entity fragments and aggregation.

Example

To illustrate the problem, imagine a table with millions of rows that describes a set of people and the companies they control. Every company can have multiple directors, while each director might control multiple companies:

CompanyID CompanyName DirectorName DirectorIdNo DirectorDoB
A123 Brilliant Amazing Ltd. John Smith PP827817 1979-02-16
A71882 Goldfish Ltd. John Smith PP827817 NULL
A123 Brilliant Amazing Ltd. Jane Doe PP1988299 1983-06-24

When turning this data into FtM, we’d create three entities for each row: a Company, a Person and a Directorship that connects the two.

If we do this row by row, we’d eventually generate three Company entities to represent two actual companies, and three Person entities that describe two distinct people. The generated entities are fragmented, ie. multiple references to the same entity ID are generated at different times in the program.

Another example: a document processing pipeline may first receive a Document and store its checksum and file name, then submit it for a processing to a queue. There, a content extractor may output a body text, and then a separate process performing NLP analysis contributes annotations like the names of individuals and companies mentioned in the document. Again, each step in processing has emitted fragments of the eventual entity.

Of course, we could treat each change to an entity as an update to a normal database backend, like an ElasticSearch index: fetch the existing entity, merge the changes, update the index. This, again, runs into scalability issues and race conditions in parallelized processing environments.

A better solution is to store the generated entity fragments as they are produced and to aggregate (combine) them into full entities when needed - often during a search indexing or data export process.

Entity aggregation

Entity aggregation requires sorting all generated fragments so they can be iterated in sequence of the entity ID that they are a part of. There are several techniques for doing so:

  • In-memory: for small datasets, it can be easiest to load all fragments into memory in order to merge them. The command-line tool ftm aggregate will do this.
  • In a database: the entity fragments are written to a SQL database, and then iterated as the result of a sorting query. The followthemoney-store library allows you to do this in a SQLite or PostgreSQL database.
  • Using flat files: FtM's JSON exporter will emit data that can be sorted using the generic sort command-line tool. The ftm sorted-aggregate command will then aggregate the resulting stream of sorted entity data.
  • Using statements: the statement data model provides an alternative to the idea of fragmentation. Instead of splitting entities into partial entity fragments, they are divided into statements, which are even more granular. Statements can also be aggregated.

Using followthemoney-store

# Generate entities from a CSV file and a mapping:
cat company-registry.csv | ftm map-csv mapping_file.yml > fragments.ijson

# Write the fragments to a table `company_registry`:
cat fragments.ijson | ftm store write -d company_registry

# List the tables in the store:
ftm store list

# Output merged entities:
ftm store iterate -d company_registry

The same functionality can also be used as a Python library:

import os
from ftmstore import get_dataset
# Assume a function that will emit fragments:
from myapp.data import generate_fragments

# If no `database_uri` is given, ftmstore will read connection from
# $FTM_STORE_URI, or create a file called `followthemoney.sqlite` in
# the current directory.
database_uri = os.environ.get('DATABASE_URI')
dataset = get_dataset('myapp_dataset', database_uri=database_uri)
bulk = dataset.bulk()
for idx, proxy in enumerate(generate_fragments()):
    bulk.put(proxy, fragment=idx)
bulk.flush()

# This will print the number of combined entities (ie. DISTINCT id):
print(len(dataset))

# This will return combined entities:
for entity in dataset.iterate():
    print(entity.caption)

# You could also iterate the underlying fragments:
for proxy in dataset.partials():
    print(proxy)

# Note: `dataset.partials()` returns `EntityProxy` objects. The method
# `dataset.fragments()` would return raw Python dictionaries instead.

# All three methods also support the `entity_id` filter, which can also be
# shortened to `get`:
entity = dataset.get(entity_id)

Fragment origins

followthemoney-store is used across the tools built on FtM to capture and aggregate entity fragments. In Aleph, fragments for one entity might be written by different processes: the API, document ingestors, document NER analyzers or a translation backend. It is convenient to be able to flush all entity fragments from a particular origin, while leaving the other fragments intact. For example, this can be used to delete all data uploaded via the bulk API, while leaving document-based data in the same dataset intact.

To support this, ftm-store has the notion of an origin for each fragment. If specified, this can be used to later delete or overwrite subsets of fragments.

cat us_ofac.ijson | ftm store write -d sanctions -o us_ofac
cat eu_eeas.ijson | ftm store write -d sanctions -o eu_eeas

# Will now have entities from both source files:
ftm store iterate -d sanctions | wc -l

# Delete all fragments from the second file:
ftm store delete -d sanctions -o eu_eeas

# Only one source file is left:
ftm store iterate -d sanctions | wc -l