Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add fast_api and example #214

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions podcastfy/api/fast_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""
FastAPI implementation for Podcastify podcast generation service.

This module provides REST endpoints for podcast generation and audio serving,
with configuration management and temporary file handling.
"""

from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse, JSONResponse
import os
import shutil
import yaml
from typing import Dict, Any
from pathlib import Path
from ..client import generate_podcast
import uvicorn

def load_base_config() -> Dict[Any, Any]:
config_path = Path(__file__).parent / "podcastfy" / "conversation_config.yaml"
try:
with open(config_path, 'r') as file:
return yaml.safe_load(file)
except Exception as e:
print(f"Warning: Could not load base config: {e}")
return {}

def merge_configs(base_config: Dict[Any, Any], user_config: Dict[Any, Any]) -> Dict[Any, Any]:
"""Merge user configuration with base configuration, preferring user values."""
merged = base_config.copy()

# Handle special cases for nested dictionaries
if 'text_to_speech' in merged and 'text_to_speech' in user_config:
merged['text_to_speech'].update(user_config.get('text_to_speech', {}))

# Update top-level keys
for key, value in user_config.items():
if key != 'text_to_speech': # Skip text_to_speech as it's handled above
if value is not None: # Only update if value is not None
merged[key] = value

return merged

app = FastAPI()

TEMP_DIR = os.path.join(os.path.dirname(__file__), "temp_audio")
os.makedirs(TEMP_DIR, exist_ok=True)

@app.post("/generate")
async def generate_podcast_endpoint(data: dict):
""""""
try:
# Set environment variables
os.environ['OPENAI_API_KEY'] = data.get('openai_key')
os.environ['GEMINI_API_KEY'] = data.get('google_key')

# Load base configuration
base_config = load_base_config()

# Get TTS model and its configuration from base config
tts_model = data.get('tts_model', base_config.get('text_to_speech', {}).get('default_tts_model', 'openai'))
tts_base_config = base_config.get('text_to_speech', {}).get(tts_model, {})

# Get voices (use user-provided voices or fall back to defaults)
voices = data.get('voices', {})
default_voices = tts_base_config.get('default_voices', {})

# Prepare user configuration
user_config = {
'creativity': float(data.get('creativity', base_config.get('creativity', 0.7))),
'conversation_style': data.get('conversation_style', base_config.get('conversation_style', [])),
'roles_person1': data.get('roles_person1', base_config.get('roles_person1')),
'roles_person2': data.get('roles_person2', base_config.get('roles_person2')),
'dialogue_structure': data.get('dialogue_structure', base_config.get('dialogue_structure', [])),
'podcast_name': data.get('name', base_config.get('podcast_name')),
'podcast_tagline': data.get('tagline', base_config.get('podcast_tagline')),
'output_language': data.get('output_language', base_config.get('output_language', 'English')),
'user_instructions': data.get('user_instructions', base_config.get('user_instructions', '')),
'engagement_techniques': data.get('engagement_techniques', base_config.get('engagement_techniques', [])),
'text_to_speech': {
'default_tts_model': tts_model,
'model': tts_base_config.get('model'),
'voices': {
'question': voices.get('question', default_voices.get('question')),
'answer': voices.get('answer', default_voices.get('answer'))
}
}
}

# Merge configurations
conversation_config = merge_configs(base_config, user_config)

# Generate podcast
result = generate_podcast(
urls=data.get('urls', []),
conversation_config=conversation_config,
tts_model=tts_model,
longform=bool(data.get('is_long_form', False)),
)
# Handle the result
if isinstance(result, str) and os.path.isfile(result):
filename = f"podcast_{os.urandom(8).hex()}.mp3"
output_path = os.path.join(TEMP_DIR, filename)
shutil.copy2(result, output_path)
return {"audioUrl": f"/audio/{filename}"}
elif hasattr(result, 'audio_path'):
filename = f"podcast_{os.urandom(8).hex()}.mp3"
output_path = os.path.join(TEMP_DIR, filename)
shutil.copy2(result.audio_path, output_path)
return {"audioUrl": f"/audio/{filename}"}
else:
raise HTTPException(status_code=500, detail="Invalid result format")

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

@app.get("/audio/{filename}")
async def serve_audio(filename: str):
""" Get File Audio From ther Server"""
file_path = os.path.join(TEMP_DIR, filename)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(file_path)

if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)

Check warning on line 125 in podcastfy/api/fast_app.py

View check run for this annotation

codefactor.io / CodeFactor

podcastfy/api/fast_app.py#L125

Possible binding to all interfaces. (B104)
18 changes: 18 additions & 0 deletions usage/fast_api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# FastAPI Implementation for Podcastify

This PR adds a FastAPI implementation for serving the Podcastify functionality via REST API.

## Features
- Podcast generation endpoint
- Audio file serving
- Configuration merging
- Environment variable handling

## Usage
See `usage/fast_api_example.py` for usage example.

## Requirements
- Uvicorn
- FastAPI
- aiohttp
- pyyaml
101 changes: 101 additions & 0 deletions usage/fast_api_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""
Example implementation of the Podcastify FastAPI client.

This module demonstrates how to interact with the Podcastify API
to generate and download podcasts.
"""

