Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Disaster Data Analytics

Recipe 8 — Cold-Wave Impact Dashboard

IFRC

What you will learn:

  1. Define target countries using ISO 3166-1 alpha-3 codes and filter with a_contains.

  2. Search multiple impact collections with CQL2-JSON compound filters.

  3. Build interactive Plotly bar charts and line graphs for humanitarian metrics.

  4. Distinguish cold-wave hazard codes across GLIDE, EM-DAT, and UNDRR-ISC 2025.

Environment Setup

Binder
Google Colab
Local

All dependencies are pre-installed. Click Launch Binder at the top of the page — no setup needed.

# Install required packages (uncomment if needed)
# !pip install pystac-client pandas matplotlib seaborn plotly -q
# Import libraries
import os
from getpass import getpass
import pandas as pd
import numpy as np
from pystac_client import Client
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from typing import List, Dict, Any, Optional, Tuple
import warnings
from concurrent.futures import ThreadPoolExecutor, as_completed
import gc

warnings.filterwarnings('ignore')

# Plotting style
sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (14, 7)

Step 2 — Target Countries & Hazard Codes

Countries
Hazard Codes

17 countries spanning Nordic, Baltic, Eastern European, Balkan, and Central Asian regions — each identified by its ISO 3166-1 alpha-3 code.

# Define target countries with ISO3 codes
TARGET_COUNTRIES = {
    # Nordic Countries
    "NOR": "Norway",
    "SWE": "Sweden", 
    "FIN": "Finland",
    
    # Baltic States
    "LTU": "Lithuania",
    
    # Eastern Europe
    "RUS": "Russia",
    "POL": "Poland",
    "UKR": "Ukraine",
    "ROU": "Romania",
    "HUN": "Hungary",
    "MDA": "Moldova",
    
    # Balkans
    "SRB": "Serbia",
    "BIH": "Bosnia and Herzegovina",
    "ALB": "Albania",
    
    
    # Central Asia
    "KAZ": "Kazakhstan",
    "TJK": "Tajikistan",
    "TKM": "Turkmenistan"
}

# List of ISO3 codes for filtering
COUNTRY_CODES = list(TARGET_COUNTRIES.keys())

print(f"Target Countries: {len(COUNTRY_CODES)}")
Target Countries: 16
# Define hazard codes for snow storms and cold waves
HAZARD_CODES = [
    # GLIDE codes
    "CW",           # Cold Wave (GLIDE)
    "OT",           # Other - includes blizzard, severe winter (GLIDE)
    
    # EM-DAT codes
    "nat-met-ext-col",   # Cold wave (EM-DAT)
    "nat-met-sto-bli",   # Blizzard/Winter storm (EM-DAT)
    "nat-met-ext-sev",   # Severe winter conditions (EM-DAT)
    
    # UNDRR-ISC 2025 codes
    "MH0502",       # Cold Wave
    "MH0403",       # Blizzard
    "MH0405",       # Snow
    "MH0406",       # Snow Storm
    "MH0503",       # Dzud (severe winter - Mongolia/Central Asia)
    "MH0504",       # Freeze
    "MH0505",       # Frost
]

# Define impact collections to search
IMPACT_COLLECTIONS = [
    "desinventar-impacts",
    "emdat-impacts",
    "ifrcevent-impacts"
]

# Date range: Focus on last 5 years (2020-2025)
START_DATE = "2000-01-01"
END_DATE = "2020-12-31"
DATETIME_RANGE = f"{START_DATE}/{END_DATE}"

print(f"Hazard Codes: {len(HAZARD_CODES)}")
print(f"Collections: {IMPACT_COLLECTIONS}")
print(f"Date Range: {START_DATE} to {END_DATE}")
Hazard Codes: 12
Collections: ['desinventar-impacts', 'emdat-impacts', 'ifrcevent-impacts']
Date Range: 2000-01-01 to 2020-12-31

Step 3 — Connect & Fetch Impact Data

# ============================================================================
# AUTHENTICATION & CONNECTION TO MONTANDON STAC API
# ============================================================================

