Python/FastAPI Services


nself provides comprehensive support for Python services built with FastAPI, enabling you to create high-performance, modern web APIs with automatic documentation, type hints, and async capabilities. Perfect for data processing, machine learning APIs, and rapid prototyping.

Overview

Python services in nself leverage FastAPI's strengths:

Key Benefits

  • Fast Performance: Comparable to NodeJS and Go for API performance
  • Automatic Documentation: Interactive API docs with Swagger UI and ReDoc
  • Type Safety: Full Python type hints with runtime validation
  • Async Support: Native asyncio support for high concurrency
  • Easy Development: Python's simplicity with modern web frameworks
  • Rich Ecosystem: Access to extensive Python libraries

Ideal Use Cases

  • REST APIs: Full-featured web APIs with automatic validation
  • Machine Learning APIs: Model serving and inference endpoints
  • Data Processing: ETL pipelines and data transformation services
  • Analytics Services: Statistical analysis and reporting APIs
  • Scientific Computing: Mathematical and scientific computation services
  • Rapid Prototyping: Quick API development and testing

Getting Started

Enable Python Services

Add Python services to your .env.local:

# Enable Python services
PYTHON_SERVICES=ml-api,data-processor,analytics

# Optional: Specify Python version
PYTHON_VERSION=3.11

# Optional: Enable additional features
PYTHON_ASYNC=true
PYTHON_DOCS=true
PYTHON_CORS=true

Generate Service Structure

# Generate Python services
nself build

# Start services
nself up

This creates the following structure:

services/
├── python/
│   ├── ml-api/             # Machine Learning API
│   │   ├── app/
│   │   ├── models/
│   │   ├── requirements.txt
│   │   ├── Dockerfile
│   │   └── main.py
│   ├── data-processor/     # Data processing service
│   └── analytics/          # Analytics service
└── shared/
    └── python/             # Shared Python packages
        ├── database/
        ├── models/
        └── utils/

Service Templates

FastAPI REST API

# main.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import os
from app.database import get_db, Database
from app.models import User, UserCreate, UserResponse
from app.auth import get_current_user

app = FastAPI(
    title="nself Python API",
    description="A high-performance Python API built with FastAPI",
    version="1.0.0",
)

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Configure for production
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.get("/health")
async def health_check():
    return {"status": "healthy", "service": "python-api"}

@app.post("/users", response_model=UserResponse)
async def create_user(
    user: UserCreate, 
    db: Database = Depends(get_db)
):
    # Check if user exists
    existing_user = await db.get_user_by_email(user.email)
    if existing_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    
    # Create new user
    new_user = await db.create_user(user)
    return UserResponse.from_orm(new_user)

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
    user_id: str, 
    db: Database = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    user = await db.get_user(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    
    return UserResponse.from_orm(user)

if __name__ == "__main__":
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=int(os.getenv("PORT", 8000)),
        reload=os.getenv("ENVIRONMENT") == "development"
    )

Database Integration

# app/database.py
import asyncpg
import os
from typing import Optional, List
from app.models import User, UserCreate

class Database:
    def __init__(self):
        self.pool = None
    
    async def connect(self):
        database_url = f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT')}/{os.getenv('POSTGRES_DB')}"
        
        self.pool = await asyncpg.create_pool(
            database_url,
            min_size=1,
            max_size=10,
            command_timeout=60,
        )
    
    async def disconnect(self):
        if self.pool:
            await self.pool.close()
    
    async def get_user(self, user_id: str) -> Optional[User]:
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT id, email, first_name, last_name, is_active, created_at, updated_at FROM users WHERE id = $1",
                user_id
            )
            return User(**row) if row else None
    
    async def get_user_by_email(self, email: str) -> Optional[User]:
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT id, email, first_name, last_name, is_active, created_at, updated_at FROM users WHERE email = $1",
                email
            )
            return User(**row) if row else None
    
    async def create_user(self, user: UserCreate) -> User:
        async with self.pool.acquire() as conn:
            row = await conn.fetchrow(
                """
                INSERT INTO users (email, first_name, last_name) 
                VALUES ($1, $2, $3) 
                RETURNING id, email, first_name, last_name, is_active, created_at, updated_at
                """,
                user.email, user.first_name, user.last_name
            )
            return User(**row)

