Files
ApiGatewayTest/service_1_main.py
2026-01-10 23:37:18 +03:00

126 lines
4.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import List, Optional
import logging
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI(
title="Service 1 - User Service",
description="Микросервис для управления пользователями",
version="1.0.0"
)
# Логирование
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Модели данных
class User(BaseModel):
id: int
name: str
email: str
age: Optional[int] = None
class UserCreate(BaseModel):
name: str
email: str
age: Optional[int] = None
# Имитация базы данных
users_db = {
1: {"id": 1, "name": "Иван Петров", "email": "ivan@example.com", "age": 28},
2: {"id": 2, "name": "Мария Сидорова", "email": "maria@example.com", "age": 25},
}
next_user_id = 3
# Маршруты
@app.get("/", tags=["Health"])
async def root():
"""Проверка работоспособности Service 1"""
logger.info("Health check для Service 1")
return {
"service": "Service 1 - User Service",
"status": "operational",
"version": "1.0.0"
}
@app.get("/health", tags=["Health"])
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "user-service"}
@app.get("/users", response_model=List[User], tags=["Users"])
async def list_users(skip: int = 0, limit: int = 10):
"""Получить список всех пользователей"""
logger.info(f"Получение списка пользователей (skip={skip}, limit={limit})")
users_list = list(users_db.values())
return users_list[skip:skip + limit]
@app.get("/users/{user_id}", response_model=User, tags=["Users"])
async def get_user(user_id: int):
"""Получить пользователя по ID"""
logger.info(f"Получение пользователя с ID: {user_id}")
if user_id not in users_db:
logger.warning(f"Пользователь с ID {user_id} не найден")
raise HTTPException(status_code=404, detail="Пользователь не найден")
return users_db[user_id]
@app.post("/users", response_model=User, tags=["Users"])
async def create_user(user: UserCreate):
"""Создать нового пользователя"""
global next_user_id
logger.info(f"Создание нового пользователя: {user.name}")
new_user = {
"id": next_user_id,
"name": user.name,
"email": user.email,
"age": user.age
}
users_db[next_user_id] = new_user
next_user_id += 1
logger.info(f"Пользователь создан с ID: {new_user['id']}")
return new_user
@app.put("/users/{user_id}", response_model=User, tags=["Users"])
async def update_user(user_id: int, user: UserCreate):
"""Обновить пользователя"""
logger.info(f"Обновление пользователя с ID: {user_id}")
if user_id not in users_db:
raise HTTPException(status_code=404, detail="Пользователь не найден")
updated_user = {
"id": user_id,
"name": user.name,
"email": user.email,
"age": user.age
}
users_db[user_id] = updated_user
return updated_user
@app.delete("/users/{user_id}", tags=["Users"])
async def delete_user(user_id: int):
"""Удалить пользователя"""
logger.info(f"Удаление пользователя с ID: {user_id}")
if user_id not in users_db:
raise HTTPException(status_code=404, detail="Пользователь не найден")
del users_db[user_id]
return {"message": f"Пользователь {user_id} удален"}
@app.get("/stats", tags=["Stats"])
async def get_stats():
"""Получить статистику по пользователям"""
logger.info("Получение статистики")
return {
"total_users": len(users_db),
"service": "user-service",
"endpoint": "/users"
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)