Speed Test
"""
Internet Speed Test - Enhanced Version with Server Selection
========================================================
Speed test with continuous monitoring, system tray, desktop overlay and server selection.
Requirements:
pip install pystray pillow keyboard
To create the executable:
pyinstaller --onefile --console Python_gui.py
"""
import tkinter as tk
from tkinter import ttk, messagebox, Scale
import threading
from datetime import datetime
import time
import urllib.request
import urllib.error
import socket
import json
import ssl
import sys
import os
import random
import http.client
import io
import base64
import configparser
import certifi
import logging
# Setup file logging
log_file = "speedtest_debug.log"
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(log_file, mode='w', encoding='utf-8'),
logging.StreamHandler(sys.stdout) if sys.stdout is not None else logging.NullHandler()
]
)
# Enhanced logging for debugging
def log(message, level="INFO"):
"""Enhanced logging function"""
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
log_message = f"[{timestamp}] [{level}] {message}"
# Print to console if available
try:
print(log_message)
if sys.stdout is not None:
sys.stdout.flush()
except:
pass
# Always log to file
if level == "ERROR":
logging.error(message)
elif level == "WARNING":
logging.warning(message)
elif level == "SUCCESS":
logging.info(f"✓ {message}")
else:
logging.info(message)
log("=" * 60)
log("SpeedTest Professional - Starting")
log("=" * 60)
log(f"Python version: {sys.version}")
log(f"Running as: {'Executable' if getattr(sys, 'frozen', False) else 'Script'}")
log(f"Current directory: {os.getcwd()}")
# Check if running as administrator on Windows
def is_admin():
"""Check if running with administrator privileges"""
try:
import ctypes
return ctypes.windll.shell32.IsUserAnAdmin()
except:
return False
if sys.platform == 'win32':
admin_status = is_admin()
log(f"Administrator privileges: {'YES' if admin_status else 'NO'}", "WARNING" if not admin_status else "SUCCESS")
if not admin_status:
log("NOTE: WinError 10013 may occur without admin privileges", "WARNING")
log("If you get connection errors, try running as administrator", "WARNING")
# For system tray
try:
import pystray
from PIL import Image, ImageDraw
TRAY_AVAILABLE = True
log("pystray and PIL loaded successfully", "SUCCESS")
except ImportError as e:
TRAY_AVAILABLE = False
log(f"pystray not available: {e}", "WARNING")
print("Install with: pip install pystray pillow")
# For global hotkeys
try:
import keyboard
KEYBOARD_AVAILABLE = True
log("keyboard library loaded successfully", "SUCCESS")
except ImportError as e:
KEYBOARD_AVAILABLE = False
log(f"keyboard not available: {e}", "WARNING")
print("Install with: pip install keyboard")
# SSL Configuration for PyInstaller compatibility
def setup_ssl_context():
"""Setup SSL context with proper certificate handling"""
try:
# Try to use certifi certificates
import certifi
ssl_context = ssl.create_default_context(cafile=certifi.where())
log(f"Using certifi certificates: {certifi.where()}", "SUCCESS")
return ssl_context
except Exception as e:
log(f"certifi not available: {e}", "WARNING")
try:
# Fallback to default context
ssl_context = ssl.create_default_context()
log("Using default SSL context", "SUCCESS")
return ssl_context
except Exception as e2:
log(f"Error creating SSL context: {e2}", "ERROR")
# Last resort: disable verification (not recommended for production)
ssl._create_default_https_context = ssl._create_unverified_context
log("SSL verification disabled (unverified context)", "WARNING")
return None
# Setup SSL
SSL_CONTEXT = setup_ssl_context()
class ConfigManager:
"""Persistent configuration management"""
def __init__(self):
self.config = configparser.ConfigParser()
self.config_file = "speedtest_config.ini"
log(f"Config file path: {os.path.abspath(self.config_file)}")
self.load_config()
def load_config(self):
"""Load configuration from file"""
if os.path.exists(self.config_file):
try:
self.config.read(self.config_file)
log("Configuration loaded successfully", "SUCCESS")
except Exception as e:
log(f"Error loading config: {e}", "ERROR")
self.create_default_config()
else:
log("Config file not found, creating default", "INFO")
self.create_default_config()
def create_default_config(self):
"""Create a default configuration"""
self.config['SETTINGS'] = {
'overlay_enabled': 'True',
'continuous_monitor': 'False',
'hotkey': 'Ctrl+Q',
'overlay_x': '',
'overlay_y': '',
'start_as_icon': 'False',
'selected_server': 'Auto (Cloudflare)',
'monitor_interval': '3'
}
self.save_config()
log("Default configuration created", "SUCCESS")
def save_config(self):
"""Save configuration to file"""
try:
with open(self.config_file, 'w') as configfile:
self.config.write(configfile)
log("Configuration saved successfully", "SUCCESS")
except Exception as e:
log(f"Error saving config: {e}", "ERROR")
def get_setting(self, section, key, default=None):
"""Get a configuration value"""
try:
value = self.config.get(section, key)
log(f"Config get: {section}.{key} = {value}")
return value
except (configparser.NoSectionError, configparser.NoOptionError):
log(f"Config get: {section}.{key} = {default} (default)")
return default
def set_setting(self, section, key, value):
"""Set a configuration value and save"""
if not self.config.has_section(section):
self.config.add_section(section)
self.config.set(section, key, str(value))
self.save_config()
log(f"Config set: {section}.{key} = {value}")
class DesktopOverlay:
"""Desktop overlay to show real-time speed"""
def __init__(self, config_manager):
self.overlay = None
self.speed_label = None
self.type_label = None
self.is_visible = False
self.monitoring_active = False
self.config_manager = config_manager
log("DesktopOverlay initialized")
def create(self):
"""Create the desktop overlay"""
if self.overlay:
log("Overlay already exists")
return
log("Creating desktop overlay...")
try:
self.overlay = tk.Toplevel()
self.overlay.title("Speed Monitor")
# Overlay window configuration
self.overlay.overrideredirect(True)
self.overlay.attributes('-topmost', True)
# Get transparency from config (default 0.7)
transparency = float(self.config_manager.get_setting('SETTINGS', 'overlay_transparency', '0.7'))
self.overlay.attributes('-alpha', transparency)
log(f"Overlay transparency: {transparency}")
# Get screen dimensions
screen_width = self.overlay.winfo_screenwidth()
screen_height = self.overlay.winfo_screenheight()
log(f"Screen size: {screen_width}x{screen_height}")
# Overlay size and position
overlay_width = 140
overlay_height = 55
# Load saved position
saved_x = self.config_manager.get_setting('SETTINGS', 'overlay_x')
saved_y = self.config_manager.get_setting('SETTINGS', 'overlay_y')
if saved_x and saved_y:
x_position = int(saved_x)
y_position = int(saved_y)
else:
x_position = screen_width - overlay_width - 5
y_position = 5
self.overlay.geometry(f"{overlay_width}x{overlay_height}+{x_position}+{y_position}")
log(f"Overlay position: {x_position},{y_position}")
# Main frame
main_frame = tk.Frame(
self.overlay,
bg="#000000",
highlightbackground="#00ff00",
highlightthickness=1
)
main_frame.pack(fill="both", expand=True)
# Test type label
self.type_label = tk.Label(
main_frame,
text="SPEED",
font=("Arial", 7, "bold"),
bg="#000000",
fg="#00ff00"
)
self.type_label.pack(pady=(1, 0))
# Speed label
self.speed_label = tk.Label(
main_frame,
text="0.0 Mbps",
font=("Arial", 14, "bold"),
bg="#000000",
fg="#00ffff"
)
self.speed_label.pack()
# Direction label
self.direction_label = tk.Label(
main_frame,
text="",
font=("Arial", 6),
bg="#000000",
fg="#888888"
)
self.direction_label.pack()
# Status label
self.status_label = tk.Label(
main_frame,
text="",
font=("Arial", 5),
bg="#000000",
fg="#ffff00"
)
self.status_label.pack()
# Make draggable
self.make_draggable()
# Right-click to close
main_frame.bind("<Button-3>", lambda e: self.hide())
self.is_visible = True
log("Desktop overlay created successfully", "SUCCESS")
except Exception as e:
log(f"Error creating overlay: {e}", "ERROR")
def make_draggable(self):
"""Make the overlay draggable"""
def start_drag(event):
self.overlay.x = event.x
self.overlay.y = event.y
def drag(event):
deltax = event.x - self.overlay.x
deltay = event.y - self.overlay.y
x = self.overlay.winfo_x() + deltax
y = self.overlay.winfo_y() + deltay
self.overlay.geometry(f"+{x}+{y}")
def save_position(event):
x = self.overlay.winfo_x()
y = self.overlay.winfo_y()
self.config_manager.set_setting('SETTINGS', 'overlay_x', x)
self.config_manager.set_setting('SETTINGS', 'overlay_y', y)
log(f"Overlay position saved: {x},{y}")
self.overlay.bind("<Button-1>", start_drag)
self.overlay.bind("<B1-Motion>", drag)
self.overlay.bind("<ButtonRelease-1>", save_position)
def update(self, speed, test_type="", monitoring=False):
"""Update the displayed speed"""
if not self.overlay or not self.is_visible:
return
self.speed_label.config(text=f"{speed:.1f} Mbps")
if monitoring:
self.type_label.config(text="MONITOR", fg="#ffff00")
self.status_label.config(text="● Active")
self.monitoring_active = True
else:
self.status_label.config(text="")
self.monitoring_active = False
if test_type == "download":
self.type_label.config(text="DOWNLOAD", fg="#00ffff")
self.speed_label.config(fg="#00ffff")
self.direction_label.config(text="⬇")
elif test_type == "upload":
self.type_label.config(text="UPLOAD", fg="#ff00ff")
self.speed_label.config(fg="#ff00ff")
self.direction_label.config(text="⬆")
else:
self.type_label.config(text="SPEED", fg="#00ff00")
self.direction_label.config(text="")
def hide(self):
"""Hide the overlay"""
if self.overlay:
self.overlay.withdraw()
self.is_visible = False
log("Overlay hidden")
def show(self):
"""Show the overlay"""
if self.overlay:
self.overlay.deiconify()
self.is_visible = True
log("Overlay shown")
else:
self.create()
def destroy(self):
"""Destroy the overlay"""
if self.overlay:
self.overlay.destroy()
self.overlay = None
log("Overlay destroyed")
class ContinuousMonitor:
"""Continuous monitor with improved stability and accuracy"""
def __init__(self, server_callback=None):
self.monitoring = False
self.monitor_thread = None
self.server_callback = server_callback
self.interval = 3
self.speed_buffer = []
self.buffer_size = 3
self.bandwidth_level = 3 # Default: Medium (1-5)
self.last_speed = 0.0 # Track last measured speed
log("ContinuousMonitor initialized")
def set_interval(self, interval):
"""Set the monitoring interval"""
self.interval = max(0.5, min(10.0, interval))
log(f"Monitor interval set to: {self.interval}s")
def set_bandwidth_level(self, level):
"""Set the bandwidth usage level (1-6)"""
self.bandwidth_level = max(1, min(6, level))
log(f"Bandwidth level set to: {self.bandwidth_level}")
def get_last_speed(self):
"""Get the last measured speed"""
return self.last_speed
def get_download_size(self):
"""Get download size based on bandwidth level"""
# Map bandwidth level to download size in bytes
size_map = {
1: 2000000, # 2MB - Minimal impact (fastest updates possible)
2: 5000000, # 5MB - Very Low impact
3: 10000000, # 10MB - Low impact
4: 15000000, # 15MB - Medium impact (default)
5: 25000000, # 25MB - High impact
6: 50000000 # 50MB - Maximum impact (most accurate)
}
return size_map.get(self.bandwidth_level, 15000000)
def start_monitoring(self, callback, overlay_callback=None):
"""Start continuous monitoring"""
log("Starting continuous monitoring...")
self.monitoring = True
self.speed_buffer = []
self.monitor_thread = threading.Thread(
target=self._monitor_loop,
args=(callback, overlay_callback),
daemon=True
)
self.monitor_thread.start()
log("Monitoring thread started", "SUCCESS")
def stop_monitoring(self):
"""Stop monitoring"""
log("Stopping continuous monitoring...")
self.monitoring = False
self.speed_buffer = []
# Wait for monitoring thread to finish
if self.monitor_thread and self.monitor_thread.is_alive():
log("Waiting for monitoring thread to finish...")
self.monitor_thread.join(timeout=3.0) # Wait max 3 seconds
if self.monitor_thread.is_alive():
log("Warning: Monitoring thread did not stop in time", "WARNING")
else:
log("Monitoring thread stopped successfully", "SUCCESS")
log("Monitoring stopped", "SUCCESS")
def _calculate_weighted_average(self, speeds):
"""Calculate simple average for more accurate speed measurement"""
if not speeds:
return 0
# Use simple average instead of weighted to avoid over-optimistic readings
return sum(speeds) / len(speeds)
def _monitor_loop(self, callback, overlay_callback):
"""Enhanced monitoring loop"""
log("Monitor loop started")
url_index = 0
consecutive_errors = 0
while self.monitoring:
try:
loop_start_time = time.perf_counter()
# Get URLs dynamically on each iteration (in case server changes)
if self.server_callback:
base_urls = self.server_callback()
else:
base_urls = [
'https://speed.cloudflare.com/__down?bytes=',
]
# Get dynamic download size based on bandwidth level
download_size = self.get_download_size()
# Adjust samples based on interval (but ALWAYS respect user's bandwidth choice)
interval = self.interval
if interval < 2.0:
# Fast/medium mode: single sample for quicker updates
samples_per_interval = 1
else:
# Normal mode: 2 samples for better accuracy
samples_per_interval = 2
# Build full URLs with dynamic size
test_urls = [url + str(download_size) if url.endswith('bytes=') else url for url in base_urls]
url = test_urls[url_index % len(test_urls)]
url_index += 1
log(f"Monitoring test #{url_index} using: {url[:50]}... (Size: {download_size/1_000_000:.1f}MB)")
interval_speeds = []
for sample in range(samples_per_interval):
if not self.monitoring:
break
try:
cache_buster = f"&_={int(time.time() * 1000)}_{random.randint(1000, 9999)}"
if '?' in url:
request_url = url + cache_buster
else:
request_url = url + '?' + cache_buster[1:]
request = urllib.request.Request(
request_url,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': '*/*',
'Accept-Encoding': 'identity',
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Pragma': 'no-cache',
'Connection': 'close',
}
)
bytes_downloaded = 0
chunk_size = 131072 # 128KB chunks
warmup_bytes = chunk_size * 2 # Skip first 2 chunks for TCP warmup
with urllib.request.urlopen(request, timeout=15, context=SSL_CONTEXT) as response:
# Warmup phase - skip first chunks to avoid TCP slow start
warmup_downloaded = 0
while warmup_downloaded < warmup_bytes:
chunk = response.read(chunk_size)
if not chunk:
break
warmup_downloaded += len(chunk)
# Now start timing for accurate measurement
start_time = time.perf_counter()
while True:
chunk = response.read(chunk_size)
if not chunk:
break
bytes_downloaded += len(chunk)
# Stop when we reach the configured download size
if bytes_downloaded >= download_size:
break
end_time = time.perf_counter()
duration = end_time - start_time
if duration > 0.1:
speed_mbps = (bytes_downloaded * 8) / (duration * 1_000_000)
if speed_mbps < 10000:
interval_speeds.append(speed_mbps)
log(f"Sample {sample+1} speed: {speed_mbps:.2f} Mbps")
# Only sleep between samples if using multiple samples
if sample < samples_per_interval - 1 and samples_per_interval > 1:
time.sleep(0.3)
except Exception as e:
log(f"Sample error: {e}", "ERROR")
continue
if interval_speeds:
if len(interval_speeds) > 2:
sorted_speeds = sorted(interval_speeds)
median = sorted_speeds[len(sorted_speeds) // 2]
interval_speeds = [
s for s in interval_speeds
if median * 0.5 <= s <= median * 2
]
if interval_speeds:
avg_speed = sum(interval_speeds) / len(interval_speeds)
self.speed_buffer.append(avg_speed)
if len(self.speed_buffer) > self.buffer_size:
self.speed_buffer.pop(0)
smoothed_speed = self._calculate_weighted_average(self.speed_buffer)
self.last_speed = smoothed_speed # Save for get_last_speed()
log(f"Average speed: {smoothed_speed:.2f} Mbps", "SUCCESS")
if callback:
callback(smoothed_speed)
if overlay_callback:
overlay_callback(smoothed_speed, "", True)
consecutive_errors = 0
# Calculate sleep time based on interval
loop_end_time = time.perf_counter()
loop_duration = loop_end_time - loop_start_time if 'loop_start_time' in locals() else self.interval
sleep_time = max(0.1, self.interval - loop_duration)
sleep_steps = int(sleep_time / 0.1)
for _ in range(sleep_steps):
if not self.monitoring:
break
time.sleep(0.1)
except Exception as e:
log(f"Monitor loop error: {e}", "ERROR")
consecutive_errors += 1
if consecutive_errors > 3:
for _ in range(30):
if not self.monitoring:
break
time.sleep(0.1)
else:
for _ in range(10):
if not self.monitoring:
break
time.sleep(0.1)
log("Monitor loop ended")
class SystemTrayIcon:
"""System tray icon management"""
def __init__(self, app):
self.app = app
self.icon = None
log("SystemTrayIcon initialized")
def create_image(self):
"""Create image for system tray"""
image = Image.new('RGB', (64, 64), color='black')
draw = ImageDraw.Draw(image)
draw.rectangle([10, 20, 54, 44], fill='#00ffff')
draw.text((20, 25), "ST", fill='black')
return image
def create(self):
"""Create system tray icon"""
if not TRAY_AVAILABLE:
log("System tray not available", "WARNING")
return False
try:
menu = pystray.Menu(
pystray.MenuItem("Show", self.show_window),
pystray.MenuItem("Monitor ON/OFF", self.toggle_monitor),
pystray.MenuItem("Overlay ON/OFF", self.toggle_overlay),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Exit", self.quit_app)
)
self.icon = pystray.Icon(
"SpeedTest",
self.create_image(),
"SpeedTest Pro",
menu
)
threading.Thread(target=self.icon.run, daemon=True).start()
log("System tray icon created", "SUCCESS")
return True
except Exception as e:
log(f"Error creating system tray: {e}", "ERROR")
return False
def show_window(self, icon, item):
"""Show main window"""
self.app.root.after(0, self.app.restore_from_tray)
def toggle_monitor(self, icon, item):
"""Toggle continuous monitor"""
self.app.root.after(0, self.app.toggle_continuous_monitor)
def toggle_overlay(self, icon, item):
"""Toggle overlay"""
self.app.root.after(0, self.app.toggle_overlay_hotkey)
def quit_app(self, icon, item):
"""Quit application"""
self.icon.stop()
self.app.root.after(0, self.app.quit_application)
def stop(self):
"""Stop system tray icon"""
if self.icon:
self.icon.stop()
log("System tray stopped")
class ProfessionalSpeedTest:
"""Speed test engine"""
def __init__(self):
log("Initializing ProfessionalSpeedTest...")
self.servers = {
'Auto (Cloudflare)': {
'download': [
'https://speed.cloudflare.com/__down?bytes=',
],
'sizes': [1000000, 10000000, 25000000, 50000000],
'ping': [('1.1.1.1', 53)]
},
'Europe (Cloudflare)': {
'download': [
'https://speed.cloudflare.com/__down?bytes=',
],
'sizes': [5000000, 10000000, 25000000],
'ping': [('1.1.1.1', 53)]
},
'USA (Linode Dallas)': {
'download': [
'https://speedtest.dallas.linode.com/100MB-dallas.bin',
],
'sizes': [3000000, 10000000, 25000000],
'ping': [('8.8.8.8', 53)]
},
'Asia (Singapore OVH)': {
'download': [
'https://sgp.proof.ovh.net/files/100Mb.dat',
],
'sizes': [3000000, 8000000, 20000000],
'ping': [('8.8.8.8', 53)]
},
'Australia (Sydney OVH)': {
'download': [
'https://syd.proof.ovh.net/files/100Mb.dat',
],
'sizes': [3000000, 8000000, 20000000],
'ping': [('8.8.8.8', 53)]
}
}
self.current_server = 'Auto (Cloudflare)'
log(f"SpeedTest initialized with server: {self.current_server}", "SUCCESS")
def set_server(self, server_name):
"""Set the current test server"""
if server_name in self.servers:
self.current_server = server_name
log(f"Server changed to: {server_name}")
else:
log(f"Server '{server_name}' not found, using Auto (Cloudflare)", "WARNING")
self.current_server = 'Auto (Cloudflare)'
def get_current_server_urls(self):
"""Get URLs for continuous monitoring (base URLs without size)"""
server = self.servers[self.current_server]
base_urls = server['download']
log(f"Monitor URLs for {self.current_server}: {base_urls}")
return base_urls
def test_ping_advanced(self):
"""Advanced ping test"""
log("Starting ping test...")
ping_hosts = self.servers[self.current_server]['ping']
pings = []
for host, port in ping_hosts:
try:
log(f"Pinging {host}:{port}...")
start = time.perf_counter()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((host, port))
sock.close()
if result == 0:
end = time.perf_counter()
ping_ms = (end - start) * 1000
pings.append(ping_ms)
log(f"Ping to {host}: {ping_ms:.1f}ms", "SUCCESS")
else:
log(f"Ping to {host} failed (connection refused)", "WARNING")
except Exception as e:
log(f"Ping error for {host}: {e}", "ERROR")
continue
avg_ping = sum(pings) / len(pings) if pings else 0
log(f"Average ping: {avg_ping:.1f}ms", "SUCCESS")
return avg_ping
def test_download_optimized(self, progress_callback=None, speed_callback=None):
"""Optimized download test"""
log("Starting download test...")
try:
server = self.servers[self.current_server]
base_url = server['download'][0]
test_size = server['sizes'][2] if len(server['sizes']) > 2 else 25000000
if 'cloudflare' in base_url.lower():
url = base_url + str(test_size)
else:
url = f'https://speed.cloudflare.com/__down?bytes={test_size}'
log(f"Download URL: {url}")
log(f"Download size: {test_size / 1_000_000:.1f} MB")
request = urllib.request.Request(
url,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Pragma': 'no-cache'
}
)
start_time = time.perf_counter()
downloaded = 0
chunk_size = 65536
log("Opening connection...")
with urllib.request.urlopen(request, timeout=30, context=SSL_CONTEXT) as response:
log(f"Connection opened, status: {response.status}")
while True:
chunk = response.read(chunk_size)
if not chunk:
break
downloaded += len(chunk)
if progress_callback:
progress_callback((downloaded / test_size) * 100, 0)
current_speed = (downloaded * 8) / ((time.perf_counter() - start_time) * 1_000_000)
if speed_callback:
speed_callback(current_speed)
total_time = time.perf_counter() - start_time
if total_time > 0:
final_speed = (downloaded * 8) / (total_time * 1_000_000)
log(f"Download completed: {final_speed:.2f} Mbps", "SUCCESS")
return final_speed
else:
log("Download error: zero duration", "ERROR")
return 0
except Exception as e:
log(f"Download error: {e}", "ERROR")
import traceback
log(traceback.format_exc(), "ERROR")
return 0
class ImprovedUploadTest:
"""Improved upload test"""
def test_upload_comprehensive(self, progress_callback=None, speed_callback=None):
"""Comprehensive upload test"""
log("Starting upload test...")
try:
url = 'https://speed.cloudflare.com/__up'
data = os.urandom(5000000) # 5MB
log(f"Upload URL: {url}")
log(f"Upload size: {len(data) / 1_000_000:.1f} MB")
request = urllib.request.Request(
url,
data=data,
headers={
'Content-Type': 'application/octet-stream',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
}
)
start_time = time.perf_counter()
log("Opening connection...")
with urllib.request.urlopen(request, timeout=30, context=SSL_CONTEXT) as response:
log(f"Connection opened, status: {response.status}")
response.read()
upload_time = time.perf_counter() - start_time
if upload_time > 0:
speed_mbps = (len(data) * 8) / (upload_time * 1_000_000)
log(f"Upload completed: {speed_mbps:.2f} Mbps", "SUCCESS")
if speed_callback:
speed_callback(speed_mbps)
return speed_mbps
else:
log("Upload error: zero duration", "ERROR")
return 0
except Exception as e:
log(f"Upload error: {e}", "ERROR")
import traceback
log(traceback.format_exc(), "ERROR")
return 0
class SpeedTestApp:
def __init__(self, root):
log("Initializing SpeedTestApp...")
self.root = root
self.root.title("SpeedTest Pro")
# Initialize configuration manager FIRST (before anything else)
self.config_manager = ConfigManager()
# Load saved window size or use defaults
saved_width = int(self.config_manager.get_setting('SETTINGS', 'window_width', '550'))
saved_height = int(self.config_manager.get_setting('SETTINGS', 'window_height', '580'))
self.root.geometry(f"{saved_width}x{saved_height}")
self.root.resizable(True, True) # Allow resizing
# Set minimum size
self.root.minsize(500, 500)
# Theme colors
self.bg_color = "#0f0f0f"
self.fg_color = "#ffffff"
self.accent_color = "#00d4ff"
self.success_color = "#00ff88"
self.warning_color = "#ffaa00"
self.error_color = "#ff4444"
self.upload_color = "#ff00ff"
self.download_color = "#00ffff"
self.root.configure(bg=self.bg_color)
# Variables
self.download_speed = tk.StringVar(value="-- Mbps")
self.upload_speed = tk.StringVar(value="-- Mbps")
self.ping_value = tk.StringVar(value="-- ms")
self.current_speed = tk.StringVar(value="-- Mbps")
self.peak_speed = tk.StringVar(value="-- Mbps")
self.test_phase = tk.StringVar(value="")
# Control variables
self.test_running = False
self.peak_speed_value = 0
self.speed_history = []
self.current_test_type = "idle"
# Hotkey polling system
self.hotkey_polling_active = True
self.hotkey_polling_thread = None
self.last_hotkey_execution = 0
# Load settings
overlay_enabled = self.config_manager.get_setting('SETTINGS', 'overlay_enabled', 'True') == 'True'
continuous_monitor = self.config_manager.get_setting('SETTINGS', 'continuous_monitor', 'False') == 'True'
self.hotkey = self.config_manager.get_setting('SETTINGS', 'hotkey', 'Ctrl+Q')
start_as_icon = self.config_manager.get_setting('SETTINGS', 'start_as_icon', 'False') == 'True'
saved_server = self.config_manager.get_setting('SETTINGS', 'selected_server', 'Auto (Cloudflare)')
monitor_interval = float(self.config_manager.get_setting('SETTINGS', 'monitor_interval', '3'))
self.continuous_monitor_var = tk.BooleanVar(value=continuous_monitor)
self.overlay_enabled = tk.BooleanVar(value=overlay_enabled)
self.start_as_icon_var = tk.BooleanVar(value=start_as_icon)
self.selected_server = tk.StringVar(value=saved_server)
self.monitor_interval = tk.DoubleVar(value=monitor_interval)
# Initialize components
log("Initializing speed test engine...")
self.speed_engine = ProfessionalSpeedTest()
self.speed_engine.set_server(saved_server)
log("Initializing upload engine...")
self.upload_engine = ImprovedUploadTest()
log("Initializing desktop overlay...")
self.desktop_overlay = DesktopOverlay(self.config_manager)
log("Initializing continuous monitor...")
self.continuous_monitor = ContinuousMonitor(server_callback=self.get_monitor_urls)
self.continuous_monitor.set_interval(monitor_interval)
# Set bandwidth level from config
saved_bandwidth = int(self.config_manager.get_setting('SETTINGS', 'bandwidth_usage', '3'))
self.continuous_monitor.set_bandwidth_level(saved_bandwidth)
log("Initializing system tray...")
self.tray_icon = SystemTrayIcon(self)
self.in_tray = False
log("Setting up UI...")
self.setup_ui()
log("Starting hotkey polling system...")
self.start_hotkey_polling()
# Track window size changes
self.root.bind('<Configure>', self.on_window_resize)
self.last_window_save = 0
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
# CRITICAL: Apply settings in correct order for continuous monitor
# Must start monitor BEFORE enabling overlay to ensure callbacks are set
if self.continuous_monitor_var.get():
log("Starting continuous monitor on startup...")
# Start monitor first, then enable overlay after 800ms
self.root.after(200, self.start_continuous_monitor_on_startup)
# Enable overlay AFTER monitor is running (if enabled)
if self.overlay_enabled.get():
log("Will enable overlay after monitor starts...")
self.root.after(1200, self.enable_overlay_after_monitor_startup)
else:
# No continuous monitor - just enable overlay normally
if self.overlay_enabled.get():
log("Enabling overlay on startup...")
self.toggle_overlay()
if self.start_as_icon_var.get():
log("Minimizing to tray on startup...")
self.root.after(100, self.minimize_to_tray)
log("SpeedTestApp initialization complete", "SUCCESS")
def get_monitor_urls(self):
"""Get URLs for continuous monitoring"""
return self.speed_engine.get_current_server_urls()
def start_continuous_monitor_on_startup(self):
"""Start continuous monitor on startup"""
log("Auto-starting continuous monitor...")
if self.continuous_monitor_var.get() and not self.test_running:
# Start the monitor with is_startup=True - overlay handled separately
self.start_continuous_monitor(is_startup=True)
def enable_overlay_after_monitor_startup(self):
"""Enable overlay after continuous monitor has started - ensures callbacks are set"""
log("Enabling overlay after monitor startup...")
if not self.overlay_enabled.get():
return
# Create overlay if it doesn't exist
if not self.desktop_overlay.overlay:
log("Creating overlay for active monitor...")
self.desktop_overlay.create()
else:
self.desktop_overlay.show()
# Force initial state
self.desktop_overlay.update(0.0, "Monitoring...", True)
# Verify monitor is running and force callback update
if self.test_running and self.continuous_monitor_var.get():
log("Monitor confirmed running - forcing overlay update...")
# Schedule periodic checks to ensure overlay gets updated
def check_and_force_update(attempts=0):
if attempts < 10: # Try for up to 5 seconds
last_speed = self.continuous_monitor.get_last_speed()
if last_speed > 0:
log(f"Forcing overlay update with speed: {last_speed:.2f} Mbps")
self.desktop_overlay.update(last_speed, "", True)
else:
# No speed yet, check again in 500ms
self.root.after(500, lambda: check_and_force_update(attempts + 1))
# Start checking after 500ms
self.root.after(500, lambda: check_and_force_update(0))
else:
log("Warning: Monitor not running when overlay enabled", "WARNING")
def on_server_change(self, *args):
"""Handle server selection change"""
selected = self.selected_server.get()
log(f"Server changed to: {selected}")
self.speed_engine.set_server(selected)
self.config_manager.set_setting('SETTINGS', 'selected_server', selected)
self.server_status_label.config(text=f"Server: {selected}", fg=self.success_color)
if self.continuous_monitor_var.get() and self.test_running:
log("Restarting monitor with new server...")
self.stop_continuous_monitor()
# Increased delay to ensure old monitoring thread has fully stopped
self.root.after(500, self.start_continuous_monitor)
def on_interval_change(self, *args):
"""Handle interval change"""
interval = self.monitor_interval.get()
self.continuous_monitor.set_interval(interval)
self.config_manager.set_setting('SETTINGS', 'monitor_interval', interval)
self.interval_status_label.config(text=f"Update: {interval:.1f}s", fg=self.success_color)
def on_window_resize(self, event):
"""Save window size when resized (with debouncing)"""
# Only save if it's the root window being resized
if event.widget == self.root:
current_time = time.time()
# Debounce: only save every 2 seconds to avoid excessive writes
if current_time - self.last_window_save > 2.0:
width = self.root.winfo_width()
height = self.root.winfo_height()
# Only save if dimensions are reasonable
if width >= 500 and height >= 500:
self.config_manager.set_setting('SETTINGS', 'window_width', width)
self.config_manager.set_setting('SETTINGS', 'window_height', height)
self.last_window_save = current_time
log(f"Window size saved: {width}x{height}")
def start_hotkey_polling(self):
"""Start polling system for hotkey detection (more reliable than keyboard hooks)"""
def poll_hotkeys():
log("Hotkey polling thread started")
while self.hotkey_polling_active:
try:
# Check if overlay hotkey is pressed
if self.is_hotkey_pressed(self.hotkey):
current_time = time.time()
# Prevent multiple executions (500ms cooldown)
if current_time - self.last_hotkey_execution > 0.5:
log(f"Hotkey detected: {self.hotkey}")
self.root.after(0, self._toggle_overlay_ui)
self.last_hotkey_execution = current_time
# Small delay to prevent CPU overuse
time.sleep(0.05) # 50ms polling interval
except Exception as e:
log(f"Error in hotkey polling: {e}", "ERROR")
time.sleep(0.1)
log("Hotkey polling thread stopped")
self.hotkey_polling_thread = threading.Thread(target=poll_hotkeys, daemon=True)
self.hotkey_polling_thread.start()
log("Hotkey polling system started", "SUCCESS")
def is_hotkey_pressed(self, hotkey_str):
"""Check if a hotkey combination is currently pressed"""
if not KEYBOARD_AVAILABLE:
return False
try:
# Parse the hotkey string
keys = hotkey_str.lower().replace(' ', '').split('+')
# Check each key
for key in keys:
# Handle special key names
if key in ['ctrl', 'control']:
if not (keyboard.is_pressed('ctrl') or keyboard.is_pressed('left ctrl') or keyboard.is_pressed('right ctrl')):
return False
elif key in ['alt']:
if not (keyboard.is_pressed('alt') or keyboard.is_pressed('left alt') or keyboard.is_pressed('right alt')):
return False
elif key in ['shift']:
if not (keyboard.is_pressed('shift') or keyboard.is_pressed('left shift') or keyboard.is_pressed('right shift')):
return False
elif key in ['win', 'windows', 'cmd']:
if not (keyboard.is_pressed('windows') or keyboard.is_pressed('left windows') or keyboard.is_pressed('right windows')):
return False
else:
# Regular key
if not keyboard.is_pressed(key):
return False
return True
except Exception as e:
# Silently fail for invalid keys
return False
def setup_global_hotkeys(self):
"""Deprecated - now using polling system"""
log("Using polling-based hotkey system instead of keyboard hooks", "INFO")
def setup_hotkeys(self):
"""Deprecated - now using polling system"""
pass
def toggle_overlay_hotkey(self):
"""Toggle overlay via hotkey"""
self.root.after(0, self._toggle_overlay_ui)
def _toggle_overlay_ui(self):
"""Update UI for overlay toggle"""
self.overlay_enabled.set(not self.overlay_enabled.get())
self.toggle_overlay_and_save()
def on_bandwidth_change(self, *args):
"""Handle bandwidth usage change"""
bandwidth = self.bandwidth_usage.get()
# Update continuous monitor with new bandwidth setting
self.continuous_monitor.set_bandwidth_level(bandwidth)
# Save to config
self.config_manager.set_setting('SETTINGS', 'bandwidth_usage', bandwidth)
# Update status label
bandwidth_labels = {
1: ("Min", "#00ff00"),
2: ("Low", "#44ff00"),
3: ("Med", "#88ff00"),
4: ("High", "#ffff00"),
5: ("Max", "#ff8800"),
6: ("Ext", "#ff0000")
}
label_text, label_color = bandwidth_labels[bandwidth]
self.bandwidth_status_label.config(text=f"BW: {label_text}", fg=label_color)
log(f"Bandwidth impact set to: {label_text} ({bandwidth})")
def on_closing(self):
"""Handle application closing"""
log("Application closing...")
# Save current window size
try:
width = self.root.winfo_width()
height = self.root.winfo_height()
if width >= 500 and height >= 500:
self.config_manager.set_setting('SETTINGS', 'window_width', width)
self.config_manager.set_setting('SETTINGS', 'window_height', height)
log(f"Final window size saved: {width}x{height}")
except:
pass
self.config_manager.set_setting('SETTINGS', 'overlay_enabled', self.overlay_enabled.get())
self.config_manager.set_setting('SETTINGS', 'continuous_monitor', self.continuous_monitor_var.get())
self.config_manager.set_setting('SETTINGS', 'hotkey', self.hotkey)
self.config_manager.set_setting('SETTINGS', 'start_as_icon', self.start_as_icon_var.get())
self.config_manager.set_setting('SETTINGS', 'selected_server', self.selected_server.get())
self.config_manager.set_setting('SETTINGS', 'monitor_interval', self.monitor_interval.get())
self.config_manager.set_setting('SETTINGS', 'bandwidth_usage', self.bandwidth_usage.get())
# Stop hotkey polling
self.hotkey_polling_active = False
self.continuous_monitor.stop_monitoring()
self.desktop_overlay.destroy()
if hasattr(self, 'tray_icon'):
self.tray_icon.stop()
log("Application closed", "SUCCESS")
self.root.destroy()
def setup_ui(self):
# Header
header_frame = tk.Frame(self.root, bg=self.bg_color)
header_frame.pack(pady=5)
title = tk.Label(
header_frame,
text="⚡ SpeedTest",
font=("Arial", 16, "bold"),
bg=self.bg_color,
fg=self.fg_color
)
title.pack()
# Server Selection
control_row1 = tk.Frame(self.root, bg=self.bg_color)
control_row1.pack(pady=2, padx=10, fill="x")
# Server
tk.Label(
control_row1,
text="Server:",
font=("Arial", 8),
bg=self.bg_color,
fg=self.fg_color
).pack(side="left", padx=(0, 2))
server_dropdown = ttk.Combobox(
control_row1,
textvariable=self.selected_server,
values=['Auto (Cloudflare)', 'Europe (Cloudflare)', 'USA (Linode Dallas)', 'Asia (Singapore OVH)', 'Australia (Sydney OVH)'],
state="readonly",
width=15,
font=("Arial", 8)
)
server_dropdown.pack(side="left", padx=2)
server_dropdown.bind('<<ComboboxSelected>>', self.on_server_change)
# Interval
tk.Label(
control_row1,
text="Interval:",
font=("Arial", 8),
bg=self.bg_color,
fg=self.fg_color
).pack(side="left", padx=(10, 2))
interval_scale = Scale(
control_row1,
from_=0.25,
to=10,
resolution=0.25,
orient="horizontal",
variable=self.monitor_interval,
command=self.on_interval_change,
bg=self.bg_color,
fg=self.fg_color,
highlightthickness=0,
length=80,
showvalue=0
)
interval_scale.pack(side="left", padx=2)
self.interval_status_label = tk.Label(
control_row1,
text=f"{self.monitor_interval.get():.1f}s",
font=("Arial", 8),
bg=self.bg_color,
fg=self.success_color,
width=5
)
self.interval_status_label.pack(side="left", padx=2)
# Bandwidth usage control
control_row2 = tk.Frame(self.root, bg=self.bg_color)
control_row2.pack(pady=2, padx=10, fill="x")
tk.Label(
control_row2,
text="Bandwidth:",
font=("Arial", 8),
bg=self.bg_color,
fg=self.fg_color
).pack(side="left", padx=(0, 2))
# Get saved bandwidth setting or use default (4 = Medium 15MB)
saved_bandwidth = int(self.config_manager.get_setting('SETTINGS', 'bandwidth_usage', '4'))
self.bandwidth_usage = tk.IntVar(value=saved_bandwidth)
bandwidth_scale = Scale(
control_row2,
from_=1,
to=6,
resolution=1,
orient="horizontal",
variable=self.bandwidth_usage,
command=self.on_bandwidth_change,
bg=self.bg_color,
fg=self.fg_color,
highlightthickness=0,
length=80,
showvalue=0
)
bandwidth_scale.pack(side="left", padx=2)
bandwidth_labels = {
1: ("Min", "#00ff00"),
2: ("Low", "#44ff00"),
3: ("Med", "#88ff00"),
4: ("High", "#ffff00"),
5: ("Max", "#ff8800"),
6: ("Ext", "#ff0000")
}
self.bandwidth_status_label = tk.Label(
control_row2,
text=f"BW: {bandwidth_labels[saved_bandwidth][0]}",
font=("Arial", 8),
bg=self.bg_color,
fg=bandwidth_labels[saved_bandwidth][1]
)
self.bandwidth_status_label.pack(side="left", padx=2)
# Info icon
info_label = tk.Label(
control_row2,
text="โน",
font=("Arial", 8, "bold"),
bg=self.bg_color,
fg="#666666",
cursor="hand2"
)
info_label.pack(side="left", padx=2)
# Tooltip for bandwidth
def show_bandwidth_info(event):
info_text = (
"Bandwidth Impact Settings:\n\n"
"• Min (1): Ultra-fast updates (2MB)\n"
"• Low (2): Minimal slowdown (5MB)\n"
"• Med (3): Light impact (10MB)\n"
"• High (4): Standard (15MB)\n"
"• Max (5): Professional (25MB)\n"
"• Ext (6): Maximum accuracy (50MB)"
)
messagebox.showinfo("Bandwidth Info", info_text)
info_label.bind("<Button-1>", show_bandwidth_info)
# Controls
control_row3 = tk.Frame(self.root, bg=self.bg_color)
control_row3.pack(pady=5, padx=10, fill="x")
# Checkbuttons in one line
self.overlay_check = tk.Checkbutton(
control_row3,
text="Overlay",
variable=self.overlay_enabled,
command=self.toggle_overlay_and_save,
font=("Arial", 8),
bg=self.bg_color,
fg="#888888",
selectcolor=self.bg_color,
activebackground=self.bg_color
)
self.overlay_check.pack(side="left", padx=5)
self.continuous_check = tk.Checkbutton(
control_row3,
text="Continious Monitor",
variable=self.continuous_monitor_var,
command=self.toggle_continuous_monitor_and_save,
font=("Arial", 8, "bold"),
bg=self.bg_color,
fg=self.warning_color,
selectcolor=self.bg_color,
activebackground=self.bg_color
)
self.continuous_check.pack(side="left", padx=5)
self.start_as_icon_check = tk.Checkbutton(
control_row3,
text="Start in Tray",
variable=self.start_as_icon_var,
command=self.toggle_start_as_icon,
font=("Arial", 8),
bg=self.bg_color,
fg="#888888",
selectcolor=self.bg_color,
activebackground=self.bg_color
)
self.start_as_icon_check.pack(side="left", padx=5)
# Tray button
self.tray_button = tk.Button(
control_row3,
text="๐ฅ Mimimize",
command=self.minimize_to_tray,
font=("Arial", 8),
bg="#444444",
fg=self.fg_color,
activebackground="#555555",
activeforeground=self.fg_color,
padx=8,
pady=2,
relief="flat",
cursor="hand2",
bd=0
)
self.tray_button.pack(side="left", padx=5)
# Real-time speed
realtime_frame = tk.Frame(self.root, bg=self.bg_color)
realtime_frame.pack(pady=5, padx=10, fill="x")
self.realtime_container = tk.Frame(realtime_frame, bg="#1a1a1a", relief="ridge", bd=1)
self.realtime_container.pack(fill="x")
self.realtime_container.configure(highlightbackground=self.accent_color, highlightthickness=1)
realtime_inner = tk.Frame(self.realtime_container, bg="#1a1a1a")
realtime_inner.pack(pady=5, padx=10, fill="x")
self.phase_label = tk.Label(
realtime_inner,
textvariable=self.test_phase,
font=("Arial", 9),
bg="#1a1a1a",
fg=self.warning_color
)
self.phase_label.pack()
current_label = tk.Label(
realtime_inner,
text="Current Speed",
font=("Arial", 9),
bg="#1a1a1a",
fg="#888888"
)
current_label.pack()
self.current_speed_label = tk.Label(
realtime_inner,
textvariable=self.current_speed,
font=("Arial", 24, "bold"),
bg="#1a1a1a",
fg=self.accent_color
)
self.current_speed_label.pack()
peak_frame = tk.Frame(realtime_inner, bg="#1a1a1a")
peak_frame.pack(pady=(2, 0))
tk.Label(
peak_frame,
text="Peak:",
font=("Arial", 8),
bg="#1a1a1a",
fg="#666666"
).pack(side="left", padx=2)
tk.Label(
peak_frame,
textvariable=self.peak_speed,
font=("Arial", 8, "bold"),
bg="#1a1a1a",
fg=self.success_color
).pack(side="left")
self.monitor_status_label = tk.Label(
realtime_inner,
text="",
font=("Arial", 7, "italic"),
bg="#1a1a1a",
fg="#ffff00"
)
self.monitor_status_label.pack(pady=(2, 0))
# Results
results_frame = tk.Frame(self.root, bg=self.bg_color)
results_frame.pack(pady=5, padx=10, fill="both", expand=True)
# Metrica compatta in una griglia
metrics_frame = tk.Frame(results_frame, bg=self.bg_color)
metrics_frame.pack(fill="both", expand=True)
# Download
download_frame = tk.Frame(metrics_frame, bg="#1a1a1a", highlightthickness=1)
download_frame.configure(highlightbackground=self.download_color)
download_frame.grid(row=0, column=0, sticky="nsew", padx=2, pady=2)
download_frame.columnconfigure(0, weight=1)
tk.Label(
download_frame,
text="⬇ Download",
font=("Arial", 9),
bg="#1a1a1a",
fg=self.fg_color
).pack(pady=(3, 0))
tk.Label(
download_frame,
textvariable=self.download_speed,
font=("Arial", 11, "bold"),
bg="#1a1a1a",
fg=self.download_color
).pack(pady=(0, 3))
# Upload
upload_frame = tk.Frame(metrics_frame, bg="#1a1a1a", highlightthickness=1)
upload_frame.configure(highlightbackground=self.upload_color)
upload_frame.grid(row=0, column=1, sticky="nsew", padx=2, pady=2)
upload_frame.columnconfigure(0, weight=1)
tk.Label(
upload_frame,
text="⬆ Upload",
font=("Arial", 9),
bg="#1a1a1a",
fg=self.fg_color
).pack(pady=(3, 0))
tk.Label(
upload_frame,
textvariable=self.upload_speed,
font=("Arial", 11, "bold"),
bg="#1a1a1a",
fg=self.upload_color
).pack(pady=(0, 3))
# Ping
ping_frame = tk.Frame(metrics_frame, bg="#1a1a1a", highlightthickness=1)
ping_frame.configure(highlightbackground="#00ff00")
ping_frame.grid(row=0, column=2, sticky="nsew", padx=2, pady=2)
ping_frame.columnconfigure(0, weight=1)
tk.Label(
ping_frame,
text="๐ก Ping",
font=("Arial", 9),
bg="#1a1a1a",
fg=self.fg_color
).pack(pady=(3, 0))
tk.Label(
ping_frame,
textvariable=self.ping_value,
font=("Arial", 11, "bold"),
bg="#1a1a1a",
fg="#00ff00"
).pack(pady=(0, 3))
# Configure grid
metrics_frame.columnconfigure(0, weight=1)
metrics_frame.columnconfigure(1, weight=1)
metrics_frame.columnconfigure(2, weight=1)
metrics_frame.rowconfigure(0, weight=1)
# Progress bar
self.progress_frame = tk.Frame(self.root, bg=self.bg_color)
self.progress_frame.pack(pady=5, padx=10, fill="x") #
self.progress_label = tk.Label(
self.progress_frame,
text="Ready",
font=("Arial", 9),
bg=self.bg_color,
fg=self.fg_color
)
self.progress_label.pack()
self.progress_bar = ttk.Progressbar(
self.progress_frame,
mode="determinate",
length=500,
maximum=100
)
self.progress_percent = tk.Label(
self.progress_frame,
text="",
font=("Arial", 7),
bg=self.bg_color,
fg="#888888"
)
style = ttk.Style()
style.theme_use('clam')
style.configure(
"TProgressbar",
background=self.accent_color,
troughcolor="#1a1a1a",
bordercolor=self.bg_color,
lightcolor=self.accent_color,
darkcolor=self.accent_color
)
# Buttons
button_frame = tk.Frame(self.root, bg=self.bg_color)
button_frame.pack(pady=8, padx=10)
self.test_button = tk.Button(
button_frame,
text="๐ TEST",
command=self.run_speed_test,
font=("Arial", 10, "bold"),
bg="#00aa00",
fg=self.fg_color,
activebackground="#00ff00",
activeforeground=self.bg_color,
padx=15,
pady=5,
relief="flat",
cursor="hand2",
bd=0
)
self.test_button.pack(side="left", padx=2)
self.stop_button = tk.Button(
button_frame,
text="⏹ STOP",
command=self.stop_test,
font=("Arial", 10, "bold"),
bg="#aa0000",
fg=self.fg_color,
activebackground="#ff0000",
activeforeground=self.fg_color,
padx=12,
pady=5,
relief="flat",
cursor="hand2",
bd=0,
state="disabled"
)
self.stop_button.pack(side="left", padx=2)
self.settings_button = tk.Button(
button_frame,
text="⚙ SETTINGS",
command=self.open_settings_dialog,
font=("Arial", 10, "bold"),
bg="#444444",
fg=self.fg_color,
activebackground="#555555",
activeforeground=self.fg_color,
padx=12,
pady=5,
relief="flat",
cursor="hand2",
bd=0
)
self.settings_button.pack(side="left", padx=2)
# Footer
footer_frame = tk.Frame(self.root, bg=self.bg_color)
footer_frame.pack(pady=3, padx=10, fill="x")
self.last_test_label = tk.Label(
footer_frame,
text="",
font=("Arial", 8),
bg=self.bg_color,
fg="#666666"
)
self.last_test_label.pack()
hotkey_label = tk.Label(
footer_frame,
text=f"Hotkey: {self.hotkey}",
font=("Arial", 7),
bg=self.bg_color,
fg="#ffffff"
)
hotkey_label.pack()
hotkey_status = "● Hotkeys active" if KEYBOARD_AVAILABLE else "⚠ Hotkeys not available"
hotkey_status_label = tk.Label(
footer_frame,
text=hotkey_status,
font=("Arial", 7),
bg=self.bg_color,
fg="#00ff00" if KEYBOARD_AVAILABLE else "#ff0000"
)
hotkey_status_label.pack()
def create_metric_frame(self, parent, label, variable, icon, color, tooltip=""):
"""Create frame for metric"""
frame = tk.Frame(parent, bg="#1a1a1a", relief="flat", bd=0)
frame.configure(highlightbackground=color, highlightthickness=1)
inner_frame = tk.Frame(frame, bg="#1a1a1a")
inner_frame.pack(pady=5, padx=10, fill="x")
left_frame = tk.Frame(inner_frame, bg="#1a1a1a")
left_frame.pack(side="left")
label_text = tk.Label(
left_frame,
text=f"{icon} {label}",
font=("Arial", 10),
bg="#1a1a1a",
fg=self.fg_color
)
label_text.pack(side="left")
value_label = tk.Label(
inner_frame,
textvariable=variable,
font=("Arial", 12, "bold"),
bg="#1a1a1a",
fg=color
)
value_label.pack(side="right")
return frame
def open_settings_dialog(self):
"""Open settings dialog - MOLTO RIDOTTO"""
log("Opening settings dialog...")
dialog = tk.Toplevel(self.root)
dialog.title("Settings")
# Load saved dialog size or use defaults
saved_dialog_width = int(self.config_manager.get_setting('SETTINGS', 'settings_dialog_width', '450'))
saved_dialog_height = int(self.config_manager.get_setting('SETTINGS', 'settings_dialog_height', '400'))
dialog.geometry(f"{saved_dialog_width}x{saved_dialog_height}")
dialog.transient(self.root)
dialog.grab_set()
dialog.configure(bg=self.bg_color)
# Make dialog resizable
dialog.resizable(True, True)
dialog.minsize(400, 350)
# Track dialog resize
last_dialog_save = [0]
def on_dialog_resize(event):
if event.widget == dialog:
current_time = time.time()
if current_time - last_dialog_save[0] > 2.0:
width = dialog.winfo_width()
height = dialog.winfo_height()
if width >= 400 and height >= 350:
self.config_manager.set_setting('SETTINGS', 'settings_dialog_width', width)
self.config_manager.set_setting('SETTINGS', 'settings_dialog_height', height)
last_dialog_save[0] = current_time
dialog.bind('<Configure>', on_dialog_resize)
# Canvas with scrollbar
canvas = tk.Canvas(dialog, bg=self.bg_color, highlightthickness=0)
scrollbar = tk.Scrollbar(dialog, orient="vertical", command=canvas.yview)
scrollable_frame = tk.Frame(canvas, bg=self.bg_color)
scrollable_frame.bind(
"<Configure>",
lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
)
canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
canvas.configure(yscrollcommand=scrollbar.set)
canvas.pack(side="left", fill="both", expand=True, padx=3, pady=3)
scrollbar.pack(side="right", fill="y")
# Main content frame
main_frame = tk.Frame(scrollable_frame, bg=self.bg_color, padx=10, pady=10)
main_frame.pack(fill="both", expand=True)
# Title
title = tk.Label(
main_frame,
text="⚙ Settings",
font=("Arial", 14, "bold"),
bg=self.bg_color,
fg=self.fg_color
)
title.pack(pady=(0, 10))
# Hotkey settings
hotkey_frame = tk.LabelFrame(
main_frame,
text="Hotkey",
font=("Arial", 9, "bold"),
bg=self.bg_color,
fg=self.accent_color,
padx=10,
pady=8
)
hotkey_frame.pack(fill="x", pady=(0, 8))
hotkey_var = tk.StringVar(value=self.hotkey)
hotkey_entry = tk.Entry(
hotkey_frame,
textvariable=hotkey_var,
font=("Arial", 9),
width=15,
bg="#1a1a1a",
fg=self.fg_color,
insertbackground=self.fg_color
)
hotkey_entry.pack(side="left", padx=(0, 5))
def record_hotkey():
if not KEYBOARD_AVAILABLE:
messagebox.showerror(
"Keyboard Not Available",
"Install: pip install keyboard"
)
return
hotkey_entry.delete(0, tk.END)
hotkey_entry.insert(0, "Press hotkey...")
hotkey_entry.update()
try:
hotkey = keyboard.read_hotkey(suppress=False)
hotkey_var.set(hotkey)
except:
hotkey_var.set(self.hotkey)
record_btn = tk.Button(
hotkey_frame,
text="๐ Record",
command=record_hotkey,
font=("Arial", 8),
bg="#444444",
fg=self.fg_color,
padx=8,
pady=3,
relief="flat"
)
record_btn.pack(side="left")
# Overlay settings
overlay_frame = tk.LabelFrame(
main_frame,
text="Overlay",
font=("Arial", 9, "bold"),
bg=self.bg_color,
fg=self.accent_color,
padx=10,
pady=8
)
overlay_frame.pack(fill="x", pady=(0, 8))
transparency_var = tk.DoubleVar(
value=float(self.config_manager.get_setting('SETTINGS', 'overlay_transparency', '0.7'))
)
tk.Label(
overlay_frame,
text="Transparency:",
font=("Arial", 8),
bg=self.bg_color,
fg=self.fg_color
).pack(side="left", padx=(0, 5))
transparency_scale = Scale(
overlay_frame,
from_=0.3,
to=1.0,
resolution=0.1,
orient="horizontal",
variable=transparency_var,
bg=self.bg_color,
fg=self.fg_color,
highlightthickness=0,
length=120
)
transparency_scale.pack(side="left")
# Startup settings
startup_frame = tk.LabelFrame(
main_frame,
text="Startup",
font=("Arial", 9, "bold"),
bg=self.bg_color,
fg=self.accent_color,
padx=10,
pady=8
)
startup_frame.pack(fill="x", pady=(0, 8))
tk.Checkbutton(
startup_frame,
text="Start in tray",
variable=self.start_as_icon_var,
font=("Arial", 8),
bg=self.bg_color,
fg=self.fg_color
).pack(anchor="w", pady=2)
tk.Checkbutton(
startup_frame,
text="Auto-start monitor",
variable=self.continuous_monitor_var,
font=("Arial", 8),
bg=self.bg_color,
fg=self.fg_color
).pack(anchor="w", pady=2)
tk.Checkbutton(
startup_frame,
text="Auto-show overlay",
variable=self.overlay_enabled,
font=("Arial", 8),
bg=self.bg_color,
fg=self.fg_color
).pack(anchor="w", pady=2)
# Buttons
button_frame = tk.Frame(main_frame, bg=self.bg_color)
button_frame.pack(pady=(10, 0))
def save_settings():
try:
new_hotkey = hotkey_var.get().strip()
if new_hotkey and new_hotkey != self.hotkey:
self.hotkey = new_hotkey
self.config_manager.set_setting('SETTINGS', 'hotkey', new_hotkey)
new_transparency = transparency_var.get()
self.config_manager.set_setting('SETTINGS', 'overlay_transparency', new_transparency)
if self.desktop_overlay.overlay:
self.desktop_overlay.overlay.attributes('-alpha', new_transparency)
self.config_manager.set_setting('SETTINGS', 'start_as_icon', self.start_as_icon_var.get())
self.config_manager.set_setting('SETTINGS', 'continuous_monitor', self.continuous_monitor_var.get())
self.config_manager.set_setting('SETTINGS', 'overlay_enabled', self.overlay_enabled.get())
messagebox.showinfo("Settings Saved", "Settings saved!")
dialog.destroy()
except Exception as e:
messagebox.showerror("Error", f"Failed: {e}")
def cancel_settings():
dialog.destroy()
tk.Button(
button_frame,
text="๐พ Save",
command=save_settings,
font=("Arial", 9, "bold"),
bg="#00aa00",
fg=self.fg_color,
padx=12,
pady=4,
relief="flat"
).pack(side="left", padx=3)
tk.Button(
button_frame,
text="❌ Cancel",
command=cancel_settings,
font=("Arial", 9, "bold"),
bg="#aa0000",
fg=self.fg_color,
padx=12,
pady=4,
relief="flat"
).pack(side="left", padx=3)
# Center dialog
dialog.update_idletasks()
x = (dialog.winfo_screenwidth() // 2) - (dialog.winfo_width() // 2)
y = (dialog.winfo_screenheight() // 2) - (dialog.winfo_height() // 2)
dialog.geometry(f"+{x}+{y}")
# Enable mousewheel
def on_mousewheel(event):
canvas.yview_scroll(int(-1*(event.delta/120)), "units")
canvas.bind_all("<MouseWheel>", on_mousewheel)
def on_dialog_close():
canvas.unbind_all("<MouseWheel>")
dialog.protocol("WM_DELETE_WINDOW", lambda: (on_dialog_close(), dialog.destroy()))
def toggle_start_as_icon(self):
"""Toggle start as icon"""
self.config_manager.set_setting('SETTINGS', 'start_as_icon', self.start_as_icon_var.get())
def toggle_overlay_and_save(self):
"""Toggle overlay and save"""
self.toggle_overlay()
self.config_manager.set_setting('SETTINGS', 'overlay_enabled', self.overlay_enabled.get())
def toggle_continuous_monitor_and_save(self):
"""Toggle continuous monitor and save"""
self.toggle_continuous_monitor()
self.config_manager.set_setting('SETTINGS', 'continuous_monitor', self.continuous_monitor_var.get())
def toggle_overlay(self):
"""Toggle desktop overlay"""
if self.overlay_enabled.get():
if not self.desktop_overlay.overlay:
self.desktop_overlay.create()
if self.continuous_monitor_var.get() and self.test_running:
self.desktop_overlay.update(0.0, "Monitoring...", True)
else:
self.desktop_overlay.show()
if self.continuous_monitor_var.get() and self.test_running:
last_speed = self.continuous_monitor.get_last_speed()
if last_speed > 0:
self.desktop_overlay.update(last_speed, "", True)
else:
self.desktop_overlay.update(0.0, "Monitoring...", True)
else:
self.desktop_overlay.hide()
def toggle_continuous_monitor(self):
"""Toggle continuous monitor"""
if self.continuous_monitor_var.get():
self.start_continuous_monitor()
else:
self.stop_continuous_monitor()
def start_continuous_monitor(self, is_startup=False):
"""Start continuous monitoring"""
log("Starting continuous monitor...")
if self.test_running:
messagebox.showwarning("Test in progress", "Wait for test completion before starting monitor.")
self.continuous_monitor_var.set(False)
return
if not is_startup and self.overlay_enabled.get():
if not self.desktop_overlay.overlay:
log("Creating overlay for continuous monitor...")
self.desktop_overlay.create()
self.desktop_overlay.update(0.0, "Monitoring...", True)
self.test_button.config(state="disabled", text="MONITOR ACTIVE")
self.stop_button.config(state="normal")
self.test_phase.set("๐ Monitor active")
self.monitor_status_label.config(text="● Monitor running")
self.current_test_type = "monitor"
def update_monitor_speed(speed):
self.current_speed.set(f"{speed:.1f} Mbps")
self.download_speed.set(f"{speed:.1f} Mbps")
if speed > self.peak_speed_value:
self.peak_speed_value = speed
self.peak_speed.set(f"{speed:.1f} Mbps")
self.speed_history.append(speed)
if len(self.speed_history) > 30:
self.speed_history.pop(0)
def overlay_update_wrapper(speed, text, monitoring):
if self.overlay_enabled.get() and self.desktop_overlay.overlay:
self.desktop_overlay.update(speed, text, monitoring)
self.continuous_monitor.start_monitoring(
callback=update_monitor_speed,
overlay_callback=overlay_update_wrapper
)
self.test_running = True
log("Continuous monitor started", "SUCCESS")
def stop_continuous_monitor(self):
"""Stop continuous monitoring"""
log("Stopping continuous monitor...")
self.continuous_monitor.stop_monitoring()
self.test_button.config(state="normal", text="๐ TEST")
self.stop_button.config(state="disabled")
self.test_phase.set("")
self.monitor_status_label.config(text="")
self.test_running = False
if self.desktop_overlay.overlay:
self.desktop_overlay.update(0, "", False)
log("Continuous monitor stopped", "SUCCESS")
def minimize_to_tray(self):
"""Minimize to system tray"""
log("Minimizing to system tray...")
if not TRAY_AVAILABLE:
messagebox.showwarning(
"System Tray not available",
"Install pystray and pillow:\npip install pystray pillow"
)
return
if not hasattr(self, 'tray_icon') or not self.tray_icon.icon:
if not self.tray_icon.create():
messagebox.showerror("Error", "Cannot create system tray icon")
return
self.root.withdraw()
self.in_tray = True
log("Minimized to system tray", "SUCCESS")
def restore_from_tray(self):
"""Restore from system tray"""
log("Restoring from system tray...")
self.root.deiconify()
self.root.lift()
self.in_tray = False
def quit_application(self):
"""Quit application"""
log("Quitting application...")
self.on_closing()
def update_realtime_speed(self, speed):
"""Update real-time speed"""
if speed > 10000:
speed = self.speed_history[-1] if self.speed_history else 0
return
self.current_speed.set(f"{speed:.1f} Mbps")
if speed > self.peak_speed_value and speed < 10000:
self.peak_speed_value = speed
self.peak_speed.set(f"{speed:.1f} Mbps")
if self.overlay_enabled.get():
if not self.desktop_overlay.overlay:
self.desktop_overlay.create()
is_monitoring = self.continuous_monitor_var.get() and self.current_test_type == "monitor"
self.desktop_overlay.update(speed, self.current_test_type if not is_monitoring else "", is_monitoring)
if self.current_test_type == "download":
self.current_speed_label.config(fg=self.download_color)
self.realtime_container.configure(highlightbackground=self.download_color)
elif self.current_test_type == "upload":
self.current_speed_label.config(fg=self.upload_color)
self.realtime_container.configure(highlightbackground=self.upload_color)
elif self.current_test_type == "monitor":
self.current_speed_label.config(fg="#ffff00")
self.realtime_container.configure(highlightbackground="#ffff00")
self.speed_history.append(speed)
if len(self.speed_history) > 30:
self.speed_history.pop(0)
def update_progress(self, percent, speed=0, message=""):
"""Update progress"""
if message:
self.progress_label.config(text=message)
if percent >= 0:
self.progress_bar['value'] = percent
self.progress_percent.config(text=f"{percent:.0f}%")
if speed > 0:
self.update_realtime_speed(speed)
self.root.update_idletasks()
def stop_test(self):
"""Stop test"""
log("Stopping test...")
if self.continuous_monitor_var.get():
self.continuous_monitor_var.set(False)
self.stop_continuous_monitor()
else:
self.test_running = False
self.stop_button.config(state="disabled")
self.test_phase.set("Test interrupted")
if not self.continuous_monitor_var.get():
self.desktop_overlay.hide()
log("Test stopped")
def run_speed_test(self):
"""Run complete speed test"""
log("=" * 60)
log("Starting full speed test...")
log("=" * 60)
if self.test_running:
return
if self.continuous_monitor_var.get():
self.continuous_monitor_var.set(False)
self.stop_continuous_monitor()
messagebox.showinfo("Monitor disabled", "Continuous monitor disabled to run full test.")
return
self.test_running = True
self.test_button.config(state="disabled", text="⏳ TEST IN PROGRESS...")
self.stop_button.config(state="normal")
self.progress_bar.pack(pady=3)
self.progress_bar['value'] = 0
self.progress_percent.pack()
self.download_speed.set("Calculating...")
self.upload_speed.set("Calculating...")
self.ping_value.set("Calculating...")
self.current_speed.set("0.0 Mbps")
self.peak_speed.set("0.0 Mbps")
self.peak_speed_value = 0
self.speed_history = []
self.test_phase.set("Initializing...")
if self.overlay_enabled.get():
if not self.desktop_overlay.overlay:
self.desktop_overlay.create()
else:
self.desktop_overlay.show()
thread = threading.Thread(target=self.perform_complete_test)
thread.daemon = True
thread.start()
def perform_complete_test(self):
"""Perform complete test"""
log("Test thread started")
try:
# Connection check
log("Phase 1: Connection check")
self.test_phase.set("๐ Checking connection...")
self.current_test_type = "idle"
self.update_progress(2, 0, "Checking internet connection...")
# Multiple connection checks for robustness
connection_ok = False
test_urls = [
"https://www.google.com",
"https://www.cloudflare.com",
"https://1.1.1.1",
]
for test_url in test_urls:
try:
log(f"Testing connection to: {test_url}")
request = urllib.request.Request(
test_url,
headers={'User-Agent': 'Mozilla/5.0'}
)
with urllib.request.urlopen(request, timeout=5, context=SSL_CONTEXT) as response:
status = response.status
log(f"Connection OK: {test_url} (status: {status})", "SUCCESS")
connection_ok = True
break
except urllib.error.URLError as e:
# Check for WinError 10013 specifically
error_str = str(e)
if "10013" in error_str or "access permissions" in error_str.lower():
log(f"WinError 10013 detected: {test_url}", "ERROR")
log("This is a PERMISSIONS error, not a connection error!", "ERROR")
log("The application is being blocked by Windows Firewall/Antivirus", "ERROR")
raise Exception(
"PERMISSIONS ERROR (WinError 10013)\n\n"
"Windows is blocking network access.\n\n"
"SOLUTIONS:\n"
"1. Run as Administrator (Right-click → 'Run as administrator')\n"
"2. Add to Windows Defender exclusions\n"
"3. Add to Firewall exceptions\n"
"4. Disable antivirus temporarily\n\n"
"See FIX_WINERROR_10013.bat for detailed instructions."
)
else:
log(f"Connection failed for {test_url}: {e}", "WARNING")
continue
except Exception as e:
log(f"Connection failed for {test_url}: {e}", "WARNING")
continue
if not connection_ok:
raise Exception("No internet connection detected. Please check your network.")
if not self.test_running:
log("Test interrupted by user")
return
# Ping test
log("Phase 2: Ping test")
self.test_phase.set("๐ก Latency test...")
self.update_progress(5, 0, "Testing latency...")
ping = self.speed_engine.test_ping_advanced()
if ping > 0:
self.ping_value.set(f"{ping:.1f} ms")
else:
self.ping_value.set("N/A")
if not self.test_running:
log("Test interrupted by user")
return
# Download test
log("Phase 3: Download test")
self.test_phase.set("⬇ Download Test...")
self.current_test_type = "download"
self.update_progress(10, 0, "Download test in progress...")
def download_progress(percent, speed):
if self.test_running:
self.update_progress(10 + (percent * 0.4), speed, f"Download: {speed:.1f} Mbps")
download = self.speed_engine.test_download_optimized(
progress_callback=download_progress,
speed_callback=lambda s: self.update_realtime_speed(s) if self.test_running else None
)
if download > 0:
self.download_speed.set(f"{download:.2f} Mbps")
log(f"Download speed: {download:.2f} Mbps", "SUCCESS")
else:
self.download_speed.set("Error")
log("Download test failed", "ERROR")
if not self.test_running:
log("Test interrupted by user")
return
# Upload test
log("Phase 4: Upload test")
self.test_phase.set("⬆ Upload Test...")
self.current_test_type = "upload"
self.update_progress(55, 0, "Upload test in progress...")
def upload_progress(percent, speed):
if self.test_running:
self.update_progress(55 + (percent * 0.4), speed, f"Upload: {speed:.1f} Mbps")
upload = self.upload_engine.test_upload_comprehensive(
progress_callback=upload_progress,
speed_callback=lambda s: self.update_realtime_speed(s) if self.test_running else None
)
if upload > 0:
self.upload_speed.set(f"{upload:.2f} Mbps")
log(f"Upload speed: {upload:.2f} Mbps", "SUCCESS")
else:
self.upload_speed.set("Error")
log("Upload test failed", "ERROR")
# Completion
self.test_phase.set("✅ Test completed!")
self.current_test_type = "idle"
self.update_progress(100, 0, "Test completed successfully!")
if not self.continuous_monitor_var.get():
self.root.after(3000, lambda: self.desktop_overlay.hide())
current_time = datetime.now().strftime("%H:%M:%S - %d/%m/%Y")
self.last_test_label.config(text=f"✓ Last test: {current_time}")
log("=" * 60)
log("Test completed successfully!", "SUCCESS")
log(f"Results - Download: {download:.2f} Mbps, Upload: {upload:.2f} Mbps, Ping: {ping:.1f} ms")
log("=" * 60)
time.sleep(1)
self.show_results(download, upload, ping)
except Exception as e:
log(f"TEST ERROR: {e}", "ERROR")
import traceback
log(traceback.format_exc(), "ERROR")
if self.test_running:
self.test_phase.set("❌ Error")
self.update_progress(0, 0, f"Error: {str(e)}")
messagebox.showerror("Error", f"Error during test:\n{str(e)}\n\nCheck console for details.")
finally:
self.progress_bar.pack_forget()
self.progress_percent.pack_forget()
self.test_button.config(state="normal", text="๐ TEST")
self.stop_button.config(state="disabled")
self.test_running = False
self.root.after(3000, lambda: self.test_phase.set(""))
if not self.speed_history:
self.current_speed.set("-- Mbps")
log("Test thread ended")
def show_results(self, download, upload, ping):
"""Show results"""
server_info = f"Server: {self.selected_server.get()}"
summary = f"""
═══════════════════════════════════
SPEED TEST RESULTS
═══════════════════════════════════
๐ฅ Download: {download:.2f} Mbps
๐ค Upload: {upload:.2f} Mbps
๐ก Ping: {ping:.1f} ms
⚡ Peak: {self.peak_speed_value:.2f} Mbps
{server_info}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
๐ก Tip: Enable continuous monitor
for constant network speed
monitoring!
"""
messagebox.showinfo("Test Results", summary)
def main():
log("Starting main application...")
root = tk.Tk()
# Center window first (will be adjusted after app initialization)
root.withdraw()
root.update_idletasks()
# Initialize app (this loads saved window size)
app = SpeedTestApp(root)
# Now center with the loaded dimensions
root.update_idletasks()
width = root.winfo_width()
height = root.winfo_height()
x = (root.winfo_screenwidth() // 2) - (width // 2)
y = (root.winfo_screenheight() // 2) - (height // 2)
root.geometry(f"+{x}+{y}")
root.deiconify()
# Icon
try:
root.iconbitmap(default='speed.ico')
log("Icon loaded")
except:
log("Icon not found", "WARNING")
pass
log("Entering main loop...")
root.mainloop()
log("Application terminated")
if __name__ == "__main__":
main()
Commenti
Posta un commento