A library for harvesting records and sets from OAI-PMH repositories to Spark SQL.


Repository

The Spark OAI Harvester is currently housed within a larger application called "Ingestion3".  Ingestion3 is an ETL system for cultural heritage metadata, and is under active development.

Github repo for Ingestion3https://github.com/dpla/ingestion3

Spark OAI Harvester package: https://github.com/dpla/ingestion3/tree/develop/src/main/scala/dpla/ingestion3/harvesters/oai

Requirements

Building from source

You can get this library from the DPLA Ingestion3 github repo.  To build a JAR file, run sbt package from the project root.  To execute the test suite, run sbt test.

Linking

We do not currently offer SBT or Maven coordinates for this library, but may do so in the future.

Using with spark-shell or spark-submit

Scala

$ ./bin/spark-shell --packages "org.apache.httpcomponents:fluent-hc:4.5.2" --jars [PATH TO DPLA INGESTION3 JAR]

Python

$ ./bin/pyspark --packages "org.apache.httpcomponents:fluent-hc:4.5.2" --jars [PATH TO DPLA INGESTION3 JAR]


Packages and jars can also be used with spark-submit.

Features

Options

Use the following options to compose your harvest.

OptionObligationUsage
endpointRequired.The base URL for the OAI repository.
verbRequired."ListSets" to harvest only sets; "ListRecords" to harvest records and any sets to which the records may belong. Case-sensitive.
metadataPrefixRequired when verb="ListRecords"; prohibited when verb="ListSets".The the metadata format in OAI-PMH requests issued to the repository.

harvestAllSets

Optional when verb="ListRecords"; cannot be used in conjunction with either setlist or blacklist."True" to harvest records from all sets. Default is "false". Case-insensitive. Results will include all sets and all their records. This will only return records that belong to at least one set; records that do not belong to any set will not be included in the results.
setlistOptional when verb="ListRecords"; cannot be used in conjunction with either harvestAllSets or blacklist.Comma-separated lists of sets to include in the harvest. Use the OAI setSpec to identify a set. Results will include all sets in the setlist and all their records.
blacklistOptional when verb="ListRecords"; cannot be used in conjunction with either harvestAllSets or setlist.Comma-separated lists of sets to exclude from the harvest. Use the OAI setSpec to identify a set. Results will include all sets not in the blacklist and all their records. Records that do not belong to any set will not be included in the results.


Record harvests that that include the option harvestAllSets, setlist, or blacklist generally run faster than those that do not.  This is because a known list of sets allows the harvester to distribute the process of making HTTP requests across multiple nodes.  Therefore, it is advantageous to use these options wherever possible.

Schema

This harvester returns a DataFrame which includes three main response types: set, record, and error.  Each row represents one of the three response types.  The basic structure of a row is as follows:


Value in set column

Value in record column

Value in error column
Row representing a set:Structured setnullnull
Row representing a record:nullStructured recordnull
Row representing an error:nullnullStructured error


Structured set

NameDataTypeDefinition
idStringOAI setSpec.
documentStringComplete node describing this set. This can be parsed into XML.
setSourceStructured source dataSource of the id and document.

Structured record

NameDataTypeDefinition
idStringOAI identifier.
documentStringComplete node describing this record. This can be parsed into XML.
setIdsArray[String]OAI setSpec for all sets to which this record belongs.
recordSourceStructured source dataSource of the id, document, and setIds.


If a record is repeated in multiple sources, it will appear in multiple rows, each with a different recordSource.  This can happen if a record belongs to more than one set.

Structured error

NameDataTypeDefinition
messageStringDescription of the error.
errorSourceStructured source dataThe source of the message.


Structured source

NameDataTypeDefinition
queryParamsMap[String, String]The parameters used to construct the OAI request.
urlStringThe URL of the OAI request. Null if an error occurred before the URL was instantiated.
textStringFull, un-parsed response to the OAI request. Null if an error occurred before the text response was received.


Complete schema

root
 |-- set: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- document: string (nullable = true)
 |    |-- setSource: struct (nullable = true)
 |    |    |-- queryParams: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- text: string (nullable = true)
 |-- record: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- document: string (nullable = true)
 |    |-- setIds: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- recordSource: struct (nullable = true)
 |    |    |-- queryParams: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- text: string (nullable = true)
 |-- error: struct (nullable = true)
 |    |-- message: string (nullable = true)
 |    |-- errorSource: struct (nullable = true)
 |    |    |-- queryParams: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- url: string (nullable = true)
 |    |    |-- text: string (nullable = true)


Examples

Scala

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder().master("local").getOrCreate()


val df = spark.read
    .format("dpla.ingestion3.harvesters.oai")
    .option("endpoint", "http://my-oai-repository.edu")
    .option("verb", "ListRecords")
    .option("metadataPrefix", "mods")
    .option("setlist", "set1, set2, set3")
    .load()

val records = df.select("record.*").where("record is not null")
val sets = df.select("set.*").where("record is not null")
val errors = df.select("error.*").where("error is not null")

Python

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").getOrCreate()


df = spark.read
    .format("dpla.ingestion3.harvesters.oai")
    .option("endpoint", "http://my-oai-repository.edu")
    .option("verb", "ListRecords")
    .option("metadataPrefix", "mods")
    .option("setlist", "set1, set2, set3")
    .load()


records = df.select("record.*").where("record is not null")
sets = df.select("set.*").where("record is not null")
errors = df.select("error.*").where("error is not null")

Contributors

The Spark OAI Harvester is developed and maintained by the Digital Public Library of America.  If you are interested in contributing, please visit our github repo or email us at tech@dp.la.

Feedback

We welcome your feedback on this project.  Email us at tech@dp.la.