Skip to content

Correlation Algorithm Examples

This document provides practical, real-world examples of using the STAC-based correlation algorithms described in correlation_algorithms.md.

Example 1: Spain October 2024 Floods

Scenario

On October 27, 2024, severe flooding struck Spain, primarily affecting the Valencia region. Multiple data sources recorded this event: - GLIDE: FL-2024-000199-ESP - GDACS: Event ID 1102983 (multiple episodes) - EM-DAT: Event 2024-0796-ESP (potentially)

We want to correlate all events, hazards, and impacts related to this disaster.


Goal: Find all source events from different collections describing this flood.

Known Information: - Event occurred on 2024-10-27 - Country: Spain (ESP) - Hazard: Flood (MH0600, nat-hyd-flo-flo, FL)

CQL2-JSON Query:

{
  "collections": [
    "glide-events",
    "gdacs-events",
    "emdat-events",
    "reference-events"
  ],
  "filter-lang": "cql2-json",
  "filter": {
    "op": "and",
    "args": [
      {
        "op": "a_contains",
        "args": [
          {"property": "monty:country_codes"},
          "ESP"
        ]
      },
      {
        "op": "a_overlaps",
        "args": [
          {"property": "monty:hazard_codes"},
          ["MH0600", "nat-hyd-flo-flo", "FL"]
        ]
      },
      {
        "op": "t_intersects",
        "args": [
          {"property": "datetime"},
          {"interval": ["2024-10-27T00:00:00Z", "2024-10-28T23:59:59Z"]}
        ]
      },
      {
        "op": "in",
        "args": [
          "event",
          {"property": "roles"}
        ]
      }
    ]
  },
  "limit": 100
}

Python Example using pystac-client:

from pystac_client import Client
from datetime import datetime

# Connect to Montandon STAC API
client = Client.open("https://montandon-eoapi-stage.ifrc.org/stac")

# Define the filter
filter_dict = {
    "op": "and",
    "args": [
        {
            "op": "a_contains",
            "args": [
                {"property": "monty:country_codes"},
                "ESP"
            ]
        },
        {
            "op": "a_overlaps",
            "args": [
                {"property": "monty:hazard_codes"},
                ["MH0600", "nat-hyd-flo-flo", "FL"]
            ]
        },
        {
            "op": "t_intersects",
            "args": [
                {"property": "datetime"},
                {"interval": ["2024-10-27T00:00:00Z", "2024-10-28T23:59:59Z"]}
            ]
        },
        {
            "op": "in",
            "args": [
                "event",
                {"property": "roles"}
            ]
        }
    ]
}

# Search for events
search = client.search(
    collections=["glide-events", "gdacs-events", "emdat-events", "reference-events"],
    filter=filter_dict,
    filter_lang="cql2-json",
    limit=100
)

# Retrieve all matching items
events = list(search.items())

print(f"Found {len(events)} related events:")
for event in events:
    print(f"  - {event.id} from {event.collection_id}")
    print(f"    Correlation ID: {event.properties.get('monty:corr_id', 'N/A')}")
    print(f"    Date: {event.properties.get('datetime')}")

Expected Results: - glide-event-FL-2024-000199-ESP (GLIDE event) - gdacs-event-1102983-1 (GDACS episode 1) - gdacs-event-1102983-2 (GDACS episode 2) - Potentially EM-DAT and reference events

Result Analysis: All events should share similar correlation IDs (e.g., 20241027-ESP-FL-1-GCDB or 20241027-ESP-FL-2-GCDB depending on episode).


Step 2: Find All Hazards for the Event

Goal: Once we've identified the events, find all hazard assessments.

Using correlation ID (if known):

{
  "collections": ["glide-hazards", "gdacs-hazards", "usgs-hazards"],
  "filter-lang": "cql2-json",
  "filter": {
    "op": "and",
    "args": [
      {
        "op": "=",
        "args": [
          {"property": "monty:corr_id"},
          "20241027-ESP-FL-2-GCDB"
        ]
      },
      {
        "op": "in",
        "args": [
          "hazard",
          {"property": "roles"}
        ]
      }
    ]
  }
}

Dynamic correlation (without correlation ID):

