Workflow Architecture

Museum digital asset pipelines routinely ingest thousands of records with fragmented rights metadata. Manual verification introduces unacceptable latency and compliance exposure. An automated status pipeline normalizes incoming payloads, applies jurisdictional thresholds, and evaluates explicit rights assertions. This evaluation layer functions as the core decision engine within broader Rights Metadata Mapping & Licensing Automation architectures. Every asset passes through a standardized compliance gate before downstream publication.

Schema Normalization & Standards Alignment

Raw collection exports rarely conform to modern interoperability standards. The ingestion layer maps legacy fields to LIDO lido:rights and IIIF Presentation API rights properties. Pydantic v2 enforces strict schema boundaries during deserialization. Invalid date formats or unsupported jurisdiction codes trigger immediate rejection. This validation step prevents malformed records from corrupting the computation graph.

Status determination relies on configurable jurisdictional rulesets. United States federal law applies a rolling publication cutoff — currently 95 years, advancing every January 1 — while European directives calculate duration based on creator lifespan plus seventy years. Reference tables for statutory durations are maintained by the U.S. Copyright Office. The engine evaluates RightsStatements.org URIs against institutional policy matrices. Detailed field mapping procedures are documented in Mapping RightsStatements.org to Collection Fields. Missing temporal data triggers deterministic fallback chains rather than halting execution.

Deterministic Routing & Dispatch

Computed statuses map directly to downstream workflow queues. Public domain assets route to open-access publication pipelines. Assets carrying explicit open licenses forward to Routing Creative Commons Licenses handlers. Restricted materials or pending donor agreements trigger Implementing Embargo Workflows for curator review. The dispatcher guarantees single-pass evaluation and prevents duplicate processing.

flowchart TD
    A["AssetRecord"] --> D{"Donor<br/>restrictions?"}
    D -->|yes| R["RESTRICTED<br/>curator_review"]
    D -->|no| N{"NoKnownCopyright<br/>URI?"}
    N -->|yes| PD["PUBLIC_DOMAIN<br/>open_publish"]
    N -->|no| J{"Jurisdiction?"}
    J -->|US| USc{"Published before<br/>rolling cutoff?"}
    USc -->|yes| PD
    USc -->|no| C["COPYRIGHTED<br/>rights_clearance"]
    J -->|EU| EUc{"Death year + 70<br/>before now?"}
    EUc -->|yes| PD
    EUc -->|no| AM["AMBIGUOUS<br/>fallback_chain"]
    J -->|other| AM

Production Implementation

Production implementations require asynchronous batch processing to handle high-throughput DAM integrations. The following stack uses pydantic for schema enforcement, asyncio with semaphores for controlled concurrency, and explicit routing logic.

python
import asyncio
import hashlib
import logging
from datetime import date, datetime, timezone
from typing import Optional, List, Dict, Any, Literal
from pydantic import BaseModel, Field, field_validator, ValidationError
import httpx

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger(__name__)

class AssetRecord(BaseModel):
    object_id: str
    title: str
    creator_death_year: Optional[int] = None
    creation_year: Optional[int] = None
    jurisdiction: str = Field(default="US", pattern="^(US|EU|UK|CA)$")
    rights_statement_uri: Optional[str] = None
    donor_restrictions: Optional[str] = None
    iiif_manifest_url: Optional[str] = None

    @field_validator("creation_year", "creator_death_year")
    @classmethod
    def validate_year_range(cls, v: Optional[int]) -> Optional[int]:
        if v is not None and not (1000 <= v <= date.today().year + 10):
            raise ValueError("Year must be within plausible historical range")
        return v

    def compute_payload_hash(self) -> str:
        raw = f"{self.object_id}|{self.creation_year}|{self.creator_death_year}|{self.jurisdiction}"
        return hashlib.sha256(raw.encode()).hexdigest()

class CopyrightStatus(BaseModel):
    object_id: str
    status: Literal["PUBLIC_DOMAIN", "COPYRIGHTED", "AMBIGUOUS", "RESTRICTED"]
    basis: str
    routing_destination: str
    payload_hash: str
    validated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))