# Montandon STAC API URL (CORRECT URL with /stac suffix)
STAC_API_URL = "https://montandon-eoapi-stage.ifrc.org/stac"

# First try to get token from environment variable
api_token = os.getenv('MONTANDON_API_TOKEN')

# If not set, prompt user to enter token
if api_token is None:
    print("=" * 70)
    print("AUTHENTICATION REQUIRED")
    print("=" * 70)
    print("\nThe Montandon STAC API requires a Bearer Token for authentication.")
    print("\nHow to get your token:")
    print("  1. Visit: https://goadmin-stage.ifrc.org/")
    print("  2. Log in with your IFRC credentials")
    print("  3. Generate an API token from your account settings")
    print("\nAlternatively, set the MONTANDON_API_TOKEN environment variable:")
    print("  PowerShell: $env:MONTANDON_API_TOKEN = 'your_token_here'")
    print("  Bash: export MONTANDON_API_TOKEN='your_token_here'")
    print("\n" + "=" * 70)
    api_token = getpass("Enter your Montandon API Token: ")

# Create authentication headers for pystac_client
AUTH_HEADERS = {
    "Authorization": f"Bearer {api_token}"
}

# Connect to the STAC API using pystac_client
try:
    catalog = Client.open(STAC_API_URL, headers=AUTH_HEADERS)
    print(f"Connected to: {catalog.title}")
    
    # List available impact collections
    collections = list(catalog.get_collections())
    impact_collections_available = [c.id for c in collections if '-impacts' in c.id]
    print(f"Available Impact Collections: {len(impact_collections_available)}")
except Exception as e:
    print(f"Connection failed: {e}")
    catalog = None
======================================================================
AUTHENTICATION REQUIRED
======================================================================

The Montandon STAC API requires a Bearer Token for authentication.

How to get your token:
  1. Visit: https://goadmin-stage.ifrc.org/
  2. Log in with your IFRC credentials
  3. Generate an API token from your account settings

Alternatively, set the MONTANDON_API_TOKEN environment variable:
  PowerShell: $env:MONTANDON_API_TOKEN = 'your_token_here'
  Bash: export MONTANDON_API_TOKEN='your_token_here'

======================================================================
Connected to: stac-fastapi
Connected to: stac-fastapi
Available Impact Collections: 9
Available Impact Collections: 9
# MEMORY-OPTIMIZED: Search and save to CSV incrementally
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import csv
import gc

