Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Disaster Data Analytics

Recipe 5 — EM-DAT People-Impact Mining

IFRC

What you will learn:

  1. Build CQL2-JSON filters targeting monty:impact_detail.category and monty:impact_detail.type.

  2. Distinguish natural disasters from technological incidents using hazard-code prefixes.

  3. Stream and aggregate impact records with fallback to direct HTTP when pystac-client encounters API quirks.

Step 1 — Import Required Libraries


import csv
import requests
from typing import Optional, List, Dict, Any
from pystac_client import Client
from getpass import getpass
from collections import defaultdict
import warnings
import time

warnings.filterwarnings('ignore')

Step 2 — Configuration

ParameterValue
Collectionemdat-impacts
Year2020
Impact categorypeople
Impact typesdeath
Outputemdat-impacts-2020-deaths.csv
# API Configuration
STAC_API_URL = "https://montandon-eoapi-stage.ifrc.org/stac"
COLLECTION = "emdat-impacts"
YEAR = 2020
OUTPUT_CSV = "emdat-impacts-2020-deaths.csv"

# Impact category for people
IMPACT_CATEGORY = "people"

# All people-related impact types from Monty taxonomy
PEOPLE_IMPACT_TYPES = [
    "death"
    
]

print(f"Configuration loaded:")
print(f"  Year: {YEAR}")
print(f"  Collection: {COLLECTION}")
print(f"  Impact types: {len(PEOPLE_IMPACT_TYPES)}")
print(f"  Output file: {OUTPUT_CSV}")
Configuration loaded:
  Year: 2020
  Collection: emdat-impacts
  Impact types: 1
  Output file: emdat-impacts-2020-deaths.csv

Step 3 — Authentication

def get_authenticated_client():
    """Connect to Montandon STAC API with Bearer Token authentication."""
    api_token = getpass("Enter your Montandon API Token: ")
    
    if not api_token or api_token.strip() == "":
        raise ValueError("API token is required to access the Montandon STAC API")
    
    auth_headers = {"Authorization": f"Bearer {api_token}"}
    
    try:
        client = Client.open(STAC_API_URL, headers=auth_headers)
        print(f"Connected to: {STAC_API_URL}")
        print(f"API Title: {client.title}")
        return client, auth_headers
    except Exception as e:
        print(f"Authentication failed: {e}")
        raise

# Connect to API
client, auth_headers = get_authenticated_client()
Connected to: https://montandon-eoapi-stage.ifrc.org/stac
API Title: stac-fastapi

Step 4 — CQL2-JSON Search Function

The search combines four CQL2 predicates: datetime year, impact category, impact type, and a minimum value threshold (> 0).

def search_with_queryables(
    client: Client,
    auth_headers: Dict[str, str],
    year: int,
    impact_type: str,
    collection: str = COLLECTION
) -> list:
    """
    Search for people impacts using queryables with fallback to HTTP request.
    
    CQL2 Filter combines:
    - Datetime (year)
    - Impact category (people)
    - Impact type (specific type like death, injured, etc.)
    - Impact value > 0
    """
    
    # Build CQL2 filter
    cql2_filter = {
        "op": "and",
        "args": [
            # Datetime for year
            {
                "op": "t_intersects",
                "args": [
                    {"property": "datetime"},
                    {"interval": [f"{year}-01-01T00:00:00Z", f"{year}-12-31T23:59:59Z"]}
                ]
            },
            # Impact category = people
            {
                "op": "=",
                "args": [
                    {"property": "monty:impact_detail.category"},
                    IMPACT_CATEGORY
                ]
            },
            # Impact type (variable)
            {
                "op": "=",
                "args": [
                    {"property": "monty:impact_detail.type"},
                    impact_type
                ]
            },
            # Impact value > 0
            {
                "op": ">",
                "args": [
                    {"property": "monty:impact_detail.value"},
                    0
                ]
            }
        ]
    }
    
    try:
        # Try pystac_client first
        search = client.search(
            collections=[collection],
            filter=cql2_filter,
            filter_lang="cql2-json",
            max_items=1000
        )
        return list(search.items())
        
    except Exception as e:
        # Fallback to direct HTTP POST request
        search_url = f"{STAC_API_URL}/search"
        search_payload = {
            "filter_lang": "cql2-json",
            "filter": cql2_filter,
            "collections": [collection],
            "limit": 1000
        }
        
        try:
            response = requests.post(search_url, json=search_payload, headers=auth_headers)
            
            if response.status_code == 200:
                search_results = response.json()
                items = []
                for feature in search_results.get('features', []):
                    item = type('Item', (), {
                        'id': feature.get('id'),
                        'collection_id': feature.get('collection'),
                        'properties': feature.get('properties', {}),
                        'geometry': feature.get('geometry'),
                        'bbox': feature.get('bbox'),
                        'assets': feature.get('assets', {})
                    })()
                    items.append(item)
                return items
            else:
                return []
        except:
            return []

