What you will learn:
Build CQL2-JSON filters targeting
monty:impact_detail.categoryandmonty:impact_detail.type.Distinguish natural disasters from technological incidents using hazard-code prefixes.
Stream and aggregate impact records with fallback to direct HTTP when
pystac-clientencounters API quirks.
Step 1 — Import Required Libraries¶
import csv
import requests
from typing import Optional, List, Dict, Any
from pystac_client import Client
from getpass import getpass
from collections import defaultdict
import warnings
import time
warnings.filterwarnings('ignore')Step 2 — Configuration¶
| Parameter | Value |
|---|---|
| Collection | emdat-impacts |
| Year | 2020 |
| Impact category | people |
| Impact types | death |
| Output | emdat-impacts-2020-deaths.csv |
# API Configuration
STAC_API_URL = "https://montandon-eoapi-stage.ifrc.org/stac"
COLLECTION = "emdat-impacts"
YEAR = 2020
OUTPUT_CSV = "emdat-impacts-2020-deaths.csv"
# Impact category for people
IMPACT_CATEGORY = "people"
# All people-related impact types from Monty taxonomy
PEOPLE_IMPACT_TYPES = [
"death"
]
print(f"Configuration loaded:")
print(f" Year: {YEAR}")
print(f" Collection: {COLLECTION}")
print(f" Impact types: {len(PEOPLE_IMPACT_TYPES)}")
print(f" Output file: {OUTPUT_CSV}")Configuration loaded:
Year: 2020
Collection: emdat-impacts
Impact types: 1
Output file: emdat-impacts-2020-deaths.csv
Step 3 — Authentication¶
def get_authenticated_client():
"""Connect to Montandon STAC API with Bearer Token authentication."""
api_token = getpass("Enter your Montandon API Token: ")
if not api_token or api_token.strip() == "":
raise ValueError("API token is required to access the Montandon STAC API")
auth_headers = {"Authorization": f"Bearer {api_token}"}
try:
client = Client.open(STAC_API_URL, headers=auth_headers)
print(f"Connected to: {STAC_API_URL}")
print(f"API Title: {client.title}")
return client, auth_headers
except Exception as e:
print(f"Authentication failed: {e}")
raise
# Connect to API
client, auth_headers = get_authenticated_client()Connected to: https://montandon-eoapi-stage.ifrc.org/stac
API Title: stac-fastapi
Step 4 — CQL2-JSON Search Function¶
The search combines four CQL2 predicates: datetime year, impact category,
impact type, and a minimum value threshold (> 0).
def search_with_queryables(
client: Client,
auth_headers: Dict[str, str],
year: int,
impact_type: str,
collection: str = COLLECTION
) -> list:
"""
Search for people impacts using queryables with fallback to HTTP request.
CQL2 Filter combines:
- Datetime (year)
- Impact category (people)
- Impact type (specific type like death, injured, etc.)
- Impact value > 0
"""
# Build CQL2 filter
cql2_filter = {
"op": "and",
"args": [
# Datetime for year
{
"op": "t_intersects",
"args": [
{"property": "datetime"},
{"interval": [f"{year}-01-01T00:00:00Z", f"{year}-12-31T23:59:59Z"]}
]
},
# Impact category = people
{
"op": "=",
"args": [
{"property": "monty:impact_detail.category"},
IMPACT_CATEGORY
]
},
# Impact type (variable)
{
"op": "=",
"args": [
{"property": "monty:impact_detail.type"},
impact_type
]
},
# Impact value > 0
{
"op": ">",
"args": [
{"property": "monty:impact_detail.value"},
0
]
}
]
}
try:
# Try pystac_client first
search = client.search(
collections=[collection],
filter=cql2_filter,
filter_lang="cql2-json",
max_items=1000
)
return list(search.items())
except Exception as e:
# Fallback to direct HTTP POST request
search_url = f"{STAC_API_URL}/search"
search_payload = {
"filter_lang": "cql2-json",
"filter": cql2_filter,
"collections": [collection],
"limit": 1000
}
try:
response = requests.post(search_url, json=search_payload, headers=auth_headers)
if response.status_code == 200:
search_results = response.json()
items = []
for feature in search_results.get('features', []):
item = type('Item', (), {
'id': feature.get('id'),
'collection_id': feature.get('collection'),
'properties': feature.get('properties', {}),
'geometry': feature.get('geometry'),
'bbox': feature.get('bbox'),
'assets': feature.get('assets', {})
})()
items.append(item)
return items
else:
return []
except:
return []
print("Search function defined")Search function defined
Step 5 — Natural-Disaster Filter¶
Montandon hazard codes follow the taxonomy nat-* (natural) vs tec-*
(technological). The helper below returns True only when at
least one natural code is present and no technological code appears.
def is_natural_disaster(hazard_codes):
"""
Check if the disaster is a natural disaster.
Returns True if at least one hazard code starts with 'nat-' or 'nat'
Returns False if any hazard code starts with 'tec-' (technological)
"""
if not hazard_codes:
return False
has_natural = False
has_technological = False
for code in hazard_codes:
if isinstance(code, str):
code_lower = code.lower()
if code_lower.startswith('tec-') or code_lower.startswith('tec'):
has_technological = True
if code_lower.startswith('nat-') or code_lower.startswith('nat'):
has_natural = True
# Exclude if any technological code is present
if has_technological:
return False
# Include if at least one natural code is present
return has_natural
print("Natural disaster filter function defined")Natural disaster filter function defined
Step 6 — Extract Impact Records¶
Walks every STAC item returned by the search, applies the natural-disaster
filter, and flattens the nested monty:impact_detail array into
tabular records ready for DataFrame consumption.
def extract_impact_records(items: list, impact_type: str) -> list:
"""
Extract impact records from STAC items.
Only includes natural disasters (hazard codes starting with 'nat-').
Excludes technological disasters (hazard codes starting with 'tec-').
Each item's impact_detail may contain multiple impact records.
"""
records = []
for item in items:
props = item.properties
# Base information
item_id = item.id
collection = item.collection_id
datetime_str = props.get('datetime') or props.get('start_datetime', '')
title = props.get('title', '')
country_codes = props.get('monty:country_codes', [])
hazard_codes = props.get('monty:hazard_codes', [])
# Filter: Only process natural disasters
if not is_natural_disaster(hazard_codes):
continue
# Get impact_detail
impact_detail = props.get('monty:impact_detail')
if impact_detail:
# Handle single dict or list of dicts
if isinstance(impact_detail, dict):
impact_detail = [impact_detail]
if isinstance(impact_detail, list):
for impact in impact_detail:
if isinstance(impact, dict):
# Check if this matches our category and type
imp_cat = impact.get('category', '')
imp_type = impact.get('type', '')
if imp_cat == IMPACT_CATEGORY and imp_type == impact_type:
record = {
'id': item_id,
'collection': collection,
'datetime': datetime_str,
'title': title,
'country_codes': '; '.join(country_codes) if country_codes else '',
'hazard_codes': '; '.join(hazard_codes) if hazard_codes else '',
'impact_category': imp_cat,
'impact_type': imp_type,
'impact_value': impact.get('value', ''),
'impact_unit': impact.get('unit', ''),
'standardized_value': impact.get('standardized_value', ''),
'estimate_type': impact.get('estimate_type', ''),
'description': impact.get('description', '')
}
records.append(record)
return records
print("Extract function defined")Extract function defined
Step 7 — Run the Analysis¶
Iterates over every people-impact type (deaths, injured, affected, …), performs a CQL2 search for each, filters to natural disasters, and accumulates results into a single CSV output.
print("=" * 70)
print(f"EM-DAT {YEAR} PEOPLE IMPACT ANALYSIS")
print("NATURAL DISASTERS ONLY")
print("=" * 70)
print(f"\nConfiguration:")
print(f" Collection: {COLLECTION}")
print(f" Year: {YEAR}")
print(f" Impact category: {IMPACT_CATEGORY}")
print(f" Impact types: {len(PEOPLE_IMPACT_TYPES)} types")
print(f" {', '.join(PEOPLE_IMPACT_TYPES)}")
print(f" Filter: Natural disasters only (nat-*)")
print(f"\nOutput: {OUTPUT_CSV}")
print("=" * 70)
# Initialize CSV file with headers
csv_headers = [
"id", "collection", "datetime", "title", "country_codes", "hazard_codes",
"impact_category", "impact_type", "impact_value", "impact_unit",
"standardized_value", "estimate_type", "description"
]
with open(OUTPUT_CSV, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_headers)
writer.writeheader()
# Flush configuration
FLUSH_INTERVAL = 50 # Flush every 50 records
records_since_last_flush = 0
total_records = 0
total_searches = 0
successful_types = []
impact_summary = defaultdict(int)
total_items_retrieved = 0
natural_disaster_records = 0
technological_filtered = 0
print(f"\nSearching {len(PEOPLE_IMPACT_TYPES)} impact types (natural disasters only)...\n")
start_time = time.time()
# Search each impact type separately
for impact_type in PEOPLE_IMPACT_TYPES:
total_searches += 1
# Search with queryables for this specific impact type
items = search_with_queryables(
client=client,
auth_headers=auth_headers,
year=YEAR,
impact_type=impact_type,
collection=COLLECTION
)
# Extract and write records (only natural disasters)
if items:
total_items_retrieved += len(items)
records = extract_impact_records(items, impact_type)
if records:
for record in records:
writer.writerow(record)
records_since_last_flush += 1
# Flush every FLUSH_INTERVAL records
if records_since_last_flush >= FLUSH_INTERVAL:
csvfile.flush()
records_since_last_flush = 0
# Track statistics
value = record.get('impact_value', 0)
if value and isinstance(value, (int, float)) and value > 0:
impact_summary[impact_type] += int(value)
natural_disaster_records += len(records)
total_records += len(records)
successful_types.append(impact_type)
print(f"{impact_type}: {len(records)} records")
else:
technological_filtered += len(items)
# Small delay to avoid overwhelming API
time.sleep(0.1)
# Final flush to ensure all data is written
csvfile.flush()
elapsed_time = time.time() - start_time
print("\nSearch completed!")
print(f"Total records written to CSV: {total_records}")======================================================================
EM-DAT 2020 PEOPLE IMPACT ANALYSIS
NATURAL DISASTERS ONLY
======================================================================
Configuration:
Collection: emdat-impacts
Year: 2020
Impact category: people
Impact types: 1 types
death
Filter: Natural disasters only (nat-*)
Output: emdat-impacts-2020-deaths.csv
======================================================================
Searching 1 impact types (natural disasters only)...
death: 286 records
Search completed!
Total records written to CSV: 286
death: 286 records
Search completed!
Total records written to CSV: 286
Step 8 — Results Summary¶
Displays a ranked table of natural-disaster people-impact totals with percentages, providing a quick overview of relative impact magnitude.
# Display results
print(f"\n{'=' * 70}")
print("SUMMARY - PEOPLE IMPACTS (NATURAL DISASTERS ONLY)")
print("=" * 70)
print(f"\nSearch completed in {elapsed_time:.2f} seconds")
print(f"Total records: {total_records}")
print(f"Natural disasters: {natural_disaster_records}")
if impact_summary:
print(f"\n{'=' * 70}")
print(f"PEOPLE IMPACT TOTALS FOR {YEAR}")
print("=" * 70)
sorted_impacts = sorted(impact_summary.items(), key=lambda x: x[1], reverse=True)
grand_total = sum(impact_summary.values())
for impact_type, count in sorted_impacts:
percentage = (count / grand_total * 100) if grand_total > 0 else 0
print(f"{impact_type:20s}: {count:>15,} ({percentage:5.1f}%)")
print("=" * 70)
print(f"{'GRAND TOTAL':20s}: {grand_total:>15,}")
print("=" * 70)
print(f"\nData saved to: {OUTPUT_CSV}")
======================================================================
SUMMARY - PEOPLE IMPACTS (NATURAL DISASTERS ONLY)
======================================================================
Search completed in 151.16 seconds
Total records: 286
Natural disasters: 286
======================================================================
PEOPLE IMPACT TOTALS FOR 2020
======================================================================
death : 15,677 (100.0%)
======================================================================
GRAND TOTAL : 15,677
======================================================================
Data saved to: emdat-impacts-2020-deaths.csv
Step 9 — Load & Preview Results¶
Reads the exported CSV back into a pandas DataFrame for interactive exploration and downstream analysis.
import pandas as pd
# Load the results
df = pd.read_csv(OUTPUT_CSV)
print(f"Total records in CSV: {len(df)}")
print(f"\nFirst 10 records:")
df.head(10)Total records in CSV: 286
First 10 records:
- Guha-Sapir, D. (2024). EM-DAT: The Emergency Events Database. Centre for Research on the Epidemiology of Disasters (CRED). https://www.emdat.be
- IFRC. (2024). Monty STAC Extension Specification. https://ifrcgo.org/monty-stac-extension/