451 lines
16 KiB
Python
451 lines
16 KiB
Python
import os
|
||
import queue
|
||
import threading
|
||
import subprocess
|
||
import sounddevice as sd
|
||
import soundfile as sf
|
||
import keyboard
|
||
import pygame
|
||
import numpy as np
|
||
from collections import deque
|
||
|
||
# PARAMETERS – adjust as needed
|
||
CHANNELS = 2
|
||
SAMPLE_RATE = 44100
|
||
BLOCKSIZE = 1024 # frames per chunk
|
||
|
||
# Look for "Stereo Mix" which is a common device for capturing system audio
|
||
devices = sd.query_devices()
|
||
DEVICE_ID = 14 # Will be set dynamically
|
||
WASAPI_METHOD = False # First try with Stereo Mix, then try WASAPI if available
|
||
|
||
print("Available audio devices:")
|
||
for i, device in enumerate(devices):
|
||
print(f" {i} {device['name']}, {device['hostapi']} ({device['max_input_channels']} in, {device['max_output_channels']} out)")
|
||
# First priority: Stereo Mix
|
||
if "Stereo Mix" in device['name'] and device['max_input_channels'] > 0:
|
||
DEVICE_ID = i
|
||
print(f"Selected device {i}: {device['name']} for system audio recording")
|
||
break
|
||
|
||
# If no Stereo Mix, try to find WASAPI devices
|
||
if DEVICE_ID is None:
|
||
hostapis = sd.query_hostapis()
|
||
wasapi_index = None
|
||
|
||
for i, api in enumerate(hostapis):
|
||
if 'WASAPI' in api['name']:
|
||
wasapi_index = i
|
||
print(f"Found WASAPI at index {i}")
|
||
break
|
||
|
||
if wasapi_index is not None:
|
||
# Try to find a WASAPI output device
|
||
for i, device in enumerate(devices):
|
||
if device['hostapi'] == wasapi_index and device['max_output_channels'] > 0 and 'Speakers' in device['name']:
|
||
DEVICE_ID = i
|
||
print(f"Selected device {i}: {device['name']} (will try WASAPI method)")
|
||
WASAPI_METHOD = True
|
||
break
|
||
|
||
# Fallback to any WASAPI output device
|
||
if DEVICE_ID is None:
|
||
for i, device in enumerate(devices):
|
||
if device['hostapi'] == wasapi_index and device['max_output_channels'] > 0:
|
||
DEVICE_ID = i
|
||
print(f"Selected device {i}: {device['name']} (will try WASAPI method)")
|
||
WASAPI_METHOD = True
|
||
break
|
||
|
||
# If still no device found, prompt user
|
||
if DEVICE_ID is None:
|
||
print("No suitable recording device found. Please select a device manually:")
|
||
for i, device in enumerate(devices):
|
||
print(f" {i} {device['name']}, {device['hostapi']} ({device['max_input_channels']} in, {device['max_output_channels']} out)")
|
||
device_id_input = input("Enter device ID to use: ")
|
||
try:
|
||
DEVICE_ID = int(device_id_input)
|
||
except ValueError:
|
||
print("Invalid device ID")
|
||
exit(1)
|
||
|
||
# Create a local cache directory
|
||
CACHE_DIR = os.path.join(os.getcwd(), "cache")
|
||
os.makedirs(CACHE_DIR, exist_ok=True)
|
||
|
||
# Global variables for visualization
|
||
audio_buffer = deque(maxlen=BLOCKSIZE * 10) # Store more samples for visualization
|
||
|
||
# Updated record_loop to use local cache and avoid file access conflicts
|
||
def record_loop(cut_event: threading.Event):
|
||
"""
|
||
Records audio into successive WAV files.
|
||
When cut_event is set, closes current file, converts it, and starts a new one.
|
||
"""
|
||
global audio_buffer
|
||
q = queue.Queue()
|
||
recording_index = 1
|
||
current_wav = None
|
||
wav_file = None
|
||
|
||
def callback(indata, frames, time, status):
|
||
if status:
|
||
print(f"⚠️ {status}")
|
||
|
||
# Add data to visualizations
|
||
audio_data = indata.copy()
|
||
q.put(audio_data)
|
||
|
||
# Update audio buffer for visualization
|
||
for sample in audio_data:
|
||
audio_buffer.append(sample[0]) # Use first channel for visualization
|
||
|
||
# Set up recording mode
|
||
device_info = sd.query_devices(DEVICE_ID)
|
||
print(f"Using device: {device_info['name']}")
|
||
|
||
try:
|
||
stream = sd.InputStream(
|
||
device=DEVICE_ID,
|
||
channels=CHANNELS,
|
||
samplerate=SAMPLE_RATE,
|
||
blocksize=BLOCKSIZE,
|
||
callback=callback,
|
||
dtype='float32'
|
||
)
|
||
|
||
stream.start()
|
||
print("▶️ Recording... Press 'c' to cut.")
|
||
|
||
while True:
|
||
# Start a new WAV file in the cache directory
|
||
current_wav_path = os.path.join(CACHE_DIR, f"segment_{recording_index:02d}.wav")
|
||
wav_file = sf.SoundFile(
|
||
current_wav_path, mode='w',
|
||
samplerate=SAMPLE_RATE, channels=CHANNELS, subtype='FLOAT'
|
||
)
|
||
|
||
# Write until cut_event
|
||
while not cut_event.is_set():
|
||
try:
|
||
data = q.get(timeout=0.1)
|
||
wav_file.write(data)
|
||
except queue.Empty:
|
||
continue
|
||
|
||
# Cut triggered
|
||
wav_file.close()
|
||
print(f"✂️ Segment {recording_index} captured: {current_wav_path}")
|
||
|
||
# Convert to MP3
|
||
mp3_path = current_wav_path.replace('.wav', '.mp3')
|
||
cmd = [
|
||
'ffmpeg', '-y',
|
||
'-i', current_wav_path,
|
||
'-codec:a', 'libmp3lame',
|
||
'-qscale:a', '2',
|
||
mp3_path
|
||
]
|
||
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||
print(f"🔊 Converted to MP3: {mp3_path}")
|
||
|
||
# Clean up WAV
|
||
os.remove(current_wav_path)
|
||
recording_index += 1
|
||
|
||
# Reset for next segment
|
||
cut_event.clear()
|
||
print("▶️ Recording next segment... Press 'c' to cut.")
|
||
|
||
except Exception as e:
|
||
print(f"❌ Error occurred setting up audio stream: {e}")
|
||
print("\nThere are several ways to fix this:")
|
||
print("1. Try enabling 'Stereo Mix' in Windows Sound settings")
|
||
print(" - Right-click the speaker icon in system tray")
|
||
print(" - Select 'Sound settings' -> 'Sound Control Panel'")
|
||
print(" - Go to 'Recording' tab")
|
||
print(" - Right-click empty space and check 'Show Disabled Devices'")
|
||
print(" - Enable 'Stereo Mix' if available")
|
||
print("2. Try a different device ID")
|
||
print("\nAvailable devices:")
|
||
for i, device in enumerate(sd.query_devices()):
|
||
hostapi_name = sd.query_hostapis(device['hostapi'])['name']
|
||
print(f" {i} {device['name']}, {hostapi_name} ({device['max_input_channels']} in, {device['max_output_channels']} out)")
|
||
|
||
print("\nRun the script again with a specific device ID:")
|
||
print(" .venv/Scripts/python record.py")
|
||
exit(1)
|
||
|
||
def listen_for_cut(cut_event: threading.Event):
|
||
"""Waits for the user to press 'c' and sets the event."""
|
||
while True:
|
||
keyboard.wait('c')
|
||
cut_event.set()
|
||
|
||
# Initialize pygame
|
||
pygame.init()
|
||
|
||
# Screen dimensions
|
||
SCREEN_WIDTH = 1000
|
||
SCREEN_HEIGHT = 700
|
||
|
||
# Modern Colors
|
||
BACKGROUND = (30, 30, 38)
|
||
TEXT_COLOR = (230, 230, 230)
|
||
ACCENT1 = (75, 207, 250) # Light blue
|
||
ACCENT2 = (255, 110, 110) # Soft red
|
||
ACCENT3 = (131, 232, 90) # Green
|
||
ACCENT4 = (245, 194, 66) # Gold
|
||
PANEL_BG = (45, 45, 60)
|
||
|
||
# Create the screen with a modern resolution
|
||
screen = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))
|
||
pygame.display.set_caption("Modern Audio Recorder")
|
||
|
||
# Try to load a nicer font
|
||
try:
|
||
font_file = pygame.font.get_default_font()
|
||
main_font = pygame.font.Font(font_file, 18)
|
||
title_font = pygame.font.Font(font_file, 24)
|
||
button_font = pygame.font.Font(font_file, 20)
|
||
except:
|
||
main_font = pygame.font.Font(None, 24)
|
||
title_font = pygame.font.Font(None, 36)
|
||
button_font = pygame.font.Font(None, 28)
|
||
|
||
# Modern UI Layout
|
||
header_rect = pygame.Rect(0, 0, SCREEN_WIDTH, 60)
|
||
footer_rect = pygame.Rect(0, SCREEN_HEIGHT - 80, SCREEN_WIDTH, 80)
|
||
waveform_rect = pygame.Rect(20, 80, SCREEN_WIDTH - 40, 200)
|
||
|
||
# Modern rounded button class
|
||
class RoundedButton:
|
||
def __init__(self, x, y, width, height, text, color, hover_color):
|
||
self.rect = pygame.Rect(x, y, width, height)
|
||
self.text = text
|
||
self.color = color
|
||
self.hover_color = hover_color
|
||
self.is_hovered = False
|
||
self.radius = 10
|
||
|
||
def draw(self, surface):
|
||
color = self.hover_color if self.is_hovered else self.color
|
||
pygame.draw.rect(surface, color, self.rect, border_radius=self.radius)
|
||
|
||
# Add a slight 3D effect
|
||
highlight = (min(color[0] + 40, 255), min(color[1] + 40, 255), min(color[2] + 40, 255))
|
||
shadow = (max(color[0] - 40, 0), max(color[1] - 40, 0), max(color[2] - 40, 0))
|
||
|
||
# Top highlight
|
||
pygame.draw.rect(surface, highlight,
|
||
(self.rect.x, self.rect.y, self.rect.width, 3),
|
||
border_top_left_radius=self.radius, border_top_right_radius=self.radius)
|
||
|
||
# Bottom shadow
|
||
pygame.draw.rect(surface, shadow,
|
||
(self.rect.x, self.rect.y + self.rect.height - 3, self.rect.width, 3),
|
||
border_bottom_left_radius=self.radius, border_bottom_right_radius=self.radius)
|
||
|
||
text_surface = button_font.render(self.text, True, TEXT_COLOR)
|
||
text_rect = text_surface.get_rect(center=self.rect.center)
|
||
surface.blit(text_surface, text_rect)
|
||
|
||
def update(self, mouse_pos):
|
||
self.is_hovered = self.rect.collidepoint(mouse_pos)
|
||
|
||
def is_clicked(self, mouse_pos, mouse_click):
|
||
return self.rect.collidepoint(mouse_pos) and mouse_click
|
||
|
||
# Modern UI elements
|
||
start_button = RoundedButton(SCREEN_WIDTH//2 - 160, SCREEN_HEIGHT - 65, 100, 40, "START", ACCENT3, (160, 255, 130))
|
||
cut_button = RoundedButton(SCREEN_WIDTH//2 - 45, SCREEN_HEIGHT - 65, 90, 40, "CUT", ACCENT4, (255, 220, 100))
|
||
stop_button = RoundedButton(SCREEN_WIDTH//2 + 60, SCREEN_HEIGHT - 65, 100, 40, "STOP", ACCENT2, (255, 150, 150))
|
||
|
||
# Modern input box
|
||
class ModernInputBox:
|
||
def __init__(self, x, y, width, height, placeholder="Enter filename..."):
|
||
self.rect = pygame.Rect(x, y, width, height)
|
||
self.color_inactive = (100, 100, 120)
|
||
self.color_active = ACCENT1
|
||
self.color = self.color_inactive
|
||
self.text = ""
|
||
self.placeholder = placeholder
|
||
self.active = False
|
||
self.radius = 5
|
||
|
||
def handle_event(self, event):
|
||
if event.type == pygame.MOUSEBUTTONDOWN:
|
||
self.active = self.rect.collidepoint(event.pos)
|
||
self.color = self.color_active if self.active else self.color_inactive
|
||
if event.type == pygame.KEYDOWN and self.active:
|
||
if event.key == pygame.K_RETURN:
|
||
print(f"File name set to: {self.text}")
|
||
return self.text
|
||
elif event.key == pygame.K_BACKSPACE:
|
||
self.text = self.text[:-1]
|
||
else:
|
||
self.text += event.unicode
|
||
return None
|
||
|
||
def draw(self, surface):
|
||
pygame.draw.rect(surface, PANEL_BG, self.rect, border_radius=self.radius)
|
||
pygame.draw.rect(surface, self.color, self.rect, 2, border_radius=self.radius)
|
||
|
||
if self.text:
|
||
text_surface = main_font.render(self.text, True, TEXT_COLOR)
|
||
else:
|
||
text_surface = main_font.render(self.placeholder, True, (130, 130, 140))
|
||
|
||
surface.blit(text_surface, (self.rect.x + 10, self.rect.y + (self.rect.height - text_surface.get_height()) // 2))
|
||
|
||
# Create modern input box
|
||
input_box = ModernInputBox(SCREEN_WIDTH - 260, SCREEN_HEIGHT - 65, 240, 40)
|
||
|
||
# Variables
|
||
running = True
|
||
recording = False
|
||
cut_event = threading.Event()
|
||
recording_thread = None
|
||
silence_counter = 0
|
||
|
||
# Function to draw modern waveform
|
||
def draw_waveform():
|
||
global audio_buffer
|
||
pygame.draw.rect(screen, PANEL_BG, waveform_rect, border_radius=10)
|
||
pygame.draw.rect(screen, ACCENT1, waveform_rect, 2, border_radius=10)
|
||
|
||
# Add waveform title
|
||
title = title_font.render("Waveform", True, TEXT_COLOR)
|
||
screen.blit(title, (waveform_rect.x + 10, waveform_rect.y + 10))
|
||
|
||
if len(audio_buffer) < 2:
|
||
return
|
||
|
||
# Get data for visualization
|
||
buffer_data = np.array(list(audio_buffer))
|
||
max_amplitude = np.max(np.abs(buffer_data))
|
||
if max_amplitude > 0:
|
||
buffer_data = buffer_data / max_amplitude # Normalize to [-1, 1]
|
||
samples_to_show = min(len(buffer_data), 1000)
|
||
|
||
# Determine step size
|
||
step = max(1, len(buffer_data) // samples_to_show)
|
||
points = []
|
||
|
||
# Scale factor to fit within our display area
|
||
scale = 80 # Adjust amplitude scale
|
||
|
||
# Adjust waveform rendering to ensure it fits within the screen
|
||
for i in range(0, len(buffer_data) - step, step):
|
||
x = waveform_rect.x + 10 + (i // step) * ((waveform_rect.width - 20) / samples_to_show)
|
||
y = waveform_rect.y + waveform_rect.height // 2 - buffer_data[i] * scale
|
||
points.append((x, y))
|
||
|
||
# Apply a more effective compressor to cut low volume (silence threshold)
|
||
silence_threshold = 0.05 # Increased threshold to reduce noise
|
||
buffer_data = np.where(np.abs(buffer_data) < silence_threshold, 0, buffer_data)
|
||
|
||
# Improved auto-cut logic to handle silence detection
|
||
silence_duration = 3 # seconds
|
||
silence_frames = int(silence_duration * SAMPLE_RATE)
|
||
global silence_counter
|
||
|
||
if recording:
|
||
if np.all(np.abs(buffer_data[-BLOCKSIZE:]) < silence_threshold):
|
||
silence_counter += BLOCKSIZE
|
||
else:
|
||
silence_counter = 0
|
||
|
||
if silence_counter >= silence_frames:
|
||
cut_event.set()
|
||
silence_counter = 0
|
||
|
||
# Ensure points are tuples of two floats
|
||
points = [(float(x), float(y)) for x, y in points]
|
||
|
||
# Draw waveform lines
|
||
if len(points) > 1:
|
||
pygame.draw.lines(screen, ACCENT1, False, points, 2)
|
||
|
||
# Draw header with title and status
|
||
def draw_header():
|
||
pygame.draw.rect(screen, PANEL_BG, header_rect)
|
||
|
||
# App Title
|
||
title = title_font.render("MODERN AUDIO RECORDER", True, TEXT_COLOR)
|
||
screen.blit(title, (20, header_rect.height // 2 - title.get_height() // 2))
|
||
|
||
# Status indicator
|
||
status_text = "⚫ RECORDING" if recording else "⚪ READY"
|
||
status_color = ACCENT3 if recording else TEXT_COLOR
|
||
status = main_font.render(status_text, True, status_color)
|
||
screen.blit(status, (SCREEN_WIDTH - status.get_width() - 20, header_rect.height // 2 - status.get_height() // 2))
|
||
|
||
# Main loop
|
||
while running:
|
||
screen.fill(BACKGROUND)
|
||
|
||
mouse_pos = pygame.mouse.get_pos()
|
||
mouse_clicked = False
|
||
|
||
for event in pygame.event.get():
|
||
if event.type == pygame.QUIT:
|
||
running = False
|
||
elif event.type == pygame.MOUSEBUTTONDOWN:
|
||
mouse_clicked = True
|
||
input_box.handle_event(event)
|
||
elif event.type == pygame.KEYDOWN:
|
||
input_box.handle_event(event)
|
||
|
||
# Update buttons
|
||
start_button.update(mouse_pos)
|
||
cut_button.update(mouse_pos)
|
||
stop_button.update(mouse_pos)
|
||
|
||
# Handle button clicks
|
||
if mouse_clicked:
|
||
if start_button.is_clicked(mouse_pos, mouse_clicked) and not recording:
|
||
recording = True
|
||
cut_event.clear()
|
||
recording_thread = threading.Thread(target=record_loop, args=(cut_event,), daemon=True)
|
||
recording_thread.start()
|
||
elif cut_button.is_clicked(mouse_pos, mouse_clicked) and recording:
|
||
cut_event.set()
|
||
elif stop_button.is_clicked(mouse_pos, mouse_clicked) and recording:
|
||
recording = False
|
||
cut_event.set()
|
||
# Ensure the recording thread is properly terminated
|
||
if recording_thread and recording_thread.is_alive():
|
||
recording_thread.join(timeout=5) # Wait for the thread to finish with a timeout
|
||
if recording_thread.is_alive():
|
||
print("⚠️ Recording thread did not terminate in time.")
|
||
|
||
# Stop the pygame loop safely
|
||
running = False
|
||
# Clear visualization data when stopping
|
||
audio_buffer.clear()
|
||
|
||
# Update waveform to reflect actual audio data
|
||
if len(audio_buffer) > 0:
|
||
buffer_data = np.array(list(audio_buffer))
|
||
max_amplitude = np.max(np.abs(buffer_data))
|
||
if max_amplitude > 0:
|
||
buffer_data = buffer_data / max_amplitude # Normalize to [-1, 1]
|
||
audio_buffer = deque(buffer_data, maxlen=BLOCKSIZE * 10)
|
||
|
||
# Draw UI components
|
||
draw_header()
|
||
draw_waveform()
|
||
|
||
# Draw controls in the footer area
|
||
pygame.draw.rect(screen, PANEL_BG, footer_rect)
|
||
start_button.draw(screen)
|
||
cut_button.draw(screen)
|
||
stop_button.draw(screen)
|
||
input_box.draw(screen)
|
||
|
||
pygame.display.flip()
|
||
pygame.time.delay(30) # Cap at roughly 30 fps for UI
|
||
|
||
pygame.quit()
|