def search_and_save_incrementally(
    collections: List[str],
    country_codes: List[str],
    hazard_codes: List[str],
    start_year: int,
    end_year: int,
    output_file: str,
    batch_size: int = 20,
    max_items_per_year: int = 2000,
    max_workers: int = 3
) -> int:
    """
    MEMORY-OPTIMIZED: Search and write to CSV incrementally.
    
    MEMORY OPTIMIZATIONS:
    1. Writes to CSV every batch_size records (default: 20)
    2. Does NOT keep all items in memory
    3. Parallel search with immediate CSV writing
    4. Thread-safe file writing
    5. Processes items one-by-one without list accumulation
    6. Frequent garbage collection
    7. NO .copy() operations - direct dict creation
    
    Parameters:
    -----------
    collections : list
        Collection IDs to search
    country_codes : list
        ISO3 country codes to filter
    hazard_codes : list
        Hazard classification codes to filter
    start_year : int
        Starting year (e.g., 2000)
    end_year : int
        Ending year (e.g., 2025)
    output_file : str
        CSV file path to write results
    batch_size : int
        Write to CSV every N records (default: 20)
    max_items_per_year : int
        Maximum items per year per collection
    max_workers : int
        Number of parallel workers (default: 3)
    
    Returns:
    --------
    int: Total number of records written
    """
    if not catalog:
        print("No catalog connection available. Please check authentication.")
        return 0
    
    # Thread-safe CSV writing
    write_lock = threading.Lock()
    total_records = 0
    buffer = []
    
    # CSV headers
    csv_headers = [
        'id', 'collection', 'datetime', 'title', 'country_codes', 'hazard_codes',
        'impact_category', 'impact_type', 'impact_value', 'impact_unit', 'estimate_type'
    ]
    
    # Build CQL2 filter for country codes
    country_filters = [
        {"op": "a_contains", "args": [{"property": "monty:country_codes"}, code]}
        for code in country_codes
    ]
    filter_body = {"op": "or", "args": country_filters}
    
    print(f"Memory-optimized search: {len(collections)} collections, {max_workers} parallel workers")
    print(f"Saving to CSV every {batch_size} records\n")
    
    # Initialize CSV file
    with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=csv_headers)
        writer.writeheader()
        
        def extract_and_buffer_item(item):
            """Extract impact records from a single STAC item """
            nonlocal total_records, buffer
            
            props = item.properties
            
            # Get base info once - extract values to avoid repeated property access
            item_id = item.id
            collection_id = item.collection_id
            datetime_val = props.get('datetime')
            title_val = props.get('title', '')
            country_codes_str = ', '.join(props.get('monty:country_codes', []))
            hazard_codes_str = ', '.join(props.get('monty:hazard_codes', []))
            
            impact_detail = props.get('monty:impact_detail')
            
            if impact_detail:
                if isinstance(impact_detail, dict):
                    impact_detail = [impact_detail]
                
                if isinstance(impact_detail, list):
                    # OPTIMIZED: Create records directly without .copy()
                    for impact in impact_detail:
                        if isinstance(impact, dict):
                            # Direct dict creation - NO .copy()
                            record = {
                                'id': item_id,
                                'collection': collection_id,
                                'datetime': datetime_val,
                                'title': title_val,
                                'country_codes': country_codes_str,
                                'hazard_codes': hazard_codes_str,
                                'impact_category': impact.get('category', 'UNKNOWN'),
                                'impact_type': impact.get('type', 'UNKNOWN'),
                                'impact_value': impact.get('value', 0),
                                'impact_unit': impact.get('unit', ''),
                                'estimate_type': impact.get('estimate_type', 'PRIMARY')
                            }
                            
                            # Add directly to buffer
                            with write_lock:
                                buffer.append(record)
                                
                                if len(buffer) >= batch_size:
                                    writer.writerows(buffer)
                                    csvfile.flush()
                                    total_records += len(buffer)
                                    buffer.clear()
                                    
                                    if total_records % 100 == 0:
                                        print(f"  Saved {total_records} records...")
                    return
            
            # NO_DATA case - direct dict creation
            record = {
                'id': item_id,
                'collection': collection_id,
                'datetime': datetime_val,
                'title': title_val,
                'country_codes': country_codes_str,
                'hazard_codes': hazard_codes_str,
                'impact_category': 'NO_DATA',
                'impact_type': 'NO_DATA',
                'impact_value': None,
                'impact_unit': '',
                'estimate_type': ''
            }
            
            with write_lock:
                buffer.append(record)
                
                if len(buffer) >= batch_size:
                    writer.writerows(buffer)
                    csvfile.flush()
                    total_records += len(buffer)
                    buffer.clear()
                    
                    if total_records % 100 == 0:
                        print(f"  Saved {total_records} records...")
        
        # Helper function to search one collection for one year
        def search_single(collection: str, year: int) -> tuple:
            """Search single collection for single year (run in parallel)"""
            year_start = f"{year}-01-01"
            year_end = f"{year}-12-31"
            datetime_range = f"{year_start}/{year_end}"
            
            items_processed = 0
            
            try:
                search = catalog.search(
                    collections=[collection],
                    max_items=max_items_per_year,
                    datetime=datetime_range,
                    filter=filter_body
                )
                
                # CRITICAL: Process items one-by-one, don't collect in list
                for item in search.items():
                    if any(code in item.properties.get('monty:hazard_codes', []) 
                           for code in hazard_codes):
                        extract_and_buffer_item(item)
                        items_processed += 1
                        
                        # Force garbage collection every 50 items
                        if items_processed % 50 == 0:
                            gc.collect()
                
                return (year, collection, items_processed)
                
            except:
                try:
                    search = catalog.search(
                        collections=[collection],
                        max_items=max_items_per_year,
                        datetime=datetime_range
                    )
                    
                    # CRITICAL: Process items one-by-one
                    for item in search.items():
                        if (any(c in item.properties.get('monty:country_codes', []) 
                                for c in country_codes) and
                            any(h in item.properties.get('monty:hazard_codes', [])
                                for h in hazard_codes)):
                            extract_and_buffer_item(item)
                            items_processed += 1
                            
                            # Force garbage collection every 50 items
                            if items_processed % 50 == 0:
                                gc.collect()
                    
                    return (year, collection, items_processed)
                except:
                    return (year, collection, 0)
        
        # PARALLEL SEARCH: Process all collections for each year
        for year in range(start_year, end_year + 1):
            year_total = 0
            
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                futures = {
                    executor.submit(search_single, coll, year): coll 
                    for coll in collections
                }
                
                for future in as_completed(futures):
                    year_val, coll, count = future.result()
                    year_total += count
            
            print(f"Year {year}: {year_total} items")
            
            # Force garbage collection after each year
            gc.collect()
        
        # Write any remaining records in buffer
        if buffer:
            writer.writerows(buffer)
            csvfile.flush()
            total_records += len(buffer)
            buffer.clear()
    
    # Final cleanup
    gc.collect()
    
    return total_records