class CopyrightEngine:
    US_COPYRIGHT_TERM = 95  # years from publication for pre-1978 US works
    EU_LIFE_PLUS = 70

    @classmethod
    def evaluate(cls, record: AssetRecord) -> CopyrightStatus:
        status, basis, destination = cls._apply_rules(record)
        return CopyrightStatus(
            object_id=record.object_id,
            status=status,
            basis=basis,
            routing_destination=destination,
            payload_hash=record.compute_payload_hash()
        )

    @classmethod
    def _apply_rules(cls, record: AssetRecord) -> tuple[str, str, str]:
        current_year = date.today().year

        if record.donor_restrictions:
            return "RESTRICTED", "Donor agreement active", "curator_review"

        if record.rights_statement_uri and "NoKnownCopyright" in record.rights_statement_uri:
            return "PUBLIC_DOMAIN", "RightsStatements.org assertion", "open_publish"

        if record.jurisdiction == "US" and record.creation_year:
            # The cutoff is not fixed: it advances every January 1 as the
            # 95-year term lapses. Compute it from the current year.
            latest_pd_year = current_year - cls.US_COPYRIGHT_TERM - 1
            if record.creation_year <= latest_pd_year:
                return "PUBLIC_DOMAIN", f"US 95-year term (published ≤ {latest_pd_year})", "open_publish"
            return "COPYRIGHTED", "US 95-year term active", "rights_clearance"

        if record.jurisdiction == "EU" and record.creator_death_year:
            if record.creator_death_year + cls.EU_LIFE_PLUS < current_year:
                return "PUBLIC_DOMAIN", "EU life+70 expiry", "open_publish"

        if record.creation_year and record.creator_death_year:
            return "COPYRIGHTED", "Active statutory term", "rights_clearance"

        return "AMBIGUOUS", "Insufficient temporal data", "fallback_chain"

class RightsPipeline:
    def __init__(self, batch_size: int = 50, max_concurrency: int = 10):
        self.batch_size = batch_size
        self.semaphore = asyncio.Semaphore(max_concurrency)
        self.client = httpx.AsyncClient(timeout=15.0)

    async def process_batch(self, records: List[AssetRecord]) -> List[Any]:
        # return_exceptions=True means the result list may hold CopyrightStatus
        # objects or, for any unhandled error, the raised exception — callers
        # must check the type (see main()).
        tasks = [self._process_single(r) for r in records]
        return await asyncio.gather(*tasks, return_exceptions=True)

    async def _process_single(self, record: AssetRecord) -> CopyrightStatus:
        async with self.semaphore:
            try:
                status = CopyrightEngine.evaluate(record)
                logger.info(f"Evaluated {record.object_id} -> {status.status}")
                await self._dispatch(status)
                return status
            except ValidationError as e:
                logger.error(f"Schema validation failed for {record.object_id}: {e}")
                return CopyrightStatus(
                    object_id=record.object_id,
                    status="AMBIGUOUS",
                    basis="Invalid input schema",
                    routing_destination="error_queue",
                    payload_hash=record.compute_payload_hash()
                )

    async def _dispatch(self, status: CopyrightStatus) -> None:
        # Simulate downstream API call or queue push
        endpoint = f"https://workflow.internal/api/v1/rights/{status.routing_destination}"
        try:
            await self.client.post(endpoint, json=status.model_dump(mode="json"))
        except httpx.HTTPError as e:
            logger.warning(f"Dispatch failed for {status.object_id}: {e}")

    async def close(self) -> None:
        await self.client.aclose()

async def main() -> None:
    sample_records = [
        AssetRecord(object_id="OBJ-001", title="Landscape Study", creation_year=1890, jurisdiction="US"),
        AssetRecord(object_id="OBJ-002", title="Modern Sculpture", creator_death_year=1950, jurisdiction="EU"),
        AssetRecord(object_id="OBJ-003", title="Restricted Archive", donor_restrictions="Active until 2030", jurisdiction="US")
    ]
    
    pipeline = RightsPipeline(batch_size=50, max_concurrency=5)
    try:
        results = await pipeline.process_batch(sample_records)
        for r in results:
            if isinstance(r, CopyrightStatus):
                logger.info(f"Final: {r.object_id} | {r.status} | {r.routing_destination}")
    finally:
        await pipeline.close()

if __name__ == "__main__":
    asyncio.run(main())

Operational Requirements

Idempotency requires deterministic hashing of input payloads. Audit logs must capture every rule evaluation and routing decision. Concurrency limits prevent DAM API throttling during peak ingestion windows. Configuration files should externalize jurisdictional thresholds and routing endpoints. This architecture scales horizontally across collection batches without blocking I/O threads.

Conclusion

The engine’s correctness depends on two details that are easy to get wrong: the US public domain cutoff is not a fixed year but advances every January 1 (computed as current_year - 95 - 1), and the EU life+70 comparison uses a strict < rather than <= to avoid claiming works as public domain in their final day of protection. The fail-safe default — AMBIGUOUS routing to fallback_chain — ensures that incomplete temporal data never produces a false public domain assertion.