Writing Custom Packet Sniffers

⚠️
This article is for educational purposes only. Packet sniffing without permission may be illegal. Always obtain proper authorization before conducting any network analysis. Misuse of these techniques may violate privacy laws and network policies.

Introduction

Packet sniffing is a powerful technique used in network analysis and security. It involves capturing and inspecting network traffic to understand communication patterns, diagnose issues, or detect potential security threats. In this article, we’ll explore how to create custom packet sniffers using Python and the Scapy library, with a focus on practical examples and real-world applications.

What is Scapy?

Scapy is a powerful Python library for packet manipulation. It allows you to create, send, capture, and analyze network packets. With Scapy, you can easily build complex network tools and perform various network-related tasks.

Setting Up the Environment

Before we dive into coding, make sure you have Python and Scapy installed. You can install Scapy using pip:

install_scapy.sh
pip install scapy

Basic Packet Sniffer

Let’s start with a simple packet sniffer that captures and prints basic information about network packets:

basic_sniffer.py
from scapy.all import *

def packet_callback(packet):
    if IP in packet:
        print(f"Source IP: {packet[IP].src}, Destination IP: {packet[IP].dst}")
        if TCP in packet:
            print(f"Source Port: {packet[TCP].sport}, Destination Port: {packet[TCP].dport}")
    print("----")

# Start sniffing
sniff(prn=packet_callback, store=0)

This script defines a callback function that prints the source and destination IP addresses for each captured packet. If the packet contains TCP information, it also prints the source and destination ports.

Advanced Packet Analysis

Let’s create a more sophisticated packet analyzer that provides detailed information about various protocols:

advanced_analyzer.py
from scapy.all import *
from collections import defaultdict
import time

class PacketAnalyzer:
    def __init__(self):
        self.packet_count = defaultdict(int)
        self.start_time = time.time()

    def packet_callback(self, packet):
        if IP in packet:
            self.packet_count[packet[IP].proto] += 1

            print(f"Time: {time.time() - self.start_time:.2f}s")
            print(f"Source: {packet[IP].src} -> Destination: {packet[IP].dst}")
            print(f"Protocol: {packet[IP].proto}")

            if TCP in packet:
                print(f"TCP - Sport: {packet[TCP].sport}, Dport: {packet[TCP].dport}")
                if packet[TCP].flags:
                    print(f"Flags: {packet[TCP].flags}")
            elif UDP in packet:
                print(f"UDP - Sport: {packet[UDP].sport}, Dport: {packet[UDP].dport}")
            elif ICMP in packet:
                print(f"ICMP - Type: {packet[ICMP].type}, Code: {packet[ICMP].code}")

            if Raw in packet:
                print(f"Payload: {packet[Raw].load[:50]}...")

            print("----")

    def run(self, duration=60):
        print(f"Starting packet analysis for {duration} seconds...")
        sniff(prn=self.packet_callback, store=0, timeout=duration)
        self.print_summary()

    def print_summary(self):
        print("\nPacket Analysis Summary:")
        total = sum(self.packet_count.values())
        for proto, count in sorted(self.packet_count.items(), key=lambda x: x[1], reverse=True):
            print(f"Protocol {proto}: {count} packets ({count/total*100:.2f}%)")

        # Generate Mermaid pie chart
        mermaid_code = "```mermaid\npie title Protocol Distribution\n"
        for proto, count in self.packet_count.items():
            mermaid_code += f'    "{proto}" : {count}\n'
        mermaid_code += "```"
        print("\nProtocol Distribution Chart:")
        print(mermaid_code)

if __name__ == "__main__":
    analyzer = PacketAnalyzer()
    analyzer.run()

This advanced analyzer provides a more comprehensive view of network traffic, including:

  1. Detailed packet information for various protocols (TCP, UDP, ICMP)
  2. Payload preview for packets containing raw data
  3. Real-time packet counting and protocol distribution
  4. A summary report with percentage breakdown of protocols
  5. A Mermaid pie chart visualization of the protocol distribution

Real-Life Example: Network Intrusion Detection

Let’s create a simple network intrusion detection system (IDS) that monitors for potential security threats:

network_ids.py
from scapy.all import *
from collections import defaultdict
import time

class NetworkIDS:
    def __init__(self):
        self.connection_count = defaultdict(int)
        self.packet_count = defaultdict(int)
        self.start_time = time.time()
        self.alerts = []

    def packet_callback(self, packet):
        if IP in packet:
            src_ip = packet[IP].src
            dst_ip = packet[IP].dst
            self.packet_count[src_ip] += 1

            if TCP in packet:
                if packet[TCP].flags & 0x02:  # SYN flag
                    self.connection_count[src_ip] += 1

                # Check for potential port scanning
                if self.connection_count[src_ip] > 10 and len(set(pkt[TCP].dport for pkt in packet[src_ip])) > 5:
                    self.alerts.append(f"Possible port scan detected from {src_ip}")

            # Check for potential DoS attack
            if self.packet_count[src_ip] > 1000 and (time.time() - self.start_time) < 60:
                self.alerts.append(f"Possible DoS attack detected from {src_ip}")

    def run(self, duration=300):
        print(f"Starting Network IDS for {duration} seconds...")
        sniff(prn=self.packet_callback, store=0, timeout=duration)
        self.print_summary()

    def print_summary(self):
        print("\nNetwork IDS Summary:")
        if self.alerts:
            print("Alerts detected:")
            for alert in self.alerts:
                print(f"- {alert}")
        else:
            print("No alerts detected.")

        # Generate Mermaid graph for top connections
        mermaid_code = "```mermaid\ngraph TD\n"
        for src_ip, count in sorted(self.connection_count.items(), key=lambda x: x[1], reverse=True)[:5]:
            mermaid_code += f'    {src_ip.replace(".", "_")}["{src_ip}"] --> Connections{count}["{count} connections"]\n'
        mermaid_code += "```"
        print("\nTop Connections:")
        print(mermaid_code)

if __name__ == "__main__":
    ids = NetworkIDS()
    ids.run()

This Network IDS example demonstrates:

  1. Monitoring for potential port scanning activities
  2. Detecting possible DoS (Denial of Service) attacks
  3. Generating alerts for suspicious activities
  4. Providing a summary of detected threats
  5. Visualizing top connections using a Mermaid graph

Conclusion

Custom packet sniffers and network analysis tools are powerful assets for network security and troubleshooting. With Python and Scapy, you can create sophisticated solutions tailored to your specific needs. Remember to use these tools responsibly and always obtain proper authorization before analyzing network traffic.

Further Reading

Mathematical Background

For those interested in the mathematical aspects of network analysis, here’s a brief introduction to the concept of entropy in network traffic:

The entropy $H$ of a discrete random variable $X$ with possible values ${x_1, …, x_n}$ is defined as:

$$ H(X) = -\sum_{i=1}^n p(x_i) \log_2 p(x_i) $$

Where $p(x_i)$ is the probability of $X$ taking the value $x_i$. In network analysis, entropy can be used to measure the randomness or unpredictability of network traffic, which can be useful for detecting anomalies or potential security threats.

For example, we can calculate the entropy of packet sizes or inter-arrival times to detect unusual patterns that might indicate malicious activity. A sudden decrease in entropy could suggest a DoS attack with uniform packet sizes, while an increase might indicate a diverse range of traffic potentially masking malicious activities.