Sync Pipeline
Trigger
Sync is initiated by an HTTP call:
POST /api/internal/sync
X-API-KEY: <valid-key>
This is intended to be called by a Kubernetes CronJob (default: nightly at 02:00), but can also be called manually for testing or recovery.
Pipeline Steps
1. Fetch upstream data
UpstreamApiClient makes three parallel HTTP requests to the upstream FHIR APIs:
| Endpoint | Returns |
|---|---|
HreseptGuidelines/PlanDefinition |
Treatment group definitions |
HreseptGuidelines/ActivityDefinition |
Treatment alternatives |
HreseptReimbursement/RegulatedAuthorization |
Reimbursement authorizations |
All three requests include the configured X-API-KEY header and run concurrently via Task.WhenAll.
If any upstream request fails, the entire sync aborts with an exception.
2. Increment version
VersionService.IncrementVersionAsync() atomically increments the global version counter in the SyncMetadata MongoDB collection using FindOneAndUpdate with $inc, and sets LastSyncAt to the current UTC time. The same version number is used for both raw and converted writes in this sync run.
3. Upsert raw FHIR data
The raw FHIR resources are persisted to MongoDB before conversion, so the system retains an authoritative record of what upstream returned. This enables re-conversion if mapping logic changes and provides full audit traceability.
| Source | Collection | BusinessKey |
|---|---|---|
PlanDefinition |
PlanDefinitions |
Identifier[0].Value |
ActivityDefinition |
ActivityDefinitions |
Identifier[0].Value |
RegulatedAuthorization |
RegulatedAuthorizations |
Identifier[0].Value |
Each resource is wrapped in VersionedDocument<T> and goes through the same upsert + soft-delete logic as the output collections (see step 5). Resources without an identifier are silently skipped.
4. Convert to output models
ConversionService transforms the FHIR data:
PlanDefinition + ActivityDefinition → TreatmentGroup: Each PlanDefinition becomes one TreatmentGroup. ActivityDefinitions are resolved via a lookup dictionary keyed by identifier, linked through
PlanDefinitionAction.DefinitionCanonical.RegulatedAuthorization → ReimbursementGroup: Each RegulatedAuthorization maps directly to one ReimbursementGroup with
legalBasis(from thereimbursementRegulationextension) andindications(fromBasis.Coding).
See conversion-mapping.md for field-level details.
5. Upsert output and soft-delete
For each output collection (TreatmentGroups, ReimbursementGroups) — and for the raw collections in step 3:
Compute content hash for each incoming item (SHA256 of its BSON representation).
Upsert with change detection:
- If no existing record: insert with new version.
- If existing record has matching
ContentHashandIsDeleted = false: skip (counts asunchanged, version is preserved). - Otherwise (content changed, or resurrecting from soft-deleted state): update with new version, hash, and
IsDeleted = false.
Soft-delete any existing records whose
BusinessKeyis not in the current upstream data set. These getIsDeleted = trueand theirVersionupdated to the current version, so diff consumers see the deletion.
For each item in upstream:
hash = ComputeContentHash(item)
existing = find by BusinessKey
if existing is null:
INSERT (Version, ContentHash=hash) → added
else if existing.ContentHash == hash AND not existing.IsDeleted:
skip → unchanged
else:
UPDATE (Version, ContentHash=hash, IsDeleted=false) → updated
For all records NOT in upstream AND IsDeleted=false:
→ UPDATE SET IsDeleted=true, Version, UpdatedAt → deleted
The change-detection ensures the version-based diff API is meaningful: clients calling ?sinceVersion=N only receive records that genuinely changed, not every record after every sync.
6. Persist sync history
A SyncHistoryEntry is inserted into the SyncHistory collection with full per-collection statistics, including the business keys of every added, updated, and deleted record. This gives durable audit trail of every sync run.
7. Return result
The sync endpoint returns a SyncResult:
{
"version": 5,
"added": 12,
"updated": 3,
"unchanged": 2676,
"deleted": 1,
"durationMs": 1847
}
Error Handling
- If an upstream API returns a non-success status code,
HttpRequestExceptionis thrown and the sync aborts. No version increment or data changes occur. - If MongoDB operations fail mid-sync, the version counter has been incremented and partial writes may have occurred. The next sync is idempotent (change detection means unchanged records won't be touched, changed ones will be reconciled).
- Items without an identifier (no
BusinessKey) are silently skipped, both for raw and converted collections.
MongoDB Collections Involved
| Collection | Purpose |
|---|---|
PlanDefinitions |
VersionedDocument<PlanDefinition> — raw FHIR, indexed on BusinessKey (unique) and Version |
ActivityDefinitions |
VersionedDocument<ActivityDefinition> — raw FHIR |
RegulatedAuthorizations |
VersionedDocument<RegulatedAuthorization> — raw FHIR |
TreatmentGroups |
VersionedDocument<TreatmentGroup> — converted output |
ReimbursementGroups |
VersionedDocument<ReimbursementGroup> — converted output |
SyncMetadata |
Single document with global version counter (CurrentVersion, LastSyncAt) |
SyncHistory |
One document per sync run with per-collection stats and business keys; indexed on Version (unique) and SyncedAt (descending) |
All VersionedDocument<T> collections share the same shape: Id, BusinessKey, Data, Version, IsDeleted, UpdatedAt, ContentHash.
Kubernetes CronJob
apiVersion: batch/v1
kind: CronJob
metadata:
name: nompd-sync
spec:
schedule: "0 2 * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: sync
image: curlimages/curl
command:
- curl
- -X POST
- -H "X-API-KEY: $(SYNC_API_KEY)"
- -f
- https://nompd-api/api/internal/sync
restartPolicy: OnFailure
envFrom:
- secretRef:
name: nompd-sync-secrets