scripts/fda_query.py

#!/usr/bin/env python3
"""
FDA API Query Helper

Comprehensive utility for querying FDA databases through openFDA API.
Includes error handling, rate limiting, caching, and common query patterns.

Usage:
    from fda_query import FDAQuery

    fda = FDAQuery(api_key="YOUR_API_KEY")
    results = fda.query_drug_events(drug_name="aspirin", limit=100)
"""

import requests
import time
import json
import hashlib
from pathlib import Path
from datetime import datetime, timedelta
from collections import deque, Counter
from typing import Dict, List, Optional, Any


class RateLimiter:
    """Manage API rate limits."""

    def __init__(self, max_per_minute: int = 240):
        self.max_per_minute = max_per_minute
        self.requests = deque()

    def wait_if_needed(self):
        """Wait if necessary to stay under rate limit."""
        now = time.time()

        # Remove requests older than 1 minute
        while self.requests and now - self.requests[0] > 60:
            self.requests.popleft()

        # Check if at limit
        if len(self.requests) >= self.max_per_minute:
            sleep_time = 60 - (now - self.requests[0]) + 0.1
            if sleep_time > 0:
                print(f"Rate limit approaching. Waiting {sleep_time:.1f} seconds...")
                time.sleep(sleep_time)
            self.requests.popleft()

        self.requests.append(time.time())


class FDACache:
    """Simple file-based cache for FDA API responses."""

    def __init__(self, cache_dir: str = "fda_cache", ttl: int = 3600):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
        self.ttl = ttl

    def _get_cache_key(self, url: str, params: Dict) -> str:
        """Generate cache key from URL and params."""
        cache_string = f"{url}_{json.dumps(params, sort_keys=True)}"
        return hashlib.md5(cache_string.encode()).hexdigest()

    def get(self, url: str, params: Dict) -> Optional[Dict]:
        """Get cached response if available and not expired."""
        key = self._get_cache_key(url, params)
        cache_file = self.cache_dir / f"{key}.json"

        if cache_file.exists():
            age = time.time() - cache_file.stat().st_mtime
            if age < self.ttl:
                with open(cache_file, 'r') as f:
                    return json.load(f)
        return None

    def set(self, url: str, params: Dict, data: Dict):
        """Cache response data."""
        key = self._get_cache_key(url, params)
        cache_file = self.cache_dir / f"{key}.json"
        with open(cache_file, 'w') as f:
            json.dump(data, f)


