graph TB
subgraph "Primary Workflows"
A[Main Migration Workflow]
B[Data Extraction Workflow]
C[Data Transformation Workflow]
D[Synchronization Workflow]
E[File Upload Workflow]
end
subgraph "Supporting Workflows"
F[Configuration Setup]
G[Authentication]
H[Error Handling]
I[Logging]
end
A --> B
A --> C
A --> D
A --> E
F --> A
G --> A
H --> A
I --> A
click A href "#main-migration-workflow"
click B href "#data-extraction-workflow"
click C href "#data-transformation-workflow"
click D href "#synchronization-workflow"
click E href "#file-upload-workflow"
style A fill:#86bbd8
style B fill:#ffe880
style C fill:#ffe880
style D fill:#ffe880
style E fill:#ffe880
Workflows Documentation
This document describes the various workflows in the omeka2dsp system using Mermaid diagrams to illustrate the data flow and processing steps.
Overview
The omeka2dsp system implements several interconnected workflows that work together to migrate and synchronize cultural heritage data between Omeka and DSP platforms.
Main Migration Workflow
This is the primary workflow executed when running data_2_dasch.py. It orchestrates all other workflows.
flowchart TD
Start([Start Script]) --> ParseArgs[Parse Command Line Arguments]
ParseArgs --> LoadConfig[Load Environment Configuration]
LoadConfig --> ValidateConfig{Configuration Valid?}
ValidateConfig -->|No| ConfigError[Exit with Configuration Error]
ValidateConfig -->|Yes| SetMode[Determine Processing Mode]
SetMode --> ModeChoice{Processing Mode?}
ModeChoice -->|all_data| FetchAll[Fetch All Items from Collection]
ModeChoice -->|sample_data| FetchSample[Fetch Random Sample]
ModeChoice -->|test_data| FetchTest[Fetch Test Data]
FetchAll --> Authenticate
FetchSample --> Authenticate
FetchTest --> Authenticate
Authenticate[Authenticate with DSP] --> AuthCheck{Authentication Successful?}
AuthCheck -->|No| AuthError[Exit with Auth Error]
AuthCheck -->|Yes| GetProject[Get Project Information]
GetProject --> GetLists[Fetch DSP Lists]
GetLists --> StartProcessing[Start Item Processing Loop]
StartProcessing --> MoreItems{More Items to Process?}
MoreItems -->|No| Complete[Migration Complete]
MoreItems -->|Yes| ProcessItem[Process Next Item]
ProcessItem --> ExtractID[Extract Item Identifier]
ExtractID --> CheckExists[Check if Resource Exists in DSP]
CheckExists --> ExistsChoice{Resource Exists?}
ExistsChoice -->|No| CreateFlow[Create New Resource]
ExistsChoice -->|Yes| SyncFlow[Synchronize Existing Resource]
CreateFlow --> TransformData[Transform Omeka Data to DSP Format]
TransformData --> CreateResource[Create Resource in DSP]
CreateResource --> ProcessMedia[Process Associated Media]
SyncFlow --> CompareData[Compare Omeka vs DSP Data]
CompareData --> HasChanges{Data Has Changed?}
HasChanges -->|No| ProcessMedia
HasChanges -->|Yes| UpdateResource[Update Resource in DSP]
UpdateResource --> ProcessMedia
ProcessMedia --> MediaComplete{All Media Processed?}
MediaComplete -->|No| ProcessNextMedia[Process Next Media Item]
MediaComplete -->|Yes| LogProgress[Log Processing Results]
ProcessNextMedia --> UploadMedia[Upload Media to DSP]
UploadMedia --> LinkMedia[Link Media to Parent Resource]
LinkMedia --> MediaComplete
LogProgress --> MoreItems
Complete --> GenerateReport[Generate Final Report]
GenerateReport --> End([End Script])
ConfigError --> End
AuthError --> End
style Start fill:#dbfe87
style End fill:#dbfe87
style ProcessItem fill:#ffe880
style CreateFlow fill:#86bbd8
style SyncFlow fill:#ffe880
style ProcessMedia fill:#3a1e3e,color:#fff
style Complete fill:#dbfe87
Processing Mode Details
graph LR
subgraph "Processing Modes"
A[all_data] --> A1[Process entire collection]
B[sample_data] --> B1[Process random sample]
C[test_data] --> C1[Process predefined test items]
end
A1 --> Config1[Uses ITEM_SET_ID from config]
B1 --> Config2[Uses NUMBER_RANDOM_OBJECTS variable]
C1 --> Config3[Uses TEST_DATA array]
style A fill:#86bbd8
style B fill:#ffe880
style C fill:#f6ae2d
Data Extraction Workflow
Handles the extraction of data from the Omeka API, implemented primarily in process_data_from_omeka.py.
sequenceDiagram
participant Script as Main Script
participant Extractor as Data Extractor
participant OmekaAPI as Omeka API
participant Cache as Local Cache
Script->>Extractor: get_items_from_collection(collection_id)
loop Paginated Request
Extractor->>OmekaAPI: GET /items?item_set_id={id}&page={n}
OmekaAPI->>Extractor: Items page + pagination links
Extractor->>Cache: Store items temporarily
end
Extractor->>Script: Return all collected items
loop For each item
Script->>Extractor: get_media(item_id)
loop Paginated Media Request
Extractor->>OmekaAPI: GET /media?item_id={id}&page={n}
OmekaAPI->>Extractor: Media page + pagination links
end
Extractor->>Script: Return media for item
Script->>Extractor: extract_property(properties, prop_id)
Extractor->>Script: Return extracted property value
Script->>Extractor: extract_combined_values(properties)
Extractor->>Script: Return combined values array
end
Property Extraction Details
flowchart TD
A[Raw Omeka Property Array] --> B[extract_property Function]
B --> C{Property ID Match?}
C -->|No| D[Try Next Property]
C -->|Yes| E{Request Type?}
E -->|as_uri| F[Return formatted URI link]
E -->|only_label| G[Return label only]
E -->|default| H[Return @value]
D --> I{More Properties?}
I -->|Yes| C
I -->|No| J[Return Empty String]
F --> K[Formatted Result]
G --> K
H --> K
J --> K
style A fill:#86bbd8
style K fill:#dbfe87
style B fill:#ffe880
Data Transformation Workflow
Converts Omeka data structures into DSP-compatible formats, primarily handled by the construct_payload() function.
flowchart TD
A[Omeka Item Data] --> B[construct_payload Function]
B --> C[Initialize Base Payload Structure]
C --> D[Set Context & Type Information]
D --> E[Extract Basic Metadata]
E --> F[Process Dublin Core Properties]
F --> F1[dcterms:title → rdfs:label]
F --> F2[dcterms:identifier → identifier]
F --> F3[dcterms:description → description]
F --> F4[dcterms:creator → creator]
F --> F5[dcterms:date → date]
F1 --> G[Process Custom Properties]
F2 --> G
F3 --> G
F4 --> G
F5 --> G
G --> G1[dcterms:subject → subject]
G --> G2[dcterms:language → language]
G --> G3[dcterms:rights → rights]
G --> G4[dcterms:license → license]
G1 --> H[Map List Values]
G2 --> H
G3 --> H
G4 --> H
H --> I{Resource Type?}
I -->|Object| J[Add Object-Specific Fields]
I -->|Media| K[Add Media-Specific Fields]
J --> L[Add Collection References]
K --> M[Add File Information]
L --> N[Validate Payload Structure]
M --> N
N --> O{Validation Passed?}
O -->|No| P[Log Validation Errors]
O -->|Yes| Q[Return Complete Payload]
P --> R[Return Null/Error]
style A fill:#86bbd8
style Q fill:#dbfe87
style R fill:#f6ae2d
style B fill:#ffe880
List Value Mapping
sequenceDiagram
participant T as Transformer
participant M as Mapper
participant L as DSP Lists
T->>M: extract_listvalueiri_from_value(value, list_label, lists)
M->>L: Find list by label
L->>M: Return list structure
loop For each list node
M->>L: Check if node label matches value
L->>M: Return match result
end
alt Match found
M->>T: Return node IRI
else No match found
M->>T: Return null
T->>T: Log unmapped value warning
end
Synchronization Workflow
Handles incremental updates by comparing existing DSP data with current Omeka data.
flowchart TD
A[Resource Exists in DSP] --> B[get_full_resource]
B --> C[Extract Current DSP Values]
C --> D[Extract Current Omeka Values]
D --> E[check_values Function]
E --> F[Compare Each Property]
F --> G{Property Type?}
G -->|Single Value| H[sync_value]
G -->|Array Value| I[sync_array_value]
H --> J[Compare Values]
J --> K{Values Different?}
K -->|No| L[No Action Needed]
K -->|Yes| M[Queue Update Operation]
I --> N[Compare Arrays]
N --> O[Calculate Additions & Deletions]
O --> P[Queue Create/Delete Operations]
L --> Q[Process Next Property]
M --> Q
P --> Q
Q --> R{More Properties?}
R -->|Yes| F
R -->|No| S[Execute Queued Operations]
S --> T[Apply Updates via DSP API]
T --> U[Log Synchronization Results]
style A fill:#86bbd8
style U fill:#dbfe87
style E fill:#ffe880
style S fill:#ffe880
Array Value Synchronization Details
flowchart LR
A[DSP Array: a,b,c] --> C[Comparison Logic]
B[Omeka Array: b,c,d] --> C
C --> D[Calculate Differences]
D --> E[To Create: d]
D --> F[To Delete: a]
D --> G[Unchanged: b,c]
E --> H[Create Operations]
F --> I[Delete Operations]
G --> J[No Action]
H --> K[Execute via DSP API]
I --> K
style C fill:#ffe880
style K fill:#dbfe87
File Upload Workflow
Manages the transfer of media files from Omeka to DSP storage.
sequenceDiagram
participant Script as Main Script
participant Uploader as File Uploader
participant OmekaAPI as Omeka API
participant TempStorage as Temporary Storage
participant DSPAPI as DSP API
participant DSPStorage as DSP File Storage
Script->>Uploader: upload_file_from_url(file_url, token)
Uploader->>OmekaAPI: GET file_url
OmekaAPI->>Uploader: File stream
Uploader->>TempStorage: Write to temporary file
TempStorage->>Uploader: Temporary file path
alt ZIP file
Uploader->>Uploader: Create ZIP archive
end
Uploader->>DSPAPI: POST /v2/files (multipart upload)
DSPAPI->>DSPStorage: Store file
DSPStorage->>DSPAPI: File stored confirmation
DSPAPI->>Uploader: Internal filename
Uploader->>TempStorage: Clean up temporary files
Uploader->>Script: Return internal filename
Script->>Script: Update resource payload with filename
Script->>DSPAPI: Create/Update resource with file reference
Media Type Classification
graph TD
A[Media MIME Type] --> B[specify_mediaclass Function]
B --> C{MIME Type?}
C -->|image/*| D[sgb_MEDIA_IMAGE]
C -->|application/pdf| E[sgb_MEDIA_ARCHIV]
C -->|text/*| E
C -->|application/zip| E
C -->|Other| F[sgb_MEDIA_ARCHIV - Default]
D --> G[StillImageFileValue]
E --> H[ArchiveFileValue]
F --> H
G --> I[Configure Image-specific Fields]
H --> J[Configure Archive-specific Fields]
style A fill:#86bbd8
style B fill:#ffe880
style G fill:#86bbd8
style H fill:#3a1e3e,color:#fff
Error Handling Workflow
Comprehensive error handling and recovery mechanisms throughout the system.
flowchart TD
A[Operation Attempted] --> B{Operation Successful?}
B -->|Yes| C[Continue Processing]
B -->|No| D[Capture Error Details]
D --> E[Log Error Information]
E --> F{Error Type?}
F -->|Authentication Error| G[Re-authenticate]
F -->|Rate Limit Error| H[Wait and Retry]
F -->|Network Error| I[Retry with Backoff]
F -->|Data Validation Error| J[Skip Item, Log Warning]
F -->|Critical System Error| K[Fail Fast]
G --> L{Re-auth Successful?}
L -->|Yes| M[Retry Original Operation]
L -->|No| K
H --> N[Sleep for Rate Limit Period]
N --> M
I --> O[Exponential Backoff Delay]
O --> P{Max Retries Reached?}
P -->|No| M
P -->|Yes| Q[Mark as Failed, Continue]
J --> Q
K --> R[Exit with Error Code]
M --> A
Q --> S[Continue with Next Item]
C --> S
S --> T[Processing Complete]
style A fill:#86bbd8
style T fill:#dbfe87
style K fill:#f6ae2d
style R fill:#f6ae2d
style E fill:#ffe880
API Utility Workflows
Supporting scripts that prepare the environment and fetch configuration data.
Project Information Workflow (api_get_project.py)
sequenceDiagram
participant Script as api_get_project.py
participant DSPAPI as DSP API
participant FileSystem as Local Storage
Script->>Script: Load PROJECT_SHORT_CODE from env
Script->>Script: Load API_HOST from env
Script->>DSPAPI: GET /admin/projects/shortcode/{shortcode}
alt Success
DSPAPI->>Script: Project data (JSON)
Script->>FileSystem: Save to ../data/project_data.json
Script->>Script: Log success message
else Failure
DSPAPI->>Script: Error response
Script->>Script: Log error details
end
Lists Retrieval Workflow (api_get_lists.py)
sequenceDiagram
participant Script as api_get_lists.py
participant DSPAPI as DSP API
participant FileSystem as Local Storage
Script->>Script: Set project_iri (hardcoded)
Script->>Script: URL encode project IRI
Script->>DSPAPI: GET /admin/lists/?projectIri={encoded_iri}
alt Success
DSPAPI->>Script: Lists summary (JSON)
Script->>FileSystem: Save to ../data/data_lists.json
Script->>Script: Log success message
else Failure
DSPAPI->>Script: Error response
Script->>Script: Log error details
end
Detailed Lists Workflow (api_get_lists_detailed.py)
flowchart TD
A[Start Script] --> B[Load data_lists.json]
B --> C{File Loaded?}
C -->|No| D[Exit with Error]
C -->|Yes| E[Extract List IDs]
E --> F[Initialize Empty Results Array]
F --> G[Process Next List ID]
G --> H[URL Encode List ID]
H --> I["GET /v2/lists/{encoded_id}"]
I --> J{Request Successful?}
J -->|Yes| K[Add to Results Array]
J -->|No| L[Log Error, Continue]
K --> M{More Lists?}
L --> M
M -->|Yes| G
M -->|No| N[Save All Results to data_lists_detail.json]
N --> O[Log Completion]
O --> P[End Script]
D --> P
style A fill:#dbfe87
style P fill:#dbfe87
style B fill:#ffe880
style I fill:#86bbd8
Workflow Integration
All workflows integrate through the main processing pipeline, working together to provide a robust, fault-tolerant data migration system that can handle various scenarios from initial bulk migration to ongoing incremental synchronization:
graph TB
subgraph "Initialization Phase"
A1[Configuration Loading]
A2[Authentication]
A3[Project & Lists Retrieval]
end
subgraph "Processing Phase"
B1[Data Extraction]
B2[Data Transformation]
B3[Synchronization Check]
B4[File Upload]
end
subgraph "Completion Phase"
C1[Result Logging]
C2[Cleanup]
C3[Report Generation]
end
A1 --> A2
A2 --> A3
A3 --> B1
B1 --> B2
B2 --> B3
B3 --> B4
B4 --> B1
B4 --> C1
C1 --> C2
C2 --> C3
style A1 fill:#86bbd8
style B1 fill:#ffe880
style C1 fill:#dbfe87