Workflow Context
Museum digital asset pipelines fail when object schemas function as static documentation rather than executable contracts. Production-grade schemas must enforce structural consistency at ingestion, route assets based on intellectual property constraints, and normalize controlled vocabularies before reaching the DAMS. This architecture relies on strict type enforcement and deterministic field mapping. These patterns directly inform Core Architecture & Collection Taxonomy decisions across institutional systems. When curators submit batch records or vendors push CSV exports, the pipeline must validate, transform, and route without manual intervention. The schema separates structural requirements from semantic enrichment. This separation enables automated high-throughput ingestion while preserving provenance and rights metadata.
Schema Architecture & Type Enforcement
Define the ingestion contract using Pydantic v2 for native JSON Schema generation and runtime validation. The model must enforce mandatory routing fields while deferring optional enrichment to downstream normalization stages. Strict mode prevents silent coercion of malformed data. Extra field rejection ensures schema drift does not propagate to production databases.
import logging
from datetime import date
from typing import Optional, List, Literal
from enum import Enum
from pydantic import BaseModel, Field, field_validator, ConfigDict
logger = logging.getLogger("museum_pipeline.schema")
class RightsCategory(str, Enum):
PUBLIC_DOMAIN = "public_domain"
IN_COPYRIGHT = "in_copyright"
ORPHAN_WORK = "orphan_work"
RESTRICTED = "restricted"
class MuseumObject(BaseModel):
model_config = ConfigDict(strict=True, extra="forbid", json_schema_extra={"lido_mapping": "recordWrap"})
accession_number: str = Field(pattern=r"^[A-Za-z]{2,4}-\d{4}-\d{1,6}$")
object_name: str = Field(min_length=2, max_length=255)
creation_date: Optional[date] = Field(default=None, strict=False)
medium: Optional[str] = None
rights_category: RightsCategory
embargo_date: Optional[date] = Field(default=None, strict=False)
source_system: Literal["tms", "collectiveaccess", "manual_entry"]
@field_validator("accession_number")
@classmethod
def normalize_accession(cls, v: str) -> str:
return v.strip().upper()This configuration aligns with Mapping LIDO to Internal Databases by embedding explicit mapping metadata. Strict mode rejects silent coercion on the mandatory fields, while the date fields opt out with strict=False so ISO 8601 strings from CSV and JSON exports are parsed into date objects rather than rejected. Because Pydantic evaluates Field(pattern=...) before field validators, the accession pattern is case-insensitive ([A-Za-z]); the normalize_accession validator then canonicalizes the value to uppercase before persistence.
Async Batch Processing & Validation Pipeline
High-volume ingestion requires non-blocking execution to prevent I/O bottlenecks during database writes or external vocabulary lookups. The processor chunks incoming records, validates them concurrently, and isolates failures without halting the batch. This approach guarantees deterministic throughput under load.
import asyncio
from typing import List, Dict, Any
async def validate_and_route(record: Dict[str, Any]) -> Dict[str, Any]:
try:
validated = MuseumObject.model_validate(record)
return {"status": "valid", "data": validated.model_dump(mode="json")}
except Exception as e:
logger.warning(f"Validation failed for record {record.get('accession_number')}: {e}")
return {"status": "failed", "error": str(e), "input": record}
async def process_batch(records: List[dict], batch_size: int = 50) -> dict:
valid_objects = []
failed_records = []
for i in range(0, len(records), batch_size):
chunk = records[i:i + batch_size]
tasks = [validate_and_route(record) for record in chunk]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
failed_records.append({"error": str(result)})
elif result["status"] == "valid":
valid_objects.append(result["data"])
else:
failed_records.append(result)
return {"valid_count": len(valid_objects), "failed_count": len(failed_records), "payloads": valid_objects}The asyncio.gather pattern ensures concurrent execution while preserving error isolation. Each chunk processes independently, allowing horizontal scaling across worker nodes. Failed records route to a quarantine queue for manual review.
Rights Routing & Access Control Integration
Intellectual property constraints dictate downstream visibility and IIIF manifest generation. The schema routes objects based on rights_category and embargo_date values. This routing layer prevents unauthorized exposure of restricted assets. Public domain records bypass review queues and proceed directly to the delivery tier.
flowchart TD
A["Validated MuseumObject"] --> B{"rights_category<br/>== RESTRICTED?"}
B -->|yes| S["secure_storage"]
B -->|no| C{"embargo_date<br/>in the future?"}
C -->|yes| E["embargo_queue"]
C -->|no| D{"rights_category<br/>== PUBLIC_DOMAIN?"}
D -->|yes| P["public_api"]
D -->|no| R["rights_review"]def determine_routing_path(obj: MuseumObject) -> str:
if obj.rights_category == RightsCategory.RESTRICTED:
return "secure_storage"
if obj.embargo_date and obj.embargo_date > date.today():
return "embargo_queue"
if obj.rights_category == RightsCategory.PUBLIC_DOMAIN:
return "public_api"
return "rights_review"This deterministic routing aligns with institutional security boundaries by enforcing access tiers at the schema level. The logic executes synchronously within the validation pipeline. This ensures zero-latency routing decisions.
Vocabulary Normalization & Downstream Mapping
Controlled vocabulary resolution occurs after structural validation. Medium and object classification fields require mapping to Getty AAT and TGN identifiers. The pipeline queues unnormalized strings for asynchronous enrichment. This two-stage process prevents ingestion delays during external API calls.
Normalized terms integrate directly with Implementing Getty AAT & TGN workflows. The schema retains original curator input in a separate raw_medium field for audit trails. Enriched identifiers populate the aat_id field upon successful resolution.
Production Deployment Checklist
Validate schema versions against CI/CD pipelines before merging. Enforce JSON Schema output for frontend contract testing. Monitor validation failure rates using structured logging. Rotate embargo routing rules quarterly to reflect updated copyright assessments.
Schema evolution requires backward compatibility testing. Legacy CMS exports often contain deprecated field names. Declare per-field aliases with Field(alias=...) (enabling populate_by_name=True in ConfigDict) to bridge version gaps. This strategy minimizes disruption during system migrations.
Final outputs must conform to How to Structure JSON-LD for Museum Objects specifications. The validated payload serves as the canonical source for semantic web serialization.
External Standards Reference
Conclusion
A well-designed schema is the ingestion pipeline’s most important boundary. Strict Pydantic models with explicit field constraints, rights-category enums, and deterministic routing logic eliminate the manual triage cycles that plague ad-hoc import scripts. The schema-first approach means that schema drift is caught at the validation boundary — not after it has propagated to the database.