100 lines
3.3 KiB
Python
100 lines
3.3 KiB
Python
import math
|
|
from typing import Dict, List, Tuple
|
|
|
|
import numpy as np
|
|
from pandas import DataFrame
|
|
from sklearn import cluster
|
|
from sklearn.metrics import silhouette_samples, silhouette_score
|
|
|
|
|
|
def run_agglomerative(
|
|
df: DataFrame, num_clusters: int | None = 2
|
|
) -> cluster.AgglomerativeClustering:
|
|
agglomerative = cluster.AgglomerativeClustering(
|
|
n_clusters=num_clusters,
|
|
compute_distances=True,
|
|
)
|
|
return agglomerative.fit(df)
|
|
|
|
|
|
def get_linkage_matrix(model: cluster.AgglomerativeClustering) -> np.ndarray:
|
|
counts = np.zeros(model.children_.shape[0]) # type: ignore
|
|
n_samples = len(model.labels_)
|
|
for i, merge in enumerate(model.children_): # type: ignore
|
|
current_count = 0
|
|
for child_idx in merge:
|
|
if child_idx < n_samples:
|
|
current_count += 1
|
|
else:
|
|
current_count += counts[child_idx - n_samples]
|
|
counts[i] = current_count
|
|
|
|
return np.column_stack([model.children_, model.distances_, counts]).astype(float)
|
|
|
|
|
|
def print_cluster_result(
|
|
df: DataFrame, clusters_num: int, labels: np.ndarray, separator: str = ", "
|
|
):
|
|
for cluster_id in range(clusters_num):
|
|
cluster_indices = np.where(labels == cluster_id)[0]
|
|
print(f"Cluster {cluster_id + 1} ({len(cluster_indices)}):")
|
|
rules = [str(df.index[idx]) for idx in cluster_indices]
|
|
print(separator.join(rules))
|
|
print("")
|
|
print("--------")
|
|
|
|
|
|
def run_kmeans(
|
|
df: DataFrame, num_clusters: int, random_state: int
|
|
) -> Tuple[np.ndarray, np.ndarray]:
|
|
kmeans = cluster.KMeans(n_clusters=num_clusters, random_state=random_state)
|
|
labels = kmeans.fit_predict(df)
|
|
return labels, kmeans.cluster_centers_
|
|
|
|
|
|
def fit_kmeans(
|
|
reduced_data: np.ndarray, num_clusters: int, random_state: int
|
|
) -> cluster.KMeans:
|
|
kmeans = cluster.KMeans(n_clusters=num_clusters, random_state=random_state)
|
|
kmeans.fit(reduced_data)
|
|
return kmeans
|
|
|
|
|
|
def _get_kmeans_range(
|
|
df: DataFrame | np.ndarray, random_state: int
|
|
) -> Tuple[List, range]:
|
|
max_clusters = int(math.sqrt(len(df)))
|
|
clusters_range = range(2, max_clusters + 1)
|
|
kmeans_per_k = [
|
|
cluster.KMeans(n_clusters=k, random_state=random_state).fit(df)
|
|
for k in clusters_range
|
|
]
|
|
return kmeans_per_k, clusters_range
|
|
|
|
|
|
def get_clusters_inertia(df: DataFrame, random_state: int) -> Tuple[List, range]:
|
|
kmeans_per_k, clusters_range = _get_kmeans_range(df, random_state)
|
|
return [model.inertia_ for model in kmeans_per_k], clusters_range
|
|
|
|
|
|
def get_clusters_silhouette_scores(
|
|
df: DataFrame, random_state: int
|
|
) -> Tuple[List, range]:
|
|
kmeans_per_k, clusters_range = _get_kmeans_range(df, random_state)
|
|
return [
|
|
float(silhouette_score(df, model.labels_)) for model in kmeans_per_k
|
|
], clusters_range
|
|
|
|
|
|
def get_clusters_silhouettes(df: np.ndarray, random_state: int) -> Dict:
|
|
kmeans_per_k, _ = _get_kmeans_range(df, random_state)
|
|
clusters_silhouettes: Dict = {}
|
|
for model in kmeans_per_k:
|
|
silhouette_value = silhouette_score(df, model.labels_)
|
|
sample_silhouette_values = silhouette_samples(df, model.labels_)
|
|
clusters_silhouettes[model.n_clusters] = (
|
|
silhouette_value,
|
|
sample_silhouette_values,
|
|
model,
|
|
)
|
|
return clusters_silhouettes |