Workflow Context

Ingesting LIDO payloads into internal collection management systems requires a deterministic, state-aware pipeline. The architecture must decouple XML parsing from database persistence to prevent partial writes. This transformation layer operates as the central routing node within the Core Architecture & Collection Taxonomy framework. Raw harvest streams are converted into transactional database operations through isolated phases.

The pipeline enforces three distinct stages: extraction, transformation, and routing. Extraction handles memory-efficient XML traversal. Transformation normalizes controlled vocabularies and coerces data types. Routing evaluates access rights and queues validated records for batch execution. Treating LIDO as an immutable input stream guarantees auditability across high-volume digitization backlogs.

flowchart LR
    X["LIDO XML stream"] --> E["Extraction<br/>iterparse · clear()"]
    E --> T["Transformation<br/>flatten · vocab resolve"]
    T --> Ro{"Rights routing"}
    Ro -->|public domain| I["Index / publish"]
    Ro -->|restricted| Em["Embargo workflow"]
    I --> U["Async upsert<br/>ON CONFLICT"]
    Em --> U

Streaming Extraction Architecture

Full DOM loading is prohibited for production LIDO ingestion. Memory consumption scales linearly with file size, causing instability during bulk harvests. The extraction engine must use lxml.etree.iterparse to process <lido:lido> record elements sequentially. This approach maintains a constant memory footprint regardless of payload volume.

python
import io
import logging
from typing import Any, Iterator
from lxml import etree
from dataclasses import dataclass, field

logger = logging.getLogger("lido_pipeline")

@dataclass
class LidoRecord:
    object_id: str
    title: str
    creation_date: str | None = None
    rights_status: str = "unknown"
    media_count: int = 0
    raw_xml: str = field(default="", repr=False)

LIDO_NS = "http://www.lido-schema.org/"

def extract_lido_records(xml_bytes: bytes) -> Iterator[dict[str, Any]]:
    ns = {"lido": LIDO_NS}
    context = etree.iterparse(
        io.BytesIO(xml_bytes),
        events=("end",),
        tag=f"{{{LIDO_NS}}}lido"
    )
    for _, elem in context:
        obj_id = elem.findtext(".//lido:recordID[@type='local']", namespaces=ns)
        title = elem.findtext(".//lido:titleSet/lido:appellationValue", namespaces=ns)
        rights = elem.findtext(".//lido:rightsType/lido:term", namespaces=ns) or "unknown"
        creation_date = elem.findtext(
            ".//lido:eventDate/lido:date/lido:earliestDate", namespaces=ns
        )
        media = len(elem.findall(".//lido:resourceWrap/lido:resourceSet", namespaces=ns))
        yield {
            "object_id": obj_id or "UNKNOWN",
            "title": title or "Untitled",
            "creation_date": creation_date,
            "rights_status": rights,
            "media_count": media,
            "raw_xml": etree.tostring(elem, encoding="unicode")
        }
        # Free the processed element and its now-unreachable siblings so the
        # memory footprint stays flat across multi-gigabyte harvests.
        elem.clear()
        while elem.getprevious() is not None:
            del elem.getparent()[0]

The generator yields one flat dictionary per record and clears each element — along with its already-processed siblings — from the parser context immediately after it is emitted. This keeps the memory footprint constant during multi-gigabyte harvest cycles.

Transformation & Vocabulary Normalization

LIDO nodes rarely align directly with internal relational schemas. The transformation phase maps hierarchical XML paths to flat column definitions. Field coercion handles date formats, numeric ranges, and multilingual text fallbacks. This stage must reference your internal schema definitions to guarantee type safety.

Controlled vocabulary resolution occurs during transformation. LIDO <lido:subjectConcept> and <lido:place> elements contain raw strings or external URIs. These must be resolved against institutional authority files. Proper normalization requires cross-referencing with Implementing Getty AAT & TGN to map legacy terms to current identifiers.

The transformation engine applies deterministic fallback logic. Missing dates default to NULL rather than placeholder strings. Unresolved vocabulary terms route to a quarantine table for curator review. This prevents silent data corruption during automated ingestion.

Async Routing & Database Persistence

Validated records enter the routing layer for rights evaluation and batch execution. The pipeline evaluates <lido:rightsType> against institutional access policies. Public domain records proceed to immediate indexing. Restricted materials trigger embargo workflows and audit logging.

python
async def process_batch(records: list[LidoRecord], db_pool: Any) -> None:
    async with db_pool.acquire() as conn:
        async with conn.transaction():
            query = """
                INSERT INTO collection_objects 
                (object_id, title, creation_date, rights_status, media_count)
                VALUES ($1, $2, $3, $4, $5)
                ON CONFLICT (object_id) DO UPDATE SET
                    title = EXCLUDED.title,
                    media_count = EXCLUDED.media_count,
                    updated_at = NOW()
            """
            params = [
                (r.object_id, r.title, r.creation_date, r.rights_status, r.media_count)
                for r in records
            ]
            await conn.executemany(query, params)
            logger.info("Batch committed: %d records", len(records))

Database operations execute within explicit transaction boundaries. The ON CONFLICT clause ensures idempotent upserts during re-ingestion cycles. Connection pooling prevents resource exhaustion under concurrent load. This pattern aligns with Designing Museum Object Schemas for consistent primary key management.

Post-insert validation triggers downstream metadata synchronization. Normalized records are compared against baseline Dublin Core profiles. Discrepancies generate reconciliation tickets for technical staff. The validation workflow is detailed in Validating Dublin Core Against CollectionBase.

Production Hardening & IIIF Alignment

Production deployments require strict error isolation and retry logic. Transient network failures must not corrupt batch state. Implement exponential backoff for database connections. Log parsing failures with full XML context for forensic review.

IIIF manifest generation depends on accurate media mapping. The media_count field — derived from <lido:resourceWrap>/<lido:resourceSet> digital surrogates, not from related works — drives downstream presentation routing. Each resource set must resolve to a persistent identifier. The IIIF Presentation API 3.0 specification governs how these references translate to viewer payloads.

Memory limits must be enforced at the OS level. Configure ulimit and Python garbage collection thresholds. Stream-based processing eliminates heap fragmentation during peak harvest windows. Official asyncio documentation provides patterns for graceful task cancellation and event loop management.

Conclusion

The three-stage pipeline — iterparse extraction, vocabulary-resolved transformation, and idempotent async upsert — converts deeply nested LIDO XML into flat, indexed relational rows without loading the entire harvest into memory. The elem.clear() + sibling deletion pattern after yielding each record is the specific technique that holds memory flat across multi-gigabyte LIDO streams; omitting it causes the parser’s internal tree to accumulate every processed element until the file is exhausted.