class FDAQuery:
    """Main class for querying FDA databases."""

    BASE_URL = "https://api.fda.gov"

    def __init__(self, api_key: Optional[str] = None, use_cache: bool = True,
                 cache_ttl: int = 3600, rate_limit: int = 240):
        """
        Initialize FDA query client.

        Args:
            api_key: FDA API key (optional but recommended)
            use_cache: Whether to use response caching
            cache_ttl: Cache time-to-live in seconds
            rate_limit: Requests per minute limit
        """
        self.api_key = api_key
        self.rate_limiter = RateLimiter(max_per_minute=rate_limit)
        self.cache = FDACache(ttl=cache_ttl) if use_cache else None

    def _build_url(self, category: str, endpoint: str) -> str:
        """Build full API endpoint URL."""
        return f"{self.BASE_URL}/{category}/{endpoint}.json"

    def _make_request(self, url: str, params: Dict, use_cache: bool = True) -> Dict:
        """
        Make API request with error handling, rate limiting, and caching.

        Args:
            url: Full API endpoint URL
            params: Query parameters
            use_cache: Whether to use cache for this request

        Returns:
            API response as dictionary
        """
        # Add API key if available
        if self.api_key:
            params["api_key"] = self.api_key

        # Check cache
        if use_cache and self.cache:
            cached = self.cache.get(url, params)
            if cached:
                return cached

        # Rate limiting
        self.rate_limiter.wait_if_needed()

        # Make request
        try:
            response = requests.get(url, params=params, timeout=30)
            response.raise_for_status()
            data = response.json()

            # Cache successful response
            if use_cache and self.cache:
                self.cache.set(url, params, data)

            return data

        except requests.exceptions.HTTPError as e:
            if response.status_code == 404:
                return {"error": "No results found", "results": []}
            elif response.status_code == 429:
                # Rate limit exceeded, wait and retry once
                print("Rate limit exceeded. Waiting 60 seconds...")
                time.sleep(60)
                return self._make_request(url, params, use_cache=False)
            elif response.status_code == 400:
                return {"error": f"Invalid query: {response.text}"}
            else:
                return {"error": f"HTTP error {response.status_code}: {e}"}
        except requests.exceptions.RequestException as e:
            return {"error": f"Request error: {e}"}

    def query(self, category: str, endpoint: str, search: Optional[str] = None,
              limit: int = 100, skip: int = 0, count: Optional[str] = None,
              sort: Optional[str] = None) -> Dict:
        """
        Generic query method for any FDA endpoint.

        Args:
            category: API category (drug, device, food, animalandveterinary, other)
            endpoint: Specific endpoint (event, label, enforcement, etc.)
            search: Search query string
            limit: Maximum results to return (1-1000)
            skip: Number of results to skip (for pagination)
            count: Field to count/aggregate by
            sort: Field to sort by (e.g., "receivedate:desc")

        Returns:
            API response dictionary
        """
        url = self._build_url(category, endpoint)
        params = {}

        if search:
            params["search"] = search
        if limit:
            params["limit"] = min(limit, 1000)
        if skip:
            params["skip"] = skip
        if count:
            params["count"] = count
        if sort:
            params["sort"] = sort

        return self._make_request(url, params)

    def query_all(self, category: str, endpoint: str, search: str,
                  max_results: int = 5000, batch_size: int = 100) -> List[Dict]:
        """
        Query and retrieve all results with automatic pagination.

        Args:
            category: API category
            endpoint: Specific endpoint
            search: Search query string
            max_results: Maximum total results to retrieve
            batch_size: Results per request

        Returns:
            List of all result records
        """
        all_results = []
        skip = 0

        while len(all_results) < max_results:
            data = self.query(
                category=category,
                endpoint=endpoint,
                search=search,
                limit=batch_size,
                skip=skip
            )

            if "error" in data or "results" not in data:
                break

            results = data["results"]
            if not results:
                break

            all_results.extend(results)

            if len(results) < batch_size:
                break

            skip += batch_size

        return all_results[:max_results]

    # Drug-specific methods

    def query_drug_events(self, drug_name: str, limit: int = 100) -> Dict:
        """Query drug adverse events."""
        search = f"patient.drug.medicinalproduct:*{drug_name}*"
        return self.query("drug", "event", search=search, limit=limit)

    def query_drug_label(self, drug_name: str, brand: bool = True) -> Dict:
        """Query drug labeling information."""
        field = "openfda.brand_name" if brand else "openfda.generic_name"
        search = f"{field}:{drug_name}"
        return self.query("drug", "label", search=search, limit=1)

    def query_drug_ndc(self, ndc: Optional[str] = None,
                       manufacturer: Optional[str] = None) -> Dict:
        """Query National Drug Code directory."""
        if ndc:
            search = f"product_ndc:{ndc}"
        elif manufacturer:
            search = f"labeler_name:*{manufacturer}*"
        else:
            raise ValueError("Must provide either ndc or manufacturer")

        return self.query("drug", "ndc", search=search, limit=100)

    def query_drug_recalls(self, drug_name: Optional[str] = None,
                          classification: Optional[str] = None) -> Dict:
        """Query drug recalls."""
        search_parts = []
        if drug_name:
            search_parts.append(f"product_description:*{drug_name}*")
        if classification:
            search_parts.append(f"classification:Class+{classification}")

        search = "+AND+".join(search_parts) if search_parts else None
        return self.query("drug", "enforcement", search=search, limit=100,
                         sort="report_date:desc")

    # Device-specific methods

    def query_device_events(self, device_name: str, limit: int = 100) -> Dict:
        """Query device adverse events."""
        search = f"device.brand_name:*{device_name}*"
        return self.query("device", "event", search=search, limit=limit)

    def query_device_510k(self, applicant: Optional[str] = None,
                          device_name: Optional[str] = None) -> Dict:
        """Query 510(k) clearances."""
        if applicant:
            search = f"applicant:*{applicant}*"
        elif device_name:
            search = f"device_name:*{device_name}*"
        else:
            raise ValueError("Must provide either applicant or device_name")

        return self.query("device", "510k", search=search, limit=100)

    def query_device_classification(self, product_code: str) -> Dict:
        """Query device classification by product code."""
        search = f"product_code:{product_code}"
        return self.query("device", "classification", search=search, limit=1)

    # Food-specific methods

    def query_food_events(self, product_name: Optional[str] = None,
                         industry: Optional[str] = None) -> Dict:
        """Query food adverse events."""
        if product_name:
            search = f"products.name_brand:*{product_name}*"
        elif industry:
            search = f"products.industry_name:*{industry}*"
        else:
            search = "_exists_:report_number"

        return self.query("food", "event", search=search, limit=100)

    def query_food_recalls(self, product: Optional[str] = None,
                          reason: Optional[str] = None,
                          classification: Optional[str] = None) -> Dict:
        """Query food recalls."""
        search_parts = []
        if product:
            search_parts.append(f"product_description:*{product}*")
        if reason:
            search_parts.append(f"reason_for_recall:*{reason}*")
        if classification:
            search_parts.append(f"classification:Class+{classification}")

        search = "+AND+".join(search_parts) if search_parts else "_exists_:recall_number"
        return self.query("food", "enforcement", search=search, limit=100,
                         sort="report_date:desc")

    # Animal & Veterinary methods

    def query_animal_events(self, species: Optional[str] = None,
                           drug_name: Optional[str] = None) -> Dict:
        """Query animal drug adverse events."""
        search_parts = []
        if species:
            search_parts.append(f"animal.species:*{species}*")
        if drug_name:
            search_parts.append(f"drug.brand_name:*{drug_name}*")

        search = "+AND+".join(search_parts) if search_parts else "_exists_:unique_aer_id_number"
        return self.query("animalandveterinary", "event", search=search, limit=100)

    # Substance methods

    def query_substance_by_unii(self, unii: str) -> Dict:
        """Query substance by UNII code."""
        search = f"approvalID:{unii}"
        return self.query("other", "substance", search=search, limit=1)

    def query_substance_by_name(self, name: str) -> Dict:
        """Query substance by name."""
        search = f"names.name:*{name}*"
        return self.query("other", "substance", search=search, limit=10)

    # Analysis methods

    def count_by_field(self, category: str, endpoint: str,
                      search: str, field: str, exact: bool = True) -> Dict:
        """
        Count and aggregate results by a specific field.

        Args:
            category: API category
            endpoint: Specific endpoint
            search: Search query
            field: Field to count by
            exact: Use exact phrase matching

        Returns:
            Count results
        """
        count_field = f"{field}.exact" if exact and not field.endswith(".exact") else field
        return self.query(category, endpoint, search=search, count=count_field)

    def get_date_range_data(self, category: str, endpoint: str,
                           date_field: str, days_back: int = 30,
                           additional_search: Optional[str] = None) -> List[Dict]:
        """
        Get data for a specific date range.

        Args:
            category: API category
            endpoint: Specific endpoint
            date_field: Date field name
            days_back: Number of days to look back
            additional_search: Additional search criteria

        Returns:
            List of results
        """
        end_date = datetime.now()
        start_date = end_date - timedelta(days=days_back)

        date_range = f"[{start_date.strftime('%Y%m%d')}+TO+{end_date.strftime('%Y%m%d')}]"
        search = f"{date_field}:{date_range}"

        if additional_search:
            search = f"{search}+AND+{additional_search}"

        return self.query_all(category, endpoint, search=search)


def main():
    """Example usage."""
    import os

    # Get API key from environment or use None
    api_key = os.environ.get("FDA_API_KEY")

    # Initialize client
    fda = FDAQuery(api_key=api_key)

    # Example 1: Query drug adverse events
    print("Querying aspirin adverse events...")
    events = fda.query_drug_events("aspirin", limit=10)
    if "results" in events:
        print(f"Found {len(events['results'])} events")

    # Example 2: Count reactions
    print("\nCounting reactions...")
    counts = fda.count_by_field(
        "drug", "event",
        search="patient.drug.medicinalproduct:aspirin",
        field="patient.reaction.reactionmeddrapt"
    )
    if "results" in counts:
        for item in counts["results"][:5]:
            print(f"  {item['term']}: {item['count']}")

    # Example 3: Get drug label
    print("\nGetting drug label...")
    label = fda.query_drug_label("Lipitor", brand=True)
    if "results" in label and len(label["results"]) > 0:
        result = label["results"][0]
        if "indications_and_usage" in result:
            print(f"  Indications: {result['indications_and_usage'][0][:200]}...")


if __name__ == "__main__":
    main()
← Back to fda-database