1+ """
2+ Security utilities and best practices for the Binance MCP Server.
3+
4+ This module implements security best practices including input validation,
5+ error sanitization, rate limiting, and secure configuration management.
6+ """
7+
8+ import os
9+ import logging
10+ import hashlib
11+ import secrets
12+ from typing import Dict , Any , Optional , List
13+ from functools import wraps
14+
15+ logger = logging .getLogger (__name__ )
16+
17+
18+ class SecurityConfig :
19+ """
20+ Security configuration and validation for MCP server.
21+ """
22+
23+ def __init__ (self ):
24+ self .rate_limit_enabled = os .getenv ("MCP_RATE_LIMIT_ENABLED" , "true" ).lower () == "true"
25+ self .max_requests_per_minute = int (os .getenv ("MCP_MAX_REQUESTS_PER_MINUTE" , "60" ))
26+ self .enable_input_validation = os .getenv ("MCP_INPUT_VALIDATION" , "true" ).lower () == "true"
27+ self .log_security_events = os .getenv ("MCP_LOG_SECURITY" , "true" ).lower () == "true"
28+
29+ def is_secure (self ) -> bool :
30+ """Check if current configuration meets security requirements."""
31+ return all ([
32+ self .rate_limit_enabled ,
33+ self .max_requests_per_minute <= 120 , # Reasonable limit
34+ self .enable_input_validation
35+ ])
36+
37+ def get_security_warnings (self ) -> List [str ]:
38+ """Get list of security configuration warnings."""
39+ warnings = []
40+
41+ if not self .rate_limit_enabled :
42+ warnings .append ("Rate limiting is disabled - this may expose the server to abuse" )
43+
44+ if self .max_requests_per_minute > 120 :
45+ warnings .append ("Rate limit is set too high - recommended maximum is 120 requests/minute" )
46+
47+ if not self .enable_input_validation :
48+ warnings .append ("Input validation is disabled - this is a security risk" )
49+
50+ return warnings
51+
52+
53+ def validate_api_credentials () -> bool :
54+ """
55+ Validate that API credentials are properly configured and secure.
56+
57+ Returns:
58+ bool: True if credentials are valid and secure
59+ """
60+ api_key = os .getenv ("BINANCE_API_KEY" )
61+ api_secret = os .getenv ("BINANCE_API_SECRET" )
62+
63+ if not api_key or not api_secret :
64+ logger .error ("Missing API credentials" )
65+ return False
66+
67+ # Validate API key format (basic check)
68+ if len (api_key ) < 32 :
69+ logger .warning ("API key appears to be too short" )
70+ return False
71+
72+ if len (api_secret ) < 32 :
73+ logger .warning ("API secret appears to be too short" )
74+ return False
75+
76+ # Check for common insecure values
77+ insecure_values = ["test" , "demo" , "example" , "your_key" , "your_secret" ]
78+ if api_key .lower () in insecure_values or api_secret .lower () in insecure_values :
79+ logger .error ("API credentials appear to be placeholder values" )
80+ return False
81+
82+ return True
83+
84+
85+ def secure_hash (data : str ) -> str :
86+ """
87+ Create a secure hash of sensitive data for logging/identification purposes.
88+
89+ Args:
90+ data: The data to hash
91+
92+ Returns:
93+ str: SHA-256 hash of the data
94+ """
95+ return hashlib .sha256 (data .encode ('utf-8' )).hexdigest ()[:16 ] # First 16 chars for brevity
96+
97+
98+ def generate_request_id () -> str :
99+ """
100+ Generate a secure random request ID for tracking.
101+
102+ Returns:
103+ str: Cryptographically secure random request ID
104+ """
105+ return secrets .token_hex (8 )
106+
107+
108+ class SecurityMiddleware :
109+ """
110+ Security middleware for request validation and monitoring.
111+ """
112+
113+ def __init__ (self , config : Optional [SecurityConfig ] = None ):
114+ self .config = config or SecurityConfig ()
115+ self .request_counts = {}
116+
117+ def validate_request (self , request_data : Dict [str , Any ]) -> Dict [str , Any ]:
118+ """
119+ Validate incoming request for security issues.
120+
121+ Args:
122+ request_data: The request data to validate
123+
124+ Returns:
125+ Dict containing validation result and any security warnings
126+ """
127+ warnings = []
128+ is_valid = True
129+
130+ # Check for common injection patterns
131+ if self ._contains_injection_patterns (request_data ):
132+ warnings .append ("Potential injection pattern detected" )
133+ is_valid = False
134+
135+ # Check request size
136+ if self ._request_too_large (request_data ):
137+ warnings .append ("Request size exceeds limits" )
138+ is_valid = False
139+
140+ return {
141+ "valid" : is_valid ,
142+ "warnings" : warnings ,
143+ "request_id" : generate_request_id ()
144+ }
145+
146+ def _contains_injection_patterns (self , data : Dict [str , Any ]) -> bool :
147+ """Check for common injection patterns in request data."""
148+ if not isinstance (data , dict ):
149+ return False
150+
151+ dangerous_patterns = [
152+ 'script' , 'javascript:' , 'onload=' , 'onerror=' ,
153+ 'eval(' , 'exec(' , 'system(' , 'shell_exec' ,
154+ '../' , './' , '..\\ ' , '.\\ ' ,
155+ 'SELECT' , 'INSERT' , 'UPDATE' , 'DELETE' , 'DROP'
156+ ]
157+
158+ data_str = str (data ).lower ()
159+ return any (pattern .lower () in data_str for pattern in dangerous_patterns )
160+
161+ def _request_too_large (self , data : Dict [str , Any ], max_size : int = 1024 * 1024 ) -> bool :
162+ """Check if request data is too large."""
163+ try :
164+ import json
165+ return len (json .dumps (data ).encode ('utf-8' )) > max_size
166+ except (TypeError , ValueError ):
167+ return True
168+
169+
170+ def security_audit_log (event_type : str , details : Dict [str , Any ], level : str = "INFO" ) -> None :
171+ """
172+ Log security-related events with proper formatting.
173+
174+ Args:
175+ event_type: Type of security event
176+ details: Event details (will be sanitized)
177+ level: Log level (INFO, WARNING, ERROR)
178+ """
179+ security_logger = logging .getLogger (f"{ __name__ } .security" )
180+
181+ # Sanitize details to prevent log injection
182+ sanitized_details = {k : _sanitize_log_value (v ) for k , v in details .items ()}
183+
184+ log_entry = {
185+ "event_type" : event_type ,
186+ "details" : sanitized_details ,
187+ "request_id" : generate_request_id ()
188+ }
189+
190+ if level .upper () == "ERROR" :
191+ security_logger .error (f"SECURITY_EVENT: { log_entry } " )
192+ elif level .upper () == "WARNING" :
193+ security_logger .warning (f"SECURITY_EVENT: { log_entry } " )
194+ else :
195+ security_logger .info (f"SECURITY_EVENT: { log_entry } " )
196+
197+
198+ def _sanitize_log_value (value : Any ) -> Any :
199+ """Sanitize values before logging to prevent log injection."""
200+ if isinstance (value , str ):
201+ # Remove control characters and potential log injection patterns
202+ return '' .join (c for c in value if c .isprintable ())[:500 ] # Limit length
203+ return str (value )[:500 ] if value is not None else None
204+
205+
206+ def secure_tool_wrapper (func ):
207+ """
208+ Decorator to add security validation to MCP tools.
209+ """
210+ @wraps (func )
211+ def wrapper (* args , ** kwargs ):
212+ request_id = generate_request_id ()
213+
214+ try :
215+ # Log tool invocation (without sensitive data)
216+ security_audit_log (
217+ "tool_invocation" ,
218+ {
219+ "tool_name" : func .__name__ ,
220+ "request_id" : request_id ,
221+ "args_count" : len (args ),
222+ "kwargs_count" : len (kwargs )
223+ }
224+ )
225+
226+ result = func (* args , ** kwargs )
227+
228+ # Log successful completion
229+ if isinstance (result , dict ) and result .get ("success" ):
230+ security_audit_log (
231+ "tool_success" ,
232+ {"tool_name" : func .__name__ , "request_id" : request_id }
233+ )
234+ else :
235+ security_audit_log (
236+ "tool_failure" ,
237+ {"tool_name" : func .__name__ , "request_id" : request_id },
238+ level = "WARNING"
239+ )
240+
241+ return result
242+
243+ except Exception as e :
244+ security_audit_log (
245+ "tool_exception" ,
246+ {
247+ "tool_name" : func .__name__ ,
248+ "request_id" : request_id ,
249+ "error_type" : type (e ).__name__
250+ },
251+ level = "ERROR"
252+ )
253+ raise
254+
255+ return wrapper
0 commit comments