Step 4 — Process & Aggregate Impact Records

Build a unified DataFrame from DesInventar, EM-DAT, and IFRC collections, extracting impact type, value, and country metadata.

import time 

START_YEAR = 2000
END_YEAR = 2020  # REDUCED from 2025 to 2021 for faster processing
OUTPUT_CSV = 'cold_wave_snow_storm_impacts_europe_central_asia.csv'
BATCH_SIZE = 10  # REDUCED from 20 to 10 for less memory

print(f"Searching from {START_YEAR} to {END_YEAR}...")
print(f"Target: {len(COUNTRY_CODES)} countries, {len(HAZARD_CODES)} hazard codes")
print(f"Collections: {IMPACT_COLLECTIONS}")
print(f"Output: {OUTPUT_CSV}")


start_time = time.time()

# MEMORY-OPTIMIZED: Search and save incrementally
total_records = search_and_save_incrementally(
    collections=IMPACT_COLLECTIONS,
    country_codes=COUNTRY_CODES,
    hazard_codes=HAZARD_CODES,
    start_year=START_YEAR,
    end_year=END_YEAR,
    output_file=OUTPUT_CSV,
    batch_size=BATCH_SIZE,
    max_items_per_year=1000,  # REDUCED from 2000 to 1000
    max_workers=2  # REDUCED from 6 to 2 for less memory
)

elapsed_time = time.time() - start_time

print(f"\n{'='*60}")
print(f"✓ Search and Save Complete!")
print(f"  Total records saved: {total_records}")
print(f"  Years searched: {START_YEAR}-{END_YEAR}")
print(f"  Search time: {elapsed_time:.2f} seconds")
print(f"  Output file: {OUTPUT_CSV}")
print(f"{'='*60}")

# Force cleanup
gc.collect()
Searching from 2000 to 2020...
Target: 16 countries, 12 hazard codes
Collections: ['desinventar-impacts', 'emdat-impacts', 'ifrcevent-impacts']
Output: cold_wave_snow_storm_impacts_europe_central_asia.csv
Memory-optimized search: 3 collections, 2 parallel workers
Saving to CSV every 10 records

Year 2000: 16 items
Year 2000: 16 items
Year 2001: 27 items
Year 2001: 27 items
Year 2002: 10 items
Year 2002: 10 items
Year 2003: 1 items
Year 2003: 1 items
Year 2004: 1 items
Year 2004: 1 items
Year 2005: 8 items
Year 2005: 8 items
Year 2006: 12 items
Year 2006: 12 items
Year 2007: 1 items
Year 2007: 1 items
Year 2008: 8 items
Year 2008: 8 items
Year 2009: 13 items
Year 2009: 13 items
  Saved 100 records...
  Saved 100 records...
