Custom Python Fuzzers for Vulnerabilities
Fuzzing is a powerful technique used in software testing and security research to uncover vulnerabilities and bugs in applications. By generating and sending unexpected or malformed inputs to a program, fuzzers can reveal edge cases and potential security flaws that might otherwise go unnoticed. In this article, we’ll explore how to create custom Python fuzzers to enhance your vulnerability discovery process, focusing on modern approaches and hardware acceleration techniques.
Understanding Fuzzing
Fuzzing involves:
- Generating random or semi-random input data
- Feeding that data into the target application
- Monitoring for unexpected behavior, crashes, or other issues
Python’s flexibility and rich ecosystem make it an excellent choice for creating custom fuzzers. Let’s dive into some practical examples using modern approaches and hardware acceleration when possible.
Basic Random Fuzzer with NumPy
Let’s start with a simple fuzzer that generates random strings using NumPy for improved performance:
import numpy as np
import string
def generate_random_string(length):
return ''.join(np.random.choice(list(string.printable), size=length))
def basic_fuzzer(target_function, num_tests=1000, max_length=100):
for _ in range(num_tests):
test_input = generate_random_string(np.random.randint(1, max_length + 1))
try:
target_function(test_input)
except Exception as e:
print(f"Error found with input: {test_input}")
print(f"Exception: {e}")
# Example usage
def sample_function(input_string):
if len(input_string) > 50 and 'x' in input_string:
raise ValueError("Invalid input")
return input_string.upper()
basic_fuzzer(sample_function)
This basic fuzzer uses NumPy’s random number generation, which can be faster than Python’s built-in random
module, especially for large numbers of tests.
Mutation-based Fuzzer with Parallel Processing
Here’s a mutation-based fuzzer that uses parallel processing to speed up the fuzzing process:
import numpy as np
import multiprocessing as mp
def mutate_string(input_string):
mutations = [
lambda s: s + chr(np.random.randint(32, 127)),
lambda s: s[:np.random.randint(0, len(s) + 1)],
lambda s: s.replace(np.random.choice(list(s)), chr(np.random.randint(32, 127))),
]
return np.random.choice(mutations)(input_string)
def process_mutation(args):
seed, target_function, num_mutations = args
for _ in range(num_mutations):
mutated_input = mutate_string(seed)
try:
target_function(mutated_input)
except Exception as e:
return f"Error found with input: {mutated_input}\nException: {e}"
return None
def parallel_mutation_fuzzer(target_function, seed_inputs, num_mutations=1000, num_processes=mp.cpu_count()):
with mp.Pool(processes=num_processes) as pool:
args = [(seed, target_function, num_mutations // len(seed_inputs)) for seed in seed_inputs]
results = pool.map(process_mutation, args)
for result in results:
if result:
print(result)
# Example usage
def parse_date(date_string):
import datetime
return datetime.datetime.strptime(date_string, "%Y-%m-%d")
seed_inputs = ["2023-01-01", "2023-12-31", "2023-06-15"]
parallel_mutation_fuzzer(parse_date, seed_inputs)
This fuzzer leverages multiple CPU cores to perform mutations and tests in parallel, significantly speeding up the fuzzing process.
Coverage-guided Fuzzer with PyPy
For improved performance, we can use PyPy, a Just-In-Time (JIT) compiler for Python. Here’s a coverage-guided fuzzer optimized for PyPy:
import random
from coverage import Coverage
def mutate_string(input_string):
mutations = [
lambda s: s + chr(random.randint(32, 126)),
lambda s: s[:random.randint(0, len(s))],
lambda s: s.replace(random.choice(s), chr(random.randint(32, 126))),
]
return random.choice(mutations)(input_string)
def coverage_guided_fuzzer(target_function, seed_inputs, num_iterations=1000):
cov = Coverage()
best_coverage = 0
best_input = None
for _ in range(num_iterations):
test_input = random.choice(seed_inputs)
mutated_input = mutate_string(test_input)
cov.start()
try:
target_function(mutated_input)
except Exception as e:
print(f"Error found with input: {mutated_input}")
print(f"Exception: {e}")
finally:
cov.stop()
current_coverage = len(cov.get_data().measured_files())
if current_coverage > best_coverage:
best_coverage = current_coverage
best_input = mutated_input
seed_inputs.append(mutated_input)
print(f"Best coverage achieved: {best_coverage}")
print(f"Best input: {best_input}")
# Example usage
def complex_function(input_string):
if len(input_string) > 10:
if 'a' in input_string:
if input_string.endswith('xyz'):
raise ValueError("Specific condition met")
return input_string.upper()
seed_inputs = ["hello", "world", "fuzzing"]
coverage_guided_fuzzer(complex_function, seed_inputs)
To run this fuzzer with PyPy for improved performance, save it as pypy_coverage_fuzzer.py
and run:
pypy3 pypy_coverage_fuzzer.py
Real-life Example: Fuzzing a JSON Parser
Let’s create a more realistic example by fuzzing a JSON parser. We’ll use the orjson
library, which is a fast JSON parser/serializer written in Rust with Python bindings.
import orjson
import random
import string
from concurrent.futures import ProcessPoolExecutor, as_completed
def generate_json_like_string(max_depth=5, max_length=100):
if max_depth == 0 or random.random() < 0.1:
return random.choice([
''.join(random.choices(string.printable, k=random.randint(1, max_length))),
str(random.randint(-1000000, 1000000)),
str(random.uniform(-1000000, 1000000)),
random.choice(['true', 'false', 'null'])
])
if random.random() < 0.5:
return '{' + ','.join([
f'"{generate_json_like_string(1, 10)}": {generate_json_like_string(max_depth-1, max_length)}'
for _ in range(random.randint(1, 5))
]) + '}'
else:
return '[' + ','.join([
generate_json_like_string(max_depth-1, max_length)
for _ in range(random.randint(1, 5))
]) + ']'
def fuzz_json(num_tests):
for _ in range(num_tests):
test_input = generate_json_like_string()
try:
orjson.loads(test_input)
except orjson.JSONDecodeError as e:
# Expected for invalid JSON
pass
except Exception as e:
print(f"Unexpected error with input: {test_input}")
print(f"Exception: {e}")
def parallel_json_fuzzer(num_tests=10000, num_processes=8):
tests_per_process = num_tests // num_processes
with ProcessPoolExecutor(max_workers=num_processes) as executor:
futures = [executor.submit(fuzz_json, tests_per_process) for _ in range(num_processes)]
for future in as_completed(futures):
future.result()
if __name__ == '__main__':
parallel_json_fuzzer()
This fuzzer generates JSON-like strings and attempts to parse them using the orjson
library. It uses multi-processing to leverage multiple CPU cores for faster fuzzing. Here’s a detailed explanation of the code:
- We define a
generate_json_like_string
function that recursively generates JSON-like structures, including nested objects and arrays. - The
fuzz_json
function generates test inputs and tries to parse them withorjson.loads()
. - We catch
JSONDecodeError
separately as it’s expected for invalid JSON. Any other exception is considered unexpected and logged. - The
parallel_json_fuzzer
function sets up multiple processes to run the fuzzing in parallel.
To use this fuzzer, you’ll need to install the orjson
library:
pip install orjson
Then run the fuzzer:
python json_fuzzer.py
This example demonstrates how to create a practical fuzzer for a real-world scenario, using modern libraries and parallel processing for improved performance.
Visualizing the JSON Fuzzing Process
Here’s a Mermaid diagram illustrating the JSON fuzzing process:
graph TD A[Generate JSON-like String] --> B[Attempt to Parse with orjson] B --> C{Parse Successful?} C -->|Yes| D[Continue to Next Test] C -->|No| E{JSONDecodeError?} E -->|Yes| F[Expected Error, Continue] E -->|No| G[Log Unexpected Error] D --> A F --> A G --> H[Analyze Results] H --> I[Improve Fuzzer] I --> A
This diagram shows the flow of the JSON fuzzing process, from generating test inputs to handling different types of parsing results.
Mathematical Representation of Fuzzing Coverage
We can represent the coverage of our JSON fuzzer mathematically using set theory:
Let $J$ be the set of all possible JSON-like strings, and $V \subset J$ be the set of valid JSON strings.
Define the parsing function $P: J \rightarrow {0, 1, 2}$ as:
$$ P(j) = \begin{cases} 0 & \text{if } j \text{ is parsed successfully} \ 1 & \text{if } j \text{ raises JSONDecodeError} \ 2 & \text{if } j \text{ raises any other exception} \end{cases} $$
The goal of our fuzzer is to explore the set $U = {j \in J : P(j) = 2}$, which represents unexpected parsing behavior.
The coverage of our fuzzer can be expressed as:
$$ C = \frac{|J_{\text{tested}} \cap U|}{|U|} $$
Where $J_{\text{tested}}$ is the set of JSON-like strings tested by our fuzzer.
Conclusion
Custom Python fuzzers are powerful tools for discovering vulnerabilities in software. By using modern approaches like parallel processing, JIT compilation with PyPy, and hardware-accelerated libraries, we can create more efficient and effective fuzzers.
Remember to always fuzz responsibly and ethically, and use these techniques to improve the security and reliability of your own software or systems you have permission to test.
As you develop your fuzzing skills, consider exploring more advanced techniques such as:
- Integrating with sanitizers (e.g., AddressSanitizer)
- Distributed fuzzing across multiple machines
- Combining different fuzzing strategies for maximum effectiveness
For more detailed research on fuzzing techniques and their applications, check out these resources:
- The Art, Science, and Engineering of Fuzzing: A Survey
- FuzzBench: An Open Fuzzer Benchmarking Platform and Service
- American Fuzzy Lop (AFL) - A security-oriented fuzzer
Happy fuzzing, and may your bug hunts be fruitful!