print("Search function defined")
Search function defined

Step 5 — Natural-Disaster Filter

Montandon hazard codes follow the taxonomy nat-* (natural) vs tec-* (technological). The helper below returns True only when at least one natural code is present and no technological code appears.

def is_natural_disaster(hazard_codes):
    """
    Check if the disaster is a natural disaster.
    Returns True if at least one hazard code starts with 'nat-' or 'nat'
    Returns False if any hazard code starts with 'tec-' (technological)
    """
    if not hazard_codes:
        return False
    
    has_natural = False
    has_technological = False
    
    for code in hazard_codes:
        if isinstance(code, str):
            code_lower = code.lower()
            if code_lower.startswith('tec-') or code_lower.startswith('tec'):
                has_technological = True
            if code_lower.startswith('nat-') or code_lower.startswith('nat'):
                has_natural = True
    
    # Exclude if any technological code is present
    if has_technological:
        return False
    
    # Include if at least one natural code is present
    return has_natural

print("Natural disaster filter function defined")
Natural disaster filter function defined

Step 6 — Extract Impact Records

Walks every STAC item returned by the search, applies the natural-disaster filter, and flattens the nested monty:impact_detail array into tabular records ready for DataFrame consumption.

def extract_impact_records(items: list, impact_type: str) -> list:
    """
    Extract impact records from STAC items.
    Only includes natural disasters (hazard codes starting with 'nat-').
    Excludes technological disasters (hazard codes starting with 'tec-').
    Each item's impact_detail may contain multiple impact records.
    """
    records = []
    
    for item in items:
        props = item.properties
        
        # Base information
        item_id = item.id
        collection = item.collection_id
        datetime_str = props.get('datetime') or props.get('start_datetime', '')
        title = props.get('title', '')
        country_codes = props.get('monty:country_codes', [])
        hazard_codes = props.get('monty:hazard_codes', [])
        
        # Filter: Only process natural disasters
        if not is_natural_disaster(hazard_codes):
            continue
        
        # Get impact_detail
        impact_detail = props.get('monty:impact_detail')
        
        if impact_detail:
            # Handle single dict or list of dicts
            if isinstance(impact_detail, dict):
                impact_detail = [impact_detail]
            
            if isinstance(impact_detail, list):
                for impact in impact_detail:
                    if isinstance(impact, dict):
                        # Check if this matches our category and type
                        imp_cat = impact.get('category', '')
                        imp_type = impact.get('type', '')
                        
                        if imp_cat == IMPACT_CATEGORY and imp_type == impact_type:
                            record = {
                                'id': item_id,
                                'collection': collection,
                                'datetime': datetime_str,
                                'title': title,
                                'country_codes': '; '.join(country_codes) if country_codes else '',
                                'hazard_codes': '; '.join(hazard_codes) if hazard_codes else '',
                                'impact_category': imp_cat,
                                'impact_type': imp_type,
                                'impact_value': impact.get('value', ''),
                                'impact_unit': impact.get('unit', ''),
                                'standardized_value': impact.get('standardized_value', ''),
                                'estimate_type': impact.get('estimate_type', ''),
                                'description': impact.get('description', '')
                            }
                            records.append(record)
    
    return records

print("Extract function defined")
Extract function defined

Step 7 — Run the Analysis

Iterates over every people-impact type (deaths, injured, affected, …), performs a CQL2 search for each, filters to natural disasters, and accumulates results into a single CSV output.

print("=" * 70)
print(f"EM-DAT {YEAR} PEOPLE IMPACT ANALYSIS")
print("NATURAL DISASTERS ONLY")
print("=" * 70)
print(f"\nConfiguration:")
print(f"  Collection: {COLLECTION}")
print(f"  Year: {YEAR}")
print(f"  Impact category: {IMPACT_CATEGORY}")
print(f"  Impact types: {len(PEOPLE_IMPACT_TYPES)} types")
print(f"    {', '.join(PEOPLE_IMPACT_TYPES)}")
print(f"  Filter: Natural disasters only (nat-*)")

print(f"\nOutput: {OUTPUT_CSV}")
print("=" * 70)

# Initialize CSV file with headers
csv_headers = [
    "id", "collection", "datetime", "title", "country_codes", "hazard_codes",
    "impact_category", "impact_type", "impact_value", "impact_unit",
    "standardized_value", "estimate_type", "description"
]