Year 2010: 10 items
Year 2010: 10 items
Year 2011: 2 items
Year 2011: 2 items
Year 2012: 44 items
Year 2012: 44 items
Year 2013: 7 items
Year 2013: 7 items
Year 2014: 4 items
Year 2014: 4 items
Year 2015: 1 items
Year 2015: 1 items
Year 2016: 2 items
Year 2016: 2 items
Year 2017: 3 items
Year 2017: 3 items
Year 2018: 4 items
Year 2018: 4 items
Year 2019: 3 items
Year 2019: 3 items
Year 2020: 0 items
Year 2020: 0 items

============================================================
✓ Search and Save Complete!
  Total records saved: 177
  Years searched: 2000-2020
  Search time: 504.06 seconds
  Output file: cold_wave_snow_storm_impacts_europe_central_asia.csv
  Memory used: Minimal (batch writes every 10 records)
============================================================

============================================================
✓ Search and Save Complete!
  Total records saved: 177
  Years searched: 2000-2020
  Search time: 504.06 seconds
  Output file: cold_wave_snow_storm_impacts_europe_central_asia.csv
  Memory used: Minimal (batch writes every 10 records)
============================================================
0


if os.path.exists(OUTPUT_CSV):
    # Quick count from CSV
    import pandas as pd
    temp_df = pd.read_csv(OUTPUT_CSV)
    
    print(f"\n📊 Data Collection Summary:")
    print(f"   Total records saved: {len(temp_df)}")
    print(f"   Date range: {START_YEAR}-{END_YEAR}")
    print(f"   Countries: {len(COUNTRY_CODES)}")
    print(f"   Hazard codes: {len(HAZARD_CODES)}")
    print(f"   CSV file: {OUTPUT_CSV}")
    
    # Count by collection
    if 'collection' in temp_df.columns:
        collections_count = temp_df['collection'].value_counts()
        print(f"\n   Records by collection:")
        for coll, count in collections_count.items():
            print(f"     - {coll}: {count}")
    
    del temp_df  # Free memory
    gc.collect()
else:
    print("  No data found. CSV file not created.")

📊 Data Collection Summary:
   Total records saved: 177
   Date range: 2000-2020
   Countries: 16
   Hazard codes: 12
   CSV file: cold_wave_snow_storm_impacts_europe_central_asia.csv

   Records by collection:
     - emdat-impacts: 163
     - desinventar-impacts: 14

Step 5 — Data Summary & Quality Check

# Data was already saved during search - now load and process it
print(f"Data was saved incrementally during search process.")
print(f"CSV file: {OUTPUT_CSV}")
print(f"Memory optimization: Data was written in batches of {BATCH_SIZE} records")
print(f"\nNo additional extraction needed - proceeding to load and analyze...")
Data was saved incrementally during search process.
CSV file: cold_wave_snow_storm_impacts_europe_central_asia.csv
Memory optimization: Data was written in batches of 10 records

No additional extraction needed - proceeding to load and analyze...


if os.path.exists(OUTPUT_CSV):
    print(f"Loading data from {OUTPUT_CSV}...")
    print(f"(Using memory-efficient chunked loading)\n")
    
    # Read in chunks to avoid memory spike
    chunk_size = 100
    chunks = []
    
    for chunk in pd.read_csv(OUTPUT_CSV, chunksize=chunk_size):
        # Process datetime immediately
        chunk['datetime'] = pd.to_datetime(chunk['datetime'], errors='coerce')
        chunk['year'] = chunk['datetime'].dt.year
        chunk['month'] = chunk['datetime'].dt.month
        
        # Extract first country code
        chunk['primary_country'] = chunk['country_codes'].apply(
            lambda x: x.split(',')[0].strip() if pd.notna(x) and x else 'UNKNOWN'
        )
        
        # Add country name
        chunk['country_name'] = chunk['primary_country'].map(TARGET_COUNTRIES)
        chunk['country_name'] = chunk['country_name'].fillna(chunk['primary_country'])
        
        chunks.append(chunk)
        
        # Clear memory every 5 chunks
        if len(chunks) % 5 == 0:
            gc.collect()
    
    # Combine all chunks
    impacts_df = pd.concat(chunks, ignore_index=True)
    del chunks
    gc.collect()
    
    print(f"✓ Loaded {len(impacts_df)} records from CSV")
    print(f"\nYear range: {impacts_df['year'].min()} - {impacts_df['year'].max()}")
    print(f"\nCountries with data:")
    print(impacts_df['country_name'].value_counts())
