Spark OAI Harvester
A library for harvesting records and sets from OAI-PMH repositories to Spark SQL.
Repository
The Spark OAI Harvester is currently housed within a larger application called "Ingestion3". Ingestion3 is an ETL system for cultural heritage metadata, and is under active development.
Github repo for Ingestion3: https://github.com/dpla/ingestion3
Spark OAI Harvester package: https://github.com/dpla/ingestion3/tree/develop/src/main/scala/dpla/ingestion3/harvesters/oai
Requirements
- Spark 2.0.1
- SBT (necessary to build from source)
Building from source
You can get this library from the DPLA Ingestion3 github repo. To build a JAR file, run sbt package
from the project root. To execute the test suite, run sbt test
.
Linking
We do not currently offer SBT or Maven coordinates for this library, but may do so in the future.
Using with spark-shell
or spark-submit
Scala
$ ./bin/spark-shell --packages "org.apache.httpcomponents:fluent-hc:4.5.2" --jars [PATH TO DPLA INGESTION3 JAR]
Python
$ ./bin/pyspark --packages "org.apache.httpcomponents:fluent-hc:4.5.2" --jars [PATH TO DPLA INGESTION3 JAR]
Packages and jars can also be used with spark-submit
.
Features
- Partitioning. The harvester strategically partitions data during the harvest to make the process more efficient. For example, when harvesting records from multiple sets, it runs HTTP request in parallel. The results of the OAI harvest are loaded into a Spark DataFrame, a distributed collection of data.
- Harvest sets and records. You can harvest OAI sets or records. If you harvest records, this harvester will also return data about both the records and any sets to which they belong.
- Specify which sets to harvest records from. You can tell this harvester which OAI sets to include or exclude in a records request.
- Flow control. Often, OAI harvests are comprised of a series of HTTP requests and responses. In this case, each individual response contains a partial list of the requested data along with a resumption token, which is used to compose subsequent request. This harvester handles the flow control. You need only compose the initial request; the harvester will compose all necessary subsequent requests and return a complete set of data. For a standard records or sets harvest, the HTTP requests must be executed sequentially. However, in the case of a records request from known sets, the harvester can initiate an independent series of sequential requests for each set. These request series can run in parallel, speeding up the harvest process.
- Error handling. This harvester performs as much of the harvest as possible. It returns any errors encountered during the process along with successfully harvested data. This allows you to retain and examine records, sets, and errors from a partially successful harvest.
Options
Use the following options to compose your harvest.
Option | Obligation | Usage |
---|---|---|
endpoint | Required. | The base URL for the OAI repository. |
verb | Required. | "ListSets" to harvest only sets; "ListRecords" to harvest records and any sets to which the records may belong. Case-sensitive. |
metadataPrefix | Required when verb="ListRecords" ; prohibited when verb="ListSets" . | The the metadata format in OAI-PMH requests issued to the repository. |
| Optional when verb="ListRecords" ; cannot be used in conjunction with either setlist or blacklist . | "True" to harvest records from all sets. Default is "false" . Case-insensitive. Results will include all sets and all their records. This will only return records that belong to at least one set; records that do not belong to any set will not be included in the results. |
setlist | Optional when verb="ListRecords" ; cannot be used in conjunction with either harvestAllSets or blacklist . | Comma-separated lists of sets to include in the harvest. Use the OAI setSpec to identify a set. Results will include all sets in the setlist and all their records. |
blacklist | Optional when verb="ListRecords" ; cannot be used in conjunction with either harvestAllSets or setlist . | Comma-separated lists of sets to exclude from the harvest. Use the OAI setSpec to identify a set. Results will include all sets not in the blacklist and all their records. Records that do not belong to any set will not be included in the results. |
Record harvests that that include the option harvestAllSets
, setlist
, or blacklist
generally run faster than those that do not. This is because a known list of sets allows the harvester to distribute the process of making HTTP requests across multiple nodes. Therefore, it is advantageous to use these options wherever possible.
Schema
This harvester returns a DataFrame which includes three main response types: set
, record
, and error
. Each row represents one of the three response types. The basic structure of a row is as follows:
Value in set column | Value in | Value in error column | |
---|---|---|---|
Row representing a set: | Structured set | null | null |
Row representing a record: | null | Structured record | null |
Row representing an error: | null | null | Structured error |
Structured set
Name | DataType | Definition |
---|---|---|
id | String | OAI setSpec . |
document | String | Complete node describing this set. This can be parsed into XML. |
setSource | Structured source data | Source of the id and document . |
Structured record
Name | DataType | Definition |
---|---|---|
id | String | OAI identifier . |
document | String | Complete node describing this record. This can be parsed into XML. |
setIds | Array[String] | OAI setSpec for all sets to which this record belongs. |
recordSource | Structured source data | Source of the id , document , and setIds . |
If a record is repeated in multiple sources, it will appear in multiple rows, each with a different recordSource
. This can happen if a record belongs to more than one set.
Structured error
Name | DataType | Definition |
---|---|---|
message | String | Description of the error. |
errorSource | Structured source data | The source of the message. |
Structured source
Name | DataType | Definition |
---|---|---|
queryParams | Map[String, String] | The parameters used to construct the OAI request. |
url | String | The URL of the OAI request. Null if an error occurred before the URL was instantiated. |
text | String | Full, un-parsed response to the OAI request. Null if an error occurred before the text response was received. |
Complete schema
root |-- set: struct (nullable = true) | |-- id: string (nullable = true) | |-- document: string (nullable = true) | |-- setSource: struct (nullable = true) | | |-- queryParams: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- url: string (nullable = true) | | |-- text: string (nullable = true) |-- record: struct (nullable = true) | |-- id: string (nullable = true) | |-- document: string (nullable = true) | |-- setIds: array (nullable = true) | | |-- element: string (containsNull = true) | |-- recordSource: struct (nullable = true) | | |-- queryParams: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- url: string (nullable = true) | | |-- text: string (nullable = true) |-- error: struct (nullable = true) | |-- message: string (nullable = true) | |-- errorSource: struct (nullable = true) | | |-- queryParams: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- url: string (nullable = true) | | |-- text: string (nullable = true)
Examples
Scala
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ val spark = SparkSession.builder().master("local").getOrCreate() val df = spark.read .format("dpla.ingestion3.harvesters.oai") .option("endpoint", "http://my-oai-repository.edu") .option("verb", "ListRecords") .option("metadataPrefix", "mods") .option("setlist", "set1, set2, set3") .load() val records = df.select("record.*").where("record is not null") val sets = df.select("set.*").where("record is not null") val errors = df.select("error.*").where("error is not null")
Python
from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").getOrCreate() df = spark.read .format("dpla.ingestion3.harvesters.oai") .option("endpoint", "http://my-oai-repository.edu") .option("verb", "ListRecords") .option("metadataPrefix", "mods") .option("setlist", "set1, set2, set3") .load() records = df.select("record.*").where("record is not null") sets = df.select("set.*").where("record is not null") errors = df.select("error.*").where("error is not null")
Contributors
The Spark OAI Harvester is developed and maintained by the Digital Public Library of America. If you are interested in contributing, please visit our github repo or email us at tech@dp.la.
Feedback
We welcome your feedback on this project. Email us at tech@dp.la.