AI Security & Resilience: A Deep Dive
AI Security and Resilience: A Deep Dive
Hey guys! Let's dive deep into making our AI applications super secure and resilient. This improved version comes with extra protection, handles tricky situations, and has a much better structure. We'll go through the updated .env, security.py, assistant.py, and api.py files. It's all about making your AI not just smart, but also safe and always available. The goal here is to fortify our AI systems against potential threats and ensure they keep running smoothly, no matter what. Let's get started!
1. Improved .env (with comments)
First up, the .env file, which is super important for keeping sensitive stuff safe. It now has some extra comments to help you understand what each setting does. Remember, you should never put your API keys directly into your code. Keep them in this .env file and make sure it's not in your repository. This is crucial for your AI's security. It's like the secret handshake to your AI's core functionality.
# OpenAI API key (must be outside the repository)
OPENAI_API_KEY=your_openai_api_key_here
# Secret key for JWT (generate via secrets.token_urlsafe(32))
SECRET_KEY=32_char_random_string_here
# JWT signature algorithm
JWT_ALGORITHM=HS256
# Path to the audit file (recommended outside the web root)
AUDIT_LOG_PATH=./logs/audit_log.json
# Maximum token lifetime (hours)
TOKEN_EXPIRY_HOURS=1
# Request limit per minute (anti-flooding protection)
RATE_LIMIT=60
This setup ensures that all the sensitive configurations are kept separate from the code, making the system much more secure and easier to manage. Properly configuring your .env file is the first step towards a robust and secure AI system. Think of it as the foundation upon which your secure AI house is built. It is very important to secure your OpenAI API key; otherwise, you could incur unintended charges.
2. Updated security.py (with validation and protection)
Now, let's talk about the security.py file. This one is all about keeping things locked down. It's been updated with some serious validation checks and protections to make sure everything is on the up-and-up. The main changes include better input validation, more detailed error messages, and better security logging. This helps in catching any suspicious activity or attempts to misuse the system. The SecurityManager class is your go-to for all things security.
import jwt
import hashlib
import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, Any, List
import secrets
import re
class SecurityManager:
def __init__(self, secret_key: str, log_path: str, expiry_hours: int = 1):
self.secret_key = secret_key
self.log_path = log_path
self.expiry_hours = expiry_hours
self._setup_logging()
def _setup_logging(self):
"""Setting up security logging"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s | %(levelname)s | %(message)s',
handlers=[
logging.FileHandler('./logs/security.log', encoding='utf-8')
]
)
self.logger = logging.getLogger(__name__)
def generate_token(self, user_id: str, roles: List[str]) -> str:
"""Issuing a JWT token with input validation"""
if not re.match(r'^[a-zA-Z0-9_-]{3,32}{{content}}#39;, user_id):
raise ValueError("Invalid user_id format")
if not roles or not all(re.match(r'^[a-z.]+{{content}}#39;, r) for r in roles):
raise ValueError("Invalid roles format")
payload = {
"user_id": user_id,
"roles": roles,
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(hours=self.expiry_hours),
"nonce": secrets.token_hex(16)
}
try:
return jwt.encode(payload, self.secret_key, algorithm="HS256")
except Exception as e:
self.logger.error(f"Token generation failed: {e}")
raise
def verify_token(self, token: str) -> Dict[str, Any]:
"""Token verification with detailed errors"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
# 1. Expiration check
if payload["exp"] < time.time():
raise jwt.ExpiredSignatureError("Token expired")
# 2. Nonce check
if not payload.get("nonce") or not re.match(r'^[0-9a-f]{32}{{content}}#39;, payload["nonce"]):
raise ValueError("Invalid nonce")
# 3. Roles check
if not payload.get("roles") or not isinstance(payload["roles"], list):
raise ValueError("Roles missing or invalid")
return payload
except jwt.ExpiredSignatureError:
raise ValueError("Token expired")
except jwt.DecodeError:
raise ValueError("Invalid token format")
except jwt.InvalidTokenError as e:
raise ValueError(f"Token invalid: {str(e)}")
except Exception as e:
self.logger.error(f"Token verification error: {e}")
raise ValueError("Token validation failed")
def log_action(self, user_id: str, action: str, details: Dict):
"""Blockchain-like logging with duplicate protection"""
# Data normalization
normalized_details = json.dumps(details, sort_keys=True, ensure_ascii=False)
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"action": action,
"details": details,
"hash": hashlib.sha256(
f"{user_id}{action}{normalized_details}".encode()
).hexdigest()
}
try:
with open(self.log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
except IOError as e:
self.logger.critical(f"Audit log write failed: {e}")
raise
The generate_token function now validates the user_id and roles to make sure they're in the right format. This prevents certain types of injection attacks. The verify_token function does a much better job of telling you exactly why a token is invalid. This helps you figure out the problem super fast. The logging is designed to prevent duplicate entries and is crucial for auditing and security investigations. This makes it easier to track what's happening and spot any problems.
Key improvements:
user_idandrolesFormat Validation: Ensures that these crucial user identifiers and roles are in the correct format, helping to prevent unauthorized access and other security risks.- Detailed Error Messages: Provides specific error messages for various token verification failures, making it easier to troubleshoot and identify the cause of issues.
- Security Logging: Logs security-related events to a separate file, enabling detailed auditing and security analysis to identify and respond to security incidents.
- Protection against Invalid Nonces: Verifies the
noncevalue in the token, which adds an extra layer of security and helps prevent replay attacks.
3. Enhanced assistant.py (with error handling)
Now, let's look at assistant.py. This is where the AI actually does its work. We've made it more robust by adding error handling and input validation. The SecureAIAssistant class is designed to handle user queries securely and efficiently. This enhancement ensures that the AI assistant remains operational and secure under various conditions.
from security import SecurityManager
import os
from dotenv import load_dotenv
from langchain.llms import OpenAI
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from typing import Optional
load_dotenv()
class SecureAIAssistant:
def __init__(self):
self.security = SecurityManager(
secret_key=os.getenv("SECRET_KEY"),
log_path=os.getenv("AUDIT_LOG_PATH"),
expiry_hours=int(os.getenv("TOKEN_EXPIRY_HOURS", 1))
)
try:
self.llm = OpenAI(
model_name="gpt-3.5-turbo",
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.7,
max_tokens=1000 # Restriction to protect against overspending
)
self.prompt_template = PromptTemplate(
input_variables=["query"],
template="You are a protected assistant. Answer the question: {query}"
)
self.chain = LLMChain(llm=self.llm, prompt=self.prompt_template)
except Exception as e:
raise RuntimeError(f"LLM initialization failed: {e}")
def ask(self, query: str, token: str) -> Optional[str]:
try:
# 1. Request Validation
if not query or len(query) > 2000:
raise ValueError("Query too long or empty")
# 2. Token Verification
payload = self.security.verify_token(token)
# 3. Logging
self.security.log_action(
user_id=payload["user_id"],
action="ask",
details={"query": query, "role": payload["roles"]}
)
# 4. Permissions check
required_role = "ai.playground.user"
if required_role not in payload["roles"]:
raise PermissionError(f"Missing role: {required_role}")
# 5. Execute the request
response = self.chain.run(query)
return response.strip()
except PermissionError as e:
self.security.log_action(payload["user_id"], "permission_denied", {"error": str(e)})
raise
except Exception as e:
self.security.log_action(payload["user_id"], "error", {"error": str(e), "query": query})
raise
Here's what changed:
- Input Length Limitation: The code now checks the length of the query to prevent overly long requests. This is a simple but effective measure to limit resource usage and reduce the risk of denial-of-service attacks.
- LLM Initialization Error Handling: The
__init__method now catches and re-raises exceptions during LLM (Language Model) initialization. This means that if something goes wrong when setting up the AI model, the system won't just crash silently. It will let you know there's a problem, making it easier to debug. - Clearer Stage Separation: The code now clearly separates different steps, like validating requests, checking tokens, logging actions, and verifying permissions. This makes the code easier to read and understand, and simplifies debugging.
- Use of
Optional[str]: The function now returnsOptional[str]. This shows that the function might not always return a response, which makes the intent much clearer.
4. Strengthened api.py (with Rate Limiting)
Lastly, let's talk about api.py. It's the point of contact between the user and the AI. We've added rate limiting to prevent abuse. Rate limiting restricts the number of requests a user can make within a certain time frame. This is a very common technique to prevent abuse, protect your resources, and keep your AI running smoothly, especially in the face of potential attacks or high traffic.
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from p
I'll continue this section once you provide the code for api.py. However, here's a general overview of the improvements. This setup will protect your API from being overwhelmed by too many requests in a short time. This is a critical feature to ensure your AI service remains available and responsive even under heavy load. This includes a system that keeps track of incoming requests, and, when a rate limit is exceeded, will return an error to prevent further requests. This is all about ensuring that your AI is both secure and available.
In essence, these changes ensure that your AI is robust, secure, and ready to handle whatever comes its way. This makes your AI not just smarter but also a lot more resilient.