{
  "collections": ["glide-hazards", "gdacs-hazards", "ifrcevent-hazards"],
  "filter-lang": "cql2-json",
  "filter": {
    "op": "and",
    "args": [
      {
        "op": "a_contains",
        "args": [
          {"property": "monty:country_codes"},
          "ESP"
        ]
      },
      {
        "op": "a_overlaps",
        "args": [
          {"property": "monty:hazard_codes"},
          ["MH0600", "MH0603", "nat-hyd-flo-flo", "nat-hyd-flo-fla", "FL", "FF"]
        ]
      },
      {
        "op": "t_intersects",
        "args": [
          {"property": "datetime"},
          {"interval": ["2024-10-27T00:00:00Z", "2024-10-31T23:59:59Z"]}
        ]
      },
      {
        "op": "in",
        "args": [
          "hazard",
          {"property": "roles"}
        ]
      }
    ]
  }
}

Python Example:

def find_hazards_for_event(client, country_code, hazard_codes, start_date, end_date):
    """
    Find all hazards matching the given criteria.

    Args:
        client: pystac_client.Client instance
        country_code: ISO 3166-1 alpha-3 country code
        hazard_codes: List of hazard codes to search for
        start_date: Start datetime as ISO string
        end_date: End datetime as ISO string

    Returns:
        List of pystac.Item objects
    """
    filter_dict = {
        "op": "and",
        "args": [
            {
                "op": "a_contains",
                "args": [
                    {"property": "monty:country_codes"},
                    country_code
                ]
            },
            {
                "op": "a_overlaps",
                "args": [
                    {"property": "monty:hazard_codes"},
                    hazard_codes
                ]
            },
            {
                "op": "t_intersects",
                "args": [
                    {"property": "datetime"},
                    {"interval": [start_date, end_date]}
                ]
            },
            {
                "op": "in",
                "args": [
                    "hazard",
                    {"property": "roles"}
                ]
            }
        ]
    }

    search = client.search(
        collections=["glide-hazards", "gdacs-hazards", "ifrcevent-hazards", "usgs-hazards"],
        filter=filter_dict,
        filter_lang="cql2-json",
        limit=100
    )

    return list(search.items())

# Usage
hazards = find_hazards_for_event(
    client,
    "ESP",
    ["MH0600", "MH0603", "nat-hyd-flo-flo", "FL"],
    "2024-10-27T00:00:00Z",
    "2024-10-31T23:59:59Z"
)

print(f"Found {len(hazards)} related hazards:")
for hazard in hazards:
    detail = hazard.properties.get("monty:hazard_detail", {})
    print(f"  - {hazard.id}")
    print(f"    Severity: {detail.get('severity_value')} {detail.get('severity_unit')}")

Expected Results: - Hazard assessments from GLIDE, GDACS, and potentially other sources - Each hazard item contains severity/magnitude information


Step 3: Find All Impacts

Goal: Find all impact estimates (casualties, damages, displacement) for the event.

Query (allowing for reporting delays):

{
  "collections": ["gdacs-impacts", "emdat-impacts", "idmc-gidd-impacts", "idmc-idu-impacts"],
  "filter-lang": "cql2-json",
  "filter": {
    "op": "and",
    "args": [
      {
        "op": "a_contains",
        "args": [
          {"property": "monty:country_codes"},
          "ESP"
        ]
      },
      {
        "op": "a_overlaps",
        "args": [
          {"property": "monty:hazard_codes"},
          ["MH0600", "FL", "nat-hyd-flo-flo"]
        ]
      },
      {
        "op": "t_after",
        "args": [
          {"property": "datetime"},
          "2024-10-27T00:00:00Z"
        ]
      },
      {
        "op": "t_before",
        "args": [
          {"property": "datetime"},
          "2024-11-15T00:00:00Z"
        ]
      },
      {
        "op": "in",
        "args": [
          "impact",
          {"property": "roles"}
        ]
      }
    ]
  }
}

Python Example with Impact Analysis:

def aggregate_impacts(impacts):
    """
    Aggregate impact data by type and category.

    Args:
        impacts: List of pystac.Item objects with impact data

    Returns:
        Dictionary with aggregated impact statistics
    """
    aggregated = {}

    for impact in impacts:
        detail = impact.properties.get("monty:impact_detail", {})
        category = detail.get("category", "unknown")
        impact_type = detail.get("type", "unknown")
        value = detail.get("value", 0)

        key = f"{category}_{impact_type}"
        if key not in aggregated:
            aggregated[key] = {
                "category": category,
                "type": impact_type,
                "total": 0,
                "sources": []
            }

        aggregated[key]["total"] += value
        aggregated[key]["sources"].append({
            "id": impact.id,
            "value": value,
            "source": impact.properties.get("source", "unknown"),
            "datetime": impact.properties.get("datetime")
        })

    return aggregated