import asyncio
import aiohttp
import json
import os
from pathlib import Path
from typing import Dict, Any


def get_default_config() -> Dict[str, Any]:
"""
Returns default configuration for podcast generation.

Returns:
Dict[str, Any]: Default configuration dictionary
"""
return {
"generate_podcast": True,
"google_key": "YOUR_GEMINI_API_KEY",
"openai_key": "YOUR_OPENAI_API_KEY",
"urls": ["https://www.phenomenalworld.org/interviews/swap-structure/"],
"name": "Central Clearing Risks",
"tagline": "Exploring the complexities of financial systemic risk",
"creativity": 0.8,
"conversation_style": ["engaging", "informative"],
"roles_person1": "main summarizer",
"roles_person2": "questioner",
"dialogue_structure": ["Introduction", "Content", "Conclusion"],
"tts_model": "openai",
"is_long_form": False,
"engagement_techniques": ["questions", "examples", "analogies"],
"user_instructions": "Dont use the world Dwelve",
"output_language": "English"
}


async def generate_podcast() -> None:
"""
Generates a podcast using the Podcastify API and downloads the result.
"""
async with aiohttp.ClientSession() as session:
try:
print("Starting podcast generation...")
async with session.post(
"http://localhost:8080/generate",
json=get_default_config()
) as response:
if response.status != 200:
print(f"Error: Server returned status {response.status}")
return

result = await response.json()
if "error" in result:
print(f"Error: {result['error']}")
return

await download_podcast(session, result)

except aiohttp.ClientError as e:
print(f"Network error: {str(e)}")
except Exception as e:
print(f"Unexpected error: {str(e)}")


async def download_podcast(session: aiohttp.ClientSession, result: Dict[str, str]) -> None:
"""
Downloads the generated podcast file.

Args:
session (aiohttp.ClientSession): Active client session
result (Dict[str, str]): API response containing audioUrl
"""
audio_url = f"http://localhost:8080{result['audioUrl']}"
print(f"Podcast generated! Downloading from: {audio_url}")

async with session.get(audio_url) as audio_response:
if audio_response.status == 200:
filename = os.path.join(
str(Path.home() / "Downloads"),
result['audioUrl'].split('/')[-1]
)
with open(filename, 'wb') as f:
f.write(await audio_response.read())
print(f"Downloaded to: {filename}")
else:
print(f"Failed to download audio. Status: {audio_response.status}")


if __name__ == "__main__":
try:
asyncio.run(generate_podcast())
except KeyboardInterrupt:
print("\nProcess interrupted by user")
except Exception as e:
print(f"Error: {str(e)}")
Loading