else:
    print(f"CSV file not found: {OUTPUT_CSV}")
    impacts_df = pd.DataFrame()
Loading data from cold_wave_snow_storm_impacts_europe_central_asia.csv...
(Using memory-efficient chunked loading)

✓ Loaded 177 records from CSV

Year range: 2000 - 2019

Countries with data:
country_name
Russia                    36
Albania                   24
Poland                    22
Romania                   20
Ukraine                   16
Serbia                    13
Hungary                   12
Moldova                    9
Bosnia and Herzegovina     7
Kazakhstan                 7
Tajikistan                 6
Lithuania                  5
Name: count, dtype: int64
# Optimize memory usage after loading
if len(impacts_df) > 0:
    print("\nOptimizing data types for memory efficiency...")
    
    # Convert to categorical for memory savings
    impacts_df['collection'] = impacts_df['collection'].astype('category')
    impacts_df['impact_category'] = impacts_df['impact_category'].astype('category')
    impacts_df['impact_type'] = impacts_df['impact_type'].astype('category')
    impacts_df['impact_unit'] = impacts_df['impact_unit'].astype('category')
    impacts_df['estimate_type'] = impacts_df['estimate_type'].astype('category')
    impacts_df['primary_country'] = impacts_df['primary_country'].astype('category')
    impacts_df['country_name'] = impacts_df['country_name'].astype('category')
    
    # Convert numeric types to smaller dtypes
    impacts_df['year'] = impacts_df['year'].astype('int16')
    impacts_df['month'] = impacts_df['month'].astype('int8')
    impacts_df['impact_value'] = pd.to_numeric(impacts_df['impact_value'], errors='coerce').astype('float32')
    
    # Drop columns not needed for analysis to save memory
    impacts_df = impacts_df.drop(columns=['id', 'title'], errors='ignore')
    
    gc.collect()
    
    memory_mb = impacts_df.memory_usage(deep=True).sum() / 1024**2
    

Optimizing data types for memory efficiency...

Step 6 — Collection & Impact-Type Breakdowns

# Summary by collection
if len(impacts_df) > 0:
    print("Impact Records by Collection:")
    collection_summary = impacts_df.groupby('collection').agg({
        'impact_value': 'sum',
        'country_name': 'nunique',
        'year': ['min', 'max']
    }).round(0)
    collection_summary.columns = ['Total Impact Value', 'Countries', 'From Year', 'To Year']
    display(collection_summary)
else:
    print("No data to summarize")
Impact Records by Collection:
Loading...
# Analyze impact types
if len(impacts_df) > 0 and 'impact_type' in impacts_df.columns:
    print("Impact Types Distribution:")
    impact_type_summary = impacts_df.groupby('impact_type').agg({
        'impact_value': ['count', 'sum', 'max']
    }).round(2)
    impact_type_summary.columns = ['Count', 'Total Value', 'Max Value']
    impact_type_summary = impact_type_summary.sort_values('Count', ascending=False)
    display(impact_type_summary.head(15))
Impact Types Distribution:
Loading...

Step 7 — Deaths & Casualties Analysis

# Filter for death-related impacts
death_types = ['DEATHS', 'DEATH', 'deaths', 'death']

if len(impacts_df) > 0:
    deaths_df = impacts_df[impacts_df['impact_type'].str.upper().isin([t.upper() for t in death_types])]
    
    if len(deaths_df) > 0:
        print(f"Death Records Found: {len(deaths_df)}")
        
        # Deaths by country
        deaths_by_country = deaths_df.groupby('country_name')['impact_value'].sum().sort_values(ascending=False)
        print("\nTotal Deaths by Country (Cold Wave/Snow Storm 2000-2021):")
        display(deaths_by_country)
        
        # Deaths by year
        deaths_by_year = deaths_df.groupby('year')['impact_value'].sum().sort_index()
        print("\nDeaths by Year:")
        display(deaths_by_year)
    else:
        print("No death records found in filtered data")
