copilot-meeting/audio-recorder/record.py
Michael Bobbitt 42c5f87a1d init
2025-11-10 22:34:17 -05:00

222 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import queue
import threading
import tempfile
import subprocess
import sounddevice as sd
import soundfile as sf
import keyboard
# PARAMETERS adjust as needed
CHANNELS = 2
SAMPLE_RATE = 44100
BLOCKSIZE = 1024 # frames per chunk
# Look for "Stereo Mix" which is a common device for capturing system audio
devices = sd.query_devices()
DEVICE_ID = 14 # Will be set dynamically
WASAPI_METHOD = False # First try with Stereo Mix, then try WASAPI if available
print("Available audio devices:")
for i, device in enumerate(devices):
print(f" {i} {device['name']}, {device['hostapi']} ({device['max_input_channels']} in, {device['max_output_channels']} out)")
# First priority: Stereo Mix
if "Stereo Mix" in device['name'] and device['max_input_channels'] > 0:
DEVICE_ID = i
print(f"Selected device {i}: {device['name']} for system audio recording")
break
# If no Stereo Mix, try to find WASAPI devices
if DEVICE_ID is None:
hostapis = sd.query_hostapis()
wasapi_index = None
for i, api in enumerate(hostapis):
if 'WASAPI' in api['name']:
wasapi_index = i
print(f"Found WASAPI at index {i}")
break
if wasapi_index is not None:
# Try to find a WASAPI output device
for i, device in enumerate(devices):
if device['hostapi'] == wasapi_index and device['max_output_channels'] > 0 and 'Speakers' in device['name']:
DEVICE_ID = i
print(f"Selected device {i}: {device['name']} (will try WASAPI method)")
WASAPI_METHOD = True
break
# Fallback to any WASAPI output device
if DEVICE_ID is None:
for i, device in enumerate(devices):
if device['hostapi'] == wasapi_index and device['max_output_channels'] > 0:
DEVICE_ID = i
print(f"Selected device {i}: {device['name']} (will try WASAPI method)")
WASAPI_METHOD = True
break
# If still no device found, prompt user
if DEVICE_ID is None:
print("No suitable recording device found. Please select a device manually:")
for i, device in enumerate(devices):
print(f" {i} {device['name']}, {device['hostapi']} ({device['max_input_channels']} in, {device['max_output_channels']} out)")
device_id_input = input("Enter device ID to use: ")
try:
DEVICE_ID = int(device_id_input)
except ValueError:
print("Invalid device ID")
exit(1)
def record_loop(cut_event: threading.Event):
"""
Records audio into successive WAV files.
When cut_event is set, closes current file, converts it, and starts a new one.
"""
q = queue.Queue()
recording_index = 1
current_wav = None
wav_file = None
def callback(indata, frames, time, status):
if status:
print(f"⚠️ {status}")
q.put(indata.copy())
# Set up recording mode
device_info = sd.query_devices(DEVICE_ID)
print(f"Using device: {device_info['name']}")
# Open the input stream - different approach based on whether it's a capture device or WASAPI
try:
# Use either regular input stream or customized parameters
if WASAPI_METHOD:
print("Using WASAPI device - if this doesn't work, try enabling 'Stereo Mix' in Windows sound settings")
# Use standard input stream with potential extra settings for newer versions
stream_kwargs = {
'device': DEVICE_ID,
'channels': CHANNELS,
'samplerate': SAMPLE_RATE,
'blocksize': BLOCKSIZE,
'callback': callback,
'dtype': 'float32'
}
# Try adding WASAPI loopback option if available in this version
try:
stream = sd.InputStream(**stream_kwargs)
except Exception as e:
print(f"Standard InputStream failed: {e}")
print("Trying with loopback option...")
try:
# Try with extra settings dictionary instead
stream_kwargs['extra_settings'] = {'wasapi_loopback': True}
stream = sd.InputStream(**stream_kwargs)
except Exception as e2:
print(f"WASAPI loopback failed: {e2}")
raise Exception("Could not initialize audio stream with WASAPI device") from e2
else:
# Regular input device like Stereo Mix
stream = sd.InputStream(
device=DEVICE_ID,
channels=CHANNELS,
samplerate=SAMPLE_RATE,
blocksize=BLOCKSIZE,
callback=callback,
dtype='float32'
)
# Start the stream
stream.start()
print("▶️ Recording... Press 'c' to cut.")
while True:
# Start a new WAV file
current_wav = tempfile.NamedTemporaryFile(
prefix=f"segment_{recording_index:02d}_", suffix=".wav", delete=False
)
wav_file = sf.SoundFile(
current_wav.name, mode='w',
samplerate=SAMPLE_RATE, channels=CHANNELS, subtype='FLOAT'
)
# Write until cut_event
while not cut_event.is_set():
try:
data = q.get(timeout=0.1)
wav_file.write(data)
except queue.Empty:
continue
# Cut triggered
wav_file.close()
print(f"✂️ Segment {recording_index} captured: {current_wav.name}")
# Convert to MP3
mp3_path = current_wav.name.replace('.wav', '.mp3')
cmd = [
'ffmpeg', '-y',
'-i', current_wav.name,
'-codec:a', 'libmp3lame',
'-qscale:a', '2',
mp3_path
]
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
print(f"🔊 Converted to MP3: {mp3_path}")
# Prompt for rename/move
new_path = input("Enter new filename (with .mp3) or full path (leave blank to keep): ").strip()
if new_path:
try:
os.replace(mp3_path, new_path)
print(f"✅ Moved to {new_path}")
except Exception as e:
print(f"❌ Could not move file: {e}")
else:
print(f"➔ Keeping as {mp3_path}")
# Clean up WAV
os.remove(current_wav.name)
recording_index += 1
# Reset for next segment
cut_event.clear()
print("▶️ Recording next segment... Press 'c' to cut.")
except Exception as e:
print(f"❌ Error occurred setting up audio stream: {e}")
print("\nThere are several ways to fix this:")
print("1. Try enabling 'Stereo Mix' in Windows Sound settings")
print(" - Right-click the speaker icon in system tray")
print(" - Select 'Sound settings' -> 'Sound Control Panel'")
print(" - Go to 'Recording' tab")
print(" - Right-click empty space and check 'Show Disabled Devices'")
print(" - Enable 'Stereo Mix' if available")
print("2. Try a different device ID")
print("\nAvailable devices:")
for i, device in enumerate(sd.query_devices()):
hostapi_name = sd.query_hostapis(device['hostapi'])['name']
print(f" {i} {device['name']}, {hostapi_name} ({device['max_input_channels']} in, {device['max_output_channels']} out)")
print("\nRun the script again with a specific device ID:")
print(" .venv/Scripts/python record.py")
exit(1)
def listen_for_cut(cut_event: threading.Event):
"""Waits for the user to press 'c' and sets the event."""
while True:
keyboard.wait('c')
cut_event.set()
if __name__ == '__main__':
cut_flag = threading.Event()
# Thread that listens for 'c'
listener = threading.Thread(target=listen_for_cut, args=(cut_flag,), daemon=True)
listener.start()
# Main recording loop
try:
record_loop(cut_flag)
except KeyboardInterrupt:
print("\n🛑 Recording stopped by user.")
except Exception as e:
print(f"❌ Error occurred while recording: {e}")