Workflow Context

Getty vocabularies function as the authoritative backbone for controlled terminology in museum collection management. The Art & Architecture Thesaurus (AAT) standardizes object classification, material composition, and stylistic periods. The Thesaurus of Geographic Names (TGN) anchors provenance, creation sites, and cultural jurisdiction. Integrating these vocabularies into a digital asset pipeline requires deterministic mapping and strict URI resolution. When digital surrogates are ingested, metadata normalization must occur before rights assignment or public API exposure. Establishing a Core Architecture & Collection Taxonomy that treats Getty URIs as immutable identifiers prevents downstream fragmentation. Pipeline stages must resolve preferred labels, capture hierarchical paths, and enforce jurisdictional restrictions before assets transition from staging to production.

Pipeline Architecture & Data Flow

Ingestion begins with raw metadata extraction from legacy CMS exports or batch uploads. The pipeline isolates candidate terms and queues them for asynchronous resolution. A local cache intercepts duplicate requests to minimize external HTTP traffic. Resolved payloads undergo schema validation against institutional requirements. Validated records route to a transformation layer for LIDO serialization. The final output feeds directly into IIIF manifest generation and access control routing. This linear progression guarantees that unverified terminology never reaches public endpoints.

flowchart TD
    U["AAT / TGN URI"] --> Cache{"In local cache?"}
    Cache -->|yes| Ret["Return GettyTerm"]
    Cache -->|no| F["Fetch .json via aiohttp"]
    F --> R{"HTTP 429?"}
    R -->|yes| B["Exponential backoff"]
    B --> F
    R -->|no| P["Parse skos:Concept<br/>gvp:broaderPreferred"]
    P --> V["Validate GettyTerm"]
    V --> Ret

Python Implementation

Production ingestion pipelines should resolve AAT/TGN terms asynchronously. Batch requests prevent rate-limiting violations at the Getty LOD endpoints. The following implementation uses aiohttp for concurrent resolution and pydantic for strict schema enforcement. It includes exponential backoff and a configurable batch processor.

python
import asyncio
import aiohttp
import logging
from typing import List, Dict, Optional, Any
from pydantic import BaseModel, Field, HttpUrl, field_validator
from datetime import datetime, timezone

logger = logging.getLogger("aat_tgn_pipeline")

class GettyTerm(BaseModel):
    uri: HttpUrl
    preferred_label: str
    term_type: str
    hierarchical_path: Optional[str] = None
    resolved_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

    @field_validator("term_type")
    @classmethod
    def validate_type(cls, v: str) -> str:
        if v.upper() not in {"AAT", "TGN"}:
            raise ValueError("term_type must be 'AAT' or 'TGN'")
        return v.upper()

class GettyResolver:
    def __init__(self, batch_size: int = 20, max_retries: int = 3):
        self.batch_size = batch_size
        self.max_retries = max_retries
        self.session: Optional[aiohttp.ClientSession] = None
        self.cache: Dict[str, GettyTerm] = {}

    async def __aenter__(self) -> "GettyResolver":
        self.session = aiohttp.ClientSession(
            headers={"Accept": "application/json", "User-Agent": "MuseumPipeline/1.0"}
        )
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        if self.session:
            await self.session.close()

    async def _fetch_with_retry(self, uri: str) -> Dict[str, Any]:
        json_uri = f"{uri.rstrip('/')}.json"
        for attempt in range(self.max_retries):
            try:
                async with self.session.get(json_uri, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                    if resp.status == 429:
                        await asyncio.sleep(2 ** attempt)
                        continue
                    resp.raise_for_status()
                    return await resp.json()
            except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                logger.warning(f"Attempt {attempt + 1} failed for {uri}: {e}")
                if attempt == self.max_retries - 1:
                    raise
        raise RuntimeError("Max retries exceeded")

    async def resolve_term(self, uri: str) -> GettyTerm:
        if uri in self.cache:
            return self.cache[uri]

        payload = await self._fetch_with_retry(uri)
        graph = payload.get("@graph", [])
        concept_data = next(
            (c for c in graph
             if "skos:Concept" in (c["@type"] if isinstance(c.get("@type"), list) else [c.get("@type")])),
            None
        )

        if not concept_data:
            raise ValueError(f"No concept data found for {uri}")

        term = GettyTerm(
            uri=uri,
            preferred_label=concept_data.get("skos:prefLabel", [{"@value": "Unknown"}])[0]["@value"],
            term_type="AAT" if "aat" in uri else "TGN",
            hierarchical_path=concept_data.get("gvp:broaderPreferred", [{}])[0].get("skos:prefLabel", [{"@value": ""}])[0]["@value"]
        )
        self.cache[uri] = term
        return term

    async def resolve_batch(self, uris: List[str]) -> List[GettyTerm]:
        tasks = [self.resolve_term(u) for u in uris]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        valid_terms = []
        for r in results:
            if isinstance(r, GettyTerm):
                valid_terms.append(r)
            else:
                logger.error(f"Resolution failed: {r}")
        return valid_terms

LIDO Mapping & IIIF Manifest Generation

Resolved terms must align with LIDO schema requirements for cross-institutional interoperability. AAT concepts map directly to <objectWorkType> and <materialsTech> elements. TGN coordinates populate <place> and <eventPlace> nodes. This mapping ensures compliance with LIDO documentation standards. See Designing Museum Object Schemas for structural alignment patterns. The IIIF Presentation API 3.0 requires metadata blocks to reference these controlled URIs. Each term becomes a discrete label and value pair within the manifest. Automated serialization prevents manual transcription errors and preserves semantic relationships.

Production Hardening & Compliance Routing

Getty endpoints enforce strict rate limits and require persistent session management. Implementing a local Redis cache reduces external dependency during peak ingestion windows. Circuit breakers isolate failing vocabulary services from the core pipeline. Jurisdictional flags from TGN coordinates trigger automated access tiering. Restricted geographic data routes through embargo workflows before public exposure. This architecture aligns with Mapping LIDO to Internal Databases for persistent storage. Monitoring must track resolution latency, cache hit ratios, and schema validation failures. Implementing Getty LOD best practices ensures long-term endpoint stability.

Conclusion

Treating Getty URIs as immutable, cache-keyed identifiers rather than ephemeral labels prevents vocabulary drift from propagating into downstream indexes. The async resolver with in-process caching and exponential backoff handles both normal operations and Getty rate-limit events deterministically. LIDO element placement for AAT and TGN resolutions — <objectWorkType>, <materialsTech>, <place> — makes the controlled vocabulary output immediately consumable by cross-institutional harvesters.