Automated Port Scanning With Python
Port scanning is a crucial technique in network security and administration. It allows you to discover open ports on a target system, which can be useful for network mapping, vulnerability assessment, and security auditing. In this article, we’ll explore how to perform automated port scanning using Python, covering various methods and providing practical code examples.
Basic Port Scanning
Let’s start with a simple port scanner that checks if a specific port is open on a given host:
import socket
def is_port_open(host, port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(1)
return sock.connect_ex((host, port)) == 0
host = "example.com"
port = 80
if is_port_open(host, port):
print(f"Port {port} is open on {host}")
else:
print(f"Port {port} is closed on {host}")
This script uses Python’s socket
module to attempt a connection to the specified host and port. If the connection is successful, the port is considered open.
Scanning Multiple Ports
To scan a range of ports, we can modify our script to iterate through a list of port numbers:
import socket
from concurrent.futures import ThreadPoolExecutor
def scan_port(host, port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(1)
result = sock.connect_ex((host, port))
if result == 0:
return port
return None
def scan_ports(host, start_port, end_port):
with ThreadPoolExecutor(max_workers=100) as executor:
futures = [executor.submit(scan_port, host, port) for port in range(start_port, end_port + 1)]
open_ports = [future.result() for future in futures if future.result() is not None]
return open_ports
host = "example.com"
start_port = 1
end_port = 1024
open_ports = scan_ports(host, start_port, end_port)
print(f"Open ports on {host}: {open_ports}")
This script uses ThreadPoolExecutor
to scan ports concurrently, significantly improving the scanning speed.
Advanced Port Scanning with Service Detection
Let’s create a more advanced scanner that not only detects open ports but also attempts to identify the services running on them:
import socket
import concurrent.futures
import threading
def get_service_banner(host, port):
try:
with socket.create_connection((host, port), timeout=1) as sock:
sock.send(b"GET / HTTP/1.1\r\nHost: " + host.encode() + b"\r\n\r\n")
return sock.recv(1024).decode('utf-8', errors='ignore').strip()
except:
return None
def scan_port(host, port):
try:
with socket.create_connection((host, port), timeout=1):
banner = get_service_banner(host, port)
return port, banner
except:
return None
def scan_ports(host, ports):
open_ports = []
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
results = executor.map(lambda p: scan_port(host, p), ports)
for result in results:
if result:
open_ports.append(result)
return open_ports
def main():
host = "example.com"
ports = range(1, 1025)
print(f"Scanning {host}...")
open_ports = scan_ports(host, ports)
print(f"\nOpen ports on {host}:")
for port, banner in open_ports:
print(f"Port {port}: {'Unknown' if banner is None else banner.split('\n')[0]}")
if __name__ == "__main__":
main()
This script not only detects open ports but also attempts to retrieve service banners, providing more information about the services running on each port.
Real-life Example: Network Service Auditing
Let’s create a practical example of how port scanning can be used for network service auditing. This script will scan a range of IP addresses in a local network and report on the services found:
import ipaddress
import socket
import concurrent.futures
from collections import defaultdict
def get_hostname(ip):
try:
return socket.gethostbyaddr(ip)[0]
except socket.herror:
return None
def scan_port(ip, port):
try:
with socket.create_connection((ip, port), timeout=1):
service = socket.getservbyport(port)
return port, service
except:
return None
def scan_host(ip):
hostname = get_hostname(ip)
open_ports = []
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
common_ports = [21, 22, 23, 25, 53, 80, 110, 143, 443, 465, 587, 993, 995, 3306, 3389, 5432, 8080]
results = executor.map(lambda p: scan_port(ip, p), common_ports)
for result in results:
if result:
open_ports.append(result)
return ip, hostname, open_ports
def main():
network = "192.168.1.0/24" # Replace with your network
ip_network = ipaddress.ip_network(network)
print(f"Scanning network: {network}")
results = defaultdict(list)
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = [executor.submit(scan_host, str(ip)) for ip in ip_network.hosts()]
for future in concurrent.futures.as_completed(futures):
ip, hostname, open_ports = future.result()
if open_ports:
results[ip] = (hostname, open_ports)
print("\nScan Results:")
for ip, (hostname, ports) in results.items():
print(f"\nIP: {ip}")
print(f"Hostname: {hostname or 'Unknown'}")
print("Open Ports:")
for port, service in ports:
print(f" {port}/tcp: {service}")
print(f"\nTotal hosts with open ports: {len(results)}")
if __name__ == "__main__":
main()
This script scans a local network (192.168.1.0/24) for common services on well-known ports. It provides a detailed report of open ports and services for each responsive host in the network.
Visualizing Network Scan Results
We can use Mermaid diagrams to visualize the results of our network scan:
import ipaddress
import socket
import concurrent.futures
from collections import defaultdict
# ... [Previous scan_port and scan_host functions] ...
def generate_mermaid(results):
mermaid = "```mermaid\ngraph TD\n"
mermaid += "A[Network] --> B{Scanner}\n"
for i, (ip, (hostname, ports)) in enumerate(results.items()):
node_id = f"C{i}"
mermaid += f"B --> {node_id}[{ip}]\n"
for j, (port, service) in enumerate(ports):
mermaid += f"{node_id} --> D{i}_{j}[{service} on {port}]\n"
mermaid += "```"
return mermaid
def main():
network = "192.168.1.0/24" # Replace with your network
ip_network = ipaddress.ip_network(network)
print(f"Scanning network: {network}")
results = defaultdict(list)
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = [executor.submit(scan_host, str(ip)) for ip in ip_network.hosts()]
for future in concurrent.futures.as_completed(futures):
ip, hostname, open_ports = future.result()
if open_ports:
results[ip] = (hostname, open_ports)
mermaid_diagram = generate_mermaid(results)
print("\nNetwork Scan Visualization:")
print(mermaid_diagram)
if __name__ == "__main__":
main()
This script generates a Mermaid diagram that visualizes the network structure and open ports discovered during the scan. Here’s an example of how the output might look:
graph TD A[Network] --> B{Scanner} B --> C0[192.168.1.1] C0 --> D0_0[HTTP on 80] C0 --> D0_1[HTTPS on 443] B --> C1[192.168.1.10] C1 --> D1_0[SSH on 22] C1 --> D1_1[HTTP on 80]
Port Scanning Performance Optimization
To further optimize our port scanning process, we can use the asyncio
library for asynchronous I/O operations:
import asyncio
import aiodns
import socket
async def async_dns_lookup(host):
resolver = aiodns.DNSResolver()
try:
result = await resolver.gethostbyname(host, socket.AF_INET)
return result.addresses[0]
except aiodns.error.DNSError:
return None
async def scan_port(ip, port):
try:
_, writer = await asyncio.wait_for(asyncio.open_connection(ip, port), timeout=1)
writer.close()
await writer.wait_closed()
return port
except (asyncio.TimeoutError, ConnectionRefusedError):
return None
async def scan_host(host, ports):
ip = await async_dns_lookup(host)
if not ip:
print(f"Could not resolve hostname: {host}")
return
print(f"Scanning {host} ({ip})...")
tasks = [scan_port(ip, port) for port in ports]
results = await asyncio.gather(*tasks)
open_ports = [port for port in results if port is not None]
print(f"Open ports on {host} ({ip}): {open_ports}")
async def main():
host = "example.com"
ports = range(1, 1025)
await scan_host(host, ports)
if __name__ == "__main__":
asyncio.run(main())
This asynchronous approach allows for efficient scanning of multiple ports simultaneously, significantly reducing the overall scan time.
Mathematical Analysis of Scanning Efficiency
We can use LaTeX to describe the efficiency of our scanning process:
Let $n$ be the number of ports to scan, $t$ be the timeout for each connection attempt, and $c$ be the number of concurrent connections. The total scan time $T$ can be estimated as:
$$T \approx \frac{n \times t}{c}$$
For example, if we scan 1000 ports with a 1-second timeout and 100 concurrent connections:
$$T \approx \frac{1000 \times 1s}{100} = 10s$$
This formula demonstrates the importance of concurrency in reducing scan times. However, it’s important to note that increasing concurrency beyond a certain point may not yield proportional improvements due to system and network limitations.
Conclusion
Automated port scanning with Python offers powerful ways to gather information about network services and potential vulnerabilities. By leveraging modern techniques such as multithreading, asynchronous I/O, and efficient libraries, we can create fast and effective port scanners for various use cases.
Remember to always use these tools responsibly and within the bounds of your authorization. Stay updated with the latest security practices and legal regulations regarding network scanning.
For more detailed information on port scanning techniques and their implications, refer to the following resources:
- NMAP Network Scanning
- RFC 6335 - Internet Assigned Numbers Authority (IANA) Procedures for the Management of the Service Name and Transport Protocol Port Number Registry
- OWASP Information Gathering
These resources provide in-depth knowledge about port scanning techniques, protocol specifications, and security implications.