# Find impacts
def find_impacts_for_event(client, country_code, hazard_codes, start_date, end_date):
    filter_dict = {
        "op": "and",
        "args": [
            {"op": "a_contains", "args": [{"property": "monty:country_codes"}, country_code]},
            {"op": "a_overlaps", "args": [{"property": "monty:hazard_codes"}, hazard_codes]},
            {"op": "t_after", "args": [{"property": "datetime"}, start_date]},
            {"op": "t_before", "args": [{"property": "datetime"}, end_date]},
            {"op": "in", "args": ["impact", {"property": "roles"}]}
        ]
    }

    search = client.search(
        collections=["gdacs-impacts", "emdat-impacts", "idmc-gidd-impacts", "idmc-idu-impacts"],
        filter=filter_dict,
        filter_lang="cql2-json",
        limit=500
    )

    return list(search.items())

impacts = find_impacts_for_event(
    client,
    "ESP",
    ["MH0600", "FL"],
    "2024-10-27T00:00:00Z",
    "2024-11-15T00:00:00Z"
)

# Aggregate
aggregated = aggregate_impacts(impacts)

print("Impact Summary:")
for key, data in aggregated.items():
    print(f"\n{data['category']} - {data['type']}:")
    print(f"  Total: {data['total']}")
    print(f"  Sources: {len(data['sources'])}")
    for source in data['sources']:
        print(f"    - {source['source']}: {source['value']} (reported {source['datetime']})")

Expected Results: - Death counts from multiple sources - Missing persons estimates - Displaced/evacuated populations - Economic losses - Infrastructure damage


Step 4: Correlation by Static ID

Goal: Use the correlation ID for deterministic linking.

Query:

def find_all_related_by_corr_id(client, corr_id):
    """
    Find all items (events, hazards, impacts) with a specific correlation ID.

    Args:
        client: pystac_client.Client instance
        corr_id: Correlation ID string (e.g., "20241027-ESP-FL-2-GCDB")

    Returns:
        Dictionary with items grouped by type
    """
    filter_dict = {
        "op": "=",
        "args": [
            {"property": "monty:corr_id"},
            corr_id
        ]
    }

    search = client.search(
        filter=filter_dict,
        filter_lang="cql2-json",
        limit=500
    )

    items = list(search.items())

    # Group by roles
    grouped = {
        "events": [],
        "hazards": [],
        "impacts": [],
        "responses": []
    }

    for item in items:
        roles = item.properties.get("roles", [])
        if "event" in roles:
            grouped["events"].append(item)
        if "hazard" in roles:
            grouped["hazards"].append(item)
        if "impact" in roles:
            grouped["impacts"].append(item)
        if "response" in roles:
            grouped["responses"].append(item)

    return grouped

# Usage
corr_id = "20241027-ESP-FL-2-GCDB"
related = find_all_related_by_corr_id(client, corr_id)

print(f"Items with correlation ID {corr_id}:")
print(f"  Events: {len(related['events'])}")
print(f"  Hazards: {len(related['hazards'])}")
print(f"  Impacts: {len(related['impacts'])}")
print(f"  Responses: {len(related['responses'])}")

Example 2: Multi-Hazard Event (Earthquake → Tsunami)

Scenario

A magnitude 7.5 earthquake strikes near Japan, triggering a tsunami that affects multiple countries.

Primary Hazard: Earthquake (GH0101)
Secondary Hazard: Tsunami (MH0705)
Countries: Japan (JPN), potentially others

Find Primary Event

# Find the earthquake event
earthquake_filter = {
    "op": "and",
    "args": [
        {
            "op": "a_contains",
            "args": [{"property": "monty:hazard_codes"}, "GH0101"]
        },
        {
            "op": "a_contains",
            "args": [{"property": "monty:country_codes"}, "JPN"]
        },
        {
            "op": "t_after",
            "args": [{"property": "datetime"}, "2024-01-01T00:00:00Z"]
        },
        {
            "op": "in",
            "args": ["event", {"property": "roles"}]
        }
    ]
}

Find Triggered Hazards

# Find tsunami hazards that occurred after the earthquake
tsunami_filter = {
    "op": "and",
    "args": [
        {
            "op": "a_contains",
            "args": [{"property": "monty:hazard_codes"}, "MH0705"]
        },
        {
            "op": "or",
            "args": [
                {"op": "a_contains", "args": [{"property": "monty:country_codes"}, "JPN"]},
                {"op": "a_contains", "args": [{"property": "monty:country_codes"}, "RUS"]},
                {"op": "a_contains", "args": [{"property": "monty:country_codes"}, "USA"]}
            ]
        },
        {
            "op": "t_after",
            "args": [{"property": "datetime"}, "2024-01-01T00:00:00Z"]
        },
        {
            "op": "t_before",
            "args": [{"property": "datetime"}, "2024-01-01T06:00:00Z"]  # Within 6 hours
        },
        {
            "op": "in",
            "args": ["hazard", {"property": "roles"}]
        }
    ]
}

