To upload files, please first save the app
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPooling2D, Input, GlobalAveragePooling2D
from tensorflow.keras.applications import ResNet50, VGG16, InceptionV3
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import time
import subprocess
import sys
import warnings
warnings.filterwarnings('ignore')
# Optional imports for UI when in a notebook environment
try:
import ipywidgets as widgets
from IPython.display import display, clear_output, HTML
from google.colab import files
NOTEBOOK_ENV = True
except ImportError:
NOTEBOOK_ENV = False
print("TensorFlow version:", tf.__version__)
# Define constants
IMG_SIZE = 224 # Standard image size for preprocessing
BATCH_SIZE = 32
# Set up Kaggle credentials (users will need to provide their own)
def setup_kaggle():
try:
# Create Kaggle directory if it doesn't exist
os.makedirs('/root/.kaggle', exist_ok=True)
# Check if kaggle.json exists
if not os.path.exists('/root/.kaggle/kaggle.json'):
print("Please manually place your kaggle.json file in /root/.kaggle/ directory")
if NOTEBOOK_ENV:
print("Please upload your kaggle.json file")
uploaded = files.upload()
# Move the uploaded file to the right location
with open('/root/.kaggle/kaggle.json', 'wb') as f:
f.write(uploaded[list(uploaded.keys())[0]])
# Set permissions
os.chmod('/root/.kaggle/kaggle.json', 600)
print("Kaggle API setup complete!")
# Install Kaggle if not already installed
try:
import kaggle
except ImportError:
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "kaggle"])
except Exception as e:
print(f"Error setting up Kaggle: {e}")
# Function to download datasets from Kaggle
def download_datasets():
# Create data directory
os.makedirs('datasets', exist_ok=True)
datasets = {
'alzheimer': 'lukechugh/best-alzheimer-mri-dataset-99-accuracy',
'brain_tumor': 'masoudnickparvar/brain-tumor-mri-dataset',
'parkinsons': 'turkertuncer/brain-disorders-four-categories' # Using this as a substitute
}
for name, dataset_path in datasets.items():
target_dir = f'datasets/{name}'
if not os.path.exists(target_dir):
print(f"Downloading {name} dataset...")
try:
# Use kaggle API via subprocess instead of ! command
subprocess.run(["kaggle", "datasets", "download", "-d", dataset_path, "-p", target_dir],
check=True)
# Find and unzip downloaded files
for zip_file in os.listdir(target_dir):
if zip_file.endswith('.zip'):
zip_path = os.path.join(target_dir, zip_file)
subprocess.run(["unzip", "-q", zip_path, "-d", target_dir], check=True)
os.remove(zip_path)
print(f"{name} dataset downloaded and extracted.")
except Exception as e:
print(f"Error downloading {name} dataset: {e}")
else:
print(f"{name} dataset already exists.")
# Functions for preprocessing datasets
def preprocess_alzheimer_dataset():
base_dir = 'datasets/alzheimer'
categories = ['NonDemented', 'VeryMildDemented', 'MildDemented', 'ModerateDemented']
X, y = [], []
for idx, category in enumerate(categories):
path = os.path.join(base_dir, category)
if os.path.exists(path):
for img_file in os.listdir(path):
img_path = os.path.join(path, img_file)
try:
img = cv2.imread(img_path)
img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
img = img / 255.0 # Normalize
X.append(img)
y.append(idx)
except Exception as e:
print(f"Error processing {img_path}: {e}")
return np.array(X), np.array(y)
def preprocess_brain_tumor_dataset():
base_dir = 'datasets/brain_tumor'
categories = ['glioma', 'meningioma', 'notumor', 'pituitary']
X, y = [], []
for idx, category in enumerate(categories):
path = os.path.join(base_dir, 'Training', category)
if os.path.exists(path):
for img_file in os.listdir(path):
img_path = os.path.join(path, img_file)
try:
img = cv2.imread(img_path)
img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
img = img / 255.0 # Normalize
X.append(img)
y.append(idx)
except Exception as e:
print(f"Error processing {img_path}: {e}")
return np.array(X), np.array(y)
def preprocess_parkinsons_dataset():
base_dir = 'datasets/parkinsons'
categories = ['parkinsons', 'normal']
X, y = [], []
for idx, category in enumerate(categories):
path = os.path.join(base_dir, category)
if os.path.exists(path):
for img_file in os.listdir(path):
img_path = os.path.join(path, img_file)
try:
img = cv2.imread(img_path)
img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
img = img / 255.0 # Normalize
X.append(img)
y.append(idx)
except Exception as e:
print(f"Error processing {img_path}: {e}")
return np.array(X), np.array(y)
# Define CNN models
def create_alexnet_model(input_shape, num_classes):
model = Sequential([
Conv2D(96, 11, strides=4, padding='same', activation='relu', input_shape=input_shape),
MaxPooling2D(3, strides=2),
Conv2D(256, 5, padding='same', activation='relu'),
MaxPooling2D(3, strides=2),
Conv2D(384, 3, padding='same', activation='relu'),
Conv2D(384, 3, padding='same', activation='relu'),
Conv2D(256, 3, padding='same', activation='relu'),
MaxPooling2D(3, strides=2),
Flatten(),
Dense(4096, activation='relu'),
Dropout(0.5),
Dense(4096, activation='relu'),
Dropout(0.5),
Dense(num_classes, activation='softmax')
])
model.compile(optimizer=Adam(),
loss='categorical_crossentropy',
metrics=['accuracy'])
return model
def create_resnet50_model(input_shape, num_classes):
base_model = ResNet50(weights='imagenet', include_top=False, input_shape=input_shape)
# Freeze the base model layers
for layer in base_model.layers:
layer.trainable = False
# Add classification head
x = GlobalAveragePooling2D()(base_model.output)
x = Dense(512, activation='relu')(x)
x = Dropout(0.5)(x)
predictions = Dense(num_classes, activation='softmax')(x)
model = Model(inputs=base_model.input, outputs=predictions)
model.compile(optimizer=Adam(learning_rate=0.0001),
loss='categorical_crossentropy',
metrics=['accuracy'])
return model
def create_vgg16_model(input_shape, num_classes):
base_model = VGG16(weights='imagenet', include_top=False, input_shape=input_shape)
# Freeze the base model layers
for layer in base_model.layers:
layer.trainable = False
# Add classification head
x = GlobalAveragePooling2D()(base_model.output)
x = Dense(512, activation='relu')(x)
x = Dropout(0.5)(x)
predictions = Dense(num_classes, activation='softmax')(x)
model = Model(inputs=base_model.input, outputs=predictions)
model.compile(optimizer=Adam(learning_rate=0.0001),
loss='categorical_crossentropy',
metrics=['accuracy'])
return model
def create_inception_model(input_shape, num_classes):
base_model = InceptionV3(weights='imagenet', include_top=False, input_shape=input_shape)
# Freeze the base model layers
for layer in base_model.layers:
layer.trainable = False
# Add classification head
x = GlobalAveragePooling2D()(base_model.output)
x = Dense(512, activation='relu')(x)
x = Dropout(0.5)(x)
predictions = Dense(num_classes, activation='softmax')(x)
model = Model(inputs=base_model.input, outputs=predictions)
model.compile(optimizer=Adam(learning_rate=0.0001),
loss='categorical_crossentropy',
metrics=['accuracy'])
return model
# Function to train models
def train_model(model, X_train, y_train, X_val, y_val, epochs=10):
start_time = time.time()
# Convert labels to categorical
y_train_cat = to_categorical(y_train)
y_val_cat = to_categorical(y_val)
# Data augmentation
datagen = ImageDataGenerator(
rotation_range=15,
width_shift_range=0.1,
height_shift_range=0.1,
horizontal_flip=True,
vertical_flip=False,
zoom_range=0.1
)
# Train the model
history = model.fit(
datagen.flow(X_train, y_train_cat, batch_size=BATCH_SIZE),
epochs=epochs,
validation_data=(X_val, y_val_cat),
verbose=1
)
# Calculate training time
training_time = time.time() - start_time
return model, history, training_time
# Function to evaluate models
def evaluate_model(model, X_test, y_test):
start_time = time.time()
# Convert labels to categorical
y_test_cat = to_categorical(y_test)
# Predict
y_pred_prob = model.predict(X_test)
y_pred = np.argmax(y_pred_prob, axis=1)
y_test_class = np.argmax(y_test_cat, axis=1)
# Calculate metrics
accuracy = accuracy_score(y_test_class, y_pred)
precision = precision_score(y_test_class, y_pred, average='weighted')
recall = recall_score(y_test_class, y_pred, average='weighted')
f1 = f1_score(y_test_class, y_pred, average='weighted')
# Calculate evaluation time
eval_time = time.time() - start_time
return {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1_score': f1,
'eval_time': eval_time
}
# Function to save trained models
def save_trained_models(models, dataset_name):
os.makedirs('saved_models', exist_ok=True)
for model_name, model in models.items():
model.save(f'saved_models/{dataset_name}_{model_name}_model.h5')
print(f"Saved {model_name} model for {dataset_name}")
# Function to load or train models for each dataset
def get_models(dataset_name, X, y, force_train=False):
models = {
'alexnet': None,
'resnet50': None,
'vgg16': None,
'inception': None
}
model_results = {}
training_time = None
# Split dataset
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
num_classes = len(np.unique(y))
input_shape = X[0].shape
os.makedirs('saved_models', exist_ok=True)
for model_name in models.keys():
model_path = f'saved_models/{dataset_name}_{model_name}_model.h5'
if os.path.exists(model_path) and not force_train:
print(f"Loading pre-trained {model_name} model for {dataset_name}...")
models[model_name] = load_model(model_path)
else:
print(f"Training {model_name} model for {dataset_name}...")
if model_name == 'alexnet':
model = create_alexnet_model(input_shape, num_classes)
elif model_name == 'resnet50':
model = create_resnet50_model(input_shape, num_classes)
elif model_name == 'vgg16':
model = create_vgg16_model(input_shape, num_classes)
elif model_name == 'inception':
model = create_inception_model(input_shape, num_classes)
# Train the model (use fewer epochs for demonstration)
model, history, training_time = train_model(model, X_train, y_train, X_val, y_val, epochs=5)
models[model_name] = model
# Save the model
model.save(model_path)
print(f"Saved {model_name} model for {dataset_name}")
# Evaluate the model
eval_results = evaluate_model(models[model_name], X_test, y_test)
eval_results['training_time'] = training_time
model_results[model_name] = eval_results
return models, model_results
# Function to preprocess a user-uploaded image
def preprocess_user_image(img):
img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
img = img / 255.0 # Normalize
return np.expand_dims(img, axis=0) # Add batch dimension
# Function to classify a user image with all models
def classify_image(img, models, dataset_info):
preprocessed_img = preprocess_user_image(img)
results = {}
for dataset_name, dataset_models in models.items():
results[dataset_name] = {}
for model_name, model in dataset_models.items():
start_time = time.time()
prediction = model.predict(preprocessed_img)
inference_time = time.time() - start_time
# Get the predicted class and probability
predicted_class_idx = np.argmax(prediction[0])
probability = prediction[0][predicted_class_idx]
# Get the class name
class_name = dataset_info[dataset_name]['classes'][predicted_class_idx]
results[dataset_name][model_name] = {
'class_name': class_name,
'probability': float(probability),
'inference_time': inference_time
}
return results
# Create a simple UI using ipywidgets (only for notebook environments)
def create_ui(models, dataset_info):
if not NOTEBOOK_ENV:
print("UI creation is only available in notebook environments like Jupyter or Google Colab.")
return
# Import required modules for UI
from io import BytesIO
from PIL import Image
import cv2
# Create upload widget
upload_widget = widgets.FileUpload(
accept='image/*',
multiple=False,
description='Upload Brain MRI Image'
)
# Create dataset selection widget
dataset_widget = widgets.SelectMultiple(
options=list(dataset_info.keys()),
value=[list(dataset_info.keys())[0]],
description='Select Datasets',
disabled=False
)
# Create button to trigger analysis
analyze_button = widgets.Button(
description='Analyze Image',
disabled=False,
button_style='primary',
tooltip='Click to analyze the uploaded image'
)
# Create output widget to display results
output = widgets.Output()
# Create progress widget
progress = widgets.IntProgress(
value=0,
min=0,
max=10,
description='Processing:',
bar_style='info',
orientation='horizontal'
)
# Function to handle button click
def on_analyze_button_clicked(b):
with output:
clear_output()
if not upload_widget.value:
print("Please upload an image first.")
return
selected_datasets = dataset_widget.value
if not selected_datasets:
print("Please select at least one dataset.")
return
# Read the uploaded image
uploaded_file = list(upload_widget.value.values())[0]
content = uploaded_file['content']
# Convert to numpy array
img = np.array(Image.open(BytesIO(content)))
# Display the uploaded image
plt.figure(figsize=(4, 4))
plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
plt.title("Uploaded Image")
plt.axis('off')
plt.show()
# Set progress
progress.value = 2
# Filter models based on selected datasets
selected_models = {ds: models[ds] for ds in selected_datasets}
print("Analyzing image...")
progress.value = 5
# Classify the image
results = classify_image(img, selected_models, dataset_info)
progress.value = 8
# Display results in a table and visualize
for dataset_name in selected_datasets:
print(f"\n--- {dataset_name.upper()} DATASET RESULTS ---")
# Create table for the current dataset
data = []
for model_name, result in results[dataset_name].items():
data.append({
'Model': model_name,
'Predicted Class': result['class_name'],
'Probability': f"{result['probability']:.4f}",
'Inference Time (s)': f"{result['inference_time']:.4f}"
})
df = pd.DataFrame(data)
display(df)
# Create bar chart for probabilities
plt.figure(figsize=(10, 6))
bars = plt.bar(
df['Model'],
df['Probability'].astype(float),
color=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728']
)
# Add value labels on top of bars
for bar in bars:
height = bar.get_height()
plt.text(
bar.get_x() + bar.get_width()/2.,
height + 0.01,
f"{height:.2f}",
ha='center', va='bottom', rotation=0
)
plt.title(f'Probability by Model for {dataset_name.capitalize()} Dataset')
plt.ylabel('Probability')
plt.ylim(0, 1.1)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()
# Create bar chart for inference times
plt.figure(figsize=(10, 6))
bars = plt.bar(
df['Model'],
df['Inference Time (s)'].astype(float),
color=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728']
)
# Add value labels on top of bars
for bar in bars:
height = bar.get_height()
plt.text(
bar.get_x() + bar.get_width()/2.,
height + 0.001,
f"{height:.3f}",
ha='center', va='bottom', rotation=0
)
plt.title(f'Inference Time by Model for {dataset_name.capitalize()} Dataset')
plt.ylabel('Time (seconds)')
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()
progress.value = 10
print("Analysis complete!")
# Attach the click handler
analyze_button.on_click(on_analyze_button_clicked)
# Display the UI
display(widgets.VBox([
widgets.HTML('<h2>Comparative Analysis of ML Models on Brain Disorder Image Datasets</h2>'),
widgets.HBox([upload_widget, dataset_widget]),
analyze_button,
progress,
output
]))
# CLI version for non-notebook environments
def run_cli_analysis(models, dataset_info):
import cv2
print("\nCLI Interface for Brain Disorder Image Analysis")
print("================================================")
# Ask for image path
image_path = input("\nEnter the path to the brain MRI image: ")
if not os.path.exists(image_path):
print(f"Error: Image not found at {image_path}")
return
# Load the image
try:
img = cv2.imread(image_path)
if img is None:
print(f"Error: Could not read image at {image_path}")
return
except Exception as e:
print(f"Error loading image: {e}")
return
# Ask which datasets to use
print("\nAvailable datasets:")
for i, dataset in enumerate(dataset_info.keys()):
print(f"{i+1}. {dataset}")
dataset_choices = input("\nEnter dataset numbers to use (comma-separated, or 'all'): ")
if dataset_choices.lower() == 'all':
selected_datasets = list(dataset_info.keys())
else:
try:
indices = [int(x.strip()) - 1 for x in dataset_choices.split(',')]
selected_datasets = [list(dataset_info.keys())[i] for i in indices]
except (ValueError, IndexError):
print("Invalid selection. Using all datasets.")
selected_datasets = list(dataset_info.keys())
print(f"\nAnalyzing image using {', '.join(selected_datasets)} datasets...")
# Filter models based on selected datasets
selected_models = {ds: models[ds] for ds in selected_datasets}
# Classify the image
results = classify_image(img, selected_models, dataset_info)
# Display results
print("\nAnalysis Results:")
print("================")
for dataset_name in selected_datasets:
print(f"\n--- {dataset_name.upper()} DATASET RESULTS ---")
# Create table for the current dataset
print(f"{'Model':<12} {'Predicted Class':<20} {'Probability':<12} {'Inference Time (s)':<18}")
print("-" * 65)
for model_name, result in results[dataset_name].items():
print(f"{model_name:<12} {result['class_name']:<20} {result['probability']:.4f} {result['inference_time']:.4f}")
print("\nAnalysis complete!")
# Ask if user wants to save the results
save_choice = input("\nDo you want to save these results to a CSV file? (y/n): ")
if save_choice.lower() == 'y':
# Create a DataFrame with all results
all_data = []
for dataset_name in selected_datasets:
for model_name, result in results[dataset_name].items():
all_data.append({
'Dataset': dataset_name,
'Model': model_name,
'Predicted Class': result['class_name'],
'Probability': result['probability'],
'Inference Time (s)': result['inference_time']
})
df = pd.DataFrame(all_data)
# Save to CSV
csv_filename = f"brain_disorder_analysis_results_{int(time.time())}.csv"
df.to_csv(csv_filename, index=False)
print(f"Results saved to {csv_filename}")
# Main function to run the entire analysis
def main():
print("Setting up the environment...")
# Need to import OpenCV here
try:
import cv2
except ImportError:
print("OpenCV is not installed. Installing now...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "opencv-python"])
import cv2
# Setup Kaggle
setup_kaggle()
# Download datasets
download_datasets()
# Define dataset information
dataset_info = {
'alzheimer': {
'classes': ['NonDemented', 'VeryMildDemented', 'MildDemented', 'ModerateDemented'],
'preprocess_func': preprocess_alzheimer_dataset
},
'brain_tumor': {
'classes': ['Glioma', 'Meningioma', 'No Tumor', 'Pituitary'],
'preprocess_func': preprocess_brain_tumor_dataset
},
'parkinsons': {
'classes': ['Parkinsons', 'Normal'],
'preprocess_func': preprocess_parkinsons_dataset
}
}
# Load and prepare datasets
print("Loading and preparing datasets...")
datasets = {}
for name, info in dataset_info.items():
X, y = info['preprocess_func']()
datasets[name] = (X, y)
print(f"Loaded {name} dataset: {X.shape} images, {len(info['classes'])} classes")
# Train or load models for each dataset
print("Training or loading models...")
all_models = {}
all_results = {}
for name, (X, y) in datasets.items():
print(f"\nProcessing {name} dataset...")
models, results = get_models(name, X, y)
all_models[name] = models
all_results[name] = results
# Display comparative results
print("\nComparative Analysis of Models:")
for dataset_name, results in all_results.items():
print(f"\n--- {dataset_name.upper()} DATASET ---")
# Create table
data = []
for model_name, metrics in results.items():
data.append({
'Model': model_name,
'Accuracy': f"{metrics['accuracy']:.4f}",
'Precision': f"{metrics['precision']:.4f}",
'Recall': f"{metrics['recall']:.4f}",
'F1 Score': f"{metrics['f1_score']:.4f}",
'Eval Time (s)': f"{metrics['eval_time']:.4f}"
})
df = pd.DataFrame(data)
print(df)
# If in a notebook environment, show plots
if NOTEBOOK_ENV:
from IPython.display import display
display(df)
# Create accuracy bar chart
plt.figure(figsize=(12, 6))
bars = plt.bar(
df['Model'],
df['Accuracy'].astype(float),
color=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728']
)
# Add value labels on top of bars
for bar in bars:
height = bar.get_height()
plt.text(
bar.get_x() + bar.get_width()/2.,
height + 0.01,
f"{height:.2f}",
ha='center', va='bottom', rotation=0
)
plt.title(f'Accuracy by Model for {dataset_name.capitalize()} Dataset')
plt.ylabel('Accuracy')
plt.ylim(0, 1.1)
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()
# Create evaluation time bar chart
plt.figure(figsize=(12, 6))
bars = plt.bar(
df['Model'],
df['Eval Time (s)'].astype(float),
color=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728']
)
# Add value labels on top of bars
for bar in bars:
height = bar.get_height()
plt.text(
bar.get_x() + bar.get_width()/2.,
height + 0.01,
f"{height:.2f}",
ha='center', va='bottom', rotation=0
)
plt.title(f'Evaluation Time by Model for {dataset_name.capitalize()} Dataset')
plt.ylabel('Time (seconds)')
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()
# Launch interface based on environment
if NOTEBOOK_ENV:
print("\nLaunching the UI for image analysis...")
create_ui(all_models, dataset_info)
else:
print("\nRunning in CLI mode...")
run_cli_analysis(all_models, dataset_info)
if __name__ == "__main__":
main()
Hi! I can help you with any questions about Streamlit and Python. What would you like to know?