else:
    print("No data available")
Death Records Found: 87

Total Deaths by Country (Cold Wave/Snow Storm 2000-2021):
Loading...

Deaths by Year:
Loading...
# Visualize deaths by country
if len(impacts_df) > 0 and 'deaths_by_country' in dir():
    if len(deaths_by_country) > 0:
        fig = px.bar(
            x=deaths_by_country.index,
            y=deaths_by_country.values,
            title='Deaths from Cold Waves and Snow Storms (2000-2020)',
            labels={'x': 'Country', 'y': 'Total Deaths'},
            color=deaths_by_country.values,
            color_continuous_scale='Reds'
        )
        fig.update_layout(
            xaxis_tickangle=-45,
            height=500
        )
        fig.show()
Loading...
# Visualize deaths trend over years
if len(impacts_df) > 0 and 'deaths_by_year' in dir():
    if len(deaths_by_year) > 0:
        fig = px.line(
            x=deaths_by_year.index,
            y=deaths_by_year.values,
            title='Deaths from Cold Waves and Snow Storms Over Time',
            labels={'x': 'Year', 'y': 'Deaths'},
            markers=True
        )
        fig.update_layout(height=400)
        fig.show()
Loading...

Step 8 — Affected Population Dashboard

# Filter for affected population
affected_types = ['AFFECTED_TOTAL', 'TOTAL_AFFECTED', 'DIRECTLY_AFFECTED', 'INDIRECTLY_AFFECTED', 
                  'affected_total', 'affected', 'AFFECTED']

if len(impacts_df) > 0:
    affected_df = impacts_df[impacts_df['impact_type'].str.upper().isin([t.upper() for t in affected_types])]
    
    if len(affected_df) > 0:
        print(f"Affected Population Records: {len(affected_df)}")
        
        # Affected by country
        affected_by_country = affected_df.groupby('country_name')['impact_value'].sum().sort_values(ascending=False)
        print("\nTotal Affected Population by Country:")
        display(affected_by_country)
    else:
        print("No affected population records found")
Affected Population Records: 56

Total Affected Population by Country:
Loading...
# Visualize affected population
if 'affected_by_country' in dir() and len(affected_by_country) > 0:
    fig = px.bar(
        x=affected_by_country.index,
        y=affected_by_country.values,
        title='Population Affected by Cold Waves and Snow Storms (2000-2020)',
        labels={'x': 'Country', 'y': 'Total Affected'},
        color=affected_by_country.values,
        color_continuous_scale='Blues'
    )
    fig.update_layout(
        xaxis_tickangle=-45,
        height=500
    )
    fig.show()
Loading...

Step 9 — Economic Losses Analysis

# Filter for economic losses
economic_types = ['LOSS_COST', 'COST', 'LOSS', 'DAMAGED', 'DESTROYED', 'DAMAGES']

if len(impacts_df) > 0:
    economic_df = impacts_df[impacts_df['impact_type'].str.upper().isin([t.upper() for t in economic_types])]
    
    if len(economic_df) > 0:
        print(f"Economic Impact Records: {len(economic_df)}")
        
        # Filter for USD values
        usd_losses = economic_df[economic_df['impact_unit'].str.contains('USD|dollar', case=False, na=False)]
        
        if len(usd_losses) > 0:
            losses_by_country = usd_losses.groupby('country_name')['impact_value'].sum().sort_values(ascending=False)
            print("\nEconomic Losses by Country (USD):")
            display(losses_by_country)
        else:
            print("\nNo USD-denominated losses found. All economic impacts:")
            display(economic_df[['country_name', 'impact_type', 'impact_value', 'impact_unit']].head(20))
    else:
        print("No economic impact records found")
Economic Impact Records: 6

No USD-denominated losses found. All economic impacts:
Loading...