Analyze Cascading Impacts

def find_cascading_impacts(client, primary_corr_id, secondary_hazard_codes, time_window_hours=48):
    """
    Find impacts from cascading hazards.
    """
    # First, find the primary event datetime
    primary_search = client.search(
        filter={
            "op": "and",
            "args": [
                {"op": "=", "args": [{"property": "monty:corr_id"}, primary_corr_id]},
                {"op": "in", "args": ["event", {"property": "roles"}]}
            ]
        },
        filter_lang="cql2-json",
        limit=1
    )

    primary_event = next(primary_search.items())
    primary_datetime = primary_event.properties["datetime"]

    # Calculate end time for search window
    from datetime import datetime, timedelta
    start = datetime.fromisoformat(primary_datetime.replace('Z', '+00:00'))
    end = start + timedelta(hours=time_window_hours)

    # Find impacts from secondary hazards
    impact_filter = {
        "op": "and",
        "args": [
            {
                "op": "a_overlaps",
                "args": [{"property": "monty:hazard_codes"}, secondary_hazard_codes]
            },
            {
                "op": "t_during",
                "args": [
                    {"property": "datetime"},
                    {"interval": [primary_datetime, end.isoformat()]}
                ]
            },
            {"op": "in", "args": ["impact", {"property": "roles"}]}
        ]
    }

    search = client.search(
        filter=impact_filter,
        filter_lang="cql2-json",
        limit=500
    )

    return list(search.items())

Example 3: Regional Multi-Country Event

Scenario

Flooding affects multiple countries in Southeast Asia during monsoon season.

Countries: Thailand (THA), Vietnam (VNM), Cambodia (KHM)
Hazard: Flooding (MH0600 series)
Time Period: July-September 2024

Find All Events in the Region

def find_regional_events(client, country_codes, hazard_codes, start_date, end_date):
    """
    Find events affecting multiple countries in a region.
    """
    filter_dict = {
        "op": "and",
        "args": [
            {
                "op": "a_overlaps",
                "args": [
                    {"property": "monty:country_codes"},
                    country_codes
                ]
            },
            {
                "op": "a_overlaps",
                "args": [
                    {"property": "monty:hazard_codes"},
                    hazard_codes
                ]
            },
            {
                "op": "t_during",
                "args": [
                    {"property": "datetime"},
                    {"interval": [start_date, end_date]}
                ]
            },
            {"op": "in", "args": ["event", {"property": "roles"}]}
        ]
    }

    search = client.search(
        filter=filter_dict,
        filter_lang="cql2-json",
        limit=500
    )

    return list(search.items())

# Usage
events = find_regional_events(
    client,
    ["THA", "VNM", "KHM"],
    ["MH0600", "MH0603", "MH0604", "FL"],
    "2024-07-01T00:00:00Z",
    "2024-09-30T23:59:59Z"
)

# Group by country
by_country = {}
for event in events:
    countries = event.properties.get("monty:country_codes", [])
    for country in countries:
        if country not in by_country:
            by_country[country] = []
        by_country[country].append(event)

for country, country_events in by_country.items():
    print(f"{country}: {len(country_events)} events")

Example 4: Complete Correlation Workflow

This comprehensive example demonstrates a complete correlation workflow from event discovery to impact analysis.

from pystac_client import Client
from datetime import datetime, timedelta
from typing import List, Dict, Any
import json

