Queue-based Entity URI Iterators

See Krikri issue https://issues.dp.la/issues/8019.

In order to provide an ingestion worker process with the ability to recover from a failure, and to allow for the possibility someday of multiple worker processes or threads working on the same record set, we are going to try to update the record-iterator methods in Krikri to use queues for the lists of entity URIs that need processing. Entity URIs are the identifiers that we use for records.

We currently query our database for the list of record URIs that we need for each Activity. We query for the list of records that were touched by the "generator activity" that the current Activity is picking up from. If the current Activity fails, we have to query the database all over again and process that list from the first element. This does not allow us to resume our progress at an offset partway through the list of records.

We aim to use a number of entity-URI queues to solve this problem. When a harvest Activity is run, it should be able to push the URIs of records it has seen onto a "processed" queue. A mapping Activity typically follows, and it's always been the case that it knows what harvester Activity has created the records it will process. With that knowledge, we can have it work through the "processed" queue that was populated in the last step, without querying the database. For each record processed, the worker process can move the record ID to an "in progress" queue or data structure. In the event that the worker crashes while working on that record, a worker that resumes the job can retry any "in progress" records before picking up again with its usual source queue. Records that are successfully processed can be pushed onto another queue for the current Activity as was done earlier by the Harvester, and the sequence can iterate with the enrichment and indexing. Records that fail to be processed due to recoverable errors (as opposed to worker crashes) can be pushed onto a "failed" queue for further analysis and reporting.

There may be some subtleties to work out with the proposal above, but that's what we are going to try, and this document can be updated if need be.

Following is the scope for this work, which was arrived at in a meeting attended by the Tech Team and the Data Services Coordinator on 2016-09-28.


  • Will use Redis lists.

    • Redis has rich set of commands for doing the manipulations we need and documentation on creating queues of the kind we need

      • E.g. BLPOPRPUSH command

  • We’ll probably use a Behavior pattern (similar to as is done with SoftwareAgents and their entity behaviors) for the Redis-based iterator

  • We will focus on enqueueing entity URIs (equivalent to record IDs) rather than entities.

  • We need to modify software agents to save their processed or failed record URIs to the appropriate queue for the activity.

    • Probably best-handled in each SoftwareAgent’s process_record method, where the SoftwareAgent has a queue-platform-specific Behavior for writing the record URI to the “processed” or “failed” queue.

  • We need to handle the case when a worker fails in the middle of processing a record. The record URI should be popped from the “todo” queue and pushed onto a “processing” queue. When a job is restarted the old “processing” records need to be returned to the “todo” queue.

  • We will have to decide whether to save & retrieve one record URI at a time from Redis, or in chunks. The ticket should have an explanation of which strategy is chosen and why. Consider performance. Saving in chunks may mean less network communication with Redis, but may mean more time serializing and deserializing encoded list elements, because Redis lists are lists of strings. We probably need one-at-a-time for the sequence 1) BLPOPRPUSH item from “todo” to a worker’s own “processing” queue 2) move item from “processing” to “processed” or “failed”

  • Necessary shims for functions that Hudson Molonglo will need available (set these up first before Redis implementation is available):

    • Activity: Restart job

    • Activity: Number of records processed

    • Activity: Number of errors

    • more?

  • The iterator will check to see if there’s an entity-ids queue for the Activity and, if not, will run a provenance query to populate it. We’ll treat this as backup in case the queue is removed; we’ll assume that, under normal conditions, the harvest will create a “processed” queue, etc.

  • We will focus on sequential activity processes in this first iteration, and not on concurrent workers for a single IDs queue. However, we will design for the eventual possibility of multiple workers for an IDs queue. Consider the step above to populate the queue: a mutex can be used to make sure two workers don’t run the provenance query. Again, something that’s easy with Redis.

  • The need may emerge to break out Redis to its own EC2 instance if we run into memory usage or performance problems with it being colocated on the ingestion_app box. We’ll deal with that in a new ticket if we have to. We can start with upsizing the ingestion_app instances, which shouldn’t require a ticket. We can also use Amazon ElastiCache for Redis.

  • Estimated time of completion: last week of October, barring more unforeseen distractions.

  • Tickets (updated / created following discussion of the above):

We’re keeping the “Recover failed harvest” ticket because it’s different; the problems vary depending on the provider’s platform and it's about recovery from endpoint problems on the provider end, not crashes on our end.