# Dependency injection
database = Database()

async def get_db() -> Database:
    if not database.pool:
        await database.connect()
    return database

Pydantic Models

# app/models.py
from pydantic import BaseModel, EmailStr, validator
from typing import Optional
from datetime import datetime

class UserBase(BaseModel):
    email: EmailStr
    first_name: Optional[str] = None
    last_name: Optional[str] = None

class UserCreate(UserBase):
    @validator('email')
    def email_must_be_valid(cls, v):
        if not v or '@' not in v:
            raise ValueError('Invalid email address')
        return v.lower()

class User(UserBase):
    id: str
    is_active: bool = True
    created_at: datetime
    updated_at: datetime
    
    class Config:
        orm_mode = True

class UserResponse(BaseModel):
    id: str
    email: str
    first_name: Optional[str]
    last_name: Optional[str]
    is_active: bool
    created_at: datetime
    
    class Config:
        orm_mode = True

# Machine Learning Models
class PredictionRequest(BaseModel):
    features: List[float]
    model_version: Optional[str] = "latest"

class PredictionResponse(BaseModel):
    prediction: float
    confidence: float
    model_version: str
    processing_time_ms: int

Machine Learning Integration

Model Serving API

# app/ml_service.py
import joblib
import numpy as np
from typing import List, Tuple
import time
from app.models import PredictionRequest, PredictionResponse

class MLModelService:
    def __init__(self):
        self.models = {}
        self.load_models()
    
    def load_models(self):
        """Load pre-trained models"""
        try:
            self.models['regression'] = joblib.load('models/regression_model.pkl')
            self.models['classification'] = joblib.load('models/classification_model.pkl')
            print("Models loaded successfully")
        except Exception as e:
            print(f"Error loading models: {e}")
    
    async def predict(self, request: PredictionRequest) -> PredictionResponse:
        start_time = time.time()
        
        # Convert to numpy array
        features = np.array(request.features).reshape(1, -1)
        
        # Get model (default to regression)
        model = self.models.get('regression')
        if not model:
            raise ValueError("Model not available")
        
        # Make prediction
        prediction = model.predict(features)[0]
        
        # Calculate confidence (mock for regression)
        confidence = min(0.95, max(0.1, abs(prediction) / 100))
        
        processing_time = int((time.time() - start_time) * 1000)
        
        return PredictionResponse(
            prediction=float(prediction),
            confidence=confidence,
            model_version=request.model_version or "v1.0",
            processing_time_ms=processing_time
        )

# FastAPI endpoints
ml_service = MLModelService()

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    return await ml_service.predict(request)

@app.get("/models")
async def list_models():
    return {"available_models": list(ml_service.models.keys())}

Data Processing Pipeline

# app/data_processor.py
import pandas as pd
import numpy as np
from typing import Dict, List, Any
import asyncio
from app.database import get_db

class DataProcessor:
    def __init__(self):
        self.db = None
    
    async def process_csv_data(self, file_path: str) -> Dict[str, Any]:
        """Process CSV data and return statistics"""
        try:
            # Read CSV
            df = pd.read_csv(file_path)
            
            # Basic statistics
            stats = {
                "total_rows": len(df),
                "total_columns": len(df.columns),
                "columns": list(df.columns),
                "missing_values": df.isnull().sum().to_dict(),
                "data_types": df.dtypes.astype(str).to_dict(),
            }
            
            # Numeric statistics
            numeric_columns = df.select_dtypes(include=[np.number]).columns
            if len(numeric_columns) > 0:
                stats["numeric_summary"] = df[numeric_columns].describe().to_dict()
            
            return stats
            
        except Exception as e:
            raise ValueError(f"Error processing CSV: {str(e)}")
    
    async def transform_data(self, data: List[Dict], transformations: List[str]) -> List[Dict]:
        """Apply transformations to data"""
        df = pd.DataFrame(data)
        
        for transformation in transformations:
            if transformation == "normalize":
                numeric_cols = df.select_dtypes(include=[np.number]).columns
                df[numeric_cols] = (df[numeric_cols] - df[numeric_cols].mean()) / df[numeric_cols].std()
            
            elif transformation == "remove_nulls":
                df = df.dropna()
            
            elif transformation == "lowercase_strings":
                string_cols = df.select_dtypes(include=['object']).columns
                for col in string_cols:
                    df[col] = df[col].str.lower()
        
        return df.to_dict(orient='records')

