Metadata Fixer
"""
======================================
Features:
- 97+ file extensions across 10+ categories
- Custom user-defined extensions
- Real drag & drop on Windows (windnd) — thread-safe via queue
- Auto-watch: auto-processes new files in watched folders
- Minimize to system tray / Start as tray icon
- All settings saved to config file next to the executable/script
- Comprehensive PE header parsing and file property metadata
Dependencies:
pip install Pillow piexif mutagen
Optional:
pip install pystray windnd tkinterdnd2
"""
import os
import sys
import json
import struct
import random
import string
import hashlib
import shutil
import threading
import time
import re
import queue
import stat
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from datetime import datetime, timedelta, timezone
from pathlib import Path
# --- Optional dependency imports ---
try:
from PIL import Image, ImageDraw
from PIL.ExifTags import TAGS
HAS_PILLOW = True
except ImportError:
HAS_PILLOW = False
try:
import piexif
HAS_PIEXIF = True
except ImportError:
HAS_PIEXIF = False
try:
import mutagen
from mutagen.mp4 import MP4
HAS_MUTAGEN = True
except ImportError:
HAS_MUTAGEN = False
try:
import pystray
HAS_PYSTRAY = True
except ImportError:
HAS_PYSTRAY = False
HAS_WINDND = False
HAS_TKDND2 = False
try:
import windnd
HAS_WINDND = True
except ImportError:
pass
if not HAS_WINDND:
try:
import tkinterdnd2
HAS_TKDND2 = True
except ImportError:
pass
# ============================================================
# CONFIG FILE
# ============================================================
def get_config_path():
if getattr(sys, 'frozen', False):
base = os.path.dirname(sys.executable)
else:
base = os.path.dirname(os.path.abspath(__file__))
return os.path.join(base, "metadata_tool_config.json")
def load_config():
p = get_config_path()
if os.path.isfile(p):
try:
with open(p, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return {}
def save_config(data):
try:
with open(get_config_path(), "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception:
pass
# ============================================================
# CONSTANTS
# ============================================================
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".tiff", ".tif", ".webp", ".bmp", ".gif", ".ico", ".svg"}
AUDIO_EXTENSIONS = {".mp3", ".flac", ".ogg", ".m4a", ".wma", ".aac", ".wav", ".opus", ".ape", ".aiff"}
VIDEO_EXTENSIONS = {".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm", ".m4v", ".mpg", ".mpeg"}
PDF_EXTENSIONS = {".pdf"}
OFFICE_EXTENSIONS = {".docx", ".xlsx", ".pptx", ".odt", ".ods", ".odp", ".doc", ".xls", ".ppt"}
EXECUTABLE_EXTENSIONS = {".exe", ".dll", ".msi", ".sys", ".scr", ".com"}
SHORTCUT_EXTENSIONS = {".lnk", ".url", ".desktop", ".webloc"}
TEXT_CODE_EXTENSIONS = {
".txt", ".md", ".csv", ".json", ".xml", ".yaml", ".yml", ".ini", ".cfg",
".log", ".html", ".htm", ".css", ".js", ".ts", ".py", ".java", ".c",
".cpp", ".h", ".hpp", ".cs", ".rb", ".php", ".go", ".rs", ".swift",
".kt", ".sh", ".bat", ".ps1", ".sql", ".r", ".lua", ".toml",
}
FONT_EXTENSIONS = {".ttf", ".otf", ".woff", ".woff2", ".eot"}
ARCHIVE_EXTENSIONS = {".zip", ".rar", ".7z", ".tar", ".gz", ".bz2", ".xz"}
ACTION_REMOVE = "remove"
ACTION_OBFUSCATE = "obfuscate"
ACTION_RANDOMIZE = "randomize"
ACTION_REMOVE_OBFUSCATE = "remove_obfuscate"
COLORS = {
"bg": "#1a1b26", "surface": "#24283b", "surface2": "#2f3349",
"border": "#3b4261", "text": "#c0caf5", "text_dim": "#565f89",
"accent": "#7aa2f7", "accent_hover": "#89b4fa", "danger": "#f7768e",
"warning": "#e0af68", "success": "#9ece6a", "purple": "#bb9af7",
"cyan": "#7dcfff", "orange": "#ff9e64",
"dropzone": "#1e2a45", "dropzone_hover": "#283d5e", "dropzone_border": "#4a6fa5",
}
WATCH_INTERVAL = 3
# PE Machine type lookup
PE_MACHINE_TYPES = {
0x0: "Unknown", 0x14c: "x86 (i386)", 0x166: "MIPS R4000",
0x1a2: "Hitachi SH3", 0x1a6: "Hitachi SH4", 0x1c0: "ARM",
0x1c4: "ARM Thumb-2", 0x8664: "AMD64 (x86-64)", 0xaa64: "ARM64",
0x5032: "RISC-V 32", 0x5064: "RISC-V 64",
}
# ============================================================
# FILE PROPERTY HELPERS
# ============================================================
def get_file_properties(filepath):
"""Get filesystem-level metadata common to all files."""
props = {}
try:
st = os.stat(filepath)
props["FileSize"] = f"{st.st_size:,} bytes ({st.st_size / 1024:.1f} KB)"
props["Created"] = datetime.fromtimestamp(st.st_ctime).strftime("%Y-%m-%d %H:%M:%S")
props["Modified"] = datetime.fromtimestamp(st.st_mtime).strftime("%Y-%m-%d %H:%M:%S")
props["Accessed"] = datetime.fromtimestamp(st.st_atime).strftime("%Y-%m-%d %H:%M:%S")
# File permissions
mode = st.st_mode
attrs = []
if mode & stat.S_IRUSR: attrs.append("readable")
if mode & stat.S_IWUSR: attrs.append("writable")
if mode & stat.S_IXUSR: attrs.append("executable")
props["Permissions"] = ", ".join(attrs) if attrs else "none"
# Windows hidden/system attributes
if sys.platform == "win32":
try:
import ctypes
FILE_ATTRIBUTE_HIDDEN = 0x2
FILE_ATTRIBUTE_SYSTEM = 0x4
FILE_ATTRIBUTE_READONLY = 0x1
attrs_win = ctypes.windll.kernel32.GetFileAttributesW(filepath)
if attrs_win != -1:
win_flags = []
if attrs_win & FILE_ATTRIBUTE_HIDDEN: win_flags.append("Hidden")
if attrs_win & FILE_ATTRIBUTE_SYSTEM: win_flags.append("System")
if attrs_win & FILE_ATTRIBUTE_READONLY: win_flags.append("ReadOnly")
if win_flags:
props["WinAttributes"] = ", ".join(win_flags)
except Exception:
pass
except Exception as e:
props["error"] = str(e)
return props
def get_file_checksum(filepath, algo="md5"):
"""Compute MD5 checksum of the entire file."""
try:
h = hashlib.md5()
with open(filepath, "rb") as f:
while True:
chunk = f.read(65536)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
except Exception:
return None
def modify_file_timestamps(filepath, action):
"""
Modify filesystem timestamps (Created, Modified, Accessed).
For 'remove'/'remove_obfuscate': set all to 1980-01-01.
For 'obfuscate'/'randomize': set to a random date between 2000-2025.
On Windows, uses kernel32 SetFileTime with \\\\?\\ prefix.
"""
try:
if action in (ACTION_REMOVE, ACTION_REMOVE_OBFUSCATE):
target_ts = datetime(1980, 1, 1).timestamp()
else:
start = datetime(2000, 1, 1).timestamp()
end = datetime(2025, 12, 31).timestamp()
target_ts = random.uniform(start, end)
if sys.platform == "win32":
try:
import ctypes
from ctypes import wintypes
kernel32 = ctypes.windll.kernel32
EPOCH_DIFF = 116444736000000000
ft_val = int(target_ts * 10000000) + EPOCH_DIFF
ft = wintypes.FILETIME(ft_val & 0xFFFFFFFF, ft_val >> 32)
FILE_WRITE_ATTRIBUTES = 0x100
FILE_SHARE_RW = 0x1 | 0x2 | 0x4
OPEN_EXISTING = 3
FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
# Use extended path prefix
win_path = filepath
if not win_path.startswith("\\\\?\\"):
win_path = "\\\\?\\" + os.path.abspath(filepath)
handle = kernel32.CreateFileW(
win_path, FILE_WRITE_ATTRIBUTES,
FILE_SHARE_RW,
None, OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS, None
)
INVALID_HANDLE = ctypes.c_void_p(-1).value & 0xFFFFFFFFFFFFFFFF
h_val = handle if isinstance(handle, int) else handle.value if handle else 0
if h_val and h_val != INVALID_HANDLE and h_val != 0xFFFFFFFF:
kernel32.SetFileTime(
handle,
ctypes.byref(ft),
ctypes.byref(ft),
ctypes.byref(ft),
)
kernel32.CloseHandle(handle)
return True
else:
os.utime(filepath, (target_ts, target_ts))
return True
except Exception:
os.utime(filepath, (target_ts, target_ts))
return True
else:
os.utime(filepath, (target_ts, target_ts))
return True
except Exception:
return False
def _restore_timestamps_win32(filepath, ctime_ts, atime_ts, mtime_ts):
"""
Restore all three timestamps (creation, access, modification) on Windows.
Uses SetFileTime with FILE_WRITE_ATTRIBUTES to avoid side-effects.
Uses \\\\?\\ extended path prefix for reliable Win32 API access.
"""
if sys.platform != "win32":
try:
os.utime(filepath, (atime_ts, mtime_ts))
except Exception:
pass
return
try:
import ctypes
from ctypes import wintypes
kernel32 = ctypes.windll.kernel32
EPOCH_DIFF = 116444736000000000
FILE_WRITE_ATTRIBUTES = 0x100
FILE_SHARE_RW = 0x1 | 0x2 | 0x4 # READ | WRITE | DELETE
OPEN_EXISTING = 3
FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
def _ts_to_ft(ts):
val = int(ts * 10000000) + EPOCH_DIFF
return wintypes.FILETIME(val & 0xFFFFFFFF, val >> 32)
ft_c = _ts_to_ft(ctime_ts)
ft_a = _ts_to_ft(atime_ts)
ft_m = _ts_to_ft(mtime_ts)
# Use extended path prefix for reliable access
win_path = filepath
if not win_path.startswith("\\\\?\\"):
win_path = "\\\\?\\" + os.path.abspath(filepath)
handle = kernel32.CreateFileW(
win_path, FILE_WRITE_ATTRIBUTES,
FILE_SHARE_RW,
None, OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS, None
)
INVALID_HANDLE = ctypes.c_void_p(-1).value & 0xFFFFFFFFFFFFFFFF
h_val = handle if isinstance(handle, int) else handle.value if handle else 0
if h_val and h_val != INVALID_HANDLE and h_val != 0xFFFFFFFF:
kernel32.SetFileTime(
handle,
ctypes.byref(ft_c),
ctypes.byref(ft_a),
ctypes.byref(ft_m),
)
kernel32.CloseHandle(handle)
else:
# Fallback
os.utime(filepath, (atime_ts, mtime_ts))
except Exception:
try:
os.utime(filepath, (atime_ts, mtime_ts))
except Exception:
pass
# ============================================================
# METADATA PROCESSING ENGINE
# ============================================================
class MetadataProcessor:
@staticmethod
def random_string(length=12):
return "".join(random.choices(string.ascii_letters + string.digits, k=length))
# ---- IMAGE ----
@staticmethod
def get_image_metadata(filepath):
metadata = get_file_properties(filepath)
cs = get_file_checksum(filepath)
if cs:
metadata["Checksum_MD5"] = cs
ext = Path(filepath).suffix.lower()
pillow_ok = False
if HAS_PILLOW:
try:
img = Image.open(filepath)
metadata["ImageSize"] = f"{img.width}x{img.height}"
metadata["Mode"] = img.mode
metadata["Format"] = str(img.format)
exif_data = img._getexif()
if exif_data:
for tag_id, value in exif_data.items():
tag_name = TAGS.get(tag_id, f"Tag_{tag_id}")
try:
metadata[f"EXIF.{tag_name}"] = str(value)[:200]
except Exception:
metadata[f"EXIF.{tag_name}"] = "<binary data>"
if hasattr(img, "info") and img.info:
for k, v in img.info.items():
key = f"Info.{k}"
if key not in metadata:
metadata[key] = str(v)[:200]
pillow_ok = True
except Exception:
pass
# If Pillow failed, try binary-level analysis
if not pillow_ok:
try:
with open(filepath, "rb") as f:
header = f.read(4096)
# Detect format from magic bytes
if header[:8] == b"\x89PNG\r\n\x1a\n":
metadata["Format"] = "PNG"
metadata["MimeType"] = "image/png"
# Parse PNG chunks to find metadata
pos = 8
while pos + 8 <= len(header):
chunk_len = struct.unpack(">I", header[pos:pos + 4])[0]
chunk_type = header[pos + 4:pos + 8].decode("ascii", errors="replace")
if chunk_type == "IHDR" and pos + 24 <= len(header):
w = struct.unpack(">I", header[pos + 8:pos + 12])[0]
h = struct.unpack(">I", header[pos + 12:pos + 16])[0]
metadata["ImageSize"] = f"{w}x{h}"
bit_depth = header[pos + 16]
color_type = header[pos + 17]
metadata["BitDepth"] = str(bit_depth)
ct_map = {0: "Grayscale", 2: "RGB", 3: "Indexed",
4: "Grayscale+Alpha", 6: "RGBA"}
metadata["ColorType"] = ct_map.get(color_type, str(color_type))
elif chunk_type in ("tEXt", "iTXt", "zTXt"):
chunk_data = header[pos + 8:pos + 8 + min(chunk_len, 200)]
text = chunk_data.decode("latin-1", errors="replace")
metadata[f"PNG.{chunk_type}"] = text[:200]
elif chunk_type == "tIME":
metadata["PNG.tIME"] = "Present"
elif chunk_type == "eXIf":
metadata["PNG.eXIf"] = "Present"
pos += 12 + chunk_len
elif header[:2] == b"\xff\xd8":
metadata["Format"] = "JPEG"
metadata["MimeType"] = "image/jpeg"
elif header[:4] == b"GIF8":
metadata["Format"] = "GIF"
metadata["MimeType"] = "image/gif"
elif header[:4] == b"RIFF" and header[8:12] == b"WEBP":
metadata["Format"] = "WebP"
metadata["MimeType"] = "image/webp"
else:
metadata["Format"] = f"Unknown ({ext})"
metadata["RawHeader"] = " ".join(f"{b:02X}" for b in header[:64])
metadata["note"] = "Pillow could not open file — binary analysis only"
except Exception as e:
metadata["error"] = str(e)
return metadata
@staticmethod
def process_image(filepath, action):
ext = Path(filepath).suffix.lower()
if ext in (".jpg", ".jpeg"):
return MetadataProcessor._process_jpeg(filepath, action)
elif ext == ".png":
return MetadataProcessor._process_png(filepath, action)
elif ext in (".tiff", ".tif"):
return MetadataProcessor._process_tiff(filepath, action)
elif ext == ".gif":
return MetadataProcessor._process_gif(filepath, action)
else:
return MetadataProcessor._process_generic_image(filepath, action)
@staticmethod
def _process_jpeg(filepath, action):
if not HAS_PIEXIF:
return False, "piexif not installed"
try:
if action == ACTION_REMOVE:
piexif.remove(filepath)
return True, "EXIF data removed"
else:
exif_dict = piexif.load(filepath)
for ifd_name in ("0th", "Exif", "GPS", "1st"):
ifd = exif_dict.get(ifd_name, {})
for key in list(ifd.keys()):
val = ifd[key]
if isinstance(val, bytes):
ifd[key] = MetadataProcessor.random_string(len(val)).encode()
elif isinstance(val, str):
ifd[key] = MetadataProcessor.random_string(len(val))
elif isinstance(val, int):
ifd[key] = random.randint(0, 9999)
elif isinstance(val, tuple):
ifd[key] = tuple(random.randint(1, 100) for _ in val)
piexif.insert(piexif.dump(exif_dict), filepath)
return True, "EXIF data obfuscated"
except Exception as e:
return False, str(e)
@staticmethod
def _process_png(filepath, action):
"""Strip PNG metadata. Try Pillow first, fallback to binary chunk removal."""
# Try Pillow approach first
if HAS_PILLOW:
try:
img = Image.open(filepath)
clean = Image.new(img.mode, img.size)
clean.putdata(list(img.getdata()))
clean.save(filepath)
return True, "PNG metadata stripped"
except Exception:
pass # Fall through to binary approach
# Binary fallback: remove tEXt, iTXt, zTXt, eXIf chunks from PNG
return MetadataProcessor._process_png_binary(filepath, action)
@staticmethod
def _process_png_binary(filepath, action):
"""
Remove metadata chunks from a PNG file at the binary level.
Works even when Pillow can't open the file (corrupt, unusual format).
PNG structure: signature (8 bytes) + chunks (length + type + data + CRC).
Metadata chunks: tEXt, iTXt, zTXt, eXIf, tIME, iCCP, sRGB, gAMA, cHRM, pHYs.
"""
try:
with open(filepath, "rb") as f:
data = f.read()
# Verify PNG signature
PNG_SIG = b"\x89PNG\r\n\x1a\n"
if data[:8] != PNG_SIG:
# Not a valid PNG — try generic approach
return MetadataProcessor.process_generic(filepath, action)
# Metadata chunk types to remove/modify
meta_chunks = {b"tEXt", b"iTXt", b"zTXt", b"eXIf", b"tIME", b"iCCP"}
# Chunks to optionally remove (color profile related)
optional_chunks = {b"sRGB", b"gAMA", b"cHRM", b"pHYs"}
if action in (ACTION_REMOVE, ACTION_REMOVE_OBFUSCATE):
remove_set = meta_chunks | optional_chunks
else:
remove_set = meta_chunks
# Parse and rebuild PNG, skipping metadata chunks
output = bytearray(PNG_SIG)
pos = 8
removed_count = 0
while pos + 8 <= len(data):
chunk_len = struct.unpack(">I", data[pos:pos + 4])[0]
chunk_type = data[pos + 4:pos + 8]
chunk_end = pos + 12 + chunk_len # 4 len + 4 type + data + 4 CRC
if chunk_end > len(data):
# Truncated chunk — keep remaining data as-is
output.extend(data[pos:])
break
if chunk_type in remove_set:
removed_count += 1
# Skip this chunk
else:
# Keep this chunk
output.extend(data[pos:chunk_end])
pos = chunk_end
if removed_count > 0:
with open(filepath, "wb") as f:
f.write(output)
return True, f"PNG: {removed_count} metadata chunk(s) removed (binary)"
else:
return True, "PNG: no metadata chunks found"
except Exception as e:
return False, str(e)
@staticmethod
def _process_tiff(filepath, action):
if not HAS_PIEXIF:
return MetadataProcessor._process_generic_image(filepath, action)
try:
if action == ACTION_REMOVE:
piexif.remove(filepath)
return True, "TIFF EXIF removed"
else:
return MetadataProcessor._process_jpeg(filepath, action)
except Exception as e:
return False, str(e)
@staticmethod
def _process_generic_image(filepath, action):
if not HAS_PILLOW:
return False, "Pillow not installed"
try:
img = Image.open(filepath)
clean = Image.new(img.mode, img.size)
clean.putdata(list(img.getdata()))
clean.save(filepath)
return True, "Image re-saved without metadata"
except Exception as e:
return False, str(e)
@staticmethod
def _process_gif(filepath, action):
"""
Remove metadata from GIF files at binary level, preserving all frames.
GIF metadata lives in:
- Comment Extension blocks (0x21 0xFE)
- Application Extension blocks (0x21 0xFF) like XMP, NETSCAPE2.0
- Plain Text Extension (0x21 0x01)
We strip comment and non-essential application extensions while
keeping NETSCAPE2.0 (needed for animation looping) and all image data.
"""
try:
with open(filepath, "rb") as f:
data = f.read()
# Verify GIF signature
if data[:3] != b"GIF" or data[3:6] not in (b"87a", b"89a"):
# Not a valid GIF — try Pillow as fallback
if HAS_PILLOW:
return MetadataProcessor._process_generic_image(filepath, action)
return False, "Not a valid GIF file"
# GIF structure:
# Header (6) + Logical Screen Descriptor (7) + [Global Color Table] + blocks
header = data[:6]
lsd = data[6:13]
# Check for Global Color Table
packed = lsd[4]
has_gct = (packed >> 7) & 1
gct_size = 3 * (2 ** ((packed & 0x07) + 1)) if has_gct else 0
pos = 13 + gct_size
output = bytearray(data[:pos]) # Keep header + LSD + GCT
removed_count = 0
while pos < len(data):
byte = data[pos]
if byte == 0x3B:
# Trailer — end of GIF
output.append(0x3B)
break
elif byte == 0x2C:
# Image Descriptor — keep everything (this is actual frame data)
# Image Descriptor: 10 bytes
if pos + 10 > len(data):
output.extend(data[pos:])
break
img_desc = data[pos:pos + 10]
output.extend(img_desc)
pos += 10
# Local Color Table
img_packed = img_desc[9]
has_lct = (img_packed >> 7) & 1
if has_lct:
lct_size = 3 * (2 ** ((img_packed & 0x07) + 1))
output.extend(data[pos:pos + lct_size])
pos += lct_size
# LZW Minimum Code Size
if pos < len(data):
output.append(data[pos])
pos += 1
# Sub-blocks
while pos < len(data):
block_size = data[pos]
output.append(block_size)
pos += 1
if block_size == 0:
break
output.extend(data[pos:pos + block_size])
pos += block_size
elif byte == 0x21:
# Extension block
if pos + 1 >= len(data):
output.extend(data[pos:])
break
ext_label = data[pos + 1]
if ext_label == 0xFE:
# Comment Extension — REMOVE
pos += 2
while pos < len(data):
sz = data[pos]
pos += 1
if sz == 0:
break
pos += sz
removed_count += 1
elif ext_label == 0x01:
# Plain Text Extension — REMOVE (rarely used, contains text metadata)
pos += 2
while pos < len(data):
sz = data[pos]
pos += 1
if sz == 0:
break
pos += sz
removed_count += 1
elif ext_label == 0xFF:
# Application Extension — check if it's NETSCAPE (keep) or XMP (remove)
# Save position to decide
saved_pos = pos
pos += 2
if pos < len(data):
block_size = data[pos]
pos += 1
app_id = data[pos:pos + block_size] if pos + block_size <= len(data) else b""
pos += block_size
is_netscape = app_id.startswith(b"NETSCAPE")
# Read remaining sub-blocks
sub_blocks = bytearray()
while pos < len(data):
sz = data[pos]
sub_blocks.append(sz)
pos += 1
if sz == 0:
break
sub_blocks.extend(data[pos:pos + sz])
pos += sz
if is_netscape:
# Keep NETSCAPE extension (animation looping)
output.append(0x21)
output.append(0xFF)
output.append(block_size)
output.extend(app_id)
output.extend(sub_blocks)
else:
# Remove XMP, ICC, etc.
removed_count += 1
elif ext_label == 0xF9:
# Graphic Control Extension — KEEP (controls frame timing/transparency)
output.extend(data[pos:pos + 2])
pos += 2
while pos < len(data):
sz = data[pos]
output.append(sz)
pos += 1
if sz == 0:
break
output.extend(data[pos:pos + sz])
pos += sz
else:
# Unknown extension — keep it
output.extend(data[pos:pos + 2])
pos += 2
while pos < len(data):
sz = data[pos]
output.append(sz)
pos += 1
if sz == 0:
break
output.extend(data[pos:pos + sz])
pos += sz
else:
# Unknown byte — keep and advance
output.append(byte)
pos += 1
if removed_count > 0 or action in (ACTION_REMOVE, ACTION_REMOVE_OBFUSCATE):
with open(filepath, "wb") as f:
f.write(output)
return True, f"GIF: {removed_count} metadata block(s) removed (frames preserved)"
else:
return True, "GIF: no metadata blocks found (frames intact)"
except Exception as e:
return False, str(e)
@staticmethod
def get_audio_metadata(filepath):
metadata = get_file_properties(filepath)
cs = get_file_checksum(filepath)
if cs:
metadata["Checksum_MD5"] = cs
if not HAS_MUTAGEN:
metadata["warning"] = "mutagen not installed"
return metadata
try:
audio = mutagen.File(filepath, easy=True)
if audio and audio.tags:
for key, val in audio.tags.items():
metadata[f"Tag.{key}"] = str(val)[:200]
raw = mutagen.File(filepath)
if raw and hasattr(raw, "info"):
info = raw.info
if hasattr(info, "length"):
metadata["Duration"] = f"{info.length:.1f}s"
if hasattr(info, "bitrate"):
metadata["Bitrate"] = f"{info.bitrate // 1000} kbps"
if hasattr(info, "sample_rate"):
metadata["SampleRate"] = f"{info.sample_rate} Hz"
if hasattr(info, "channels"):
metadata["Channels"] = str(info.channels)
except Exception as e:
metadata["error"] = str(e)
return metadata
@staticmethod
def process_audio(filepath, action):
if not HAS_MUTAGEN:
return False, "mutagen not installed"
try:
audio = mutagen.File(filepath)
if audio is None:
return False, "Unrecognized audio format"
if action == ACTION_REMOVE:
audio.delete()
audio.save()
return True, "Audio tags removed"
else:
easy = mutagen.File(filepath, easy=True)
if easy and easy.tags:
for key in list(easy.tags.keys()):
easy.tags[key] = [MetadataProcessor.random_string(8)]
easy.save()
return True, "Audio tags obfuscated"
except Exception as e:
return False, str(e)
# ---- VIDEO ----
@staticmethod
def get_video_metadata(filepath):
metadata = get_file_properties(filepath)
cs = get_file_checksum(filepath)
if cs:
metadata["Checksum_MD5"] = cs
if HAS_MUTAGEN:
try:
video = mutagen.File(filepath)
if video:
if hasattr(video, "info"):
info = video.info
if hasattr(info, "length"):
metadata["Duration"] = f"{info.length:.1f}s"
if hasattr(info, "bitrate"):
metadata["Bitrate"] = f"{info.bitrate // 1000} kbps"
if video.tags:
for key, val in video.tags.items():
metadata[f"Tag.{key}"] = str(val)[:200]
except Exception:
pass
return metadata
@staticmethod
def process_video(filepath, action):
if HAS_MUTAGEN:
try:
video = mutagen.File(filepath)
if video is not None:
if action == ACTION_REMOVE:
video.delete()
video.save()
return True, "Video tags removed"
else:
if hasattr(video, "tags") and video.tags:
for key in list(video.tags.keys()):
try:
video.tags[key] = [MetadataProcessor.random_string(8)]
except Exception:
pass
video.save()
return True, "Video tags obfuscated"
except Exception as e:
return False, str(e)
return False, "No video metadata handler available"
# ---- PDF ----
@staticmethod
def get_pdf_metadata(filepath):
metadata = get_file_properties(filepath)
cs = get_file_checksum(filepath)
if cs:
metadata["Checksum_MD5"] = cs
try:
with open(filepath, "rb") as f:
content = f.read()
text = content.decode("latin-1")
for field in ("Title", "Author", "Subject", "Creator", "Producer",
"CreationDate", "ModDate", "Keywords"):
idx = text.find(f"/{field}")
if idx != -1:
ps = text.find("(", idx)
pe = text.find(")", ps + 1) if ps != -1 else -1
if ps != -1 and pe != -1 and (ps - idx) < 50:
metadata[f"PDF.{field}"] = text[ps + 1:pe][:200]
except Exception as e:
metadata["error"] = str(e)
return metadata
@staticmethod
def process_pdf(filepath, action):
try:
with open(filepath, "rb") as f:
content = f.read()
text = content.decode("latin-1")
modified = False
for field in ("Title", "Author", "Subject", "Creator", "Producer",
"CreationDate", "ModDate", "Keywords"):
idx = text.find(f"/{field}")
if idx != -1:
ps = text.find("(", idx)
pe = text.find(")", ps + 1) if ps != -1 else -1
if ps != -1 and pe != -1 and (ps - idx) < 50:
orig = text[ps + 1:pe]
repl = " " * len(orig) if action == ACTION_REMOVE else MetadataProcessor.random_string(len(orig))
text = text[:ps + 1] + repl + text[pe:]
modified = True
if modified:
with open(filepath, "wb") as f:
f.write(text.encode("latin-1"))
return True, f"PDF metadata {'cleared' if action == ACTION_REMOVE else 'obfuscated'}"
return True, "No writable PDF metadata found"
except Exception as e:
return False, str(e)
# ---- OFFICE ----
@staticmethod
def get_office_metadata(filepath):
import zipfile
import xml.etree.ElementTree as ET
metadata = get_file_properties(filepath)
try:
with zipfile.ZipFile(filepath, "r") as zf:
for pf in ("docProps/core.xml", "docProps/app.xml"):
if pf in zf.namelist():
root = ET.fromstring(zf.read(pf).decode("utf-8", errors="replace"))
for elem in root.iter():
tag = elem.tag.split("}")[-1] if "}" in elem.tag else elem.tag
if elem.text and elem.text.strip():
metadata[f"Office.{tag}"] = elem.text.strip()[:200]
except Exception as e:
metadata["error"] = str(e)
return metadata
@staticmethod
def process_office(filepath, action):
import zipfile
import xml.etree.ElementTree as ET
try:
temp = filepath + ".tmp"
modified = False
with zipfile.ZipFile(filepath, "r") as zi:
with zipfile.ZipFile(temp, "w", zipfile.ZIP_DEFLATED) as zo:
for item in zi.namelist():
data = zi.read(item)
if item in ("docProps/core.xml", "docProps/app.xml"):
root = ET.fromstring(data.decode("utf-8", errors="replace"))
for elem in root.iter():
if elem.text and elem.text.strip():
elem.text = "" if action == ACTION_REMOVE else MetadataProcessor.random_string(8)
modified = True
data = ET.tostring(root, encoding="unicode").encode("utf-8")
zo.writestr(item, data)
shutil.move(temp, filepath)
return True, f"Office metadata {'removed' if action == ACTION_REMOVE else 'obfuscated'}" if modified else "No metadata found"
except Exception as e:
if os.path.exists(filepath + ".tmp"):
os.remove(filepath + ".tmp")
return False, str(e)
# ---- EXECUTABLE / PE ----
@staticmethod
def get_executable_metadata(filepath):
"""Comprehensive PE metadata extraction: COFF + Optional Header + version info."""
metadata = get_file_properties(filepath)
cs = get_file_checksum(filepath)
if cs:
metadata["Checksum_MD5"] = cs
ext = Path(filepath).suffix.lower()
metadata["FileType"] = {".exe": "Win Executable", ".dll": "Dynamic Link Library",
".msi": "Windows Installer", ".sys": "System Driver",
".scr": "Screen Saver", ".com": "DOS Executable"}.get(ext, "Executable")
metadata["MimeType"] = "application/octet-stream"
try:
fsize = os.path.getsize(filepath)
with open(filepath, "rb") as f:
data = f.read(min(fsize, 131072))
# Detect OLE Compound Document (MSI, old Office, etc.)
OLE_SIG = b"\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1"
if data[:8] == OLE_SIG:
metadata["Format"] = "OLE Compound Document"
if ext == ".msi":
metadata["MimeType"] = "application/x-msi"
metadata["FileType"] = "Windows Installer Package (MSI)"
# Try to extract OLE summary info
text = data.decode("latin-1", errors="replace")
for kw in ("Author", "Title", "Subject", "Keywords", "Comments",
"Template", "LastSavedBy", "RevisionNumber",
"Company", "Manager"):
idx = text.find(kw)
if idx != -1:
snippet = text[idx:idx + 80]
printable = "".join(c if c.isprintable() else " " for c in snippet).strip()
if printable and printable != kw:
metadata[f"OLE.{kw}"] = printable[:200]
metadata["RawHeader"] = " ".join(f"{b:02X}" for b in data[:128])
return metadata
if data[:2] != b"MZ":
metadata["PE.Format"] = "Not a valid PE (no MZ header)"
metadata["RawHeader"] = " ".join(f"{b:02X}" for b in data[:64])
return metadata
metadata["PE.Format"] = "MZ executable"
if len(data) <= 0x40:
return metadata
pe_offset = struct.unpack_from("<I", data, 0x3C)[0]
if pe_offset + 24 >= len(data) or data[pe_offset:pe_offset + 4] != b"PE\x00\x00":
metadata["RawHeader"] = " ".join(f"{b:02X}" for b in data[:64])
return metadata
metadata["PE.Signature"] = "PE"
# === COFF HEADER (20 bytes at pe_offset + 4) ===
coff = pe_offset + 4
machine = struct.unpack_from("<H", data, coff)[0]
num_sections = struct.unpack_from("<H", data, coff + 2)[0]
timestamp_raw = struct.unpack_from("<I", data, coff + 4)[0]
sym_table_ptr = struct.unpack_from("<I", data, coff + 8)[0]
num_symbols = struct.unpack_from("<I", data, coff + 12)[0]
opt_hdr_size = struct.unpack_from("<H", data, coff + 16)[0]
characteristics = struct.unpack_from("<H", data, coff + 18)[0]
metadata["PE.MachineType"] = PE_MACHINE_TYPES.get(machine, f"0x{machine:04X}")
metadata["PE.Sections"] = str(num_sections)
metadata["PE.OptionalHeaderSize"] = str(opt_hdr_size)
if timestamp_raw > 0:
try:
ts_dt = datetime.fromtimestamp(timestamp_raw, tz=timezone.utc)
metadata["PE.Timestamp"] = ts_dt.strftime("%Y-%m-%d %H:%M:%S UTC")
except Exception:
metadata["PE.Timestamp"] = str(timestamp_raw)
else:
metadata["PE.Timestamp"] = "0 (stripped)"
char_flags = []
if characteristics & 0x0001: char_flags.append("RelocsStripped")
if characteristics & 0x0002: char_flags.append("Executable")
if characteristics & 0x0004: char_flags.append("LineNumsStripped")
if characteristics & 0x0008: char_flags.append("LocalSymsStripped")
if characteristics & 0x0020: char_flags.append("LargeAddressAware")
if characteristics & 0x0100: char_flags.append("32bit")
if characteristics & 0x0200: char_flags.append("DebugStripped")
if characteristics & 0x2000: char_flags.append("DLL")
if char_flags:
metadata["PE.Characteristics"] = ", ".join(char_flags)
# === OPTIONAL HEADER ===
opt = coff + 20
if opt + 2 > len(data):
return metadata
magic = struct.unpack_from("<H", data, opt)[0]
is_pe32plus = (magic == 0x20B)
metadata["PE.Type"] = "PE32+ (64-bit)" if is_pe32plus else ("PE32" if magic == 0x10B else f"Unknown (0x{magic:04X})")
# Linker version (offset 2-3 from opt header start)
if opt + 4 <= len(data):
metadata["PE.LinkerVersion"] = f"{data[opt + 2]}.{data[opt + 3]}"
# Code size, initialized data, uninitialized data (offsets 4, 8, 12)
if opt + 16 <= len(data):
code_size = struct.unpack_from("<I", data, opt + 4)[0]
init_data = struct.unpack_from("<I", data, opt + 8)[0]
uninit_data = struct.unpack_from("<I", data, opt + 12)[0]
metadata["PE.CodeSize"] = f"{code_size:,} bytes"
metadata["PE.InitializedDataSize"] = f"{init_data:,} bytes"
metadata["PE.UninitializedDataSize"] = f"{uninit_data:,} bytes"
# Entry point (offset 16)
if opt + 20 <= len(data):
entry = struct.unpack_from("<I", data, opt + 16)[0]
metadata["PE.EntryPoint"] = f"0x{entry:X}"
# OS Version, Image Version, Subsystem Version, Subsystem
# For PE32: OS version at opt+40, Image version at opt+44,
# Subsystem version at opt+48, Subsystem at opt+68
# For PE32+: same offsets (they don't shift until BaseOfData/ImageBase)
if opt + 50 <= len(data):
os_major = struct.unpack_from("<H", data, opt + 40)[0]
os_minor = struct.unpack_from("<H", data, opt + 42)[0]
metadata["PE.OSVersion"] = f"{os_major}.{os_minor}"
img_major = struct.unpack_from("<H", data, opt + 44)[0]
img_minor = struct.unpack_from("<H", data, opt + 46)[0]
metadata["PE.ImageVersion"] = f"{img_major}.{img_minor}"
sub_major = struct.unpack_from("<H", data, opt + 48)[0]
sub_minor = struct.unpack_from("<H", data, opt + 50)[0]
metadata["PE.SubsystemVersion"] = f"{sub_major}.{sub_minor}"
# Subsystem field: PE32 at opt+68, PE32+ at opt+68 (same)
if opt + 70 <= len(data):
subsys = struct.unpack_from("<H", data, opt + 68)[0]
subsys_map = {
0: "Unknown", 1: "Native", 2: "Windows GUI", 3: "Windows Console",
5: "OS/2 Console", 7: "POSIX Console", 9: "Windows CE",
10: "EFI Application", 11: "EFI Boot Driver", 12: "EFI Runtime Driver",
14: "Xbox", 16: "Windows Boot Application",
}
metadata["PE.Subsystem"] = subsys_map.get(subsys, f"Unknown ({subsys})")
# DLL Characteristics
dll_char_off = opt + 70
if dll_char_off + 2 <= len(data):
dll_chars = struct.unpack_from("<H", data, dll_char_off)[0]
dc_flags = []
if dll_chars & 0x0020: dc_flags.append("HighEntropyVA")
if dll_chars & 0x0040: dc_flags.append("DynamicBase/ASLR")
if dll_chars & 0x0080: dc_flags.append("ForceIntegrity")
if dll_chars & 0x0100: dc_flags.append("NX/DEP")
if dll_chars & 0x0200: dc_flags.append("NoIsolation")
if dll_chars & 0x0400: dc_flags.append("NoSEH")
if dll_chars & 0x0800: dc_flags.append("NoBind")
if dll_chars & 0x4000: dc_flags.append("ControlFlowGuard")
if dll_chars & 0x8000: dc_flags.append("TerminalServerAware")
if dc_flags:
metadata["PE.DLLCharacteristics"] = ", ".join(dc_flags)
# Checksum in PE header (opt + 64)
if opt + 68 <= len(data):
pe_checksum = struct.unpack_from("<I", data, opt + 64)[0]
metadata["PE.Checksum"] = f"0x{pe_checksum:08X}"
# === VERSION INFO STRINGS ===
for marker in ("CompanyName", "FileDescription", "FileVersion",
"InternalName", "LegalCopyright", "OriginalFilename",
"ProductName", "ProductVersion", "Comments",
"LegalTrademarks", "PrivateBuild", "SpecialBuild"):
utf16 = marker.encode("utf-16-le")
idx = data.find(utf16)
if idx != -1:
val_start = idx + len(utf16)
while val_start < len(data) - 1 and val_start < idx + len(utf16) + 16:
if data[val_start] != 0 or data[val_start + 1] != 0:
break
val_start += 2
val_bytes = data[val_start:val_start + 256]
try:
val = val_bytes.decode("utf-16-le").split("\x00")[0].strip()
if val:
metadata[f"VersionInfo.{marker}"] = val[:200]
except Exception:
pass
# Raw header (first 128 bytes hex)
metadata["RawHeader"] = " ".join(f"{b:02X}" for b in data[:128])
except Exception as e:
metadata["error"] = str(e)
return metadata
@staticmethod
def process_executable(filepath, action):
"""Modify PE timestamp, optional header fields, and version info strings.
Also handles OLE Compound Documents (MSI) with generic binary approach."""
try:
with open(filepath, "rb") as f:
data = bytearray(f.read())
modified = False
# Detect OLE Compound Document — use generic binary approach
OLE_SIG = b"\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1"
if data[:8] == OLE_SIG:
return MetadataProcessor.process_generic(filepath, action)
if data[:2] == b"MZ" and len(data) > 0x40:
pe_offset = struct.unpack_from("<I", data, 0x3C)[0]
if pe_offset + 24 <= len(data) and data[pe_offset:pe_offset + 4] == b"PE\x00\x00":
coff = pe_offset + 4
opt = coff + 20
# --- COFF: Zero/randomize timestamp ---
ts_off = coff + 4
if ts_off + 4 <= len(data):
if action == ACTION_REMOVE:
struct.pack_into("<I", data, ts_off, 0)
else:
struct.pack_into("<I", data, ts_off, random.randint(946684800, 1893456000))
modified = True
# --- Optional Header fields ---
if opt + 72 <= len(data):
# Linker version (opt+2, opt+3) — zero or randomize
if action == ACTION_REMOVE:
data[opt + 2] = 0
data[opt + 3] = 0
else:
data[opt + 2] = random.randint(8, 16)
data[opt + 3] = random.randint(0, 50)
# OS Version (opt+40..43) — zero or randomize
if action == ACTION_REMOVE:
struct.pack_into("<HH", data, opt + 40, 0, 0)
else:
struct.pack_into("<HH", data, opt + 40,
random.randint(4, 10), random.randint(0, 3))
modified = True
# Image Version (opt+44..47) — zero or randomize
if action == ACTION_REMOVE:
struct.pack_into("<HH", data, opt + 44, 0, 0)
else:
struct.pack_into("<HH", data, opt + 44,
random.randint(0, 5), random.randint(0, 99))
modified = True
# Subsystem Version (opt+48..51) — zero or randomize
if action == ACTION_REMOVE:
struct.pack_into("<HH", data, opt + 48, 0, 0)
else:
struct.pack_into("<HH", data, opt + 48,
random.randint(4, 10), random.randint(0, 3))
modified = True
# PE Checksum (opt+64..67) — zero it
struct.pack_into("<I", data, opt + 64, 0)
modified = True
# --- Version Info strings ---
for marker in ("CompanyName", "FileDescription", "FileVersion",
"InternalName", "LegalCopyright", "OriginalFilename",
"ProductName", "ProductVersion", "Comments",
"LegalTrademarks", "PrivateBuild", "SpecialBuild"):
utf16 = marker.encode("utf-16-le")
idx = data.find(utf16)
if idx != -1:
val_start = idx + len(utf16)
while val_start < len(data) - 1 and val_start < idx + len(utf16) + 16:
if data[val_start] != 0 or data[val_start + 1] != 0:
break
val_start += 2
pos = val_start
while pos + 1 < len(data) and pos < val_start + 512:
if data[pos] == 0 and data[pos + 1] == 0:
break
pos += 2
val_len = pos - val_start
if val_len > 0:
char_count = val_len // 2
if action == ACTION_REMOVE:
repl = b"\x20\x00" * char_count
else:
repl = MetadataProcessor.random_string(char_count).encode("utf-16-le")
data[val_start:val_start + val_len] = repl[:val_len]
modified = True
if modified:
with open(filepath, "wb") as f:
f.write(data)
return True, f"PE metadata {'cleared' if action == ACTION_REMOVE else 'obfuscated'} (timestamp + headers + version info)"
return True, "No writable PE metadata found"
except Exception as e:
return False, str(e)
# ---- SHORTCUTS ----
@staticmethod
def get_shortcut_metadata(filepath):
metadata = get_file_properties(filepath)
ext = Path(filepath).suffix.lower()
try:
if ext in (".url", ".desktop"):
with open(filepath, "r", errors="replace") as f:
for line in f:
if "=" in line and not line.startswith("#"):
k, v = line.strip().split("=", 1)
metadata[k] = v[:200]
elif ext == ".lnk":
with open(filepath, "rb") as f:
hdr = f.read(76)
if hdr[:4] == b"\x4c\x00\x00\x00":
metadata["Format"] = "Windows Shell Link"
# Flags at offset 0x14
if len(hdr) >= 0x18:
flags = struct.unpack_from("<I", hdr, 0x14)[0]
flag_names = []
if flags & 0x01: flag_names.append("HasLinkTargetIDList")
if flags & 0x02: flag_names.append("HasLinkInfo")
if flags & 0x04: flag_names.append("HasName")
if flags & 0x08: flag_names.append("HasRelativePath")
if flags & 0x10: flag_names.append("HasWorkingDir")
if flags & 0x20: flag_names.append("HasArguments")
if flags & 0x40: flag_names.append("HasIconLocation")
if flag_names:
metadata["LNK.Flags"] = ", ".join(flag_names)
# Creation, access, write times at offsets 0x1C, 0x24, 0x2C
for label, off in [("LNK.Created", 0x1C), ("LNK.Accessed", 0x24), ("LNK.Written", 0x2C)]:
if len(hdr) >= off + 8:
ts = struct.unpack_from("<Q", hdr, off)[0]
if ts > 0:
# Convert Windows FILETIME to Unix
try:
unix_ts = (ts - 116444736000000000) / 10000000
metadata[label] = datetime.fromtimestamp(unix_ts).strftime("%Y-%m-%d %H:%M:%S")
except Exception:
metadata[label] = str(ts)
except Exception as e:
metadata["error"] = str(e)
return metadata
@staticmethod
def process_shortcut(filepath, action):
ext = Path(filepath).suffix.lower()
try:
if ext in (".url", ".desktop"):
with open(filepath, "r", errors="replace") as f:
lines = f.readlines()
new_lines = []
modified = False
keep = {"url", "exec", "type", "icon", "terminal", "encoding"}
for line in lines:
if "=" in line and not line.startswith("#") and not line.startswith("["):
key, _ = line.split("=", 1)
if key.strip().lower() not in keep:
repl = "" if action == ACTION_REMOVE else MetadataProcessor.random_string(8)
new_lines.append(f"{key.strip()}={repl}\n")
modified = True
else:
new_lines.append(line)
else:
new_lines.append(line)
if modified:
with open(filepath, "w") as f:
f.writelines(new_lines)
return True, f"Shortcut metadata {'cleared' if action == ACTION_REMOVE else 'obfuscated'}"
return True, "No modifiable metadata"
elif ext == ".lnk":
# Zero out timestamps in LNK header
with open(filepath, "rb") as f:
data = bytearray(f.read())
if data[:4] == b"\x4c\x00\x00\x00" and len(data) >= 0x34:
for off in (0x1C, 0x24, 0x2C): # creation, access, write times
if action == ACTION_REMOVE:
data[off:off + 8] = b"\x00" * 8
else:
data[off:off + 8] = struct.pack("<Q", random.randint(
129000000000000000, 133000000000000000))
with open(filepath, "wb") as f:
f.write(data)
return True, f"LNK timestamps {'zeroed' if action == ACTION_REMOVE else 'randomized'}"
return True, "Not a valid LNK file"
except Exception as e:
return False, str(e)
return False, "Unsupported shortcut type"
# ---- TEXT / CODE ----
@staticmethod
def get_text_metadata(filepath):
"""Get file properties + content-level metadata."""
metadata = get_file_properties(filepath)
cs = get_file_checksum(filepath)
if cs:
metadata["Checksum_MD5"] = cs
ext = Path(filepath).suffix.lower()
# MIME type mapping
mime_map = {
".txt": "text/plain", ".md": "text/markdown", ".csv": "text/csv",
".json": "application/json", ".xml": "application/xml",
".yaml": "text/yaml", ".yml": "text/yaml", ".html": "text/html",
".htm": "text/html", ".css": "text/css", ".js": "application/javascript",
".ts": "application/typescript", ".py": "text/x-python",
".java": "text/x-java", ".c": "text/x-c", ".cpp": "text/x-c++",
".h": "text/x-c", ".cs": "text/x-csharp", ".rb": "text/x-ruby",
".php": "text/x-php", ".go": "text/x-go", ".rs": "text/x-rust",
".sh": "text/x-shellscript", ".bat": "text/x-batch",
".ps1": "text/x-powershell", ".sql": "text/x-sql",
".ini": "text/plain", ".cfg": "text/plain", ".log": "text/plain",
".toml": "text/x-toml", ".lua": "text/x-lua",
}
metadata["FileType"] = ext.lstrip(".").upper()
metadata["FileTypeExtension"] = ext.lstrip(".")
metadata["MimeType"] = mime_map.get(ext, "text/plain")
try:
with open(filepath, "rb") as f:
raw = f.read()
# Raw header hex (first 128 bytes)
metadata["RawHeader"] = " ".join(f"{b:02X}" for b in raw[:128])
# Detect encoding
if raw[:3] == b"\xef\xbb\xbf":
metadata["Encoding"] = "UTF-8 (BOM)"
metadata["ByteOrderMark"] = "Yes (UTF-8)"
elif raw[:2] == b"\xff\xfe":
metadata["Encoding"] = "UTF-16 LE"
metadata["ByteOrderMark"] = "Yes (UTF-16 LE)"
elif raw[:2] == b"\xfe\xff":
metadata["Encoding"] = "UTF-16 BE"
metadata["ByteOrderMark"] = "Yes (UTF-16 BE)"
else:
metadata["Encoding"] = "UTF-8 / ASCII (no BOM)"
metadata["ByteOrderMark"] = "No"
# Detect line endings
if b"\r\n" in raw:
metadata["LineEndings"] = "Windows (CRLF)"
elif b"\r" in raw:
metadata["LineEndings"] = "Classic Mac (CR)"
elif b"\n" in raw:
metadata["LineEndings"] = "Unix (LF)"
else:
metadata["LineEndings"] = "None (single line)"
# Full file text for accurate counts
full = raw.decode("utf-8", errors="replace")
# Line count — use splitlines for accurate counting
lines = full.splitlines()
metadata["LineCount"] = str(len(lines))
metadata["WordCount"] = str(len(full.split()))
# Check for metadata patterns in header
head = full[:4096]
for pattern in ("@author", "@version", "@date", "@copyright",
"Author:", "Date:", "Version:", "Copyright:",
"Created by", "Modified by", "Last modified"):
for line in head.split("\n")[:60]:
if pattern.lower() in line.lower():
metadata[f"Content.{pattern}"] = line.strip()[:200]
if "<meta" in head.lower():
for i, m in enumerate(re.findall(r'<meta[^>]+>', head, re.IGNORECASE)[:10]):
metadata[f"HTML.meta_{i}"] = m[:200]
except Exception as e:
metadata["error"] = str(e)
return metadata
@staticmethod
def process_text(filepath, action):
try:
with open(filepath, "r", errors="replace") as f:
content = f.read()
modified = False
patterns = [
r'(@author\s*[:\s]*)(.+)', r'(@version\s*[:\s]*)(.+)',
r'(@date\s*[:\s]*)(.+)', r'(@copyright\s*[:\s]*)(.+)',
r'(Author\s*:\s*)(.+)', r'(Created by\s*:\s*)(.+)',
r'(Modified by\s*:\s*)(.+)', r'(Date\s*:\s*)(.+)',
r'(Last modified\s*:\s*)(.+)', r'(Copyright\s*[:\s]*)(.+)',
]
for pat in patterns:
if re.search(pat, content, re.IGNORECASE):
if action == ACTION_REMOVE:
content = re.sub(pat, r'\1', content, flags=re.IGNORECASE)
else:
def _rr(m):
return m.group(1) + MetadataProcessor.random_string(len(m.group(2)))
content = re.sub(pat, _rr, content, flags=re.IGNORECASE)
modified = True
if "<meta" in content.lower():
for full, val in re.findall(r'(<meta\s+[^>]*content\s*=\s*")([^"]*)', content, re.IGNORECASE):
repl = "" if action == ACTION_REMOVE else MetadataProcessor.random_string(len(val))
content = content.replace(full + val, full + repl)
modified = True
if modified:
with open(filepath, "w", errors="replace") as f:
f.write(content)
return True, f"Text metadata {'cleaned' if action == ACTION_REMOVE else 'obfuscated'}"
return True, "No in-file metadata patterns found (file properties untouched)"
except Exception as e:
return False, str(e)
# ---- FONT ----
@staticmethod
def get_font_metadata(filepath):
metadata = get_file_properties(filepath)
try:
with open(filepath, "rb") as f:
data = f.read(min(os.path.getsize(filepath), 32768))
text = data.decode("latin-1", errors="replace")
for field in ("Copyright", "Font Family", "Full Name", "Version",
"PostScript", "Trademark", "Manufacturer", "Designer"):
if field.lower() in text.lower():
metadata[f"Font.{field}"] = "Present"
except Exception as e:
metadata["error"] = str(e)
return metadata
@staticmethod
def process_font(filepath, action):
return True, "Font metadata editing requires specialized tools for safety"
# ---- ARCHIVE ----
@staticmethod
def get_archive_metadata(filepath):
metadata = get_file_properties(filepath)
try:
if Path(filepath).suffix.lower() == ".zip":
import zipfile
with zipfile.ZipFile(filepath, "r") as zf:
if zf.comment:
metadata["ZIP.Comment"] = zf.comment.decode("utf-8", errors="replace")[:200]
metadata["ZIP.FileCount"] = str(len(zf.namelist()))
except Exception as e:
metadata["error"] = str(e)
return metadata
@staticmethod
def process_archive(filepath, action):
try:
if Path(filepath).suffix.lower() == ".zip":
import zipfile
temp = filepath + ".tmp"
with zipfile.ZipFile(filepath, "r") as zi:
with zipfile.ZipFile(temp, "w", zipfile.ZIP_DEFLATED) as zo:
zo.comment = b"" if action == ACTION_REMOVE else MetadataProcessor.random_string(16).encode()
for item in zi.infolist():
item.comment = b""
zo.writestr(item, zi.read(item.filename))
shutil.move(temp, filepath)
return True, "ZIP metadata cleaned"
return True, "Archive format not supported for modification"
except Exception as e:
if os.path.exists(filepath + ".tmp"):
os.remove(filepath + ".tmp")
return False, str(e)
# ---- GENERIC BINARY ----
@staticmethod
def get_generic_metadata(filepath):
metadata = get_file_properties(filepath)
cs = get_file_checksum(filepath)
if cs:
metadata["Checksum_MD5"] = cs
try:
with open(filepath, "rb") as f:
header = f.read(4096)
text = header.decode("latin-1", errors="replace")
for kw in ("author", "creator", "producer", "copyright", "title", "comment", "software"):
idx = text.lower().find(kw)
if idx != -1:
snippet = text[idx:idx + 80]
metadata[f"Found.{kw}"] = "".join(c if c.isprintable() else " " for c in snippet).strip()[:200]
metadata["RawHeader"] = " ".join(f"{b:02X}" for b in header[:16])
except Exception as e:
metadata["error"] = str(e)
return metadata
@staticmethod
def process_generic(filepath, action):
try:
with open(filepath, "rb") as f:
data = bytearray(f.read())
modified = False
limit = min(len(data), 8192)
header = data[:limit].decode("latin-1", errors="replace")
for kw in ("author", "creator", "copyright", "comment", "software"):
idx = header.lower().find(kw)
if idx != -1:
vs = idx + len(kw)
while vs < limit and header[vs] in ":= \t\"'":
vs += 1
ve = vs
while ve < limit and header[ve] not in "\x00\n\r\"'<>":
ve += 1
if ve > vs:
vl = ve - vs
repl = b" " * vl if action == ACTION_REMOVE else MetadataProcessor.random_string(vl).encode("latin-1")
data[vs:ve] = repl
modified = True
if modified:
with open(filepath, "wb") as f:
f.write(data)
return True, f"Generic metadata {'cleared' if action == ACTION_REMOVE else 'obfuscated'}"
return True, "No modifiable metadata found in header"
except Exception as e:
return False, str(e)
# ---- DISPATCHER ----
@staticmethod
def get_metadata(filepath):
ext = Path(filepath).suffix.lower()
for exts, func in [
(IMAGE_EXTENSIONS, MetadataProcessor.get_image_metadata),
(AUDIO_EXTENSIONS, MetadataProcessor.get_audio_metadata),
(VIDEO_EXTENSIONS, MetadataProcessor.get_video_metadata),
(PDF_EXTENSIONS, MetadataProcessor.get_pdf_metadata),
(OFFICE_EXTENSIONS, MetadataProcessor.get_office_metadata),
(EXECUTABLE_EXTENSIONS, MetadataProcessor.get_executable_metadata),
(SHORTCUT_EXTENSIONS, MetadataProcessor.get_shortcut_metadata),
(TEXT_CODE_EXTENSIONS, MetadataProcessor.get_text_metadata),
(FONT_EXTENSIONS, MetadataProcessor.get_font_metadata),
(ARCHIVE_EXTENSIONS, MetadataProcessor.get_archive_metadata),
]:
if ext in exts:
return func(filepath)
return MetadataProcessor.get_generic_metadata(filepath)
@staticmethod
def process_file(filepath, action):
"""
Main dispatcher. Handles the combo action (remove_obfuscate):
removes what can be removed, then obfuscates remaining fields.
For images: remove only (obfuscate after remove would re-insert data).
For PE/shortcuts: single pass remove, then single pass obfuscate.
Always modifies filesystem timestamps as the very last step.
"""
ext = Path(filepath).suffix.lower()
dispatch = [
(IMAGE_EXTENSIONS, MetadataProcessor.process_image),
(AUDIO_EXTENSIONS, MetadataProcessor.process_audio),
(VIDEO_EXTENSIONS, MetadataProcessor.process_video),
(PDF_EXTENSIONS, MetadataProcessor.process_pdf),
(OFFICE_EXTENSIONS, MetadataProcessor.process_office),
(EXECUTABLE_EXTENSIONS, MetadataProcessor.process_executable),
(SHORTCUT_EXTENSIONS, MetadataProcessor.process_shortcut),
(TEXT_CODE_EXTENSIONS, MetadataProcessor.process_text),
(FONT_EXTENSIONS, MetadataProcessor.process_font),
(ARCHIVE_EXTENSIONS, MetadataProcessor.process_archive),
]
handler = MetadataProcessor.process_generic
for exts, func in dispatch:
if ext in exts:
handler = func
break
if action == ACTION_REMOVE_OBFUSCATE:
# For images/audio/video: remove strips everything, obfuscate after
# would re-insert minimal data — so just remove
strip_only = (IMAGE_EXTENSIONS | AUDIO_EXTENSIONS | VIDEO_EXTENSIONS |
OFFICE_EXTENSIONS | ARCHIVE_EXTENSIONS)
if ext in strip_only:
ok, msg = handler(filepath, ACTION_REMOVE)
msg = f"Remove+Obfuscate: {msg} (clean strip)"
else:
# For PE, shortcuts, text, generic: remove first, then obfuscate
ok1, msg1 = handler(filepath, ACTION_REMOVE)
ok2, msg2 = handler(filepath, ACTION_OBFUSCATE)
ok = ok1 or ok2
msgs = []
if "No " not in msg1 and "No " not in msg2:
msgs = [msg1, msg2]
elif "No " in msg1:
msgs = [msg2] if "No " not in msg2 else [msg1]
else:
msgs = [msg1]
msg = f"Remove+Obfuscate: {' → '.join(msgs)}"
else:
ok, msg = handler(filepath, action)
# Set filesystem timestamps as the VERY LAST operation
# This ensures no subsequent file reads reset the access time
ts_ok = modify_file_timestamps(filepath, action)
if ts_ok:
ts_label = "zeroed" if action in (ACTION_REMOVE, ACTION_REMOVE_OBFUSCATE) else "randomized"
msg += f" | timestamps {ts_label}"
return ok, msg
# ============================================================
# SYSTEM TRAY
# ============================================================
class TrayManager:
def __init__(self, app):
self.app = app
self.icon = None
def show_tray(self):
if not HAS_PYSTRAY or not HAS_PILLOW:
return False
img = Image.new("RGBA", (64, 64), (26, 27, 38, 255))
draw = ImageDraw.Draw(img)
draw.rounded_rectangle([12, 8, 52, 48], radius=6, fill=(122, 162, 247, 255))
draw.polygon([(12, 42), (32, 58), (52, 42)], fill=(122, 162, 247, 255))
draw.line([(22, 30), (30, 38), (44, 20)], fill=(26, 27, 38, 255), width=4)
menu = pystray.Menu(
pystray.MenuItem("Show Window", self._show, default=True),
pystray.Menu.SEPARATOR,
pystray.MenuItem("Exit", self._quit),
)
self.icon = pystray.Icon("metadata_tool", img, "Metadata Cleaner", menu)
self.app.root.withdraw()
threading.Thread(target=self.icon.run, daemon=True).start()
return True
def hide_tray(self):
if self.icon and self.icon.visible:
self.icon.stop()
self.app.root.deiconify()
self.app.root.lift()
def _show(self, *a):
self.app.root.after(0, self.hide_tray)
def _quit(self, *a):
if self.icon and self.icon.visible:
self.icon.stop()
self.app.root.after(0, self.app.root.destroy)
# ============================================================
# FOLDER WATCHER
# ============================================================
class FolderWatcher:
def __init__(self, app):
self.app = app
self.running = False
self.known_files = set()
def start(self):
if self.running:
return
self.running = True
self.known_files = set(self.app._collect_files())
threading.Thread(target=self._loop, daemon=True).start()
self.app._log("Auto-watch started — monitoring for new files...", "cyan_tag")
def stop(self):
if self.running:
self.running = False
self.app._log("Auto-watch stopped.", "dim")
def _loop(self):
while self.running:
try:
current = set(self.app._collect_files())
new = current - self.known_files
if new:
action = self.app.action_var.get()
self.app.root.after(0, self.app._log,
f"Auto-watch: {len(new)} new file(s)", "info")
for fp in new:
try:
ok, msg = MetadataProcessor.process_file(fp, action)
sym = "\u2713" if ok else "\u2717"
tag = "success" if ok else "error"
self.app.root.after(0, self.app._log,
f" {sym} {os.path.basename(fp)}: {msg}", tag)
except Exception as e:
self.app.root.after(0, self.app._log,
f" \u2717 {os.path.basename(fp)}: {e}", "error")
self.known_files = current
except Exception:
pass
time.sleep(WATCH_INTERVAL)
# ============================================================
# THREAD-SAFE DRAG & DROP
# ============================================================
class DragDropManager:
"""
Thread-safe drag-and-drop using a queue.
windnd calls its callback from a COM thread — we push paths into
a queue and poll it from the tkinter main loop via root.after().
"""
def __init__(self, app, target_widget):
self.app = app
self.widget = target_widget
self.drop_queue = queue.Queue()
self.backend = None
self._setup()
# Start polling the queue from the main thread
self._poll_queue()
def _setup(self):
if HAS_WINDND:
try:
# Hook the target widget — windnd will call _on_drop from COM thread
windnd.hook_dropfiles(self.widget, func=self._on_windnd_drop)
self.backend = "windnd"
return
except Exception:
pass
if HAS_TKDND2:
try:
self.widget.tk.eval('package require tkdnd')
self.widget.tk.eval(f'tkdnd::drop_target register {self.widget._w} *')
self.widget.bind('<<Drop>>', self._on_tkdnd_drop)
self.backend = "tkdnd2"
return
except Exception:
pass
self.backend = None
def _on_windnd_drop(self, raw_paths):
"""
Called from COM thread by windnd — do NOT touch tkinter here!
Just push paths into the thread-safe queue.
"""
paths = []
for p in raw_paths:
if isinstance(p, bytes):
p = p.decode("utf-8", errors="replace")
p = p.strip().strip('"').strip("'")
if p:
paths.append(p)
if paths:
self.drop_queue.put(paths)
def _on_tkdnd_drop(self, event):
"""Called from main thread by tkdnd — safe to use tkinter."""
raw = event.data
if raw.startswith("{"):
paths = re.findall(r'\{([^}]+)\}', raw)
else:
paths = raw.split()
paths = [p for p in paths if os.path.exists(p)]
if paths:
self.app._handle_drop(paths)
return event.action
def _poll_queue(self):
"""Drain the queue on the main thread — safe for tkinter calls."""
try:
while True:
paths = self.drop_queue.get_nowait()
self.app._handle_drop(paths)
except queue.Empty:
pass
# Re-schedule
self.app.root.after(100, self._poll_queue)
# ============================================================
# MAIN GUI
# ============================================================
class MetadataToolApp:
def __init__(self, root, start_minimized=False):
self.root = root
self.root.title("Metadata Cleaner & Obfuscator v4")
self.root.geometry("1080x840")
self.root.minsize(920, 700)
self.root.configure(bg=COLORS["bg"])
self.folders = []
self.individual_files = []
self.processing = False
self.custom_extensions = set()
self.watcher = FolderWatcher(self)
self.tray_manager = TrayManager(self)
self.style = ttk.Style()
self.style.theme_use("clam")
self.style.configure("green.Horizontal.TProgressbar",
troughcolor=COLORS["surface2"],
background=COLORS["success"])
self._build_header()
self._build_main_content()
self._build_footer()
# Setup drag-and-drop AFTER the UI is built
self.dnd_manager = DragDropManager(self, self.dropzone)
# Update drop zone text based on backend
self._update_dropzone_text()
self._load_settings()
self.root.protocol("WM_DELETE_WINDOW", self._on_close)
if start_minimized:
self.root.after(200, self._minimize_to_tray)
def _update_dropzone_text(self):
if self.dnd_manager.backend:
txt = "\u2b07 Drag & drop files or folders here \u2b07"
color = COLORS["accent"]
else:
txt = "\U0001F4C2 Click here to browse for files or folders"
color = COLORS["text_dim"]
self.drop_canvas.itemconfig(self.drop_label_id, text=txt, fill=color)
# ---- SETTINGS ----
def _gather_settings(self):
return {
"folders": list(self.folders),
"individual_files": list(self.individual_files),
"action": self.action_var.get(),
"recursive": self.recursive_var.get(),
"auto_watch": self.auto_watch_var.get(),
"minimize_tray": self.minimize_tray_var.get(),
"start_icon": self.start_icon_var.get(),
"filter_images": self.filter_images.get(),
"filter_audio": self.filter_audio.get(),
"filter_video": self.filter_video.get(),
"filter_pdf": self.filter_pdf.get(),
"filter_office": self.filter_office.get(),
"filter_exe": self.filter_exe.get(),
"filter_shortcuts": self.filter_shortcuts.get(),
"filter_text": self.filter_text.get(),
"filter_fonts": self.filter_fonts.get(),
"filter_archives": self.filter_archives.get(),
"filter_custom": self.filter_custom.get(),
"custom_extensions_text": self.custom_ext_text.get("1.0", "end").strip(),
"custom_extensions": sorted(self.custom_extensions),
"window_geometry": self.root.geometry(),
}
def _save_settings(self):
try:
save_config(self._gather_settings())
except Exception:
pass
def _load_settings(self):
cfg = load_config()
if not cfg:
return
if "window_geometry" in cfg:
try:
self.root.geometry(cfg["window_geometry"])
except Exception:
pass
if "action" in cfg:
self.action_var.set(cfg["action"])
for key, var in {
"recursive": self.recursive_var, "minimize_tray": self.minimize_tray_var,
"start_icon": self.start_icon_var, "filter_images": self.filter_images,
"filter_audio": self.filter_audio, "filter_video": self.filter_video,
"filter_pdf": self.filter_pdf, "filter_office": self.filter_office,
"filter_exe": self.filter_exe, "filter_shortcuts": self.filter_shortcuts,
"filter_text": self.filter_text, "filter_fonts": self.filter_fonts,
"filter_archives": self.filter_archives, "filter_custom": self.filter_custom,
}.items():
if key in cfg:
var.set(bool(cfg[key]))
if "custom_extensions_text" in cfg:
self.custom_ext_text.delete("1.0", "end")
self.custom_ext_text.insert("1.0", cfg["custom_extensions_text"])
if "custom_extensions" in cfg:
self.custom_extensions = set(cfg["custom_extensions"])
for folder in cfg.get("folders", []):
if os.path.isdir(folder) and folder not in self.folders:
self.folders.append(folder)
self.source_listbox.insert("end", f"\U0001F4C1 {folder}")
for fp in cfg.get("individual_files", []):
if os.path.isfile(fp) and fp not in self.individual_files:
self.individual_files.append(fp)
self.source_listbox.insert("end", f"\U0001F4C4 {fp}")
# Carica e avvia auto-watch se abilitato
if cfg.get("auto_watch", False):
self.auto_watch_var.set(True)
# Avvia il watcher DOPO che la UI è completamente caricata
self.root.after(500, self._start_auto_watch_if_enabled)
self._log("Settings loaded from config file", "dim")
def _start_auto_watch_if_enabled(self):
"""Avvia l'auto-watch se la checkbox è spuntata e ci sono cartelle"""
if self.auto_watch_var.get():
if self.folders:
self.watcher.start()
self._log("Auto-watch automatically started from saved settings", "cyan_tag")
else:
# Se non ci sono cartelle, disabilita l'auto-watch
self.auto_watch_var.set(False)
self._log("Auto-watch disabled: no folders available", "warning")
self._save_settings()
def _check_auto_watch_status(self):
"""Metodo di debug per verificare lo stato dell'auto-watch"""
status = "running" if self.watcher.running else "stopped"
self._log(f"Auto-watch status: {status}, folders: {len(self.folders)}", "dim")
# ---- UI BUILDING ----
def _build_header(self):
header = tk.Frame(self.root, bg=COLORS["bg"])
header.pack(fill="x", padx=20, pady=(14, 2))
tr = tk.Frame(header, bg=COLORS["bg"])
tr.pack(fill="x")
tk.Label(tr, text="\u2728 Metadata Cleaner & Obfuscator",
bg=COLORS["bg"], fg=COLORS["accent"],
font=("Segoe UI", 18, "bold")).pack(side="left")
tf = tk.Frame(tr, bg=COLORS["bg"])
tf.pack(side="right")
self.minimize_tray_var = tk.BooleanVar(value=False)
tk.Checkbutton(tf, text="Minimize to Tray", variable=self.minimize_tray_var,
bg=COLORS["bg"], fg=COLORS["text_dim"], selectcolor=COLORS["surface2"],
activebackground=COLORS["bg"], activeforeground=COLORS["text"],
font=("Segoe UI", 9), cursor="hand2",
command=self._save_settings).pack(side="left", padx=(0, 12))
self.start_icon_var = tk.BooleanVar(value=False)
tk.Checkbutton(tf, text="Start as Icon", variable=self.start_icon_var,
bg=COLORS["bg"], fg=COLORS["text_dim"], selectcolor=COLORS["surface2"],
activebackground=COLORS["bg"], activeforeground=COLORS["text"],
font=("Segoe UI", 9), cursor="hand2",
command=self._save_settings).pack(side="left")
tk.Label(header, text="Remove, obfuscate, or randomize file metadata in bulk",
bg=COLORS["bg"], fg=COLORS["text_dim"],
font=("Segoe UI", 9)).pack(anchor="w", pady=(2, 0))
def _build_main_content(self):
c = tk.Frame(self.root, bg=COLORS["bg"])
c.pack(fill="both", expand=True, padx=20, pady=6)
c.columnconfigure(0, weight=3)
c.columnconfigure(1, weight=2)
c.rowconfigure(0, weight=0, minsize=340)
c.rowconfigure(1, weight=1)
self._build_source_panel(c)
self._build_options_panel(c)
self._build_log_panel(c)
def _build_source_panel(self, parent):
card = tk.Frame(parent, bg=COLORS["surface"],
highlightbackground=COLORS["border"], highlightthickness=1,
padx=12, pady=10)
card.grid(row=0, column=0, sticky="nsew", padx=(0, 6), pady=(0, 6))
top = tk.Frame(card, bg=COLORS["surface"])
top.pack(fill="x")
tk.Label(top, text="\U0001F4C1 Sources", bg=COLORS["surface"],
fg=COLORS["text"], font=("Segoe UI", 11, "bold")).pack(side="left")
bf = tk.Frame(top, bg=COLORS["surface"])
bf.pack(side="right")
for txt, clr, cmd in [
("+ Folder", COLORS["accent"], self._add_folder),
("+ Files", COLORS["cyan"], self._add_files),
("Remove", COLORS["surface2"], self._remove_selected),
("Clear All", COLORS["danger"], self._clear_sources),
]:
fg = "#1a1b26" if clr != COLORS["surface2"] else COLORS["text"]
tk.Button(bf, text=txt, bg=clr, fg=fg, font=("Segoe UI", 8, "bold"),
relief="flat", cursor="hand2", padx=6, pady=2,
command=cmd).pack(side="left", padx=1)
lf = tk.Frame(card, bg=COLORS["surface2"],
highlightbackground=COLORS["border"], highlightthickness=1)
lf.pack(fill="both", expand=True, pady=(8, 4))
self.source_listbox = tk.Listbox(
lf, bg=COLORS["surface2"], fg=COLORS["text"],
selectbackground=COLORS["accent"], selectforeground="#1a1b26",
font=("Consolas", 9), relief="flat", borderwidth=0,
activestyle="none", selectmode="extended")
self.source_listbox.pack(fill="both", expand=True, padx=2, pady=2)
sb = tk.Scrollbar(self.source_listbox, command=self.source_listbox.yview)
sb.pack(side="right", fill="y")
self.source_listbox.config(yscrollcommand=sb.set)
# Drop zone
self.dropzone = tk.Frame(card, bg=COLORS["dropzone"],
highlightbackground=COLORS["dropzone_border"],
highlightthickness=2, height=70, cursor="hand2")
self.dropzone.pack(fill="x", pady=(4, 4))
self.dropzone.pack_propagate(False)
self.drop_canvas = tk.Canvas(self.dropzone, bg=COLORS["dropzone"],
highlightthickness=0, bd=0)
self.drop_canvas.pack(fill="both", expand=True, padx=4, pady=4)
self.drop_label_id = self.drop_canvas.create_text(
0, 0, text="", fill=COLORS["accent"], font=("Segoe UI", 10), anchor="center")
def _redraw(event=None):
w, h = self.drop_canvas.winfo_width(), self.drop_canvas.winfo_height()
self.drop_canvas.delete("border")
self.drop_canvas.create_rectangle(4, 4, w - 4, h - 4,
outline=COLORS["dropzone_border"],
width=2, dash=(8, 4), tags="border")
self.drop_canvas.coords(self.drop_label_id, w // 2, h // 2)
self.drop_canvas.bind("<Configure>", _redraw)
self.drop_canvas.bind("<Button-1>", lambda e: self._drop_click())
self.dropzone.bind("<Button-1>", lambda e: self._drop_click())
for w in (self.drop_canvas, self.dropzone):
w.bind("<Enter>", lambda e: (
self.drop_canvas.config(bg=COLORS["dropzone_hover"]),
self.dropzone.config(bg=COLORS["dropzone_hover"])))
w.bind("<Leave>", lambda e: (
self.drop_canvas.config(bg=COLORS["dropzone"]),
self.dropzone.config(bg=COLORS["dropzone"])))
# Checkboxes
cf = tk.Frame(card, bg=COLORS["surface"])
cf.pack(fill="x", pady=(4, 0))
self.recursive_var = tk.BooleanVar(value=True)
tk.Checkbutton(cf, text=" Include subfolders", variable=self.recursive_var,
bg=COLORS["surface"], fg=COLORS["text"], selectcolor=COLORS["surface2"],
activebackground=COLORS["surface"], activeforeground=COLORS["text"],
font=("Segoe UI", 9), cursor="hand2",
command=self._save_settings).pack(side="left")
self.auto_watch_var = tk.BooleanVar(value=False)
tk.Checkbutton(cf, text=" Auto-watch (auto-process new files)",
variable=self.auto_watch_var, bg=COLORS["surface"], fg=COLORS["warning"],
selectcolor=COLORS["surface2"], activebackground=COLORS["surface"],
activeforeground=COLORS["warning"], font=("Segoe UI", 9, "bold"),
cursor="hand2", command=self._toggle_auto_watch).pack(side="left", padx=(14, 0))
def _build_options_panel(self, parent):
card = tk.Frame(parent, bg=COLORS["surface"],
highlightbackground=COLORS["border"], highlightthickness=1,
padx=12, pady=10)
card.grid(row=0, column=1, sticky="nsew", padx=(6, 0), pady=(0, 6))
canvas = tk.Canvas(card, bg=COLORS["surface"], highlightthickness=0, bd=0)
inner = tk.Frame(canvas, bg=COLORS["surface"])
vsb = tk.Scrollbar(card, orient="vertical", command=canvas.yview)
canvas.configure(yscrollcommand=vsb.set)
vsb.pack(side="right", fill="y")
canvas.pack(side="left", fill="both", expand=True)
cw = canvas.create_window((0, 0), window=inner, anchor="nw")
def _oc(e):
canvas.configure(scrollregion=canvas.bbox("all"))
canvas.itemconfig(cw, width=e.width)
canvas.bind("<Configure>", _oc)
def _mw(e):
canvas.yview_scroll(int(-1 * (e.delta / 120)), "units")
canvas.bind_all("<MouseWheel>", _mw, add="+")
o = inner
tk.Label(o, text="\u2699\ufe0f Options", bg=COLORS["surface"],
fg=COLORS["text"], font=("Segoe UI", 11, "bold")).pack(anchor="w")
tk.Label(o, text="Action:", bg=COLORS["surface"],
fg=COLORS["text_dim"], font=("Segoe UI", 9)).pack(anchor="w", pady=(8, 3))
self.action_var = tk.StringVar(value=ACTION_REMOVE)
for val, label, desc in [
(ACTION_REMOVE, "Remove metadata", "Strip all removable metadata"),
(ACTION_OBFUSCATE, "Obfuscate metadata", "Replace all with random data"),
(ACTION_RANDOMIZE, "Randomize metadata", "Fill with random values"),
(ACTION_REMOVE_OBFUSCATE, "Remove + Obfuscate", "Remove what's possible, obfuscate the rest"),
]:
f = tk.Frame(o, bg=COLORS["surface"])
f.pack(fill="x", pady=1)
tk.Radiobutton(f, text=f" {label}", variable=self.action_var, value=val,
bg=COLORS["surface"], fg=COLORS["text"], selectcolor=COLORS["surface2"],
activebackground=COLORS["surface"], activeforeground=COLORS["text"],
font=("Segoe UI", 10), cursor="hand2",
command=self._save_settings).pack(anchor="w")
tk.Label(f, text=f" {desc}", bg=COLORS["surface"],
fg=COLORS["text_dim"], font=("Segoe UI", 8)).pack(anchor="w")
tk.Frame(o, bg=COLORS["border"], height=1).pack(fill="x", pady=(10, 6))
tk.Label(o, text="File types:", bg=COLORS["surface"],
fg=COLORS["text_dim"], font=("Segoe UI", 9)).pack(anchor="w", pady=(2, 3))
self.filter_images = tk.BooleanVar(value=True)
self.filter_audio = tk.BooleanVar(value=True)
self.filter_video = tk.BooleanVar(value=True)
self.filter_pdf = tk.BooleanVar(value=True)
self.filter_office = tk.BooleanVar(value=True)
self.filter_exe = tk.BooleanVar(value=False)
self.filter_shortcuts = tk.BooleanVar(value=False)
self.filter_text = tk.BooleanVar(value=False)
self.filter_fonts = tk.BooleanVar(value=False)
self.filter_archives = tk.BooleanVar(value=False)
self.filter_custom = tk.BooleanVar(value=False)
for var, label, desc in [
(self.filter_images, "Images", "jpg, png, tiff, webp, bmp, gif, ico"),
(self.filter_audio, "Audio", "mp3, flac, ogg, m4a, wav, opus"),
(self.filter_video, "Video", "mp4, mkv, avi, mov, wmv, webm"),
(self.filter_pdf, "PDF", "pdf"),
(self.filter_office, "Office", "docx, xlsx, pptx, doc, odt"),
(self.filter_exe, "Executables", "exe, dll, msi, sys"),
(self.filter_shortcuts, "Shortcuts", "lnk, url, desktop"),
(self.filter_text, "Text / Code", "txt, md, html, css, js, py..."),
(self.filter_fonts, "Fonts", "ttf, otf, woff, woff2"),
(self.filter_archives, "Archives", "zip, rar, 7z, tar, gz"),
]:
f = tk.Frame(o, bg=COLORS["surface"])
f.pack(fill="x")
tk.Checkbutton(f, text=f" {label}", variable=var, bg=COLORS["surface"],
fg=COLORS["text"], selectcolor=COLORS["surface2"],
activebackground=COLORS["surface"], activeforeground=COLORS["text"],
font=("Segoe UI", 9), cursor="hand2",
command=self._save_settings).pack(side="left")
tk.Label(f, text=f" ({desc})", bg=COLORS["surface"],
fg=COLORS["text_dim"], font=("Segoe UI", 7)).pack(side="left")
tk.Frame(o, bg=COLORS["border"], height=1).pack(fill="x", pady=(8, 6))
tk.Checkbutton(o, text=" Custom Extensions", variable=self.filter_custom,
bg=COLORS["surface"], fg=COLORS["orange"], selectcolor=COLORS["surface2"],
activebackground=COLORS["surface"], activeforeground=COLORS["orange"],
font=("Segoe UI", 9, "bold"), cursor="hand2",
command=self._save_settings).pack(anchor="w")
tk.Label(o, text="One per line (e.g. .dat):", bg=COLORS["surface"],
fg=COLORS["text_dim"], font=("Segoe UI", 8)).pack(anchor="w", pady=(2, 2))
ef = tk.Frame(o, bg=COLORS["surface2"], highlightbackground=COLORS["border"],
highlightthickness=1)
ef.pack(fill="x", pady=(0, 4))
self.custom_ext_text = tk.Text(ef, bg=COLORS["surface2"], fg=COLORS["orange"],
font=("Consolas", 9), relief="flat", borderwidth=0,
height=3, insertbackground=COLORS["orange"],
selectbackground=COLORS["accent"],
selectforeground="#1a1b26")
self.custom_ext_text.pack(fill="x", padx=4, pady=4)
self.custom_ext_text.insert("1.0", ".dat\n.bin\n.raw")
tk.Button(o, text="Apply Custom Extensions", bg=COLORS["orange"],
fg="#1a1b26", font=("Segoe UI", 8, "bold"), relief="flat", cursor="hand2",
padx=8, pady=2, command=self._apply_custom_extensions).pack(anchor="w", pady=(2, 4))
tk.Frame(o, bg=COLORS["border"], height=1).pack(fill="x", pady=(6, 6))
tk.Button(o, text="\U0001F50D Preview Metadata", bg=COLORS["purple"],
fg="#1a1b26", font=("Segoe UI", 9, "bold"), relief="flat", cursor="hand2",
padx=10, pady=4, command=self._preview_metadata).pack(fill="x")
def _build_log_panel(self, parent):
card = tk.Frame(parent, bg=COLORS["surface"],
highlightbackground=COLORS["border"], highlightthickness=1,
padx=12, pady=10)
card.grid(row=1, column=0, columnspan=2, sticky="nsew", pady=(6, 0))
top = tk.Frame(card, bg=COLORS["surface"])
top.pack(fill="x")
tk.Label(top, text="\U0001F4CB Log", bg=COLORS["surface"],
fg=COLORS["text"], font=("Segoe UI", 11, "bold")).pack(side="left")
tk.Button(top, text="Clear", bg=COLORS["surface2"], fg=COLORS["text"],
font=("Segoe UI", 8), relief="flat", cursor="hand2",
padx=8, pady=2, command=self._clear_log).pack(side="right")
lf = tk.Frame(card, bg=COLORS["surface2"],
highlightbackground=COLORS["border"], highlightthickness=1)
lf.pack(fill="both", expand=True, pady=(6, 0))
self.log_text = tk.Text(lf, bg=COLORS["surface2"], fg=COLORS["text"],
font=("Consolas", 9), relief="flat", borderwidth=0,
wrap="word", state="disabled", insertbackground=COLORS["text"],
selectbackground=COLORS["accent"], selectforeground="#1a1b26")
self.log_text.pack(fill="both", expand=True, padx=4, pady=4)
sb = tk.Scrollbar(self.log_text, command=self.log_text.yview)
sb.pack(side="right", fill="y")
self.log_text.config(yscrollcommand=sb.set)
for tag, color in [("success", COLORS["success"]), ("error", COLORS["danger"]),
("warning", COLORS["warning"]), ("info", COLORS["accent"]),
("dim", COLORS["text_dim"]), ("cyan_tag", COLORS["cyan"])]:
self.log_text.tag_configure(tag, foreground=color)
self.progress_var = tk.DoubleVar(value=0)
ttk.Progressbar(card, variable=self.progress_var, maximum=100,
style="green.Horizontal.TProgressbar").pack(fill="x", pady=(6, 0))
def _build_footer(self):
f = tk.Frame(self.root, bg=COLORS["bg"])
f.pack(fill="x", padx=20, pady=(6, 14))
self.status_label = tk.Label(f, text="Ready", bg=COLORS["bg"],
fg=COLORS["text_dim"], font=("Segoe UI", 9))
self.status_label.pack(side="left")
self.run_btn = tk.Button(f, text="\u25B6 Process Files", bg=COLORS["success"],
fg="#1a1b26", font=("Segoe UI", 11, "bold"),
relief="flat", cursor="hand2", padx=24, pady=8,
command=self._start_processing)
self.run_btn.pack(side="right")
tk.Label(f, text=f"Config: {get_config_path()}", bg=COLORS["bg"],
fg=COLORS["text_dim"], font=("Segoe UI", 7)).pack(side="left", padx=(12, 0))
# ---- CALLBACKS ----
def _add_folder(self):
d = filedialog.askdirectory(title="Select source folder")
if d: self._add_path(d)
def _add_files(self):
for f in filedialog.askopenfilenames(title="Select files"):
self._add_path(f)
def _add_path(self, path):
if os.path.isdir(path) and path not in self.folders:
self.folders.append(path)
self.source_listbox.insert("end", f"\U0001F4C1 {path}")
self._log(f"Added folder: {path}", "info")
self._save_settings()
elif os.path.isfile(path) and path not in self.individual_files:
self.individual_files.append(path)
self.source_listbox.insert("end", f"\U0001F4C4 {path}")
self._log(f"Added file: {os.path.basename(path)}", "info")
self._save_settings()
def _remove_selected(self):
sel = self.source_listbox.curselection()
if not sel: return
items = list(self.source_listbox.get(0, "end"))
for idx in reversed(sel):
raw = items[idx][2:]
if raw in self.folders: self.folders.remove(raw)
elif raw in self.individual_files: self.individual_files.remove(raw)
self.source_listbox.delete(idx)
self._save_settings()
def _clear_sources(self):
self.folders.clear()
self.individual_files.clear()
self.source_listbox.delete(0, "end")
self._log("All sources cleared", "dim")
self._save_settings()
def _handle_drop(self, paths):
for p in paths:
p = str(p).strip().strip('"').strip("'")
if os.path.exists(p):
self._add_path(p)
def _drop_click(self):
d = filedialog.askdirectory(title="Select folder (Cancel for files)")
if d:
self._add_path(d)
else:
for f in filedialog.askopenfilenames(title="Select files"):
self._add_path(f)
def _apply_custom_extensions(self):
raw = self.custom_ext_text.get("1.0", "end").strip()
self.custom_extensions.clear()
for line in raw.split("\n"):
ext = line.strip().lower()
if ext and not ext.startswith("#"):
if not ext.startswith("."): ext = "." + ext
self.custom_extensions.add(ext)
if self.custom_extensions:
self._log(f"Custom extensions: {', '.join(sorted(self.custom_extensions))}", "info")
self._save_settings()
def _toggle_auto_watch(self):
if self.auto_watch_var.get():
if not self.folders:
messagebox.showwarning("No Folders",
"Add at least one folder for auto-watch.\n\n"
"Auto-watch will be automatically enabled when you add folders.")
self.auto_watch_var.set(False)
return
# Controlla se ci sono cartelle valide
valid_folders = [f for f in self.folders if os.path.isdir(f)]
if not valid_folders:
messagebox.showwarning("Invalid Folders",
"None of the saved folders exist anymore.\n"
"Please add new folders for auto-watch.")
self.auto_watch_var.set(False)
return
self.watcher.start()
self._log("Auto-watch started", "cyan_tag")
else:
self.watcher.stop()
self._log("Auto-watch stopped", "dim")
self._save_settings()
def _minimize_to_tray(self):
if not (HAS_PYSTRAY and HAS_PILLOW) or not self.tray_manager.show_tray():
self.root.iconify()
def _on_close(self):
self._save_settings()
if self.minimize_tray_var.get() and HAS_PYSTRAY and HAS_PILLOW:
self._minimize_to_tray()
else:
self.watcher.stop()
self.root.destroy()
def _clear_log(self):
self.log_text.config(state="normal")
self.log_text.delete("1.0", "end")
self.log_text.config(state="disabled")
def _log(self, msg, tag="info"):
ts = datetime.now().strftime("%H:%M:%S")
self.log_text.config(state="normal")
self.log_text.insert("end", f"[{ts}] ", "dim")
self.log_text.insert("end", f"{msg}\n", tag)
self.log_text.see("end")
self.log_text.config(state="disabled")
def _get_allowed_extensions(self):
exts = set()
if self.filter_images.get(): exts |= IMAGE_EXTENSIONS
if self.filter_audio.get(): exts |= AUDIO_EXTENSIONS
if self.filter_video.get(): exts |= VIDEO_EXTENSIONS
if self.filter_pdf.get(): exts |= PDF_EXTENSIONS
if self.filter_office.get(): exts |= OFFICE_EXTENSIONS
if self.filter_exe.get(): exts |= EXECUTABLE_EXTENSIONS
if self.filter_shortcuts.get(): exts |= SHORTCUT_EXTENSIONS
if self.filter_text.get(): exts |= TEXT_CODE_EXTENSIONS
if self.filter_fonts.get(): exts |= FONT_EXTENSIONS
if self.filter_archives.get(): exts |= ARCHIVE_EXTENSIONS
if self.filter_custom.get(): exts |= self.custom_extensions
return exts
def _collect_files(self):
allowed = self._get_allowed_extensions()
files = []
rec = self.recursive_var.get()
for folder in self.folders:
if rec:
for dp, _, fns in os.walk(folder):
for fn in fns:
if Path(fn).suffix.lower() in allowed:
files.append(os.path.join(dp, fn))
else:
try:
for fn in os.listdir(folder):
fp = os.path.join(folder, fn)
if os.path.isfile(fp) and Path(fn).suffix.lower() in allowed:
files.append(fp)
except PermissionError:
pass
for fp in self.individual_files:
if os.path.isfile(fp) and Path(fp).suffix.lower() in allowed and fp not in files:
files.append(fp)
return files
def _preview_metadata(self):
if not self.folders and not self.individual_files:
messagebox.showwarning("No Sources", "Add at least one source.")
return
files = self._collect_files()
if not files:
self._log("No supported files found.", "warning")
return
n = min(8, len(files))
# Step 1: Save all timestamps BEFORE any file I/O
saved_timestamps = {}
for fp in files[:n]:
try:
st = os.stat(fp)
saved_timestamps[fp] = (st.st_ctime, st.st_atime, st.st_mtime)
except Exception:
pass
# Step 2: Read and display metadata (this will touch access times)
self._log(f"--- Preview ({n} of {len(files)} files) ---", "info")
for fp in files[:n]:
self._log(f"\n \U0001F4C4 {fp}", "info")
for k, v in MetadataProcessor.get_metadata(fp).items():
self._log(f" {k}: {str(v)[:120]}", "dim")
if len(files) > n:
self._log(f" ... and {len(files) - n} more", "dim")
# Step 3: Restore ALL timestamps AFTER all reads are completely done
# Small delay to let Python close any lingering file handles
self.root.after(200, self._restore_saved_timestamps, saved_timestamps)
def _restore_saved_timestamps(self, saved_timestamps):
"""Batch-restore timestamps after preview is fully done."""
for fp, (ctime_ts, atime_ts, mtime_ts) in saved_timestamps.items():
try:
_restore_timestamps_win32(fp, ctime_ts, atime_ts, mtime_ts)
except Exception:
pass
def _start_processing(self):
if self.processing: return
if not self.folders and not self.individual_files:
messagebox.showwarning("No Sources", "Add at least one source.")
return
files = self._collect_files()
if not files:
messagebox.showinfo("No Files", "No matching files found.")
return
action = self.action_var.get()
labels = {"remove": "REMOVE", "obfuscate": "OBFUSCATE", "randomize": "RANDOMIZE",
"remove_obfuscate": "REMOVE + OBFUSCATE"}
if not messagebox.askyesno("Confirm",
f"This will {labels[action]} metadata for {len(files)} file(s).\n\n"
f"Files are modified IN PLACE — cannot be undone.\nMake backups!\n\nContinue?"):
return
self.processing = True
self.run_btn.config(state="disabled", bg=COLORS["border"])
self.progress_var.set(0)
threading.Thread(target=self._worker, args=(files, action), daemon=True).start()
def _worker(self, files, action):
total = len(files)
ok_n = fail_n = 0
self.root.after(0, self._log, f"Starting {action} on {total} files...", "info")
for i, fp in enumerate(files):
try:
ok, msg = MetadataProcessor.process_file(fp, action)
sym, tag = ("\u2713", "success") if ok else ("\u2717", "error")
if ok: ok_n += 1
else: fail_n += 1
self.root.after(0, self._log, f"{sym} {os.path.basename(fp)}: {msg}", tag)
except Exception as e:
fail_n += 1
self.root.after(0, self._log, f"\u2717 {os.path.basename(fp)}: {e}", "error")
self.root.after(0, self.progress_var.set, ((i + 1) / total) * 100)
self.root.after(0, self.status_label.config, {"text": f"Processing {i+1}/{total}..."})
# Final pass: re-set timestamps for ALL files to ensure Accessed is correct
# Python/Windows may have updated access times during processing
time.sleep(0.3) # let OS flush any pending file handle operations
for fp in files:
try:
modify_file_timestamps(fp, action)
except Exception:
pass
tag = "success" if fail_n == 0 else "warning"
self.root.after(0, self._log, f"Done! {ok_n} OK, {fail_n} failed / {total} total.", tag)
self.root.after(0, self.status_label.config, {"text": "Ready"})
self.root.after(0, self._finish)
def _finish(self):
self.processing = False
self.run_btn.config(state="normal", bg=COLORS["success"])
# ============================================================
# ENTRY POINT
# ============================================================
def main():
cfg = load_config()
start_min = "--start-icon" in sys.argv or "--minimized" in sys.argv or cfg.get("start_icon", False)
root = tk.Tk()
try:
import ctypes
root.update_idletasks()
hwnd = ctypes.windll.user32.GetParent(root.winfo_id())
ctypes.windll.dwmapi.DwmSetWindowAttribute(
hwnd, 20, ctypes.byref(ctypes.c_int(1)), ctypes.sizeof(ctypes.c_int))
except Exception:
pass
MetadataToolApp(root, start_minimized=start_min)
root.mainloop()
if __name__ == "__main__":
main()
Commenti
Posta un commento