SickRage/lib/hachoir_parser/network/tcpdump.py

506 lines
16 KiB
Python

"""
Tcpdump parser
Source:
* libpcap source code (file savefile.c)
* RFC 791 (IPv4)
* RFC 792 (ICMP)
* RFC 793 (TCP)
* RFC 1122 (Requirements for Internet Hosts)
Author: Victor Stinner
Creation: 23 march 2006
"""
from lib.hachoir_parser import Parser
from lib.hachoir_core.field import (FieldSet, ParserError,
Enum, Bytes, NullBytes, RawBytes,
UInt8, UInt16, UInt32, Int32, TimestampUnix32,
Bit, Bits, NullBits)
from lib.hachoir_core.endian import NETWORK_ENDIAN, LITTLE_ENDIAN
from lib.hachoir_core.tools import humanDuration
from lib.hachoir_core.text_handler import textHandler, hexadecimal
from lib.hachoir_core.tools import createDict
from lib.hachoir_parser.network.common import MAC48_Address, IPv4_Address, IPv6_Address
def diff(field):
return humanDuration(field.value*1000)
class Layer(FieldSet):
endian = NETWORK_ENDIAN
def parseNext(self, parent):
return None
class ARP(Layer):
opcode_name = {
1: "request",
2: "reply"
}
endian = NETWORK_ENDIAN
def createFields(self):
yield UInt16(self, "hw_type")
yield UInt16(self, "proto_type")
yield UInt8(self, "hw_size")
yield UInt8(self, "proto_size")
yield Enum(UInt16(self, "opcode"), ARP.opcode_name)
yield MAC48_Address(self, "src_mac")
yield IPv4_Address(self, "src_ip")
yield MAC48_Address(self, "dst_mac")
yield IPv4_Address(self, "dst_ip")
def createDescription(self):
desc = "ARP: %s" % self["opcode"].display
opcode = self["opcode"].value
src_ip = self["src_ip"].display
dst_ip = self["dst_ip"].display
if opcode == 1:
desc += ", %s ask %s" % (dst_ip, src_ip)
elif opcode == 2:
desc += " from %s" % src_ip
return desc
class TCP_Option(FieldSet):
NOP = 1
MAX_SEGMENT = 2
WINDOW_SCALE = 3
SACK = 4
TIMESTAMP = 8
code_name = {
NOP: "NOP",
MAX_SEGMENT: "Max segment size",
WINDOW_SCALE: "Window scale",
SACK: "SACK permitted",
TIMESTAMP: "Timestamp"
}
def __init__(self, *args):
FieldSet.__init__(self, *args)
if self["code"].value != self.NOP:
self._size = self["length"].value * 8
else:
self._size = 8
def createFields(self):
yield Enum(UInt8(self, "code", "Code"), self.code_name)
code = self["code"].value
if code == self.NOP:
return
yield UInt8(self, "length", "Option size in bytes")
if code == self.MAX_SEGMENT:
yield UInt16(self, "max_seg", "Maximum segment size")
elif code == self.WINDOW_SCALE:
yield UInt8(self, "win_scale", "Window scale")
elif code == self.TIMESTAMP:
yield UInt32(self, "ts_val", "Timestamp value")
yield UInt32(self, "ts_ecr", "Timestamp echo reply")
else:
size = (self.size - self.current_size) // 8
if size:
yield RawBytes(self, "data", size)
def createDescription(self):
return "TCP option: %s" % self["code"].display
class TCP(Layer):
port_name = {
13: "daytime",
20: "ftp data",
21: "ftp",
23: "telnet",
25: "smtp",
53: "dns",
63: "dhcp/bootp",
80: "HTTP",
110: "pop3",
119: "nntp",
123: "ntp",
139: "netbios session service",
1863: "MSNMS",
6667: "IRC"
}
def createFields(self):
yield Enum(UInt16(self, "src"), self.port_name)
yield Enum(UInt16(self, "dst"), self.port_name)
yield UInt32(self, "seq_num")
yield UInt32(self, "ack_num")
yield Bits(self, "hdrlen", 6, "Header lenght")
yield NullBits(self, "reserved", 2, "Reserved")
yield Bit(self, "cgst", "Congestion Window Reduced")
yield Bit(self, "ecn-echo", "ECN-echo")
yield Bit(self, "urg", "Urgent")
yield Bit(self, "ack", "Acknowledge")
yield Bit(self, "psh", "Push mmode")
yield Bit(self, "rst", "Reset connection")
yield Bit(self, "syn", "Synchronize")
yield Bit(self, "fin", "Stop the connection")
yield UInt16(self, "winsize", "Windows size")
yield textHandler(UInt16(self, "checksum"), hexadecimal)
yield UInt16(self, "urgent")
size = self["hdrlen"].value*8 - self.current_size
while 0 < size:
option = TCP_Option(self, "option[]")
yield option
size -= option.size
def parseNext(self, parent):
return None
def createDescription(self):
src = self["src"].value
dst = self["dst"].value
if src < 32768:
src = self["src"].display
else:
src = None
if dst < 32768:
dst = self["dst"].display
else:
dst = None
desc = "TCP"
if src != None and dst != None:
desc += " (%s->%s)" % (src, dst)
elif src != None:
desc += " (%s->)" % (src)
elif dst != None:
desc += " (->%s)" % (dst)
# Get flags
flags = []
if self["syn"].value:
flags.append("SYN")
if self["ack"].value:
flags.append("ACK")
if self["fin"].value:
flags.append("FIN")
if self["rst"].value:
flags.append("RST")
if flags:
desc += " [%s]" % (",".join(flags))
return desc
class UDP(Layer):
port_name = {
12: "daytime",
22: "ssh",
53: "DNS",
67: "dhcp/bootp",
80: "http",
110: "pop3",
123: "ntp",
137: "netbios name service",
138: "netbios datagram service"
}
def createFields(self):
yield Enum(UInt16(self, "src"), UDP.port_name)
yield Enum(UInt16(self, "dst"), UDP.port_name)
yield UInt16(self, "length")
yield textHandler(UInt16(self, "checksum"), hexadecimal)
def createDescription(self):
return "UDP (%s->%s)" % (self["src"].display, self["dst"].display)
class ICMP(Layer):
REJECT = 3
PONG = 0
PING = 8
type_desc = {
PONG: "Pong",
REJECT: "Reject",
PING: "Ping"
}
reject_reason = {
0: "net unreachable",
1: "host unreachable",
2: "protocol unreachable",
3: "port unreachable",
4: "fragmentation needed and DF set",
5: "source route failed",
6: "Destination network unknown error",
7: "Destination host unknown error",
8: "Source host isolated error",
9: "Destination network administratively prohibited",
10: "Destination host administratively prohibited",
11: "Unreachable network for Type Of Service",
12: "Unreachable host for Type Of Service.",
13: "Communication administratively prohibited",
14: "Host precedence violation",
15: "Precedence cutoff in effect"
}
def createFields(self):
# Type
yield Enum(UInt8(self, "type"), self.type_desc)
type = self["type"].value
# Code
field = UInt8(self, "code")
if type == 3:
field = Enum(field, self.reject_reason)
yield field
# Options
yield textHandler(UInt16(self, "checksum"), hexadecimal)
if type in (self.PING, self.PONG): # and self["code"].value == 0:
yield UInt16(self, "id")
yield UInt16(self, "seq_num")
# follow: ping data
elif type == self.REJECT:
yield NullBytes(self, "empty", 2)
yield UInt16(self, "hop_mtu", "Next-Hop MTU")
def createDescription(self):
type = self["type"].value
if type in (self.PING, self.PONG):
return "%s (num=%s)" % (self["type"].display, self["seq_num"].value)
else:
return "ICMP (%s)" % self["type"].display
def parseNext(self, parent):
if self["type"].value == self.REJECT:
return IPv4(parent, "rejected_ipv4")
else:
return None
class ICMPv6(Layer):
ECHO_REQUEST = 128
ECHO_REPLY = 129
TYPE_DESC = {
128: "Echo request",
129: "Echo reply",
}
def createFields(self):
yield Enum(UInt8(self, "type"), self.TYPE_DESC)
yield UInt8(self, "code")
yield textHandler(UInt16(self, "checksum"), hexadecimal)
if self['type'].value in (self.ECHO_REQUEST, self.ECHO_REPLY):
yield UInt16(self, "id")
yield UInt16(self, "sequence")
def createDescription(self):
if self['type'].value in (self.ECHO_REQUEST, self.ECHO_REPLY):
return "%s (num=%s)" % (self["type"].display, self["sequence"].value)
else:
return "ICMPv6 (%s)" % self["type"].display
class IP(Layer):
PROTOCOL_INFO = {
1: ("icmp", ICMP, "ICMP"),
6: ("tcp", TCP, "TCP"),
17: ("udp", UDP, "UDP"),
58: ("icmpv6", ICMPv6, "ICMPv6"),
60: ("ipv6_opts", None, "IPv6 destination option"),
}
PROTOCOL_NAME = createDict(PROTOCOL_INFO, 2)
def parseNext(self, parent):
proto = self["protocol"].value
if proto not in self.PROTOCOL_INFO:
return None
name, parser, desc = self.PROTOCOL_INFO[proto]
if not parser:
return None
return parser(parent, name)
class IPv4(IP):
precedence_name = {
7: "Network Control",
6: "Internetwork Control",
5: "CRITIC/ECP",
4: "Flash Override",
3: "Flash",
2: "Immediate",
1: "Priority",
0: "Routine",
}
def __init__(self, *args):
FieldSet.__init__(self, *args)
self._size = self["hdr_size"].value * 32
def createFields(self):
yield Bits(self, "version", 4, "Version")
yield Bits(self, "hdr_size", 4, "Header size divided by 5")
# Type of service
yield Enum(Bits(self, "precedence", 3, "Precedence"), self.precedence_name)
yield Bit(self, "low_delay", "If set, low delay, else normal delay")
yield Bit(self, "high_throu", "If set, high throughput, else normal throughput")
yield Bit(self, "high_rel", "If set, high relibility, else normal")
yield NullBits(self, "reserved[]", 2, "(reserved for future use)")
yield UInt16(self, "length")
yield UInt16(self, "id")
yield NullBits(self, "reserved[]", 1)
yield Bit(self, "df", "Don't fragment")
yield Bit(self, "more_frag", "There are more fragments? if not set, it's the last one")
yield Bits(self, "frag_ofst_lo", 5)
yield UInt8(self, "frag_ofst_hi")
yield UInt8(self, "ttl", "Type to live")
yield Enum(UInt8(self, "protocol"), self.PROTOCOL_NAME)
yield textHandler(UInt16(self, "checksum"), hexadecimal)
yield IPv4_Address(self, "src")
yield IPv4_Address(self, "dst")
size = (self.size - self.current_size) // 8
if size:
yield RawBytes(self, "options", size)
def createDescription(self):
return "IPv4 (%s>%s)" % (self["src"].display, self["dst"].display)
class IPv6(IP):
static_size = 40 * 8
endian = NETWORK_ENDIAN
def createFields(self):
yield Bits(self, "version", 4, "Version (6)")
yield Bits(self, "traffic", 8, "Traffic class")
yield Bits(self, "flow", 20, "Flow label")
yield Bits(self, "length", 16, "Payload length")
yield Enum(Bits(self, "protocol", 8, "Next header"), self.PROTOCOL_NAME)
yield Bits(self, "hop_limit", 8, "Hop limit")
yield IPv6_Address(self, "src")
yield IPv6_Address(self, "dst")
def createDescription(self):
return "IPv6 (%s>%s)" % (self["src"].display, self["dst"].display)
class Layer2(Layer):
PROTO_INFO = {
0x0800: ("ipv4", IPv4, "IPv4"),
0x0806: ("arp", ARP, "ARP"),
0x86dd: ("ipv6", IPv6, "IPv6"),
}
PROTO_DESC = createDict(PROTO_INFO, 2)
def parseNext(self, parent):
try:
name, parser, desc = self.PROTO_INFO[ self["protocol"].value ]
return parser(parent, name)
except KeyError:
return None
class Unicast(Layer2):
packet_type_name = {
0: "Unicast to us"
}
def createFields(self):
yield Enum(UInt16(self, "packet_type"), self.packet_type_name)
yield UInt16(self, "addr_type", "Link-layer address type")
yield UInt16(self, "addr_length", "Link-layer address length")
length = self["addr_length"].value
length = 8 # FIXME: Should we use addr_length or not?
if length:
yield RawBytes(self, "source", length)
yield Enum(UInt16(self, "protocol"), self.PROTO_DESC)
class Ethernet(Layer2):
static_size = 14*8
def createFields(self):
yield MAC48_Address(self, "dst")
yield MAC48_Address(self, "src")
yield Enum(UInt16(self, "protocol"), self.PROTO_DESC)
def createDescription(self):
return "Ethernet: %s>%s (%s)" % \
(self["src"].display, self["dst"].display, self["protocol"].display)
class Packet(FieldSet):
endian = LITTLE_ENDIAN
def __init__(self, parent, name, parser, first_name):
FieldSet.__init__(self, parent, name)
self._size = (16 + self["caplen"].value) * 8
self._first_parser = parser
self._first_name = first_name
def createFields(self):
yield TimestampUnix32(self, "ts_epoch", "Timestamp (Epoch)")
yield UInt32(self, "ts_nanosec", "Timestamp (nano second)")
yield UInt32(self, "caplen", "length of portion present")
yield UInt32(self, "len", "length this packet (off wire)")
# Read different layers
field = self._first_parser(self, self._first_name)
while field:
yield field
field = field.parseNext(self)
# Read data if any
size = (self.size - self.current_size) // 8
if size:
yield RawBytes(self, "data", size)
def getTimestamp(self):
nano_sec = float(self["ts_nanosec"].value) / 100
from datetime import timedelta
return self["ts_epoch"].value + timedelta(microseconds=nano_sec)
def createDescription(self):
t0 = self["/packet[0]"].getTimestamp()
# ts = max(self.getTimestamp() - t0, t0)
ts = self.getTimestamp() - t0
#text = ["%1.6f: " % ts]
text = ["%s: " % ts]
if "icmp" in self:
text.append(self["icmp"].description)
elif "tcp" in self:
text.append(self["tcp"].description)
elif "udp" in self:
text.append(self["udp"].description)
elif "arp" in self:
text.append(self["arp"].description)
else:
text.append("Packet")
return "".join(text)
class TcpdumpFile(Parser):
PARSER_TAGS = {
"id": "tcpdump",
"category": "misc",
"min_size": 24*8,
"description": "Tcpdump file (network)",
"magic": (("\xd4\xc3\xb2\xa1", 0),),
}
endian = LITTLE_ENDIAN
LINK_TYPE = {
1: ("ethernet", Ethernet),
113: ("unicast", Unicast),
}
LINK_TYPE_DESC = createDict(LINK_TYPE, 0)
def validate(self):
if self["id"].value != "\xd4\xc3\xb2\xa1":
return "Wrong file signature"
if self["link_type"].value not in self.LINK_TYPE:
return "Unknown link type"
return True
def createFields(self):
yield Bytes(self, "id", 4, "Tcpdump identifier")
yield UInt16(self, "maj_ver", "Major version")
yield UInt16(self, "min_ver", "Minor version")
yield Int32(self, "this_zone", "GMT to local time zone correction")
yield Int32(self, "sigfigs", "accuracy of timestamps")
yield UInt32(self, "snap_len", "max length saved portion of each pkt")
yield Enum(UInt32(self, "link_type", "data link type"), self.LINK_TYPE_DESC)
link = self["link_type"].value
if link not in self.LINK_TYPE:
raise ParserError("Unknown link type: %s" % link)
name, parser = self.LINK_TYPE[link]
while self.current_size < self.size:
yield Packet(self, "packet[]", parser, name)