Spark OAI Harvester

A library for harvesting records and sets from OAI-PMH repositories to Spark SQL.


Repository

The Spark OAI Harvester is currently housed within a larger application called "Ingestion3".  Ingestion3 is an ETL system for cultural heritage metadata, and is under active development.

Github repo for Ingestion3https://github.com/dpla/ingestion3

Spark OAI Harvester package: https://github.com/dpla/ingestion3/tree/develop/src/main/scala/dpla/ingestion3/harvesters/oai

Requirements

  • Spark 2.0.1
  • SBT (necessary to build from source)

Building from source

You can get this library from the DPLA Ingestion3 github repo.  To build a JAR file, run sbt package from the project root.  To execute the test suite, run sbt test.

Linking

We do not currently offer SBT or Maven coordinates for this library, but may do so in the future.

Using with spark-shell or spark-submit

Scala

$ ./bin/spark-shell --packages "org.apache.httpcomponents:fluent-hc:4.5.2" --jars [PATH TO DPLA INGESTION3 JAR]

Python

$ ./bin/pyspark --packages "org.apache.httpcomponents:fluent-hc:4.5.2" --jars [PATH TO DPLA INGESTION3 JAR]


Packages and jars can also be used with spark-submit.

Features

  • Partitioning.  The harvester strategically partitions data during the harvest to make the process more efficient.  For example, when harvesting records from multiple sets, it runs HTTP request in parallel.  The results of the OAI harvest are loaded into a Spark DataFrame, a distributed collection of data.
  • Harvest sets and records.  You can harvest OAI sets or records.  If you harvest records, this harvester will also return data about both the records and any sets to which they belong.
  • Specify which sets to harvest records from.  You can tell this harvester which OAI sets to include or exclude in a records request.
  • Flow control.  Often, OAI harvests are comprised of a series of HTTP requests and responses.  In this case, each individual response contains a partial list of the requested data along with a resumption token, which is used to compose subsequent request.  This harvester handles the flow control.  You need only compose the initial request; the harvester will compose all necessary subsequent requests and return a complete set of data.  For a standard records or sets harvest, the HTTP requests must be executed sequentially.  However, in the case of a records request from known sets, the harvester can initiate an independent series of sequential requests for each set.  These request series can run in parallel, speeding up the harvest process.
  • Error handling.  This harvester performs as much of the harvest as possible.  It returns any errors encountered during the process along with successfully harvested data.  This allows you to retain and examine records, sets, and errors from a partially successful harvest.

Options

Use the following options to compose your harvest.

OptionObligationUsage
endpointRequired.The base URL for the OAI repository.
verbRequired."ListSets" to harvest only sets; "ListRecords" to harvest records and any sets to which the records may belong. Case-sensitive.
metadataPrefixRequired when verb="ListRecords"; prohibited when verb="ListSets".The the metadata format in OAI-PMH requests issued to the repository.

harvestAllSets

Optional when verb="ListRecords"; cannot be used in conjunction with either setlist or blacklist."True" to harvest records from all sets. Default is "false". Case-insensitive. Results will include all sets and all their records. This will only return records that belong to at least one set; records that do not belong to any set will not be included in the results.
setlistOptional when verb="ListRecords"; cannot be used in conjunction with either harvestAllSets or blacklist.Comma-separated lists of sets to include in the harvest. Use the OAI setSpec to identify a set. Results will include all sets in the setlist and all their records.
blacklistOptional when verb="ListRecords"; cannot be used in conjunction with either harvestAllSets or setlist.Comma-separated lists of sets to exclude from the harvest. Use the OAI setSpec to identify a set. Results will include all sets not in the blacklist and all their records. Records that do not belong to any set will not be included in the results.


Record harvests that that include the option harvestAllSets, setlist, or blacklist generally run faster than those that do not.  This is because a known list of sets allows the harvester to distribute the process of making HTTP requests across multiple nodes.  Therefore, it is advantageous to use these options wherever possible.

Schema

This harvester returns a DataFrame which includes three main response types: set, record, and error.  Each row represents one of the three response types.  The basic structure of a row is as follows:


Value in set column

Value in record column

Value in error column
Row representing a set:Structured setnullnull
Row representing a record:nullStructured recordnull
Row representing an error:nullnullStructured error


Structured set

NameDataTypeDefinition
idStringOAI setSpec.
documentStringComplete node describing this set. This can be parsed into XML.
setSourceStructured source dataSource of the id and document.

Structured record

NameDataTypeDefinition
idStringOAI identifier.
documentStringComplete node describing this record. This can be parsed into XML.
setIdsArray[String]OAI setSpec for all sets to which this record belongs.
recordSourceStructured source dataSource of the id, document, and setIds.


If a record is repeated in multiple sources, it will appear in multiple rows, each with a different recordSource.  This can happen if a record belongs to more than one set.

Structured error

NameDataTypeDefinition
messageStringDescription of the error.
errorSourceStructured source dataThe source of the message.


Structured source

NameDataTypeDefinition
queryParamsMap[String, String]The parameters used to construct the OAI request.
urlStringThe URL of the OAI request. Null if an error occurred before the URL was instantiated.
textStringFull, un-parsed response to the OAI request. Null if an error occurred before the text response was received.


Complete schema

root
 |-- set: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- document: string (nullable = true)
 |    |-- setSource: struct (nullable = true)
 |    |    |-- queryParams: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- text: string (nullable = true)
 |-- record: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- document: string (nullable = true)
 |    |-- setIds: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- recordSource: struct (nullable = true)
 |    |    |-- queryParams: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- text: string (nullable = true)
 |-- error: struct (nullable = true)
 |    |-- message: string (nullable = true)
 |    |-- errorSource: struct (nullable = true)
 |    |    |-- queryParams: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- text: string (nullable = true)


Examples

Scala

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder().master("local").getOrCreate()


val df = spark.read
    .format("dpla.ingestion3.harvesters.oai")
    .option("endpoint", "http://my-oai-repository.edu")
    .option("verb", "ListRecords")
    .option("metadataPrefix", "mods")
    .option("setlist", "set1, set2, set3")
    .load()

val records = df.select("record.*").where("record is not null")
val sets = df.select("set.*").where("record is not null")
val errors = df.select("error.*").where("error is not null")

Python

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").getOrCreate()


df = spark.read
    .format("dpla.ingestion3.harvesters.oai")
    .option("endpoint", "http://my-oai-repository.edu")
    .option("verb", "ListRecords")
    .option("metadataPrefix", "mods")
    .option("setlist", "set1, set2, set3")
    .load()


records = df.select("record.*").where("record is not null")
sets = df.select("set.*").where("record is not null")
errors = df.select("error.*").where("error is not null")

Contributors

The Spark OAI Harvester is developed and maintained by the Digital Public Library of America.  If you are interested in contributing, please visit our github repo or email us at tech@dp.la.

Feedback

We welcome your feedback on this project.  Email us at tech@dp.la.