scripts/clustering_analysis.py

"""
Clustering analysis example with multiple algorithms, evaluation, and visualization.
"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.mixture import GaussianMixture
from sklearn.metrics import (
    silhouette_score, calinski_harabasz_score, davies_bouldin_score
)
import warnings
warnings.filterwarnings('ignore')


def preprocess_for_clustering(X, scale=True, pca_components=None):
    """
    Preprocess data for clustering.

    Parameters:
    -----------
    X : array-like
        Feature matrix
    scale : bool
        Whether to standardize features
    pca_components : int or None
        Number of PCA components (None to skip PCA)

    Returns:
    --------
    array
        Preprocessed data
    """
    X_processed = X.copy()

    if scale:
        scaler = StandardScaler()
        X_processed = scaler.fit_transform(X_processed)

    if pca_components is not None:
        pca = PCA(n_components=pca_components)
        X_processed = pca.fit_transform(X_processed)
        print(f"PCA: Explained variance ratio = {pca.explained_variance_ratio_.sum():.3f}")

    return X_processed


def find_optimal_k_kmeans(X, k_range=range(2, 11)):
    """
    Find optimal K for K-Means using elbow method and silhouette score.

    Parameters:
    -----------
    X : array-like
        Feature matrix (should be scaled)
    k_range : range
        Range of K values to test

    Returns:
    --------
    dict
        Dictionary with inertia and silhouette scores for each K
    """
    inertias = []
    silhouette_scores = []

    for k in k_range:
        kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
        labels = kmeans.fit_predict(X)

        inertias.append(kmeans.inertia_)
        silhouette_scores.append(silhouette_score(X, labels))

    # Plot results
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

    # Elbow plot
    ax1.plot(k_range, inertias, 'bo-')
    ax1.set_xlabel('Number of clusters (K)')
    ax1.set_ylabel('Inertia')
    ax1.set_title('Elbow Method')
    ax1.grid(True)

    # Silhouette plot
    ax2.plot(k_range, silhouette_scores, 'ro-')
    ax2.set_xlabel('Number of clusters (K)')
    ax2.set_ylabel('Silhouette Score')
    ax2.set_title('Silhouette Analysis')
    ax2.grid(True)

    plt.tight_layout()
    plt.savefig('clustering_optimization.png', dpi=300, bbox_inches='tight')
    print("Saved: clustering_optimization.png")
    plt.close()

    # Find best K based on silhouette score
    best_k = k_range[np.argmax(silhouette_scores)]
    print(f"\nRecommended K based on silhouette score: {best_k}")

    return {
        'k_values': list(k_range),
        'inertias': inertias,
        'silhouette_scores': silhouette_scores,
        'best_k': best_k
    }


def compare_clustering_algorithms(X, n_clusters=3):
    """
    Compare different clustering algorithms.

    Parameters:
    -----------
    X : array-like
        Feature matrix (should be scaled)
    n_clusters : int
        Number of clusters

    Returns:
    --------
    dict
        Dictionary with results for each algorithm
    """
    print("="*60)
    print(f"Comparing Clustering Algorithms (n_clusters={n_clusters})")
    print("="*60)

    algorithms = {
        'K-Means': KMeans(n_clusters=n_clusters, random_state=42, n_init=10),
        'Agglomerative': AgglomerativeClustering(n_clusters=n_clusters, linkage='ward'),
        'Gaussian Mixture': GaussianMixture(n_components=n_clusters, random_state=42)
    }

    # DBSCAN doesn't require n_clusters
    # We'll add it separately
    dbscan = DBSCAN(eps=0.5, min_samples=5)
    dbscan_labels = dbscan.fit_predict(X)

    results = {}

    for name, algorithm in algorithms.items():
        labels = algorithm.fit_predict(X)

        # Calculate metrics
        silhouette = silhouette_score(X, labels)
        calinski = calinski_harabasz_score(X, labels)
        davies = davies_bouldin_score(X, labels)

        results[name] = {
            'labels': labels,
            'n_clusters': n_clusters,
            'silhouette': silhouette,
            'calinski_harabasz': calinski,
            'davies_bouldin': davies
        }

        print(f"\n{name}:")
        print(f"  Silhouette Score:       {silhouette:.4f} (higher is better)")
        print(f"  Calinski-Harabasz:      {calinski:.4f} (higher is better)")
        print(f"  Davies-Bouldin:         {davies:.4f} (lower is better)")

    # DBSCAN results
    n_clusters_dbscan = len(set(dbscan_labels)) - (1 if -1 in dbscan_labels else 0)
    n_noise = list(dbscan_labels).count(-1)

    if n_clusters_dbscan > 1:
        # Only calculate metrics if we have multiple clusters
        mask = dbscan_labels != -1  # Exclude noise
        if mask.sum() > 0:
            silhouette = silhouette_score(X[mask], dbscan_labels[mask])
            calinski = calinski_harabasz_score(X[mask], dbscan_labels[mask])
            davies = davies_bouldin_score(X[mask], dbscan_labels[mask])

            results['DBSCAN'] = {
                'labels': dbscan_labels,
                'n_clusters': n_clusters_dbscan,
                'n_noise': n_noise,
                'silhouette': silhouette,
                'calinski_harabasz': calinski,
                'davies_bouldin': davies
            }

            print(f"\nDBSCAN:")
            print(f"  Clusters found:         {n_clusters_dbscan}")
            print(f"  Noise points:           {n_noise}")
            print(f"  Silhouette Score:       {silhouette:.4f} (higher is better)")
            print(f"  Calinski-Harabasz:      {calinski:.4f} (higher is better)")
            print(f"  Davies-Bouldin:         {davies:.4f} (lower is better)")
    else:
        print(f"\nDBSCAN:")
        print(f"  Clusters found:         {n_clusters_dbscan}")
        print(f"  Noise points:           {n_noise}")
        print("  Note: Insufficient clusters for metric calculation")

    return results


def visualize_clusters(X, results, true_labels=None):
    """
    Visualize clustering results using PCA for 2D projection.

    Parameters:
    -----------
    X : array-like
        Feature matrix
    results : dict
        Dictionary with clustering results
    true_labels : array-like or None
        True labels (if available) for comparison
    """
    # Reduce to 2D using PCA
    pca = PCA(n_components=2)
    X_2d = pca.fit_transform(X)

    # Determine number of subplots
    n_plots = len(results)
    if true_labels is not None:
        n_plots += 1

    n_cols = min(3, n_plots)
    n_rows = (n_plots + n_cols - 1) // n_cols

    fig, axes = plt.subplots(n_rows, n_cols, figsize=(5*n_cols, 4*n_rows))
    if n_plots == 1:
        axes = np.array([axes])
    axes = axes.flatten()

    plot_idx = 0

    # Plot true labels if available
    if true_labels is not None:
        ax = axes[plot_idx]
        scatter = ax.scatter(X_2d[:, 0], X_2d[:, 1], c=true_labels, cmap='viridis', alpha=0.6)
        ax.set_title('True Labels')
        ax.set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
        ax.set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')
        plt.colorbar(scatter, ax=ax)
        plot_idx += 1

    # Plot clustering results
    for name, result in results.items():
        ax = axes[plot_idx]
        labels = result['labels']

        scatter = ax.scatter(X_2d[:, 0], X_2d[:, 1], c=labels, cmap='viridis', alpha=0.6)

        # Highlight noise points for DBSCAN
        if name == 'DBSCAN' and -1 in labels:
            noise_mask = labels == -1
            ax.scatter(X_2d[noise_mask, 0], X_2d[noise_mask, 1],
                      c='red', marker='x', s=100, label='Noise', alpha=0.8)
            ax.legend()

        title = f"{name} (K={result['n_clusters']})"
        if 'silhouette' in result:
            title += f"\nSilhouette: {result['silhouette']:.3f}"
        ax.set_title(title)
        ax.set_xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%})')
        ax.set_ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%})')
        plt.colorbar(scatter, ax=ax)

        plot_idx += 1

    # Hide unused subplots
    for idx in range(plot_idx, len(axes)):
        axes[idx].axis('off')

    plt.tight_layout()
    plt.savefig('clustering_results.png', dpi=300, bbox_inches='tight')
    print("\nSaved: clustering_results.png")
    plt.close()


def complete_clustering_analysis(X, true_labels=None, scale=True,
                                 find_k=True, k_range=range(2, 11), n_clusters=3):
    """
    Complete clustering analysis workflow.

    Parameters:
    -----------
    X : array-like
        Feature matrix
    true_labels : array-like or None
        True labels (for comparison only, not used in clustering)
    scale : bool
        Whether to scale features
    find_k : bool
        Whether to search for optimal K
    k_range : range
        Range of K values to test
    n_clusters : int
        Number of clusters to use in comparison

    Returns:
    --------
    dict
        Dictionary with all analysis results
    """
    print("="*60)
    print("Clustering Analysis")
    print("="*60)
    print(f"Data shape: {X.shape}")

    # Preprocess data
    X_processed = preprocess_for_clustering(X, scale=scale)

    # Find optimal K if requested
    optimization_results = None
    if find_k:
        print("\n" + "="*60)
        print("Finding Optimal Number of Clusters")
        print("="*60)
        optimization_results = find_optimal_k_kmeans(X_processed, k_range=k_range)

        # Use recommended K
        if optimization_results:
            n_clusters = optimization_results['best_k']

    # Compare clustering algorithms
    comparison_results = compare_clustering_algorithms(X_processed, n_clusters=n_clusters)

    # Visualize results
    print("\n" + "="*60)
    print("Visualizing Results")
    print("="*60)
    visualize_clusters(X_processed, comparison_results, true_labels=true_labels)

    return {
        'X_processed': X_processed,
        'optimization': optimization_results,
        'comparison': comparison_results
    }


# Example usage
if __name__ == "__main__":
    from sklearn.datasets import load_iris, make_blobs

    print("="*60)
    print("Example 1: Iris Dataset")
    print("="*60)

    # Load Iris dataset
    iris = load_iris()
    X_iris = iris.data
    y_iris = iris.target

    results_iris = complete_clustering_analysis(
        X_iris,
        true_labels=y_iris,
        scale=True,
        find_k=True,
        k_range=range(2, 8),
        n_clusters=3
    )

    print("\n" + "="*60)
    print("Example 2: Synthetic Dataset with Noise")
    print("="*60)

    # Create synthetic dataset
    X_synth, y_synth = make_blobs(
        n_samples=500, n_features=2, centers=4,
        cluster_std=0.5, random_state=42
    )

    # Add noise points
    noise = np.random.randn(50, 2) * 3
    X_synth = np.vstack([X_synth, noise])
    y_synth_with_noise = np.concatenate([y_synth, np.full(50, -1)])

    results_synth = complete_clustering_analysis(
        X_synth,
        true_labels=y_synth_with_noise,
        scale=True,
        find_k=True,
        k_range=range(2, 8),
        n_clusters=4
    )

    print("\n" + "="*60)
    print("Analysis Complete!")
    print("="*60)
← Back to scikit-learn