222 lines
8.4 KiB
Python
222 lines
8.4 KiB
Python
import os
|
||
import queue
|
||
import threading
|
||
import tempfile
|
||
import subprocess
|
||
import sounddevice as sd
|
||
import soundfile as sf
|
||
import keyboard
|
||
|
||
# PARAMETERS – adjust as needed
|
||
CHANNELS = 2
|
||
SAMPLE_RATE = 44100
|
||
BLOCKSIZE = 1024 # frames per chunk
|
||
|
||
# Look for "Stereo Mix" which is a common device for capturing system audio
|
||
devices = sd.query_devices()
|
||
DEVICE_ID = 14 # Will be set dynamically
|
||
WASAPI_METHOD = False # First try with Stereo Mix, then try WASAPI if available
|
||
|
||
print("Available audio devices:")
|
||
for i, device in enumerate(devices):
|
||
print(f" {i} {device['name']}, {device['hostapi']} ({device['max_input_channels']} in, {device['max_output_channels']} out)")
|
||
# First priority: Stereo Mix
|
||
if "Stereo Mix" in device['name'] and device['max_input_channels'] > 0:
|
||
DEVICE_ID = i
|
||
print(f"Selected device {i}: {device['name']} for system audio recording")
|
||
break
|
||
|
||
# If no Stereo Mix, try to find WASAPI devices
|
||
if DEVICE_ID is None:
|
||
hostapis = sd.query_hostapis()
|
||
wasapi_index = None
|
||
|
||
for i, api in enumerate(hostapis):
|
||
if 'WASAPI' in api['name']:
|
||
wasapi_index = i
|
||
print(f"Found WASAPI at index {i}")
|
||
break
|
||
|
||
if wasapi_index is not None:
|
||
# Try to find a WASAPI output device
|
||
for i, device in enumerate(devices):
|
||
if device['hostapi'] == wasapi_index and device['max_output_channels'] > 0 and 'Speakers' in device['name']:
|
||
DEVICE_ID = i
|
||
print(f"Selected device {i}: {device['name']} (will try WASAPI method)")
|
||
WASAPI_METHOD = True
|
||
break
|
||
|
||
# Fallback to any WASAPI output device
|
||
if DEVICE_ID is None:
|
||
for i, device in enumerate(devices):
|
||
if device['hostapi'] == wasapi_index and device['max_output_channels'] > 0:
|
||
DEVICE_ID = i
|
||
print(f"Selected device {i}: {device['name']} (will try WASAPI method)")
|
||
WASAPI_METHOD = True
|
||
break
|
||
|
||
# If still no device found, prompt user
|
||
if DEVICE_ID is None:
|
||
print("No suitable recording device found. Please select a device manually:")
|
||
for i, device in enumerate(devices):
|
||
print(f" {i} {device['name']}, {device['hostapi']} ({device['max_input_channels']} in, {device['max_output_channels']} out)")
|
||
device_id_input = input("Enter device ID to use: ")
|
||
try:
|
||
DEVICE_ID = int(device_id_input)
|
||
except ValueError:
|
||
print("Invalid device ID")
|
||
exit(1)
|
||
|
||
def record_loop(cut_event: threading.Event):
|
||
"""
|
||
Records audio into successive WAV files.
|
||
When cut_event is set, closes current file, converts it, and starts a new one.
|
||
"""
|
||
q = queue.Queue()
|
||
recording_index = 1
|
||
current_wav = None
|
||
wav_file = None
|
||
|
||
def callback(indata, frames, time, status):
|
||
if status:
|
||
print(f"⚠️ {status}")
|
||
q.put(indata.copy())
|
||
|
||
# Set up recording mode
|
||
device_info = sd.query_devices(DEVICE_ID)
|
||
print(f"Using device: {device_info['name']}")
|
||
|
||
# Open the input stream - different approach based on whether it's a capture device or WASAPI
|
||
try:
|
||
# Use either regular input stream or customized parameters
|
||
if WASAPI_METHOD:
|
||
print("Using WASAPI device - if this doesn't work, try enabling 'Stereo Mix' in Windows sound settings")
|
||
# Use standard input stream with potential extra settings for newer versions
|
||
stream_kwargs = {
|
||
'device': DEVICE_ID,
|
||
'channels': CHANNELS,
|
||
'samplerate': SAMPLE_RATE,
|
||
'blocksize': BLOCKSIZE,
|
||
'callback': callback,
|
||
'dtype': 'float32'
|
||
}
|
||
|
||
# Try adding WASAPI loopback option if available in this version
|
||
try:
|
||
stream = sd.InputStream(**stream_kwargs)
|
||
except Exception as e:
|
||
print(f"Standard InputStream failed: {e}")
|
||
print("Trying with loopback option...")
|
||
try:
|
||
# Try with extra settings dictionary instead
|
||
stream_kwargs['extra_settings'] = {'wasapi_loopback': True}
|
||
stream = sd.InputStream(**stream_kwargs)
|
||
except Exception as e2:
|
||
print(f"WASAPI loopback failed: {e2}")
|
||
raise Exception("Could not initialize audio stream with WASAPI device") from e2
|
||
else:
|
||
# Regular input device like Stereo Mix
|
||
stream = sd.InputStream(
|
||
device=DEVICE_ID,
|
||
channels=CHANNELS,
|
||
samplerate=SAMPLE_RATE,
|
||
blocksize=BLOCKSIZE,
|
||
callback=callback,
|
||
dtype='float32'
|
||
)
|
||
|
||
# Start the stream
|
||
stream.start()
|
||
print("▶️ Recording... Press 'c' to cut.")
|
||
|
||
while True:
|
||
# Start a new WAV file
|
||
current_wav = tempfile.NamedTemporaryFile(
|
||
prefix=f"segment_{recording_index:02d}_", suffix=".wav", delete=False
|
||
)
|
||
wav_file = sf.SoundFile(
|
||
current_wav.name, mode='w',
|
||
samplerate=SAMPLE_RATE, channels=CHANNELS, subtype='FLOAT'
|
||
)
|
||
|
||
# Write until cut_event
|
||
while not cut_event.is_set():
|
||
try:
|
||
data = q.get(timeout=0.1)
|
||
wav_file.write(data)
|
||
except queue.Empty:
|
||
continue
|
||
|
||
# Cut triggered
|
||
wav_file.close()
|
||
print(f"✂️ Segment {recording_index} captured: {current_wav.name}")
|
||
|
||
# Convert to MP3
|
||
mp3_path = current_wav.name.replace('.wav', '.mp3')
|
||
cmd = [
|
||
'ffmpeg', '-y',
|
||
'-i', current_wav.name,
|
||
'-codec:a', 'libmp3lame',
|
||
'-qscale:a', '2',
|
||
mp3_path
|
||
]
|
||
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
||
print(f"🔊 Converted to MP3: {mp3_path}")
|
||
|
||
# Prompt for rename/move
|
||
new_path = input("Enter new filename (with .mp3) or full path (leave blank to keep): ").strip()
|
||
if new_path:
|
||
try:
|
||
os.replace(mp3_path, new_path)
|
||
print(f"✅ Moved to {new_path}")
|
||
except Exception as e:
|
||
print(f"❌ Could not move file: {e}")
|
||
else:
|
||
print(f"➔ Keeping as {mp3_path}")
|
||
|
||
# Clean up WAV
|
||
os.remove(current_wav.name)
|
||
recording_index += 1
|
||
|
||
# Reset for next segment
|
||
cut_event.clear()
|
||
print("▶️ Recording next segment... Press 'c' to cut.")
|
||
|
||
except Exception as e:
|
||
print(f"❌ Error occurred setting up audio stream: {e}")
|
||
print("\nThere are several ways to fix this:")
|
||
print("1. Try enabling 'Stereo Mix' in Windows Sound settings")
|
||
print(" - Right-click the speaker icon in system tray")
|
||
print(" - Select 'Sound settings' -> 'Sound Control Panel'")
|
||
print(" - Go to 'Recording' tab")
|
||
print(" - Right-click empty space and check 'Show Disabled Devices'")
|
||
print(" - Enable 'Stereo Mix' if available")
|
||
print("2. Try a different device ID")
|
||
print("\nAvailable devices:")
|
||
for i, device in enumerate(sd.query_devices()):
|
||
hostapi_name = sd.query_hostapis(device['hostapi'])['name']
|
||
print(f" {i} {device['name']}, {hostapi_name} ({device['max_input_channels']} in, {device['max_output_channels']} out)")
|
||
|
||
print("\nRun the script again with a specific device ID:")
|
||
print(" .venv/Scripts/python record.py")
|
||
exit(1)
|
||
|
||
def listen_for_cut(cut_event: threading.Event):
|
||
"""Waits for the user to press 'c' and sets the event."""
|
||
while True:
|
||
keyboard.wait('c')
|
||
cut_event.set()
|
||
|
||
if __name__ == '__main__':
|
||
cut_flag = threading.Event()
|
||
# Thread that listens for 'c'
|
||
listener = threading.Thread(target=listen_for_cut, args=(cut_flag,), daemon=True)
|
||
listener.start()
|
||
# Main recording loop
|
||
try:
|
||
record_loop(cut_flag)
|
||
except KeyboardInterrupt:
|
||
print("\n🛑 Recording stopped by user.")
|
||
except Exception as e:
|
||
print(f"❌ Error occurred while recording: {e}")
|