with open(OUTPUT_CSV, 'w', newline='', encoding='utf-8') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=csv_headers)
    writer.writeheader()
    
    # Flush configuration
    FLUSH_INTERVAL = 50  # Flush every 50 records
    records_since_last_flush = 0
    
    total_records = 0
    total_searches = 0
    successful_types = []
    impact_summary = defaultdict(int)
    total_items_retrieved = 0
    natural_disaster_records = 0
    technological_filtered = 0
    
    print(f"\nSearching {len(PEOPLE_IMPACT_TYPES)} impact types (natural disasters only)...\n")
    
    start_time = time.time()
    
    # Search each impact type separately
    for impact_type in PEOPLE_IMPACT_TYPES:
        total_searches += 1
        
        # Search with queryables for this specific impact type
        items = search_with_queryables(
            client=client,
            auth_headers=auth_headers,
            year=YEAR,
            impact_type=impact_type,
            collection=COLLECTION
        )
        
        # Extract and write records (only natural disasters)
        if items:
            total_items_retrieved += len(items)
            records = extract_impact_records(items, impact_type)
            
            if records:
                for record in records:
                    writer.writerow(record)
                    records_since_last_flush += 1
                    
                    # Flush every FLUSH_INTERVAL records
                    if records_since_last_flush >= FLUSH_INTERVAL:
                        csvfile.flush()
                        records_since_last_flush = 0
                    
                    # Track statistics
                    value = record.get('impact_value', 0)
                    if value and isinstance(value, (int, float)) and value > 0:
                        impact_summary[impact_type] += int(value)
                
                natural_disaster_records += len(records)
                total_records += len(records)
                successful_types.append(impact_type)
                print(f"{impact_type}: {len(records)} records")
            else:
                technological_filtered += len(items)
        
        # Small delay to avoid overwhelming API
        time.sleep(0.1)
    
    # Final flush to ensure all data is written
    csvfile.flush()
    
    elapsed_time = time.time() - start_time

print("\nSearch completed!")
print(f"Total records written to CSV: {total_records}")
======================================================================
EM-DAT 2020 PEOPLE IMPACT ANALYSIS
NATURAL DISASTERS ONLY
======================================================================

Configuration:
  Collection: emdat-impacts
  Year: 2020
  Impact category: people
  Impact types: 1 types
    death
  Filter: Natural disasters only (nat-*)

Output: emdat-impacts-2020-deaths.csv
======================================================================

Searching 1 impact types (natural disasters only)...

death: 286 records

Search completed!
Total records written to CSV: 286
death: 286 records

Search completed!
Total records written to CSV: 286

Step 8 — Results Summary

Displays a ranked table of natural-disaster people-impact totals with percentages, providing a quick overview of relative impact magnitude.

# Display results
print(f"\n{'=' * 70}")
print("SUMMARY - PEOPLE IMPACTS (NATURAL DISASTERS ONLY)")
print("=" * 70)

print(f"\nSearch completed in {elapsed_time:.2f} seconds")
print(f"Total records: {total_records}")
print(f"Natural disasters: {natural_disaster_records}")

if impact_summary:
    print(f"\n{'=' * 70}")
    print(f"PEOPLE IMPACT TOTALS FOR {YEAR}")
    print("=" * 70)
    
    sorted_impacts = sorted(impact_summary.items(), key=lambda x: x[1], reverse=True)
    grand_total = sum(impact_summary.values())
    
    for impact_type, count in sorted_impacts:
        percentage = (count / grand_total * 100) if grand_total > 0 else 0
        print(f"{impact_type:20s}: {count:>15,} ({percentage:5.1f}%)")
    
    print("=" * 70)
    print(f"{'GRAND TOTAL':20s}: {grand_total:>15,}")
    print("=" * 70)

print(f"\nData saved to: {OUTPUT_CSV}")

======================================================================
SUMMARY - PEOPLE IMPACTS (NATURAL DISASTERS ONLY)
======================================================================

Search completed in 151.16 seconds
Total records: 286
Natural disasters: 286

======================================================================
PEOPLE IMPACT TOTALS FOR 2020
======================================================================
death               :          15,677 (100.0%)
======================================================================
GRAND TOTAL         :          15,677
======================================================================

Data saved to: emdat-impacts-2020-deaths.csv

Step 9 — Load & Preview Results

Reads the exported CSV back into a pandas DataFrame for interactive exploration and downstream analysis.


import pandas as pd

# Load the results
df = pd.read_csv(OUTPUT_CSV)

print(f"Total records in CSV: {len(df)}")
print(f"\nFirst 10 records:")
df.head(10)
Total records in CSV: 286

First 10 records:
Loading...
References
  1. Guha-Sapir, D. (2024). EM-DAT: The Emergency Events Database. Centre for Research on the Epidemiology of Disasters (CRED). https://www.emdat.be
  2. IFRC. (2024). Monty STAC Extension Specification. https://ifrcgo.org/monty-stac-extension/