A library for harvesting records and sets from OAI-PMH repositories to Spark SQL.
The Spark OAI Harvester is currently housed within a larger application called "Ingestion3". Ingestion3 is an ETL system for cultural heritage metadata, and is under active development.
Github repo for Ingestion3: https://github.com/dpla/ingestion3
Spark OAI Harvester package: https://github.com/dpla/ingestion3/tree/develop/src/main/scala/dpla/ingestion3/harvesters/oai
You can get this library from the DPLA Ingestion3 github repo. To build a JAR file, run sbt package
from the project root. To execute the test suite, run sbt test
.
We do not currently offer SBT or Maven coordinates for this library, but may do so in the future.
spark-shell
or spark-submit
$ ./bin/spark-shell --packages "org.apache.httpcomponents:fluent-hc:4.5.2" --jars [PATH TO DPLA INGESTION3 JAR] |
$ ./bin/pyspark --packages "org.apache.httpcomponents:fluent-hc:4.5.2" --jars [PATH TO DPLA INGESTION3 JAR] |
Packages and jars can also be used with spark-submit
.
Use the following options to compose your harvest.
Option | Obligation | Usage |
---|---|---|
endpoint | Required. | The base URL for the OAI repository. |
verb | Required. | "ListSets" to harvest only sets; "ListRecords" to harvest records and any sets to which the records may belong. Case-sensitive. |
outputDir | Required. | Location to save output. This should be a local path but Amazon S3 may be supported at some point in the future |
metadataPrefix | Required when verb="ListRecords" ; prohibited when verb="ListSets" . | The the metadata format in OAI-PMH requests issued to the repository. |
| Optional when verb="ListRecords" ; cannot be used in conjunction with either setlist or blacklist . | "True" to harvest records from all sets. Default is "false" . Case-insensitive. Results will include all sets and all their records. This will only return records that belong to at least one set; records that do not belong to any set will not be included in the results. |
setlist | Optional when verb="ListRecords" ; cannot be used in conjunction with either harvestAllSets or blacklist . | Comma-separated lists of sets to include in the harvest. Use the OAI setSpec to identify a set. Results will include all sets in the setlist and all their records. |
blacklist | Optional when verb="ListRecords" ; cannot be used in conjunction with either harvestAllSets or setlist . | Comma-separated lists of sets to exclude from the harvest. Use the OAI setSpec to identify a set. Results will include all sets not in the blacklist and all their records. Records that do not belong to any set will not be included in the results. |
provider | Required | The name of the source of the records |
Record harvests that that include the option harvestAllSets
, setlist
, or blacklist
generally run faster than those that do not. This is because a known list of sets allows the harvester to distribute the process of making HTTP requests across multiple nodes. Therefore, it is advantageous to use these options wherever possible.
This harvester returns a DataFrame which includes three main response types: set
, record
, and error
. Each row represents one of the three response types. The basic structure of a row is as follows:
Value in set column | Value in | Value in error column | |
---|---|---|---|
Row representing a set: | Structured set | null | null |
Row representing a record: | null | Structured record | null |
Row representing an error: | null | null | Structured error |
Name | DataType | Definition |
---|---|---|
id | String | OAI setSpec . |
document | String | Complete node describing this set. This can be parsed into XML. |
setSource | Structured source data | Source of the id and document . |
Name | DataType | Definition |
---|---|---|
id | String | OAI identifier . |
document | String | Complete node describing this record. This can be parsed into XML. |
setIds | Array[String] | OAI setSpec for all sets to which this record belongs. |
recordSource | Structured source data | Source of the id , document , and setIds . |
If a record is repeated in multiple sources, it will appear in multiple rows, each with a different recordSource
. This can happen if a record belongs to more than one set.
Name | DataType | Definition |
---|---|---|
message | String | Description of the error. |
errorSource | Structured source data | The source of the message. |
Name | DataType | Definition |
---|---|---|
queryParams | Map[String, String] | The parameters used to construct the OAI request. |
url | String | The URL of the OAI request. Null if an error occurred before the URL was instantiated. |
text | String | Full, un-parsed response to the OAI request. Null if an error occurred before the text response was received. |
root |-- set: struct (nullable = true) | |-- id: string (nullable = true) | |-- document: string (nullable = true) | |-- setSource: struct (nullable = true) | | |-- queryParams: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- url: string (nullable = true) | | |-- text: string (nullable = true) |-- record: struct (nullable = true) | |-- id: string (nullable = true) | |-- document: string (nullable = true) | |-- setIds: array (nullable = true) | | |-- element: string (containsNull = true) | |-- recordSource: struct (nullable = true) | | |-- queryParams: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- url: string (nullable = true) | | |-- text: string (nullable = true) |-- error: struct (nullable = true) | |-- message: string (nullable = true) | |-- errorSource: struct (nullable = true) | | |-- queryParams: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- url: string (nullable = true) | | |-- text: string (nullable = true) |
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ val spark = SparkSession.builder().master("local").getOrCreate() val df = spark.read .format("dpla.ingestion3.harvesters.oai") .option("endpoint", "http://my-oai-repository.edu") .option("verb", "ListRecords") .option("metadataPrefix", "mods") .option("setlist", "set1, set2, set3") .load() val records = df.select("record.*").where("record is not null") val sets = df.select("set.*").where("record is not null") val errors = df.select("error.*").where("error is not null") |
from pyspark.sql import SparkSession spark = SparkSession.builder.master("local").getOrCreate() df = spark.read .format("dpla.ingestion3.harvesters.oai") .option("endpoint", "http://my-oai-repository.edu") .option("verb", "ListRecords") .option("metadataPrefix", "mods") .option("setlist", "set1, set2, set3") .load() records = df.select("record.*").where("record is not null") sets = df.select("set.*").where("record is not null") errors = df.select("error.*").where("error is not null") |
The Spark OAI Harvester is developed and maintained by the Digital Public Library of America. If you are interested in contributing, please visit our github repo or email us at tech@dp.la.
We welcome your feedback on this project. Email us at tech@dp.la.