Spark OAI Harvester

Spark OAI Harvester

A library for harvesting records and sets from OAI-PMH repositories to Spark SQL.

 

Repository

The Spark OAI Harvester is currently housed within a larger application called "Ingestion3".  Ingestion3 is an ETL system for cultural heritage metadata, and is under active development.

Github repo for Ingestion3https://github.com/dpla/ingestion3

Spark OAI Harvester package: https://github.com/dpla/ingestion3/tree/develop/src/main/scala/dpla/ingestion3/harvesters/oai

Requirements

  • Spark 2.0.1

  • SBT (necessary to build from source)

Building from source

You can get this library from the DPLA Ingestion3 github repo.  To build a JAR file, run sbt package from the project root.  To execute the test suite, run sbt test.

Linking

We do not currently offer SBT or Maven coordinates for this library, but may do so in the future.

Using with spark-shell or spark-submit

Scala

$ ./bin/spark-shell --packages "org.apache.httpcomponents:fluent-hc:4.5.2" --jars [PATH TO DPLA INGESTION3 JAR]

Python

$ ./bin/pyspark --packages "org.apache.httpcomponents:fluent-hc:4.5.2" --jars [PATH TO DPLA INGESTION3 JAR]


Packages and jars can also be used with spark-submit.

Features

  • Partitioning.  The harvester strategically partitions data during the harvest to make the process more efficient.  For example, when harvesting records from multiple sets, it runs HTTP request in parallel.  The results of the OAI harvest are loaded into a Spark DataFrame, a distributed collection of data.

  • Harvest sets and records.  You can harvest OAI sets or records.  If you harvest records, this harvester will also return data about both the records and any sets to which they belong.

  • Specify which sets to harvest records from.  You can tell this harvester which OAI sets to include or exclude in a records request.

  • Flow control.  Often, OAI harvests are comprised of a series of HTTP requests and responses.  In this case, each individual response contains a partial list of the requested data along with a resumption token, which is used to compose subsequent request.  This harvester handles the flow control.  You need only compose the initial request; the harvester will compose all necessary subsequent requests and return a complete set of data.  For a standard records or sets harvest, the HTTP requests must be executed sequentially.  However, in the case of a records request from known sets, the harvester can initiate an independent series of sequential requests for each set.  These request series can run in parallel, speeding up the harvest process.

  • Error handling.  This harvester performs as much of the harvest as possible.  It returns any errors encountered during the process along with successfully harvested data.  This allows you to retain and examine records, sets, and errors from a partially successful harvest.

Options

Use the following options to compose your harvest.

Option

Obligation

Usage

Option

Obligation

Usage

endpoint

Required.

The base URL for the OAI repository.

verb

Required.

"ListSets" to harvest only sets; "ListRecords" to harvest records and any sets to which the records may belong. Case-sensitive.

metadataPrefix

Required when verb="ListRecords"; prohibited when verb="ListSets".

The the metadata format in OAI-PMH requests issued to the repository.

harvestAllSets

Optional when verb="ListRecords"; cannot be used in conjunction with either setlist or blacklist.

"True" to harvest records from all sets. Default is "false". Case-insensitive. Results will include all sets and all their records. This will only return records that belong to at least one set; records that do not belong to any set will not be included in the results.

setlist

Optional when verb="ListRecords"; cannot be used in conjunction with either harvestAllSets or blacklist.

Comma-separated lists of sets to include in the harvest. Use the OAI setSpec to identify a set. Results will include all sets in the setlist and all their records.

blacklist

Optional when verb="ListRecords"; cannot be used in conjunction with either harvestAllSets or setlist.

Comma-separated lists of sets to exclude from the harvest. Use the OAI setSpec to identify a set. Results will include all sets not in the blacklist and all their records. Records that do not belong to any set will not be included in the results.

 

Record harvests that that include the option harvestAllSets, setlist, or blacklist generally run faster than those that do not.  This is because a known list of sets allows the harvester to distribute the process of making HTTP requests across multiple nodes.  Therefore, it is advantageous to use these options wherever possible.

Schema

This harvester returns a DataFrame which includes three main response types: set, record, and error.  Each row represents one of the three response types.  The basic structure of a row is as follows:

 

Value in set column

Value in record column

Value in error column

 

Value in set column

Value in record column

Value in error column

Row representing a set:

Structured set

null

null

Row representing a record:

null

Structured record

null

Row representing an error:

null

null

Structured error


Structured set

Name

DataType

Definition

Name

DataType

Definition

id

String

OAI setSpec.

document

String

Complete node describing this set. This can be parsed into XML.

setSource

Structured source data

Source of the id and document.

Structured record

Name

DataType

Definition

Name

DataType

Definition

id

String

OAI identifier.

document

String

Complete node describing this record. This can be parsed into XML.

setIds

Array[String]

OAI setSpec for all sets to which this record belongs.

recordSource

Structured source data

Source of the id, document, and setIds.


If a record is repeated in multiple sources, it will appear in multiple rows, each with a different recordSource.  This can happen if a record belongs to more than one set.

Structured error

Name

DataType

Definition

Name

DataType

Definition

message

String

Description of the error.

errorSource

Structured source data

The source of the message.


Structured source

Name

DataType

Definition

Name

DataType

Definition

queryParams

Map[String, String]

The parameters used to construct the OAI request.

url

String

The URL of the OAI request. Null if an error occurred before the URL was instantiated.

text

String

Full, un-parsed response to the OAI request. Null if an error occurred before the text response was received.


Complete schema

root |-- set: struct (nullable = true) | |-- id: string (nullable = true) | |-- document: string (nullable = true) | |-- setSource: struct (nullable = true) | | |-- queryParams: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- url: string (nullable = true) | | |-- text: string (nullable = true) |-- record: struct (nullable = true) | |-- id: string (nullable = true) | |-- document: string (nullable = true) | |-- setIds: array (nullable = true) | | |-- element: string (containsNull = true) | |-- recordSource: struct (nullable = true) | | |-- queryParams: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- url: string (nullable = true) | | |-- text: string (nullable = true) |-- error: struct (nullable = true) | |-- message: string (nullable = true) | |-- errorSource: struct (nullable = true) | | |-- queryParams: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- url: string (nullable = true) | | |-- text: string (nullable = true)

Examples

Scala

import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ val spark = SparkSession.builder().master("local").getOrCreate() val df = spark.read .format("dpla.ingestion3.harvesters.oai") .option("endpoint", "http://my-oai-repository.edu") .option("verb", "ListRecords") .option("metadataPrefix", "mods") .option("setlist", "set1, set2, set3") .load() val records = df.select("record.*").where("record is not null") val sets = df.select("set.*").where("record is not null") val errors = df.select("error.*").where("error is not null")

Python

from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").getOrCreate() df = spark.read .format("dpla.ingestion3.harvesters.oai") .option("endpoint", "http://my-oai-repository.edu") .option("verb", "ListRecords") .option("metadataPrefix", "mods") .option("setlist", "set1, set2, set3") .load() records = df.select("record.*").where("record is not null") sets = df.select("set.*").where("record is not null") errors = df.select("error.*").where("error is not null")

Contributors

The Spark OAI Harvester is developed and maintained by the Digital Public Library of America.  If you are interested in contributing, please visit our github repo or email us at tech@dp.la.

Feedback

We welcome your feedback on this project.  Email us at tech@dp.la.