class MontyCorrelator:
    """
    Helper class for correlating disaster events, hazards, and impacts.
    """

    def __init__(self, stac_url: str = "https://montandon-eoapi-stage.ifrc.org/stac"):
        self.client = Client.open(stac_url)

    def find_events(
        self,
        country_codes: List[str],
        hazard_codes: List[str],
        start_date: str,
        end_date: str,
        collections: List[str] = None
    ) -> List[Any]:
        """Find events matching criteria."""
        if collections is None:
            collections = [
                "glide-events", "gdacs-events", "emdat-events",
                "reference-events", "ifrcevent-events"
            ]

        filter_dict = {
            "op": "and",
            "args": [
                {
                    "op": "a_overlaps",
                    "args": [{"property": "monty:country_codes"}, country_codes]
                },
                {
                    "op": "a_overlaps",
                    "args": [{"property": "monty:hazard_codes"}, hazard_codes]
                },
                {
                    "op": "t_intersects",
                    "args": [
                        {"property": "datetime"},
                        {"interval": [start_date, end_date]}
                    ]
                },
                {"op": "in", "args": ["event", {"property": "roles"}]}
            ]
        }

        search = self.client.search(
            collections=collections,
            filter=filter_dict,
            filter_lang="cql2-json",
            limit=100
        )

        return list(search.items())

    def find_hazards_for_correlation_id(self, corr_id: str) -> List[Any]:
        """Find all hazards for a specific correlation ID."""
        filter_dict = {
            "op": "and",
            "args": [
                {"op": "=", "args": [{"property": "monty:corr_id"}, corr_id]},
                {"op": "in", "args": ["hazard", {"property": "roles"}]}
            ]
        }

        search = self.client.search(
            filter=filter_dict,
            filter_lang="cql2-json",
            limit=100
        )

        return list(search.items())

    def find_impacts_for_correlation_id(
        self,
        corr_id: str,
        impact_categories: List[str] = None
    ) -> List[Any]:
        """Find impacts for a specific correlation ID."""
        filter_args = [
            {"op": "=", "args": [{"property": "monty:corr_id"}, corr_id]},
            {"op": "in", "args": ["impact", {"property": "roles"}]}
        ]

        filter_dict = {
            "op": "and",
            "args": filter_args
        }

        search = self.client.search(
            filter=filter_dict,
            filter_lang="cql2-json",
            limit=500
        )

        return list(search.items())

    def get_complete_event_data(self, corr_id: str) -> Dict[str, Any]:
        """
        Get complete data for an event including all related items.

        Returns:
            Dictionary with events, hazards, impacts, and responses
        """
        # Find all items with this correlation ID
        filter_dict = {
            "op": "=",
            "args": [{"property": "monty:corr_id"}, corr_id]
        }

        search = self.client.search(
            filter=filter_dict,
            filter_lang="cql2-json",
            limit=1000
        )

        items = list(search.items())

        # Group by type
        result = {
            "correlation_id": corr_id,
            "events": [],
            "hazards": [],
            "impacts": [],
            "responses": []
        }

        for item in items:
            roles = item.properties.get("roles", [])

            if "event" in roles:
                result["events"].append(self._extract_item_info(item))
            if "hazard" in roles:
                result["hazards"].append(self._extract_hazard_info(item))
            if "impact" in roles:
                result["impacts"].append(self._extract_impact_info(item))
            if "response" in roles:
                result["responses"].append(self._extract_item_info(item))

        return result

    def _extract_item_info(self, item: Any) -> Dict[str, Any]:
        """Extract basic information from an item."""
        return {
            "id": item.id,
            "collection": item.collection_id,
            "datetime": item.properties.get("datetime"),
            "countries": item.properties.get("monty:country_codes", []),
            "hazards": item.properties.get("monty:hazard_codes", []),
            "source": item.properties.get("source", "unknown")
        }

    def _extract_hazard_info(self, item: Any) -> Dict[str, Any]:
        """Extract hazard-specific information."""
        info = self._extract_item_info(item)
        detail = item.properties.get("monty:hazard_detail", {})
        info.update({
            "severity_value": detail.get("severity_value"),
            "severity_unit": detail.get("severity_unit"),
            "estimate_type": detail.get("estimate_type")
        })
        return info

    def _extract_impact_info(self, item: Any) -> Dict[str, Any]:
        """Extract impact-specific information."""
        info = self._extract_item_info(item)
        detail = item.properties.get("monty:impact_detail", {})
        info.update({
            "category": detail.get("category"),
            "type": detail.get("type"),
            "value": detail.get("value"),
            "unit": detail.get("unit")
        })
        return info

# Example usage
if __name__ == "__main__":
    correlator = MontyCorrelator()

    # Find Spain floods
    print("Finding Spain flood events...")
    events = correlator.find_events(
        country_codes=["ESP"],
        hazard_codes=["MH0600", "FL", "nat-hyd-flo-flo"],
        start_date="2024-10-27T00:00:00Z",
        end_date="2024-10-28T23:59:59Z"
    )

    print(f"Found {len(events)} events")

    # Get complete data for first event
    if events:
        corr_id = events[0].properties.get("monty:corr_id")
        if corr_id:
            print(f"\nGetting complete data for correlation ID: {corr_id}")
            complete_data = correlator.get_complete_event_data(corr_id)

            print(json.dumps(complete_data, indent=2))

Additional Resources