nself provides comprehensive support for Python services built with FastAPI, enabling you to create high-performance, modern web APIs with automatic documentation, type hints, and async capabilities. Perfect for data processing, machine learning APIs, and rapid prototyping.
Python services in nself leverage FastAPI's strengths:
Add Python services to your .env.local
:
# Enable Python services
PYTHON_SERVICES=ml-api,data-processor,analytics
# Optional: Specify Python version
PYTHON_VERSION=3.11
# Optional: Enable additional features
PYTHON_ASYNC=true
PYTHON_DOCS=true
PYTHON_CORS=true
# Generate Python services
nself build
# Start services
nself up
This creates the following structure:
services/
├── python/
│ ├── ml-api/ # Machine Learning API
│ │ ├── app/
│ │ ├── models/
│ │ ├── requirements.txt
│ │ ├── Dockerfile
│ │ └── main.py
│ ├── data-processor/ # Data processing service
│ └── analytics/ # Analytics service
└── shared/
└── python/ # Shared Python packages
├── database/
├── models/
└── utils/
# main.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
import os
from app.database import get_db, Database
from app.models import User, UserCreate, UserResponse
from app.auth import get_current_user
app = FastAPI(
title="nself Python API",
description="A high-performance Python API built with FastAPI",
version="1.0.0",
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "python-api"}
@app.post("/users", response_model=UserResponse)
async def create_user(
user: UserCreate,
db: Database = Depends(get_db)
):
# Check if user exists
existing_user = await db.get_user_by_email(user.email)
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")
# Create new user
new_user = await db.create_user(user)
return UserResponse.from_orm(new_user)
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
user_id: str,
db: Database = Depends(get_db),
current_user: User = Depends(get_current_user)
):
user = await db.get_user(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return UserResponse.from_orm(user)
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=int(os.getenv("PORT", 8000)),
reload=os.getenv("ENVIRONMENT") == "development"
)
# app/database.py
import asyncpg
import os
from typing import Optional, List
from app.models import User, UserCreate
class Database:
def __init__(self):
self.pool = None
async def connect(self):
database_url = f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT')}/{os.getenv('POSTGRES_DB')}"
self.pool = await asyncpg.create_pool(
database_url,
min_size=1,
max_size=10,
command_timeout=60,
)
async def disconnect(self):
if self.pool:
await self.pool.close()
async def get_user(self, user_id: str) -> Optional[User]:
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT id, email, first_name, last_name, is_active, created_at, updated_at FROM users WHERE id = $1",
user_id
)
return User(**row) if row else None
async def get_user_by_email(self, email: str) -> Optional[User]:
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT id, email, first_name, last_name, is_active, created_at, updated_at FROM users WHERE email = $1",
email
)
return User(**row) if row else None
async def create_user(self, user: UserCreate) -> User:
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO users (email, first_name, last_name)
VALUES ($1, $2, $3)
RETURNING id, email, first_name, last_name, is_active, created_at, updated_at
""",
user.email, user.first_name, user.last_name
)
return User(**row)
# Dependency injection
database = Database()
async def get_db() -> Database:
if not database.pool:
await database.connect()
return database
# app/models.py
from pydantic import BaseModel, EmailStr, validator
from typing import Optional
from datetime import datetime
class UserBase(BaseModel):
email: EmailStr
first_name: Optional[str] = None
last_name: Optional[str] = None
class UserCreate(UserBase):
@validator('email')
def email_must_be_valid(cls, v):
if not v or '@' not in v:
raise ValueError('Invalid email address')
return v.lower()
class User(UserBase):
id: str
is_active: bool = True
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True
class UserResponse(BaseModel):
id: str
email: str
first_name: Optional[str]
last_name: Optional[str]
is_active: bool
created_at: datetime
class Config:
orm_mode = True
# Machine Learning Models
class PredictionRequest(BaseModel):
features: List[float]
model_version: Optional[str] = "latest"
class PredictionResponse(BaseModel):
prediction: float
confidence: float
model_version: str
processing_time_ms: int
# app/ml_service.py
import joblib
import numpy as np
from typing import List, Tuple
import time
from app.models import PredictionRequest, PredictionResponse
class MLModelService:
def __init__(self):
self.models = {}
self.load_models()
def load_models(self):
"""Load pre-trained models"""
try:
self.models['regression'] = joblib.load('models/regression_model.pkl')
self.models['classification'] = joblib.load('models/classification_model.pkl')
print("Models loaded successfully")
except Exception as e:
print(f"Error loading models: {e}")
async def predict(self, request: PredictionRequest) -> PredictionResponse:
start_time = time.time()
# Convert to numpy array
features = np.array(request.features).reshape(1, -1)
# Get model (default to regression)
model = self.models.get('regression')
if not model:
raise ValueError("Model not available")
# Make prediction
prediction = model.predict(features)[0]
# Calculate confidence (mock for regression)
confidence = min(0.95, max(0.1, abs(prediction) / 100))
processing_time = int((time.time() - start_time) * 1000)
return PredictionResponse(
prediction=float(prediction),
confidence=confidence,
model_version=request.model_version or "v1.0",
processing_time_ms=processing_time
)
# FastAPI endpoints
ml_service = MLModelService()
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
return await ml_service.predict(request)
@app.get("/models")
async def list_models():
return {"available_models": list(ml_service.models.keys())}
# app/data_processor.py
import pandas as pd
import numpy as np
from typing import Dict, List, Any
import asyncio
from app.database import get_db
class DataProcessor:
def __init__(self):
self.db = None
async def process_csv_data(self, file_path: str) -> Dict[str, Any]:
"""Process CSV data and return statistics"""
try:
# Read CSV
df = pd.read_csv(file_path)
# Basic statistics
stats = {
"total_rows": len(df),
"total_columns": len(df.columns),
"columns": list(df.columns),
"missing_values": df.isnull().sum().to_dict(),
"data_types": df.dtypes.astype(str).to_dict(),
}
# Numeric statistics
numeric_columns = df.select_dtypes(include=[np.number]).columns
if len(numeric_columns) > 0:
stats["numeric_summary"] = df[numeric_columns].describe().to_dict()
return stats
except Exception as e:
raise ValueError(f"Error processing CSV: {str(e)}")
async def transform_data(self, data: List[Dict], transformations: List[str]) -> List[Dict]:
"""Apply transformations to data"""
df = pd.DataFrame(data)
for transformation in transformations:
if transformation == "normalize":
numeric_cols = df.select_dtypes(include=[np.number]).columns
df[numeric_cols] = (df[numeric_cols] - df[numeric_cols].mean()) / df[numeric_cols].std()
elif transformation == "remove_nulls":
df = df.dropna()
elif transformation == "lowercase_strings":
string_cols = df.select_dtypes(include=['object']).columns
for col in string_cols:
df[col] = df[col].str.lower()
return df.to_dict(orient='records')
# FastAPI endpoints
data_processor = DataProcessor()
@app.post("/process-csv")
async def process_csv(file_path: str):
return await data_processor.process_csv_data(file_path)
@app.post("/transform")
async def transform_data(data: List[Dict], transformations: List[str]):
return await data_processor.transform_data(data, transformations)
# app/tasks.py
from fastapi import BackgroundTasks
import asyncio
import aiofiles
from app.database import get_db
async def process_large_file(file_path: str, user_id: str):
"""Background task for processing large files"""
try:
db = await get_db()
# Update job status
await db.update_job_status(user_id, "processing")
# Simulate long-running task
await asyncio.sleep(10)
# Process file
async with aiofiles.open(file_path, 'r') as f:
content = await f.read()
# Process content...
# Update completion status
await db.update_job_status(user_id, "completed")
except Exception as e:
await db.update_job_status(user_id, f"failed: {str(e)}")
@app.post("/process-file")
async def start_file_processing(
file_path: str,
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_user)
):
background_tasks.add_task(process_large_file, file_path, current_user.id)
return {"message": "File processing started", "status": "queued"}
# app/websockets.py
from fastapi import WebSocket, WebSocketDisconnect
from typing import List
import json
import asyncio
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
message = f"Client #{client_id}: {data}"
await manager.broadcast(message)
except WebSocketDisconnect:
manager.disconnect(websocket)
message = f"Client #{client_id} left the chat"
await manager.broadcast(message)
# tests/test_api.py
import pytest
from fastapi.testclient import TestClient
from main import app
from app.database import get_db
from unittest.mock import AsyncMock
client = TestClient(app)
# Mock database
async def override_get_db():
mock_db = AsyncMock()
mock_db.get_user.return_value = None
return mock_db
app.dependency_overrides[get_db] = override_get_db
def test_health_check():
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "healthy", "service": "python-api"}
def test_create_user():
user_data = {
"email": "test@example.com",
"first_name": "Test",
"last_name": "User"
}
response = client.post("/users", json=user_data)
assert response.status_code == 200
assert response.json()["email"] == "test@example.com"
@pytest.mark.asyncio
async def test_data_processor():
from app.data_processor import DataProcessor
processor = DataProcessor()
data = [
{"name": "John", "age": 25, "score": 85.5},
{"name": "Jane", "age": 30, "score": 92.0},
]
result = await processor.transform_data(data, ["lowercase_strings"])
assert result[0]["name"] == "john"
assert result[1]["name"] == "jane"
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd --create-home --shell /bin/bash appuser
USER appuser
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
# Generate new Python service
nself generate python --name ml-service
# Install Python dependencies
nself python install --service ml-api --package scikit-learn
# Run Python tests
nself test python --service ml-api
# View service logs
nself logs python --service ml-api --follow
# Python shell access
nself exec python ml-api python -c "import app; print(app.__version__)"
Now that you understand Python services:
Python services provide powerful capabilities for data processing, machine learning, and rapid API development in your nself stack.