What you will learn:
Define target countries using ISO 3166-1 alpha-3 codes and filter with
a_contains.Search multiple impact collections with CQL2-JSON compound filters.
Build interactive Plotly bar charts and line graphs for humanitarian metrics.
Distinguish cold-wave hazard codes across GLIDE, EM-DAT, and UNDRR-ISC 2025.
Environment Setup¶
All dependencies are pre-installed. Click Launch Binder at the top of the page — no setup needed.
Run this in the first code cell before anything else:
!pip install -q pystac-client pandas matplotlib seaborn plotly
import os; from getpass import getpass
if 'MONTANDON_API_TOKEN' not in os.environ:
os.environ['MONTANDON_API_TOKEN'] = getpass('API token: ')pip install -e . # from repo root
export MONTANDON_API_TOKEN='your_token_here'
jupyter labSee Getting Started for token instructions.
# Install required packages (uncomment if needed)
# !pip install pystac-client pandas matplotlib seaborn plotly -q# Import libraries
import os
from getpass import getpass
import pandas as pd
import numpy as np
from pystac_client import Client
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from typing import List, Dict, Any, Optional, Tuple
import warnings
from concurrent.futures import ThreadPoolExecutor, as_completed
import gc
warnings.filterwarnings('ignore')
# Plotting style
sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (14, 7)Step 2 — Target Countries & Hazard Codes¶
17 countries spanning Nordic, Baltic, Eastern European, Balkan, and Central Asian regions — each identified by its ISO 3166-1 alpha-3 code.
| System | Codes | Description |
|---|---|---|
| GLIDE | CW, OT | Cold Wave, Other (blizzard/winter) |
| EM-DAT | nat-met-ext-col, nat-met-sto-bli, nat-met-ext-sev | Cold wave, Blizzard, Severe winter |
| UNDRR-ISC 2025 | MH0502, MH0503, MH0403, MH0406 | Cold wave, Frost, Blizzard, Snow storm |
# Define target countries with ISO3 codes
TARGET_COUNTRIES = {
# Nordic Countries
"NOR": "Norway",
"SWE": "Sweden",
"FIN": "Finland",
# Baltic States
"LTU": "Lithuania",
# Eastern Europe
"RUS": "Russia",
"POL": "Poland",
"UKR": "Ukraine",
"ROU": "Romania",
"HUN": "Hungary",
"MDA": "Moldova",
# Balkans
"SRB": "Serbia",
"BIH": "Bosnia and Herzegovina",
"ALB": "Albania",
# Central Asia
"KAZ": "Kazakhstan",
"TJK": "Tajikistan",
"TKM": "Turkmenistan"
}
# List of ISO3 codes for filtering
COUNTRY_CODES = list(TARGET_COUNTRIES.keys())
print(f"Target Countries: {len(COUNTRY_CODES)}")Target Countries: 16
# Define hazard codes for snow storms and cold waves
HAZARD_CODES = [
# GLIDE codes
"CW", # Cold Wave (GLIDE)
"OT", # Other - includes blizzard, severe winter (GLIDE)
# EM-DAT codes
"nat-met-ext-col", # Cold wave (EM-DAT)
"nat-met-sto-bli", # Blizzard/Winter storm (EM-DAT)
"nat-met-ext-sev", # Severe winter conditions (EM-DAT)
# UNDRR-ISC 2025 codes
"MH0502", # Cold Wave
"MH0403", # Blizzard
"MH0405", # Snow
"MH0406", # Snow Storm
"MH0503", # Dzud (severe winter - Mongolia/Central Asia)
"MH0504", # Freeze
"MH0505", # Frost
]
# Define impact collections to search
IMPACT_COLLECTIONS = [
"desinventar-impacts",
"emdat-impacts",
"ifrcevent-impacts"
]
# Date range: Focus on last 5 years (2020-2025)
START_DATE = "2000-01-01"
END_DATE = "2020-12-31"
DATETIME_RANGE = f"{START_DATE}/{END_DATE}"
print(f"Hazard Codes: {len(HAZARD_CODES)}")
print(f"Collections: {IMPACT_COLLECTIONS}")
print(f"Date Range: {START_DATE} to {END_DATE}")Hazard Codes: 12
Collections: ['desinventar-impacts', 'emdat-impacts', 'ifrcevent-impacts']
Date Range: 2000-01-01 to 2020-12-31
Step 3 — Connect & Fetch Impact Data¶
# ============================================================================
# AUTHENTICATION & CONNECTION TO MONTANDON STAC API
# ============================================================================
# Montandon STAC API URL (CORRECT URL with /stac suffix)
STAC_API_URL = "https://montandon-eoapi-stage.ifrc.org/stac"
# First try to get token from environment variable
api_token = os.getenv('MONTANDON_API_TOKEN')
# If not set, prompt user to enter token
if api_token is None:
print("=" * 70)
print("AUTHENTICATION REQUIRED")
print("=" * 70)
print("\nThe Montandon STAC API requires a Bearer Token for authentication.")
print("\nHow to get your token:")
print(" 1. Visit: https://goadmin-stage.ifrc.org/")
print(" 2. Log in with your IFRC credentials")
print(" 3. Generate an API token from your account settings")
print("\nAlternatively, set the MONTANDON_API_TOKEN environment variable:")
print(" PowerShell: $env:MONTANDON_API_TOKEN = 'your_token_here'")
print(" Bash: export MONTANDON_API_TOKEN='your_token_here'")
print("\n" + "=" * 70)
api_token = getpass("Enter your Montandon API Token: ")
# Create authentication headers for pystac_client
AUTH_HEADERS = {
"Authorization": f"Bearer {api_token}"
}
# Connect to the STAC API using pystac_client
try:
catalog = Client.open(STAC_API_URL, headers=AUTH_HEADERS)
print(f"Connected to: {catalog.title}")
# List available impact collections
collections = list(catalog.get_collections())
impact_collections_available = [c.id for c in collections if '-impacts' in c.id]
print(f"Available Impact Collections: {len(impact_collections_available)}")
except Exception as e:
print(f"Connection failed: {e}")
catalog = None======================================================================
AUTHENTICATION REQUIRED
======================================================================
The Montandon STAC API requires a Bearer Token for authentication.
How to get your token:
1. Visit: https://goadmin-stage.ifrc.org/
2. Log in with your IFRC credentials
3. Generate an API token from your account settings
Alternatively, set the MONTANDON_API_TOKEN environment variable:
PowerShell: $env:MONTANDON_API_TOKEN = 'your_token_here'
Bash: export MONTANDON_API_TOKEN='your_token_here'
======================================================================
Connected to: stac-fastapi
Connected to: stac-fastapi
Available Impact Collections: 9
Available Impact Collections: 9
# MEMORY-OPTIMIZED: Search and save to CSV incrementally
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import csv
import gc
def search_and_save_incrementally(
collections: List[str],
country_codes: List[str],
hazard_codes: List[str],
start_year: int,
end_year: int,
output_file: str,
batch_size: int = 20,
max_items_per_year: int = 2000,
max_workers: int = 3
) -> int:
"""
MEMORY-OPTIMIZED: Search and write to CSV incrementally.
MEMORY OPTIMIZATIONS:
1. Writes to CSV every batch_size records (default: 20)
2. Does NOT keep all items in memory
3. Parallel search with immediate CSV writing
4. Thread-safe file writing
5. Processes items one-by-one without list accumulation
6. Frequent garbage collection
7. NO .copy() operations - direct dict creation
Parameters:
-----------
collections : list
Collection IDs to search
country_codes : list
ISO3 country codes to filter
hazard_codes : list
Hazard classification codes to filter
start_year : int
Starting year (e.g., 2000)
end_year : int
Ending year (e.g., 2025)
output_file : str
CSV file path to write results
batch_size : int
Write to CSV every N records (default: 20)
max_items_per_year : int
Maximum items per year per collection
max_workers : int
Number of parallel workers (default: 3)
Returns:
--------
int: Total number of records written
"""
if not catalog:
print("No catalog connection available. Please check authentication.")
return 0
# Thread-safe CSV writing
write_lock = threading.Lock()
total_records = 0
buffer = []
# CSV headers
csv_headers = [
'id', 'collection', 'datetime', 'title', 'country_codes', 'hazard_codes',
'impact_category', 'impact_type', 'impact_value', 'impact_unit', 'estimate_type'
]
# Build CQL2 filter for country codes
country_filters = [
{"op": "a_contains", "args": [{"property": "monty:country_codes"}, code]}
for code in country_codes
]
filter_body = {"op": "or", "args": country_filters}
print(f"Memory-optimized search: {len(collections)} collections, {max_workers} parallel workers")
print(f"Saving to CSV every {batch_size} records\n")
# Initialize CSV file
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_headers)
writer.writeheader()
def extract_and_buffer_item(item):
"""Extract impact records from a single STAC item """
nonlocal total_records, buffer
props = item.properties
# Get base info once - extract values to avoid repeated property access
item_id = item.id
collection_id = item.collection_id
datetime_val = props.get('datetime')
title_val = props.get('title', '')
country_codes_str = ', '.join(props.get('monty:country_codes', []))
hazard_codes_str = ', '.join(props.get('monty:hazard_codes', []))
impact_detail = props.get('monty:impact_detail')
if impact_detail:
if isinstance(impact_detail, dict):
impact_detail = [impact_detail]
if isinstance(impact_detail, list):
# OPTIMIZED: Create records directly without .copy()
for impact in impact_detail:
if isinstance(impact, dict):
# Direct dict creation - NO .copy()
record = {
'id': item_id,
'collection': collection_id,
'datetime': datetime_val,
'title': title_val,
'country_codes': country_codes_str,
'hazard_codes': hazard_codes_str,
'impact_category': impact.get('category', 'UNKNOWN'),
'impact_type': impact.get('type', 'UNKNOWN'),
'impact_value': impact.get('value', 0),
'impact_unit': impact.get('unit', ''),
'estimate_type': impact.get('estimate_type', 'PRIMARY')
}
# Add directly to buffer
with write_lock:
buffer.append(record)
if len(buffer) >= batch_size:
writer.writerows(buffer)
csvfile.flush()
total_records += len(buffer)
buffer.clear()
if total_records % 100 == 0:
print(f" Saved {total_records} records...")
return
# NO_DATA case - direct dict creation
record = {
'id': item_id,
'collection': collection_id,
'datetime': datetime_val,
'title': title_val,
'country_codes': country_codes_str,
'hazard_codes': hazard_codes_str,
'impact_category': 'NO_DATA',
'impact_type': 'NO_DATA',
'impact_value': None,
'impact_unit': '',
'estimate_type': ''
}
with write_lock:
buffer.append(record)
if len(buffer) >= batch_size:
writer.writerows(buffer)
csvfile.flush()
total_records += len(buffer)
buffer.clear()
if total_records % 100 == 0:
print(f" Saved {total_records} records...")
# Helper function to search one collection for one year
def search_single(collection: str, year: int) -> tuple:
"""Search single collection for single year (run in parallel)"""
year_start = f"{year}-01-01"
year_end = f"{year}-12-31"
datetime_range = f"{year_start}/{year_end}"
items_processed = 0
try:
search = catalog.search(
collections=[collection],
max_items=max_items_per_year,
datetime=datetime_range,
filter=filter_body
)
# CRITICAL: Process items one-by-one, don't collect in list
for item in search.items():
if any(code in item.properties.get('monty:hazard_codes', [])
for code in hazard_codes):
extract_and_buffer_item(item)
items_processed += 1
# Force garbage collection every 50 items
if items_processed % 50 == 0:
gc.collect()
return (year, collection, items_processed)
except:
try:
search = catalog.search(
collections=[collection],
max_items=max_items_per_year,
datetime=datetime_range
)
# CRITICAL: Process items one-by-one
for item in search.items():
if (any(c in item.properties.get('monty:country_codes', [])
for c in country_codes) and
any(h in item.properties.get('monty:hazard_codes', [])
for h in hazard_codes)):
extract_and_buffer_item(item)
items_processed += 1
# Force garbage collection every 50 items
if items_processed % 50 == 0:
gc.collect()
return (year, collection, items_processed)
except:
return (year, collection, 0)
# PARALLEL SEARCH: Process all collections for each year
for year in range(start_year, end_year + 1):
year_total = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(search_single, coll, year): coll
for coll in collections
}
for future in as_completed(futures):
year_val, coll, count = future.result()
year_total += count
print(f"Year {year}: {year_total} items")
# Force garbage collection after each year
gc.collect()
# Write any remaining records in buffer
if buffer:
writer.writerows(buffer)
csvfile.flush()
total_records += len(buffer)
buffer.clear()
# Final cleanup
gc.collect()
return total_records
Step 4 — Process & Aggregate Impact Records¶
Build a unified DataFrame from DesInventar, EM-DAT, and IFRC collections, extracting impact type, value, and country metadata.
import time
START_YEAR = 2000
END_YEAR = 2020 # REDUCED from 2025 to 2021 for faster processing
OUTPUT_CSV = 'cold_wave_snow_storm_impacts_europe_central_asia.csv'
BATCH_SIZE = 10 # REDUCED from 20 to 10 for less memory
print(f"Searching from {START_YEAR} to {END_YEAR}...")
print(f"Target: {len(COUNTRY_CODES)} countries, {len(HAZARD_CODES)} hazard codes")
print(f"Collections: {IMPACT_COLLECTIONS}")
print(f"Output: {OUTPUT_CSV}")
start_time = time.time()
# MEMORY-OPTIMIZED: Search and save incrementally
total_records = search_and_save_incrementally(
collections=IMPACT_COLLECTIONS,
country_codes=COUNTRY_CODES,
hazard_codes=HAZARD_CODES,
start_year=START_YEAR,
end_year=END_YEAR,
output_file=OUTPUT_CSV,
batch_size=BATCH_SIZE,
max_items_per_year=1000, # REDUCED from 2000 to 1000
max_workers=2 # REDUCED from 6 to 2 for less memory
)
elapsed_time = time.time() - start_time
print(f"\n{'='*60}")
print(f"✓ Search and Save Complete!")
print(f" Total records saved: {total_records}")
print(f" Years searched: {START_YEAR}-{END_YEAR}")
print(f" Search time: {elapsed_time:.2f} seconds")
print(f" Output file: {OUTPUT_CSV}")
print(f"{'='*60}")
# Force cleanup
gc.collect()
Searching from 2000 to 2020...
Target: 16 countries, 12 hazard codes
Collections: ['desinventar-impacts', 'emdat-impacts', 'ifrcevent-impacts']
Output: cold_wave_snow_storm_impacts_europe_central_asia.csv
Memory-optimized search: 3 collections, 2 parallel workers
Saving to CSV every 10 records
Year 2000: 16 items
Year 2000: 16 items
Year 2001: 27 items
Year 2001: 27 items
Year 2002: 10 items
Year 2002: 10 items
Year 2003: 1 items
Year 2003: 1 items
Year 2004: 1 items
Year 2004: 1 items
Year 2005: 8 items
Year 2005: 8 items
Year 2006: 12 items
Year 2006: 12 items
Year 2007: 1 items
Year 2007: 1 items
Year 2008: 8 items
Year 2008: 8 items
Year 2009: 13 items
Year 2009: 13 items
Saved 100 records...
Saved 100 records...
Year 2010: 10 items
Year 2010: 10 items
Year 2011: 2 items
Year 2011: 2 items
Year 2012: 44 items
Year 2012: 44 items
Year 2013: 7 items
Year 2013: 7 items
Year 2014: 4 items
Year 2014: 4 items
Year 2015: 1 items
Year 2015: 1 items
Year 2016: 2 items
Year 2016: 2 items
Year 2017: 3 items
Year 2017: 3 items
Year 2018: 4 items
Year 2018: 4 items
Year 2019: 3 items
Year 2019: 3 items
Year 2020: 0 items
Year 2020: 0 items
============================================================
✓ Search and Save Complete!
Total records saved: 177
Years searched: 2000-2020
Search time: 504.06 seconds
Output file: cold_wave_snow_storm_impacts_europe_central_asia.csv
Memory used: Minimal (batch writes every 10 records)
============================================================
============================================================
✓ Search and Save Complete!
Total records saved: 177
Years searched: 2000-2020
Search time: 504.06 seconds
Output file: cold_wave_snow_storm_impacts_europe_central_asia.csv
Memory used: Minimal (batch writes every 10 records)
============================================================
0
if os.path.exists(OUTPUT_CSV):
# Quick count from CSV
import pandas as pd
temp_df = pd.read_csv(OUTPUT_CSV)
print(f"\n📊 Data Collection Summary:")
print(f" Total records saved: {len(temp_df)}")
print(f" Date range: {START_YEAR}-{END_YEAR}")
print(f" Countries: {len(COUNTRY_CODES)}")
print(f" Hazard codes: {len(HAZARD_CODES)}")
print(f" CSV file: {OUTPUT_CSV}")
# Count by collection
if 'collection' in temp_df.columns:
collections_count = temp_df['collection'].value_counts()
print(f"\n Records by collection:")
for coll, count in collections_count.items():
print(f" - {coll}: {count}")
del temp_df # Free memory
gc.collect()
else:
print(" No data found. CSV file not created.")
📊 Data Collection Summary:
Total records saved: 177
Date range: 2000-2020
Countries: 16
Hazard codes: 12
CSV file: cold_wave_snow_storm_impacts_europe_central_asia.csv
Records by collection:
- emdat-impacts: 163
- desinventar-impacts: 14
Step 5 — Data Summary & Quality Check¶
# Data was already saved during search - now load and process it
print(f"Data was saved incrementally during search process.")
print(f"CSV file: {OUTPUT_CSV}")
print(f"Memory optimization: Data was written in batches of {BATCH_SIZE} records")
print(f"\nNo additional extraction needed - proceeding to load and analyze...")Data was saved incrementally during search process.
CSV file: cold_wave_snow_storm_impacts_europe_central_asia.csv
Memory optimization: Data was written in batches of 10 records
No additional extraction needed - proceeding to load and analyze...
if os.path.exists(OUTPUT_CSV):
print(f"Loading data from {OUTPUT_CSV}...")
print(f"(Using memory-efficient chunked loading)\n")
# Read in chunks to avoid memory spike
chunk_size = 100
chunks = []
for chunk in pd.read_csv(OUTPUT_CSV, chunksize=chunk_size):
# Process datetime immediately
chunk['datetime'] = pd.to_datetime(chunk['datetime'], errors='coerce')
chunk['year'] = chunk['datetime'].dt.year
chunk['month'] = chunk['datetime'].dt.month
# Extract first country code
chunk['primary_country'] = chunk['country_codes'].apply(
lambda x: x.split(',')[0].strip() if pd.notna(x) and x else 'UNKNOWN'
)
# Add country name
chunk['country_name'] = chunk['primary_country'].map(TARGET_COUNTRIES)
chunk['country_name'] = chunk['country_name'].fillna(chunk['primary_country'])
chunks.append(chunk)
# Clear memory every 5 chunks
if len(chunks) % 5 == 0:
gc.collect()
# Combine all chunks
impacts_df = pd.concat(chunks, ignore_index=True)
del chunks
gc.collect()
print(f"✓ Loaded {len(impacts_df)} records from CSV")
print(f"\nYear range: {impacts_df['year'].min()} - {impacts_df['year'].max()}")
print(f"\nCountries with data:")
print(impacts_df['country_name'].value_counts())
else:
print(f"CSV file not found: {OUTPUT_CSV}")
impacts_df = pd.DataFrame()
Loading data from cold_wave_snow_storm_impacts_europe_central_asia.csv...
(Using memory-efficient chunked loading)
✓ Loaded 177 records from CSV
Year range: 2000 - 2019
Countries with data:
country_name
Russia 36
Albania 24
Poland 22
Romania 20
Ukraine 16
Serbia 13
Hungary 12
Moldova 9
Bosnia and Herzegovina 7
Kazakhstan 7
Tajikistan 6
Lithuania 5
Name: count, dtype: int64
# Optimize memory usage after loading
if len(impacts_df) > 0:
print("\nOptimizing data types for memory efficiency...")
# Convert to categorical for memory savings
impacts_df['collection'] = impacts_df['collection'].astype('category')
impacts_df['impact_category'] = impacts_df['impact_category'].astype('category')
impacts_df['impact_type'] = impacts_df['impact_type'].astype('category')
impacts_df['impact_unit'] = impacts_df['impact_unit'].astype('category')
impacts_df['estimate_type'] = impacts_df['estimate_type'].astype('category')
impacts_df['primary_country'] = impacts_df['primary_country'].astype('category')
impacts_df['country_name'] = impacts_df['country_name'].astype('category')
# Convert numeric types to smaller dtypes
impacts_df['year'] = impacts_df['year'].astype('int16')
impacts_df['month'] = impacts_df['month'].astype('int8')
impacts_df['impact_value'] = pd.to_numeric(impacts_df['impact_value'], errors='coerce').astype('float32')
# Drop columns not needed for analysis to save memory
impacts_df = impacts_df.drop(columns=['id', 'title'], errors='ignore')
gc.collect()
memory_mb = impacts_df.memory_usage(deep=True).sum() / 1024**2
Optimizing data types for memory efficiency...
Step 6 — Collection & Impact-Type Breakdowns¶
# Summary by collection
if len(impacts_df) > 0:
print("Impact Records by Collection:")
collection_summary = impacts_df.groupby('collection').agg({
'impact_value': 'sum',
'country_name': 'nunique',
'year': ['min', 'max']
}).round(0)
collection_summary.columns = ['Total Impact Value', 'Countries', 'From Year', 'To Year']
display(collection_summary)
else:
print("No data to summarize")Impact Records by Collection:
# Analyze impact types
if len(impacts_df) > 0 and 'impact_type' in impacts_df.columns:
print("Impact Types Distribution:")
impact_type_summary = impacts_df.groupby('impact_type').agg({
'impact_value': ['count', 'sum', 'max']
}).round(2)
impact_type_summary.columns = ['Count', 'Total Value', 'Max Value']
impact_type_summary = impact_type_summary.sort_values('Count', ascending=False)
display(impact_type_summary.head(15))Impact Types Distribution:
Step 7 — Deaths & Casualties Analysis¶
# Filter for death-related impacts
death_types = ['DEATHS', 'DEATH', 'deaths', 'death']
if len(impacts_df) > 0:
deaths_df = impacts_df[impacts_df['impact_type'].str.upper().isin([t.upper() for t in death_types])]
if len(deaths_df) > 0:
print(f"Death Records Found: {len(deaths_df)}")
# Deaths by country
deaths_by_country = deaths_df.groupby('country_name')['impact_value'].sum().sort_values(ascending=False)
print("\nTotal Deaths by Country (Cold Wave/Snow Storm 2000-2021):")
display(deaths_by_country)
# Deaths by year
deaths_by_year = deaths_df.groupby('year')['impact_value'].sum().sort_index()
print("\nDeaths by Year:")
display(deaths_by_year)
else:
print("No death records found in filtered data")
else:
print("No data available")Death Records Found: 87
Total Deaths by Country (Cold Wave/Snow Storm 2000-2021):
Deaths by Year:
# Visualize deaths by country
if len(impacts_df) > 0 and 'deaths_by_country' in dir():
if len(deaths_by_country) > 0:
fig = px.bar(
x=deaths_by_country.index,
y=deaths_by_country.values,
title='Deaths from Cold Waves and Snow Storms (2000-2020)',
labels={'x': 'Country', 'y': 'Total Deaths'},
color=deaths_by_country.values,
color_continuous_scale='Reds'
)
fig.update_layout(
xaxis_tickangle=-45,
height=500
)
fig.show()# Visualize deaths trend over years
if len(impacts_df) > 0 and 'deaths_by_year' in dir():
if len(deaths_by_year) > 0:
fig = px.line(
x=deaths_by_year.index,
y=deaths_by_year.values,
title='Deaths from Cold Waves and Snow Storms Over Time',
labels={'x': 'Year', 'y': 'Deaths'},
markers=True
)
fig.update_layout(height=400)
fig.show()Step 8 — Affected Population Dashboard¶
# Filter for affected population
affected_types = ['AFFECTED_TOTAL', 'TOTAL_AFFECTED', 'DIRECTLY_AFFECTED', 'INDIRECTLY_AFFECTED',
'affected_total', 'affected', 'AFFECTED']
if len(impacts_df) > 0:
affected_df = impacts_df[impacts_df['impact_type'].str.upper().isin([t.upper() for t in affected_types])]
if len(affected_df) > 0:
print(f"Affected Population Records: {len(affected_df)}")
# Affected by country
affected_by_country = affected_df.groupby('country_name')['impact_value'].sum().sort_values(ascending=False)
print("\nTotal Affected Population by Country:")
display(affected_by_country)
else:
print("No affected population records found")Affected Population Records: 56
Total Affected Population by Country:
# Visualize affected population
if 'affected_by_country' in dir() and len(affected_by_country) > 0:
fig = px.bar(
x=affected_by_country.index,
y=affected_by_country.values,
title='Population Affected by Cold Waves and Snow Storms (2000-2020)',
labels={'x': 'Country', 'y': 'Total Affected'},
color=affected_by_country.values,
color_continuous_scale='Blues'
)
fig.update_layout(
xaxis_tickangle=-45,
height=500
)
fig.show()Step 9 — Economic Losses Analysis¶
# Filter for economic losses
economic_types = ['LOSS_COST', 'COST', 'LOSS', 'DAMAGED', 'DESTROYED', 'DAMAGES']
if len(impacts_df) > 0:
economic_df = impacts_df[impacts_df['impact_type'].str.upper().isin([t.upper() for t in economic_types])]
if len(economic_df) > 0:
print(f"Economic Impact Records: {len(economic_df)}")
# Filter for USD values
usd_losses = economic_df[economic_df['impact_unit'].str.contains('USD|dollar', case=False, na=False)]
if len(usd_losses) > 0:
losses_by_country = usd_losses.groupby('country_name')['impact_value'].sum().sort_values(ascending=False)
print("\nEconomic Losses by Country (USD):")
display(losses_by_country)
else:
print("\nNo USD-denominated losses found. All economic impacts:")
display(economic_df[['country_name', 'impact_type', 'impact_value', 'impact_unit']].head(20))
else:
print("No economic impact records found")Economic Impact Records: 6
No USD-denominated losses found. All economic impacts:
Step 10 — Temporal Analysis: Events by Month¶
Cold waves and snow storms peak in the Northern-Hemisphere winter (November–February). The bar chart below confirms this seasonality.
# Analyze events by month (cold waves typically winter months)
if len(impacts_df) > 0 and 'month' in impacts_df.columns:
monthly_events = impacts_df.groupby('month').size()
month_names = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
fig = px.bar(
x=month_names,
y=[monthly_events.get(i+1, 0) for i in range(12)],
title='Cold Wave and Snow Storm Events by Month',
labels={'x': 'Month', 'y': 'Number of Events'},
color=[monthly_events.get(i+1, 0) for i in range(12)],
color_continuous_scale='RdBu_r'
)
fig.update_layout(height=400)
fig.show()Step 11 — Summary & Pivot Table¶
# Create summary statistics
if len(impacts_df) > 0:
print("="*60)
print("SUMMARY: Cold Wave & Snow Storm Impacts (2000-2021)")
print(" Europe and Central Asia Target Countries")
print("="*60)
print(f"\nData Sources:")
for col in impacts_df['collection'].unique():
count = len(impacts_df[impacts_df['collection'] == col])
print(f" - {col}: {count} impact records")
print(f"\nCountries with Impact Data: {impacts_df['country_name'].nunique()}")
print(f"\nTime Period: {impacts_df['year'].min()} - {impacts_df['year'].max()}")
# Deaths summary
if 'deaths_df' in dir() and len(deaths_df) > 0:
total_deaths = deaths_df['impact_value'].sum()
print(f"\nTotal Deaths: {total_deaths:,.0f}")
# Affected summary
if 'affected_df' in dir() and len(affected_df) > 0:
total_affected = affected_df['impact_value'].sum()
print(f"\nTotal Affected: {total_affected:,.0f}")
print("\n" + "="*60)
else:
print("No data available for summary")============================================================
SUMMARY: Cold Wave & Snow Storm Impacts (2000-2021)
Europe and Central Asia Target Countries
============================================================
Data Sources:
- desinventar-impacts: 14 impact records
- emdat-impacts: 163 impact records
Countries with Impact Data: 12
Time Period: 2000 - 2019
Total Deaths: 6,618
Total Affected: 11,399,376
============================================================
# Create a comprehensive impact summary table
if len(impacts_df) > 0:
# Pivot table by country and impact type
summary_pivot = impacts_df.pivot_table(
index='country_name',
columns='impact_type',
values='impact_value',
aggfunc='sum',
fill_value=0
)
# Select key impact types if they exist
key_types = ['DEATHS', 'TOTAL_AFFECTED', 'AFFECTED_TOTAL', 'INJURED', 'MISSING',
'DESTROYED', 'DAMAGED', 'EVACUATED']
available_types = [t for t in key_types if t in summary_pivot.columns]
if available_types:
summary_filtered = summary_pivot[available_types]
print("Impact Summary by Country and Type:")
display(summary_filtered)
else:
print("Full Impact Summary:")
display(summary_pivot)Full Impact Summary:
# Display sample of the data
if len(impacts_df) > 0:
print("Sample Data (first 20 records):")
display(impacts_df[['datetime', 'country_name', 'hazard_codes', 'impact_type',
'impact_value', 'impact_unit', 'collection']].head(20))Sample Data (first 20 records):
Conclusion & Key Findings¶
DesInventar — Detailed local disaster impact records Guha-Sapir (2024)
EM-DAT — Global disaster database with comprehensive impact metrics Guha-Sapir (2024)
IFRC DREF — Red Cross disaster response operations
Deaths and casualties, affected population, economic losses, and infrastructure damage across 17 countries in Europe and Central Asia (2000–2021).
| System | Code | Meaning |
|---|---|---|
| GLIDE | CW | Cold Wave |
| EM-DAT | nat-met-ext-col | Cold wave |
| EM-DAT | nat-met-sto-bli | Blizzard / winter storm |
| UNDRR-ISC | MH0502 | Cold Wave |
| UNDRR-ISC | MH0406 | Snow Storm |
- Guha-Sapir, D. (2024). EM-DAT: The Emergency Events Database. Centre for Research on the Epidemiology of Disasters (CRED). https://www.emdat.be