# FastAPI endpoints
data_processor = DataProcessor()

@app.post("/process-csv")
async def process_csv(file_path: str):
    return await data_processor.process_csv_data(file_path)

@app.post("/transform")
async def transform_data(data: List[Dict], transformations: List[str]):
    return await data_processor.transform_data(data, transformations)

Advanced Features

Background Tasks

# app/tasks.py
from fastapi import BackgroundTasks
import asyncio
import aiofiles
from app.database import get_db

async def process_large_file(file_path: str, user_id: str):
    """Background task for processing large files"""
    try:
        db = await get_db()
        
        # Update job status
        await db.update_job_status(user_id, "processing")
        
        # Simulate long-running task
        await asyncio.sleep(10)
        
        # Process file
        async with aiofiles.open(file_path, 'r') as f:
            content = await f.read()
            # Process content...
        
        # Update completion status
        await db.update_job_status(user_id, "completed")
        
    except Exception as e:
        await db.update_job_status(user_id, f"failed: {str(e)}")

@app.post("/process-file")
async def start_file_processing(
    file_path: str, 
    background_tasks: BackgroundTasks,
    current_user: User = Depends(get_current_user)
):
    background_tasks.add_task(process_large_file, file_path, current_user.id)
    return {"message": "File processing started", "status": "queued"}

WebSocket Support

# app/websockets.py
from fastapi import WebSocket, WebSocketDisconnect
from typing import List
import json
import asyncio

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []
    
    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)
    
    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)
    
    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)
    
    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            message = f"Client #{client_id}: {data}"
            await manager.broadcast(message)
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        message = f"Client #{client_id} left the chat"
        await manager.broadcast(message)

Testing

Unit Tests

# tests/test_api.py
import pytest
from fastapi.testclient import TestClient
from main import app
from app.database import get_db
from unittest.mock import AsyncMock

client = TestClient(app)

# Mock database
async def override_get_db():
    mock_db = AsyncMock()
    mock_db.get_user.return_value = None
    return mock_db

app.dependency_overrides[get_db] = override_get_db

def test_health_check():
    response = client.get("/health")
    assert response.status_code == 200
    assert response.json() == {"status": "healthy", "service": "python-api"}

def test_create_user():
    user_data = {
        "email": "test@example.com",
        "first_name": "Test",
        "last_name": "User"
    }
    response = client.post("/users", json=user_data)
    assert response.status_code == 200
    assert response.json()["email"] == "test@example.com"

@pytest.mark.asyncio
async def test_data_processor():
    from app.data_processor import DataProcessor
    
    processor = DataProcessor()
    data = [
        {"name": "John", "age": 25, "score": 85.5},
        {"name": "Jane", "age": 30, "score": 92.0},
    ]
    
    result = await processor.transform_data(data, ["lowercase_strings"])
    assert result[0]["name"] == "john"
    assert result[1]["name"] == "jane"

Docker Configuration

Optimized Dockerfile

FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Create non-root user
RUN useradd --create-home --shell /bin/bash appuser
USER appuser

# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8000/health || exit 1

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

CLI Commands

Service Management

# Generate new Python service
nself generate python --name ml-service

# Install Python dependencies
nself python install --service ml-api --package scikit-learn

# Run Python tests
nself test python --service ml-api

# View service logs
nself logs python --service ml-api --follow

# Python shell access
nself exec python ml-api python -c "import app; print(app.__version__)"

Best Practices

Performance Optimization

  • Async/Await: Use async operations for I/O bound tasks
  • Connection Pooling: Pool database connections
  • Caching: Cache expensive computations
  • Pydantic: Use for data validation and serialization

Code Organization

  • Modular Design: Separate concerns into modules
  • Type Hints: Use comprehensive type annotations
  • Error Handling: Implement proper exception handling
  • Documentation: Document APIs with docstrings

Next Steps

Now that you understand Python services:

Python services provide powerful capabilities for data processing, machine learning, and rapid API development in your nself stack.