Step 10 — Temporal Analysis: Events by Month

Cold waves and snow storms peak in the Northern-Hemisphere winter (November–February). The bar chart below confirms this seasonality.

# Analyze events by month (cold waves typically winter months)
if len(impacts_df) > 0 and 'month' in impacts_df.columns:
    monthly_events = impacts_df.groupby('month').size()
    
    month_names = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 
                   'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
    
    fig = px.bar(
        x=month_names,
        y=[monthly_events.get(i+1, 0) for i in range(12)],
        title='Cold Wave and Snow Storm Events by Month',
        labels={'x': 'Month', 'y': 'Number of Events'},
        color=[monthly_events.get(i+1, 0) for i in range(12)],
        color_continuous_scale='RdBu_r'
    )
    fig.update_layout(height=400)
    fig.show()
Loading...

Step 11 — Summary & Pivot Table

# Create summary statistics
if len(impacts_df) > 0:
    print("="*60)
    print("SUMMARY: Cold Wave & Snow Storm Impacts (2000-2021)")
    print("   Europe and Central Asia Target Countries")
    print("="*60)
    
    print(f"\nData Sources:")
    for col in impacts_df['collection'].unique():
        count = len(impacts_df[impacts_df['collection'] == col])
        print(f"   - {col}: {count} impact records")
    
    print(f"\nCountries with Impact Data: {impacts_df['country_name'].nunique()}")
    
    print(f"\nTime Period: {impacts_df['year'].min()} - {impacts_df['year'].max()}")
    
    # Deaths summary
    if 'deaths_df' in dir() and len(deaths_df) > 0:
        total_deaths = deaths_df['impact_value'].sum()
        print(f"\nTotal Deaths: {total_deaths:,.0f}")
    
    # Affected summary
    if 'affected_df' in dir() and len(affected_df) > 0:
        total_affected = affected_df['impact_value'].sum()
        print(f"\nTotal Affected: {total_affected:,.0f}")
    
    print("\n" + "="*60)
else:
    print("No data available for summary")
============================================================
SUMMARY: Cold Wave & Snow Storm Impacts (2000-2021)
   Europe and Central Asia Target Countries
============================================================

Data Sources:
   - desinventar-impacts: 14 impact records
   - emdat-impacts: 163 impact records

Countries with Impact Data: 12

Time Period: 2000 - 2019

Total Deaths: 6,618

Total Affected: 11,399,376

============================================================
# Create a comprehensive impact summary table
if len(impacts_df) > 0:
    # Pivot table by country and impact type
    summary_pivot = impacts_df.pivot_table(
        index='country_name',
        columns='impact_type',
        values='impact_value',
        aggfunc='sum',
        fill_value=0
    )
    
    # Select key impact types if they exist
    key_types = ['DEATHS', 'TOTAL_AFFECTED', 'AFFECTED_TOTAL', 'INJURED', 'MISSING', 
                 'DESTROYED', 'DAMAGED', 'EVACUATED']
    available_types = [t for t in key_types if t in summary_pivot.columns]
    
    if available_types:
        summary_filtered = summary_pivot[available_types]
        print("Impact Summary by Country and Type:")
        display(summary_filtered)
    else:
        print("Full Impact Summary:")
        display(summary_pivot)
Full Impact Summary:
Loading...
# Display sample of the data
if len(impacts_df) > 0:
    print("Sample Data (first 20 records):")
    display(impacts_df[['datetime', 'country_name', 'hazard_codes', 'impact_type', 
                        'impact_value', 'impact_unit', 'collection']].head(20))
Sample Data (first 20 records):
Loading...

Conclusion & Key Findings

Data Sources
Impact Types
Hazard Codes
  • DesInventar — Detailed local disaster impact records Guha-Sapir (2024)

  • EM-DAT — Global disaster database with comprehensive impact metrics Guha-Sapir (2024)

  • IFRC DREF — Red Cross disaster response operations


References
  1. Guha-Sapir, D. (2024). EM-DAT: The Emergency Events Database. Centre for Research on the Epidemiology of Disasters (CRED). https://www.emdat.be