1
0
mirror of https://github.com/moparisthebest/curl synced 2025-01-11 05:58:01 -05:00

tests: remove python_dependencies for smbserver from our tree

Users of the SMB tests will have to install impacket manually.

Reasoning: our in-tree version of impacket was quite outdated
and only compatible with Python 2 which is already end-of-life.
Upgrading to Python 3 and a compatible impacket version would
require to import additional Python-only and CPython-extension
dependencies. This would have hindered portability enormously.

Closes #5094
This commit is contained in:
Marc Hoersken 2020-03-15 10:01:38 +01:00
parent 67f3f6cff1
commit 4be2560e01
No known key found for this signature in database
GPG Key ID: 61E03CBED7BC859E
17 changed files with 14 additions and 18137 deletions

View File

@ -27,6 +27,7 @@ addons:
- stunnel4
- libidn2-0-dev
- gnutls-bin
- python-impacket
matrix:
include:

View File

@ -24,21 +24,15 @@ HTMLPAGES = testcurl.html runtests.html
PDFPAGES = testcurl.pdf runtests.pdf
MANDISTPAGES = runtests.1.dist testcurl.1.dist
# the path to the impacket python lib used for SMB tests
IMP = python_dependencies/impacket
SMBDEPS = $(IMP)/__init__.py $(IMP)/nmb.py $(IMP)/nt_errors.py \
$(IMP)/ntlm.py $(IMP)/smb.py $(IMP)/smb3.py $(IMP)/smb3structs.py \
$(IMP)/smbserver.py $(IMP)/spnego.py $(IMP)/structure.py \
$(IMP)/uuid.py $(IMP)/version.py smbserver.py curl_test_data.py
EXTRA_DIST = ftpserver.pl httpserver.pl secureserver.pl runtests.pl \
getpart.pm FILEFORMAT README stunnel.pem memanalyze.pl testcurl.pl \
valgrind.pm ftp.pm sshserver.pl sshhelp.pm pathhelp.pm testcurl.1 runtests.1 \
serverhelp.pm tftpserver.pl rtspserver.pl directories.pm symbol-scan.pl \
CMakeLists.txt mem-include-scan.pl valgrind.supp extern-scan.pl \
manpage-scan.pl nroff-scan.pl http2-server.pl dictserver.py \
negtelnetserver.py $(SMBDEPS) objnames-test08.sh objnames-test10.sh \
objnames.inc disable-scan.pl manpage-syntax.pl error-codes.pl badsymbols.pl \
negtelnetserver.py smbserver.py curl_test_data.py \
objnames-test08.sh objnames-test10.sh objnames.inc \
disable-scan.pl manpage-syntax.pl error-codes.pl badsymbols.pl \
azure.pm appveyor.pm
DISTCLEANFILES = configurehelp.pm

View File

@ -28,6 +28,9 @@ Basic SMB request
<command>
-u 'curltest:curltest' smb://%HOSTIP:%SMBPORT/TESTS/1451
</command>
<precheck>
python2 -c "__import__('pkgutil').find_loader('impacket') or (__import__('sys').stdout.write('Test only works if Python package impacket is installed\n'), __import__('sys').exit(1))"
</precheck>
</client>
#

View File

@ -1,84 +0,0 @@
Licencing
---------
We provide this software under a slightly modified version of the
Apache Software License. The only changes to the document were the
replacement of "Apache" with "Impacket" and "Apache Software Foundation"
with "SecureAuth Corporation". Feel free to compare the resulting
document to the official Apache license.
The `Apache Software License' is an Open Source Initiative Approved
License.
The Apache Software License, Version 1.1
Modifications by SecureAuth Corporation (see above)
Copyright (c) 2000 The Apache Software Foundation. All rights
reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the
distribution.
3. The end-user documentation included with the redistribution,
if any, must include the following acknowledgment:
"This product includes software developed by
SecureAuth Corporation (https://www.secureauth.com/)."
Alternately, this acknowledgment may appear in the software itself,
if and wherever such third-party acknowledgments normally appear.
4. The names "Impacket", "SecureAuth Corporation" and "CORE Security Technologies" must
not be used to endorse or promote products derived from this
software without prior written permission. For written
permission, please contact oss@coresecurity.com.
5. Products derived from this software may not be called "Impacket",
nor may "Impacket" appear in their name, without prior written
permission of SecureAuth Corporation.
THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
Smb.py and nmb.py are based on Pysmb by Michael Teo
(https://miketeo.net/projects/pysmb/), and are distributed under the
following license:
This software is provided 'as-is', without any express or implied
warranty. In no event will the author be held liable for any damages
arising from the use of this software.
Permission is granted to anyone to use this software for any purpose,
including commercial applications, and to alter it and redistribute it
freely, subject to the following restrictions:
1. The origin of this software must not be misrepresented; you must
not claim that you wrote the original software. If you use this
software in a product, an acknowledgment in the product
documentation would be appreciated but is not required.
2. Altered source versions must be plainly marked as such, and must
not be misrepresented as being the original software.
3. This notice cannot be removed or altered from any source
distribution.

View File

@ -1,25 +0,0 @@
# Copyright (c) 2003-2016 CORE Security Technologies
#
# This software is provided under under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
# Author: Alberto Solino (@agsolino)
#
# Set default logging handler to avoid "No handler found" warnings.
import logging
try: # Python 2.7+
from logging import NullHandler
except ImportError:
class NullHandler(logging.Handler):
def emit(self, record):
pass
# All modules inside this library MUST use this logger (impacket)
# It is up to the library consumer to do whatever is wanted
# with the logger output. By default it is forwarded to the
# upstream logger
LOG = logging.getLogger(__name__)
LOG.addHandler(NullHandler())

View File

@ -1,982 +0,0 @@
from __future__ import print_function
from __future__ import absolute_import
# Copyright (c) 2003-2016 CORE Security Technologies
#
# This software is provided under under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
# -*- mode: python; tab-width: 4 -*-
#
# Copyright (C) 2001 Michael Teo <michaelteo@bigfoot.com>
# nmb.py - NetBIOS library
#
# This software is provided 'as-is', without any express or implied warranty.
# In no event will the author be held liable for any damages arising from the
# use of this software.
#
# Permission is granted to anyone to use this software for any purpose,
# including commercial applications, and to alter it and redistribute it
# freely, subject to the following restrictions:
#
# 1. The origin of this software must not be misrepresented; you must not
# claim that you wrote the original software. If you use this software
# in a product, an acknowledgment in the product documentation would be
# appreciated but is not required.
#
# 2. Altered source versions must be plainly marked as such, and must not be
# misrepresented as being the original software.
#
# 3. This notice cannot be removed or altered from any source distribution.
#
# Altered source done by Alberto Solino (@agsolino)
import socket
import string
import re
import select
import errno
from random import randint
from struct import pack, unpack
import time
from .structure import Structure
CVS_REVISION = '$Revision: 526 $'
# Taken from socket module reference
INADDR_ANY = '0.0.0.0'
BROADCAST_ADDR = '<broadcast>'
# Default port for NetBIOS name service
NETBIOS_NS_PORT = 137
# Default port for NetBIOS session service
NETBIOS_SESSION_PORT = 139
# Default port for SMB session service
SMB_SESSION_PORT = 445
# Owner Node Type Constants
NODE_B = 0x0000
NODE_P = 0x2000
NODE_M = 0x4000
NODE_RESERVED = 0x6000
NODE_GROUP = 0x8000
NODE_UNIQUE = 0x0
# Name Type Constants
TYPE_UNKNOWN = 0x01
TYPE_WORKSTATION = 0x00
TYPE_CLIENT = 0x03
TYPE_SERVER = 0x20
TYPE_DOMAIN_MASTER = 0x1B
TYPE_DOMAIN_CONTROLLER = 0x1C
TYPE_MASTER_BROWSER = 0x1D
TYPE_BROWSER = 0x1E
TYPE_NETDDE = 0x1F
TYPE_STATUS = 0x21
# Opcodes values
OPCODE_QUERY = 0
OPCODE_REGISTRATION = 0x5
OPCODE_RELEASE = 0x6
OPCODE_WACK = 0x7
OPCODE_REFRESH = 0x8
OPCODE_REQUEST = 0
OPCODE_RESPONSE = 0x10
# NM_FLAGS
NM_FLAGS_BROADCAST = 0x1
NM_FLAGS_UNICAST = 0
NM_FLAGS_RA = 0x8
NM_FLAGS_RD = 0x10
NM_FLAGS_TC = 0x20
NM_FLAGS_AA = 0x40
# QUESTION_TYPE
QUESTION_TYPE_NB = 0x20 # NetBIOS general Name Service Resource Record
QUESTION_TYPE_NBSTAT = 0x21 # NetBIOS NODE STATUS Resource Record
# QUESTION_CLASS
QUESTION_CLASS_IN = 0x1 # Internet class
# RR_TYPE Resource Record Type code
RR_TYPE_A = 0x1 # IP address Resource Record
RR_TYPE_NS = 0x2 # Name Server Resource Record
RR_TYPE_NULL = 0xA # NULL Resource Record
RR_TYPE_NB = 0x20 # NetBIOS general Name Service Resource Record
RR_TYPE_NBSTAT = 0x21 # NetBIOS NODE STATUS Resource Record
# Resource Record Class
RR_CLASS_IN = 1 # Internet class
# RCODE values
RCODE_FMT_ERR = 0x1 # Format Error. Request was invalidly formatted.
RCODE_SRV_ERR = 0x2 # Server failure. Problem with NBNS, cannot process name.
RCODE_IMP_ERR = 0x4 # Unsupported request error. Allowable only for challenging NBNS when gets an Update type
# registration request.
RCODE_RFS_ERR = 0x5 # Refused error. For policy reasons server will not register this name from this host.
RCODE_ACT_ERR = 0x6 # Active error. Name is owned by another node.
RCODE_CFT_ERR = 0x7 # Name in conflict error. A UNIQUE name is owned by more than one node.
# NAME_FLAGS
NAME_FLAGS_PRM = 0x0200 # Permanent Name Flag. If one (1) then entry is for the permanent node name. Flag is zero
# (0) for all other names.
NAME_FLAGS_ACT = 0x0400 # Active Name Flag. All entries have this flag set to one (1).
NAME_FLAG_CNF = 0x0800 # Conflict Flag. If one (1) then name on this node is in conflict.
NAME_FLAG_DRG = 0x1000 # Deregister Flag. If one (1) then this name is in the process of being deleted.
NAME_TYPES = { TYPE_UNKNOWN: 'Unknown', TYPE_WORKSTATION: 'Workstation', TYPE_CLIENT: 'Client',
TYPE_SERVER: 'Server', TYPE_MASTER_BROWSER: 'Master Browser', TYPE_BROWSER: 'Browser Server',
TYPE_DOMAIN_MASTER: 'Domain Master' , TYPE_NETDDE: 'NetDDE Server'}
# NetBIOS Session Types
NETBIOS_SESSION_MESSAGE = 0x0
NETBIOS_SESSION_REQUEST = 0x81
NETBIOS_SESSION_POSITIVE_RESPONSE = 0x82
NETBIOS_SESSION_NEGATIVE_RESPONSE = 0x83
NETBIOS_SESSION_RETARGET_RESPONSE = 0x84
NETBIOS_SESSION_KEEP_ALIVE = 0x85
def strerror(errclass, errcode):
if errclass == ERRCLASS_OS:
return 'OS Error', str(errcode)
elif errclass == ERRCLASS_QUERY:
return 'Query Error', QUERY_ERRORS.get(errcode, 'Unknown error')
elif errclass == ERRCLASS_SESSION:
return 'Session Error', SESSION_ERRORS.get(errcode, 'Unknown error')
else:
return 'Unknown Error Class', 'Unknown Error'
class NetBIOSError(Exception): pass
class NetBIOSTimeout(Exception):
def __init__(self, message = 'The NETBIOS connection with the remote host timed out.'):
Exception.__init__(self, message)
class NBResourceRecord:
def __init__(self, data = 0):
self._data = data
try:
if self._data:
self.rr_name = (re.split('\x00',data))[0]
offset = len(self.rr_name)+1
self.rr_type = unpack('>H', self._data[offset:offset+2])[0]
self.rr_class = unpack('>H', self._data[offset+2: offset+4])[0]
self.ttl = unpack('>L',self._data[offset+4:offset+8])[0]
self.rdlength = unpack('>H', self._data[offset+8:offset+10])[0]
self.rdata = self._data[offset+10:offset+10+self.rdlength]
offset = self.rdlength - 2
self.unit_id = data[offset:offset+6]
else:
self.rr_name = ''
self.rr_type = 0
self.rr_class = 0
self.ttl = 0
self.rdlength = 0
self.rdata = ''
self.unit_id = ''
except Exception:
raise NetBIOSError( 'Wrong packet format ' )
def set_rr_name(self, name):
self.rr_name = name
def set_rr_type(self, name):
self.rr_type = name
def set_rr_class(self,cl):
self.rr_class = cl
def set_ttl(self,ttl):
self.ttl = ttl
def set_rdata(self,rdata):
self.rdata = rdata
self.rdlength = len(rdata)
def get_unit_id(self):
return self.unit_id
def get_rr_name(self):
return self.rr_name
def get_rr_class(self):
return self.rr_class
def get_ttl(self):
return self.ttl
def get_rdlength(self):
return self.rdlength
def get_rdata(self):
return self.rdata
def rawData(self):
return self.rr_name + pack('!HHLH',self.rr_type, self.rr_class, self.ttl, self.rdlength) + self.rdata
class NBNodeStatusResponse(NBResourceRecord):
def __init__(self, data = 0):
NBResourceRecord.__init__(self,data)
self.num_names = 0
self.node_names = [ ]
self.statstics = ''
self.mac = '00-00-00-00-00-00'
try:
if data:
self._data = self.get_rdata()
self.num_names = unpack('>B',self._data[:1])[0]
offset = 1
for i in range(0, self.num_names):
name = self._data[offset:offset + 15]
type,flags = unpack('>BH', self._data[offset + 15: offset + 18])
offset += 18
self.node_names.append(NBNodeEntry(name, type ,flags))
self.set_mac_in_hexa(self.get_unit_id())
except Exception:
raise NetBIOSError( 'Wrong packet format ' )
def set_mac_in_hexa(self, data):
data_aux = ''
for d in data:
if data_aux == '':
data_aux = '%02x' % ord(d)
else:
data_aux += '-%02x' % ord(d)
self.mac = string.upper(data_aux)
def get_num_names(self):
return self.num_names
def get_mac(self):
return self.mac
def set_num_names(self, num):
self.num_names = num
def get_node_names(self):
return self.node_names
def add_node_name(self,node_names):
self.node_names.append(node_names)
self.num_names += 1
def rawData(self):
res = pack('!B', self.num_names )
for i in range(0, self.num_names):
res += self.node_names[i].rawData()
class NBPositiveNameQueryResponse(NBResourceRecord):
def __init__(self, data = 0):
NBResourceRecord.__init__(self, data)
self.addr_entries = [ ]
if data:
self._data = self.get_rdata()
_qn_length, qn_name, qn_scope = decode_name(data)
self._netbios_name = string.rstrip(qn_name[:-1]) + qn_scope
self._name_type = ord(qn_name[-1])
self._nb_flags = unpack('!H', self._data[:2])
offset = 2
while offset<len(self._data):
self.addr_entries.append('%d.%d.%d.%d' % unpack('4B', (self._data[offset:offset+4])))
offset += 4
def get_netbios_name(self):
return self._netbios_name
def get_name_type(self):
return self._name_type
def get_addr_entries(self):
return self.addr_entries
class NetBIOSPacket:
""" This is a packet as defined in RFC 1002 """
def __init__(self, data = 0):
self.name_trn_id = 0x0 # Transaction ID for Name Service Transaction.
# Requestor places a unique value for each active
# transaction. Responder puts NAME_TRN_ID value
# from request packet in response packet.
self.opcode = 0 # Packet type code
self.nm_flags = 0 # Flags for operation
self.rcode = 0 # Result codes of request.
self.qdcount = 0 # Unsigned 16 bit integer specifying the number of entries in the question section of a Name
self.ancount = 0 # Unsigned 16 bit integer specifying the number of
# resource records in the answer section of a Name
# Service packet.
self.nscount = 0 # Unsigned 16 bit integer specifying the number of
# resource records in the authority section of a
# Name Service packet.
self.arcount = 0 # Unsigned 16 bit integer specifying the number of
# resource records in the additional records
# section of a Name Service packeT.
self.questions = ''
self.answers = ''
if data == 0:
self._data = ''
else:
try:
self._data = data
self.opcode = ord(data[2]) >> 3
self.nm_flags = ((ord(data[2]) & 0x3) << 4) | ((ord(data[3]) & 0xf0) >> 4)
self.name_trn_id = unpack('>H', self._data[:2])[0]
self.rcode = ord(data[3]) & 0x0f
self.qdcount = unpack('>H', self._data[4:6])[0]
self.ancount = unpack('>H', self._data[6:8])[0]
self.nscount = unpack('>H', self._data[8:10])[0]
self.arcount = unpack('>H', self._data[10:12])[0]
self.answers = self._data[12:]
except Exception:
raise NetBIOSError( 'Wrong packet format ' )
def set_opcode(self, opcode):
self.opcode = opcode
def set_trn_id(self, trn):
self.name_trn_id = trn
def set_nm_flags(self, nm_flags):
self.nm_flags = nm_flags
def set_rcode(self, rcode):
self.rcode = rcode
def addQuestion(self, question, qtype, qclass):
self.qdcount += 1
self.questions += question + pack('!HH',qtype,qclass)
def get_trn_id(self):
return self.name_trn_id
def get_rcode(self):
return self.rcode
def get_nm_flags(self):
return self.nm_flags
def get_opcode(self):
return self.opcode
def get_qdcount(self):
return self.qdcount
def get_ancount(self):
return self.ancount
def get_nscount(self):
return self.nscount
def get_arcount(self):
return self.arcount
def rawData(self):
secondWord = self.opcode << 11
secondWord |= self.nm_flags << 4
secondWord |= self.rcode
data = pack('!HHHHHH', self.name_trn_id, secondWord , self.qdcount, self.ancount, self.nscount, self.arcount) + self.questions + self.answers
return data
def get_answers(self):
return self.answers
class NBHostEntry:
def __init__(self, nbname, nametype, ip):
self.__nbname = nbname
self.__nametype = nametype
self.__ip = ip
def get_nbname(self):
return self.__nbname
def get_nametype(self):
return self.__nametype
def get_ip(self):
return self.__ip
def __repr__(self):
return '<NBHostEntry instance: NBname="' + self.__nbname + '", IP="' + self.__ip + '">'
class NBNodeEntry:
def __init__(self, nbname, nametype, flags):
self.__nbname = string.ljust(nbname,17)
self.__nametype = nametype
self.__flags = flags
self.__isgroup = flags & 0x8000
self.__nodetype = flags & 0x6000
self.__deleting = flags & 0x1000
self.__isconflict = flags & 0x0800
self.__isactive = flags & 0x0400
self.__ispermanent = flags & 0x0200
def get_nbname(self):
return self.__nbname
def get_nametype(self):
return self.__nametype
def is_group(self):
return self.__isgroup
def get_nodetype(self):
return self.__nodetype
def is_deleting(self):
return self.__deleting
def is_conflict(self):
return self.__isconflict
def is_active(self):
return self.__isactive
def is_permanent(self):
return self.__ispermanent
def set_nbname(self, name):
self.__nbname = string.ljust(name,17)
def set_nametype(self, type):
self.__nametype = type
def set_flags(self,flags):
self.__flags = flags
def __repr__(self):
s = '<NBNodeEntry instance: NBname="' + self.__nbname + '" NameType="' + NAME_TYPES[self.__nametype] + '"'
if self.__isactive:
s += ' ACTIVE'
if self.__isgroup:
s += ' GROUP'
if self.__isconflict:
s += ' CONFLICT'
if self.__deleting:
s += ' DELETING'
return s
def rawData(self):
return self.__nbname + pack('!BH',self.__nametype, self.__flags)
class NetBIOS:
# Creates a NetBIOS instance without specifying any default NetBIOS domain nameserver.
# All queries will be sent through the servport.
def __init__(self, servport = NETBIOS_NS_PORT):
self.__servport = NETBIOS_NS_PORT
self.__nameserver = None
self.__broadcastaddr = BROADCAST_ADDR
self.mac = '00-00-00-00-00-00'
def _setup_connection(self, dstaddr):
port = randint(10000, 60000)
af, socktype, proto, _canonname, _sa = socket.getaddrinfo(dstaddr, port, socket.AF_INET, socket.SOCK_DGRAM)[0]
s = socket.socket(af, socktype, proto)
has_bind = 1
for _i in range(0, 10):
# We try to bind to a port for 10 tries
try:
s.bind(( INADDR_ANY, randint(10000, 60000) ))
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
has_bind = 1
except socket.error:
pass
if not has_bind:
raise NetBIOSError( 'Cannot bind to a good UDP port', ERRCLASS_OS, errno.EAGAIN)
self.__sock = s
# Set the default NetBIOS domain nameserver.
def set_nameserver(self, nameserver):
self.__nameserver = nameserver
# Return the default NetBIOS domain nameserver, or None if none is specified.
def get_nameserver(self):
return self.__nameserver
# Set the broadcast address to be used for query.
def set_broadcastaddr(self, broadcastaddr):
self.__broadcastaddr = broadcastaddr
# Return the broadcast address to be used, or BROADCAST_ADDR if default broadcast address is used.
def get_broadcastaddr(self):
return self.__broadcastaddr
# Returns a NBPositiveNameQueryResponse instance containing the host information for nbname.
# If a NetBIOS domain nameserver has been specified, it will be used for the query.
# Otherwise, the query is broadcasted on the broadcast address.
def gethostbyname(self, nbname, qtype = TYPE_WORKSTATION, scope = None, timeout = 1):
return self.__queryname(nbname, self.__nameserver, qtype, scope, timeout)
# Returns a list of NBNodeEntry instances containing node status information for nbname.
# If destaddr contains an IP address, then this will become an unicast query on the destaddr.
# Raises NetBIOSTimeout if timeout (in secs) is reached.
# Raises NetBIOSError for other errors
def getnodestatus(self, nbname, destaddr = None, type = TYPE_WORKSTATION, scope = None, timeout = 1):
if destaddr:
return self.__querynodestatus(nbname, destaddr, type, scope, timeout)
else:
return self.__querynodestatus(nbname, self.__nameserver, type, scope, timeout)
def getnetbiosname(self, ip):
entries = self.getnodestatus('*',ip)
entries = filter(lambda x:x.get_nametype() == TYPE_SERVER, entries)
return entries[0].get_nbname().strip()
def getmacaddress(self):
return self.mac
def __queryname(self, nbname, destaddr, qtype, scope, timeout, retries = 0):
self._setup_connection(destaddr)
trn_id = randint(1, 32000)
p = NetBIOSPacket()
p.set_trn_id(trn_id)
netbios_name = nbname.upper()
qn_label = encode_name(netbios_name, qtype, scope)
p.addQuestion(qn_label, QUESTION_TYPE_NB, QUESTION_CLASS_IN)
p.set_nm_flags(NM_FLAGS_RD)
if not destaddr:
p.set_nm_flags(p.get_nm_flags() | NM_FLAGS_BROADCAST)
destaddr = self.__broadcastaddr
req = p.rawData()
tries = retries
while 1:
self.__sock.sendto(req, ( destaddr, self.__servport ))
try:
ready, _, _ = select.select([ self.__sock.fileno() ], [ ] , [ ], timeout)
if not ready:
if tries:
# Retry again until tries == 0
tries -= 1
else:
raise NetBIOSTimeout
else:
data, _ = self.__sock.recvfrom(65536, 0)
res = NetBIOSPacket(data)
if res.get_trn_id() == p.get_trn_id():
if res.get_rcode():
if res.get_rcode() == 0x03:
return None
else:
raise NetBIOSError( 'Negative name query response', ERRCLASS_QUERY, res.get_rcode())
if res.get_ancount() != 1:
raise NetBIOSError( 'Malformed response')
return NBPositiveNameQueryResponse(res.get_answers())
except select.error as ex:
if ex[0] != errno.EINTR and ex[0] != errno.EAGAIN:
raise NetBIOSError( 'Error occurs while waiting for response', ERRCLASS_OS, ex[0])
raise
def __querynodestatus(self, nbname, destaddr, type, scope, timeout):
self._setup_connection(destaddr)
trn_id = randint(1, 32000)
p = NetBIOSPacket()
p.set_trn_id(trn_id)
netbios_name = string.upper(nbname)
qn_label = encode_name(netbios_name, type, scope)
p.addQuestion(qn_label, QUESTION_TYPE_NBSTAT, QUESTION_CLASS_IN)
if not destaddr:
p.set_nm_flags(NM_FLAGS_BROADCAST)
destaddr = self.__broadcastaddr
req = p.rawData()
tries = 3
while 1:
try:
self.__sock.sendto(req, 0, ( destaddr, self.__servport ))
ready, _, _ = select.select([ self.__sock.fileno() ], [ ] , [ ], timeout)
if not ready:
if tries:
# Retry again until tries == 0
tries -= 1
else:
raise NetBIOSTimeout
else:
try:
data, _ = self.__sock.recvfrom(65536, 0)
except Exception as e:
raise NetBIOSError("recvfrom error: %s" % str(e))
self.__sock.close()
res = NetBIOSPacket(data)
if res.get_trn_id() == p.get_trn_id():
if res.get_rcode():
if res.get_rcode() == 0x03:
# I'm just guessing here
raise NetBIOSError("Cannot get data from server")
else:
raise NetBIOSError( 'Negative name query response', ERRCLASS_QUERY, res.get_rcode())
answ = NBNodeStatusResponse(res.get_answers())
self.mac = answ.get_mac()
return answ.get_node_names()
except select.error as ex:
if ex[0] != errno.EINTR and ex[0] != errno.EAGAIN:
raise NetBIOSError( 'Error occurs while waiting for response', ERRCLASS_OS, ex[0])
except socket.error as ex:
raise NetBIOSError('Connection error: %s' % str(ex))
# Perform first and second level encoding of name as specified in RFC 1001 (Section 4)
def encode_name(name, type, scope):
if name == '*':
name += '\0' * 15
elif len(name) > 15:
name = name[:15] + chr(type)
else:
name = string.ljust(name, 15) + chr(type)
encoded_name = chr(len(name) * 2) + re.sub('.', _do_first_level_encoding, name)
if scope:
encoded_scope = ''
for s in string.split(scope, '.'):
encoded_scope = encoded_scope + chr(len(s)) + s
return encoded_name + encoded_scope + '\0'
else:
return encoded_name + '\0'
# Internal method for use in encode_name()
def _do_first_level_encoding(m):
s = ord(m.group(0))
return string.uppercase[s >> 4] + string.uppercase[s & 0x0f]
def decode_name(name):
name_length = ord(name[0])
assert name_length == 32
decoded_name = re.sub('..', _do_first_level_decoding, name[1:33])
if name[33] == '\0':
return 34, decoded_name, ''
else:
decoded_domain = ''
offset = 34
while 1:
domain_length = ord(name[offset])
if domain_length == 0:
break
decoded_domain = '.' + name[offset:offset + domain_length]
offset += domain_length
return offset + 1, decoded_name, decoded_domain
def _do_first_level_decoding(m):
s = m.group(0)
return chr(((ord(s[0]) - ord('A')) << 4) | (ord(s[1]) - ord('A')))
class NetBIOSSessionPacket:
def __init__(self, data = 0):
self.type = 0x0
self.flags = 0x0
self.length = 0x0
if data == 0:
self._trailer = ''
else:
try:
self.type = ord(data[0])
if self.type == NETBIOS_SESSION_MESSAGE:
self.length = ord(data[1]) << 16 | (unpack('!H', data[2:4])[0])
else:
self.flags = ord(data[1])
self.length = unpack('!H', data[2:4])[0]
self._trailer = data[4:]
except:
raise NetBIOSError( 'Wrong packet format ' )
def set_type(self, type):
self.type = type
def get_type(self):
return self.type
def rawData(self):
if self.type == NETBIOS_SESSION_MESSAGE:
data = pack('!BBH',self.type,self.length >> 16,self.length & 0xFFFF) + self._trailer
else:
data = pack('!BBH',self.type,self.flags,self.length) + self._trailer
return data
def set_trailer(self,data):
self._trailer = data
self.length = len(data)
def get_length(self):
return self.length
def get_trailer(self):
return self._trailer
class NetBIOSSession:
def __init__(self, myname, remote_name, remote_host, remote_type = TYPE_SERVER, sess_port = NETBIOS_SESSION_PORT, timeout = None, local_type = TYPE_WORKSTATION, sock = None):
if len(myname) > 15:
self.__myname = string.upper(myname[:15])
else:
self.__myname = string.upper(myname)
self.__local_type = local_type
assert remote_name
# if destination port SMB_SESSION_PORT and remote name *SMBSERVER, we're changing it to its IP address
# helping solving the client mistake ;)
if remote_name == '*SMBSERVER' and sess_port == SMB_SESSION_PORT:
remote_name = remote_host
# If remote name is *SMBSERVER let's try to query its name.. if can't be guessed, continue and hope for the best
if remote_name == '*SMBSERVER':
nb = NetBIOS()
try:
res = nb.getnetbiosname(remote_host)
except:
res = None
pass
if res is not None:
remote_name = res
if len(remote_name) > 15:
self.__remote_name = string.upper(remote_name[:15])
else:
self.__remote_name = string.upper(remote_name)
self.__remote_type = remote_type
self.__remote_host = remote_host
if sock is not None:
# We are acting as a server
self._sock = sock
else:
self._sock = self._setup_connection((remote_host, sess_port))
if sess_port == NETBIOS_SESSION_PORT:
self._request_session(remote_type, local_type, timeout)
def get_myname(self):
return self.__myname
def get_mytype(self):
return self.__local_type
def get_remote_host(self):
return self.__remote_host
def get_remote_name(self):
return self.__remote_name
def get_remote_type(self):
return self.__remote_type
def close(self):
self._sock.close()
def get_socket(self):
return self._sock
class NetBIOSUDPSessionPacket(Structure):
TYPE_DIRECT_UNIQUE = 16
TYPE_DIRECT_GROUP = 17
FLAGS_MORE_FRAGMENTS = 1
FLAGS_FIRST_FRAGMENT = 2
FLAGS_B_NODE = 0
structure = (
('Type','B=16'), # Direct Unique Datagram
('Flags','B=2'), # FLAGS_FIRST_FRAGMENT
('ID','<H'),
('_SourceIP','>L'),
('SourceIP','"'),
('SourcePort','>H=138'),
('DataLegth','>H-Data'),
('Offset','>H=0'),
('SourceName','z'),
('DestinationName','z'),
('Data',':'),
)
def getData(self):
addr = self['SourceIP'].split('.')
addr = [int(x) for x in addr]
addr = (((addr[0] << 8) + addr[1] << 8) + addr[2] << 8) + addr[3]
self['_SourceIP'] = addr
return Structure.getData(self)
def get_trailer(self):
return self['Data']
class NetBIOSUDPSession(NetBIOSSession):
def _setup_connection(self, peer):
af, socktype, proto, canonname, sa = socket.getaddrinfo(peer[0], peer[1], 0, socket.SOCK_DGRAM)[0]
sock = socket.socket(af, socktype, proto)
sock.connect(sa)
sock = socket.socket(af, socktype, proto)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((INADDR_ANY, 138))
self.peer = peer
return sock
def _request_session(self, remote_type, local_type, timeout = None):
pass
def next_id(self):
if hasattr(self, '__dgram_id'):
answer = self.__dgram_id
else:
self.__dgram_id = randint(1,65535)
answer = self.__dgram_id
self.__dgram_id += 1
return answer
def send_packet(self, data):
# Yes... I know...
self._sock.connect(self.peer)
p = NetBIOSUDPSessionPacket()
p['ID'] = self.next_id()
p['SourceIP'] = self._sock.getsockname()[0]
p['SourceName'] = encode_name(self.get_myname(), self.get_mytype(), '')[:-1]
p['DestinationName'] = encode_name(self.get_remote_name(), self.get_remote_type(), '')[:-1]
p['Data'] = data
self._sock.sendto(str(p), self.peer)
self._sock.close()
self._sock = self._setup_connection(self.peer)
def recv_packet(self, timeout = None):
# The next loop is a workaround for a bigger problem:
# When data reaches higher layers, the lower headers are lost,
# and with them, for example, the source IP. Hence, SMB users
# can't know where packets are comming from... we need a better
# solution, right now, we will filter everything except packets
# coming from the remote_host specified in __init__()
while 1:
data, peer = self._sock.recvfrom(8192)
# print "peer: %r self.peer: %r" % (peer, self.peer)
if peer == self.peer: break
return NetBIOSUDPSessionPacket(data)
class NetBIOSTCPSession(NetBIOSSession):
def __init__(self, myname, remote_name, remote_host, remote_type = TYPE_SERVER, sess_port = NETBIOS_SESSION_PORT, timeout = None, local_type = TYPE_WORKSTATION, sock = None, select_poll = False):
self.__select_poll = select_poll
if self.__select_poll:
self.read_function = self.polling_read
else:
self.read_function = self.non_polling_read
NetBIOSSession.__init__(self, myname, remote_name, remote_host, remote_type = remote_type, sess_port = sess_port, timeout = timeout, local_type = local_type, sock=sock)
def _setup_connection(self, peer):
try:
af, socktype, proto, canonname, sa = socket.getaddrinfo(peer[0], peer[1], 0, socket.SOCK_STREAM)[0]
sock = socket.socket(af, socktype, proto)
sock.connect(sa)
except socket.error as e:
raise socket.error("Connection error (%s:%s)" % (peer[0], peer[1]), e)
return sock
def send_packet(self, data):
p = NetBIOSSessionPacket()
p.set_type(NETBIOS_SESSION_MESSAGE)
p.set_trailer(data)
self._sock.send(p.rawData())
def recv_packet(self, timeout = None):
data = self.__read(timeout)
return NetBIOSSessionPacket(data)
def _request_session(self, remote_type, local_type, timeout = None):
p = NetBIOSSessionPacket()
remote_name = encode_name(self.get_remote_name(), remote_type, '')
myname = encode_name(self.get_myname(), local_type, '')
p.set_type(NETBIOS_SESSION_REQUEST)
p.set_trailer(remote_name + myname)
self._sock.send(p.rawData())
while 1:
p = self.recv_packet(timeout)
if p.get_type() == NETBIOS_SESSION_NEGATIVE_RESPONSE:
raise NetBIOSError( 'Cannot request session', ERRCLASS_SESSION, ord(p.get_trailer()[0]))
elif p.get_type() == NETBIOS_SESSION_POSITIVE_RESPONSE:
break
else:
# Ignore all other messages, most probably keepalive messages
pass
def polling_read(self, read_length, timeout):
data = ''
if timeout is None:
timeout = 3600
time_left = timeout
CHUNK_TIME = 0.025
bytes_left = read_length
while bytes_left > 0:
try:
ready, _, _ = select.select([self._sock.fileno() ], [ ], [ ], 0)
if not ready:
if time_left <= 0:
raise NetBIOSTimeout
else:
time.sleep(CHUNK_TIME)
time_left -= CHUNK_TIME
continue
received = self._sock.recv(bytes_left)
if len(received) == 0:
raise NetBIOSError( 'Error while reading from remote', ERRCLASS_OS, None)
data = data + received
bytes_left = read_length - len(data)
except select.error as ex:
if ex[0] != errno.EINTR and ex[0] != errno.EAGAIN:
raise NetBIOSError( 'Error occurs while reading from remote', ERRCLASS_OS, ex[0])
return data
def non_polling_read(self, read_length, timeout):
data = ''
bytes_left = read_length
while bytes_left > 0:
try:
ready, _, _ = select.select([self._sock.fileno() ], [ ], [ ], timeout)
if not ready:
raise NetBIOSTimeout
received = self._sock.recv(bytes_left)
if len(received) == 0:
raise NetBIOSError( 'Error while reading from remote', ERRCLASS_OS, None)
data = data + received
bytes_left = read_length - len(data)
except select.error as ex:
if ex[0] != errno.EINTR and ex[0] != errno.EAGAIN:
raise NetBIOSError( 'Error occurs while reading from remote', ERRCLASS_OS, ex[0])
return data
def __read(self, timeout = None):
data = self.read_function(4, timeout)
type, flags, length = unpack('>ccH', data)
if ord(type) == NETBIOS_SESSION_MESSAGE:
length |= ord(flags) << 16
else:
if ord(flags) & 0x01:
length |= 0x10000
data2 = self.read_function(length, timeout)
return data + data2
ERRCLASS_QUERY = 0x00
ERRCLASS_SESSION = 0xf0
ERRCLASS_OS = 0xff
QUERY_ERRORS = { 0x01: 'Request format error. Please file a bug report.',
0x02: 'Internal server error',
0x03: 'Name does not exist',
0x04: 'Unsupported request',
0x05: 'Request refused'
}
SESSION_ERRORS = { 0x80: 'Not listening on called name',
0x81: 'Not listening for calling name',
0x82: 'Called name not present',
0x83: 'Sufficient resources',
0x8f: 'Unspecified error'
}
def main():
def get_netbios_host_by_name(name):
n = NetBIOS()
n.set_broadcastaddr('255.255.255.255') # To avoid use "<broadcast>" in socket
for qtype in (TYPE_WORKSTATION, TYPE_CLIENT, TYPE_SERVER, TYPE_DOMAIN_MASTER, TYPE_DOMAIN_CONTROLLER):
try:
addrs = n.gethostbyname(name, qtype = qtype).get_addr_entries()
except NetBIOSTimeout:
continue
else:
return addrs
raise Exception("Host not found")
n = get_netbios_host_by_name("some-host")
print(n)
if __name__ == '__main__':
main()

File diff suppressed because it is too large Load Diff

View File

@ -1,973 +0,0 @@
from __future__ import print_function
# Copyright (c) 2003-2016 CORE Security Technologies:
#
# This software is provided under under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
import base64
import struct
import calendar
import time
import hashlib
import random
import string
import binascii
from impacket.structure import Structure
from impacket import LOG
# This is important. NTLMv2 is not negotiated by the client or server.
# It is used if set locally on both sides. Change this item if you don't want to use
# NTLMv2 by default and fall back to NTLMv1 (with EXTENDED_SESSION_SECURITY or not)
# Check the following links:
# http://davenport.sourceforge.net/ntlm.html
# http://blogs.msdn.com/b/openspecification/archive/2010/04/20/ntlm-keys-and-sundry-stuff.aspx
# http://social.msdn.microsoft.com/Forums/en-US/os_interopscenarios/thread/c8f488ed-1b96-4e06-bd65-390aa41138d1/
# So I'm setting a global variable to control this, this can also be set programmatically
USE_NTLMv2 = True # if false will fall back to NTLMv1 (or NTLMv1 with ESS a.k.a NTLM2)
def computeResponse(flags, serverChallenge, clientChallenge, serverName, domain, user, password, lmhash='', nthash='',
use_ntlmv2=USE_NTLMv2):
if use_ntlmv2:
return computeResponseNTLMv2(flags, serverChallenge, clientChallenge, serverName, domain, user, password,
lmhash, nthash, use_ntlmv2=use_ntlmv2)
else:
return computeResponseNTLMv1(flags, serverChallenge, clientChallenge, serverName, domain, user, password,
lmhash, nthash, use_ntlmv2=use_ntlmv2)
try:
POW = None
from Crypto.Cipher import ARC4
from Crypto.Cipher import DES
from Crypto.Hash import MD4
except Exception:
try:
import POW
except Exception:
LOG.critical("Warning: You don't have any crypto installed. You need PyCrypto")
LOG.critical("See http://www.pycrypto.org/")
NTLM_AUTH_NONE = 1
NTLM_AUTH_CONNECT = 2
NTLM_AUTH_CALL = 3
NTLM_AUTH_PKT = 4
NTLM_AUTH_PKT_INTEGRITY = 5
NTLM_AUTH_PKT_PRIVACY = 6
# If set, requests 56-bit encryption. If the client sends NTLMSSP_NEGOTIATE_SEAL or NTLMSSP_NEGOTIATE_SIGN
# with NTLMSSP_NEGOTIATE_56 to the server in the NEGOTIATE_MESSAGE, the server MUST return NTLMSSP_NEGOTIATE_56 to
# the client in the CHALLENGE_MESSAGE. Otherwise it is ignored. If both NTLMSSP_NEGOTIATE_56 and NTLMSSP_NEGOTIATE_128
# are requested and supported by the client and server, NTLMSSP_NEGOTIATE_56 and NTLMSSP_NEGOTIATE_128 will both be
# returned to the client. Clients and servers that set NTLMSSP_NEGOTIATE_SEAL SHOULD set NTLMSSP_NEGOTIATE_56 if it is
# supported. An alternate name for this field is NTLMSSP_NEGOTIATE_56.
NTLMSSP_NEGOTIATE_56 = 0x80000000
# If set, requests an explicit key exchange. This capability SHOULD be used because it improves security for message
# integrity or confidentiality. See sections 3.2.5.1.2, 3.2.5.2.1, and 3.2.5.2.2 for details. An alternate name for
# this field is NTLMSSP_NEGOTIATE_KEY_EXCH.
NTLMSSP_NEGOTIATE_KEY_EXCH = 0x40000000
# If set, requests 128-bit session key negotiation. An alternate name for this field is NTLMSSP_NEGOTIATE_128.
# If the client sends NTLMSSP_NEGOTIATE_128 to the server in the NEGOTIATE_MESSAGE, the server MUST return
# NTLMSSP_NEGOTIATE_128 to the client in the CHALLENGE_MESSAGE only if the client sets NTLMSSP_NEGOTIATE_SEAL or
# NTLMSSP_NEGOTIATE_SIGN. Otherwise it is ignored. If both NTLMSSP_NEGOTIATE_56 and NTLMSSP_NEGOTIATE_128 are
# requested and supported by the client and server, NTLMSSP_NEGOTIATE_56 and NTLMSSP_NEGOTIATE_128 will both be
# returned to the client. Clients and servers that set NTLMSSP_NEGOTIATE_SEAL SHOULD set NTLMSSP_NEGOTIATE_128 if it
# is supported. An alternate name for this field is NTLMSSP_NEGOTIATE_128
NTLMSSP_NEGOTIATE_128 = 0x20000000
NTLMSSP_RESERVED_1 = 0x10000000
NTLMSSP_RESERVED_2 = 0x08000000
NTLMSSP_RESERVED_3 = 0x04000000
# If set, requests the protocol version number. The data corresponding to this flag is provided in the Version field
# of the NEGOTIATE_MESSAGE, the CHALLENGE_MESSAGE, and the AUTHENTICATE_MESSAGE.<22> An alternate name for this field
# is NTLMSSP_NEGOTIATE_VERSION
NTLMSSP_NEGOTIATE_VERSION = 0x02000000
NTLMSSP_RESERVED_4 = 0x01000000
# If set, indicates that the TargetInfo fields in the CHALLENGE_MESSAGE (section 2.2.1.2) are populated.
# An alternate name for this field is NTLMSSP_NEGOTIATE_TARGET_INFO.
NTLMSSP_NEGOTIATE_TARGET_INFO = 0x00800000
# If set, requests the usage of the LMOWF (section 3.3). An alternate name for this field is
# NTLMSSP_REQUEST_NON_NT_SESSION_KEY.
NTLMSSP_REQUEST_NON_NT_SESSION_KEY = 0x00400000
NTLMSSP_RESERVED_5 = 0x00200000
# If set, requests an identify level token. An alternate name for this field is NTLMSSP_NEGOTIATE_IDENTIFY
NTLMSSP_NEGOTIATE_IDENTIFY = 0x00100000
# If set, requests usage of the NTLM v2 session security. NTLM v2 session security is a misnomer because it is not
# NTLM v2. It is NTLM v1 using the extended session security that is also in NTLM v2. NTLMSSP_NEGOTIATE_LM_KEY and
# NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY are mutually exclusive. If both NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY
# and NTLMSSP_NEGOTIATE_LM_KEY are requested, NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY alone MUST be returned to the
# client. NTLM v2 authentication session key generation MUST be supported by both the client and the DC in order to be
# used, and extended session security signing and sealing requires support from the client and the server in order to
# be used.<23> An alternate name for this field is NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY
NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY = 0x00080000
NTLMSSP_NEGOTIATE_NTLM2 = 0x00080000
NTLMSSP_TARGET_TYPE_SHARE = 0x00040000
# If set, TargetName MUST be a server name. The data corresponding to this flag is provided by the server in the
# TargetName field of the CHALLENGE_MESSAGE. If this bit is set, then NTLMSSP_TARGET_TYPE_DOMAIN MUST NOT be set.
# This flag MUST be ignored in the NEGOTIATE_MESSAGE and the AUTHENTICATE_MESSAGE. An alternate name for this field
# is NTLMSSP_TARGET_TYPE_SERVER
NTLMSSP_TARGET_TYPE_SERVER = 0x00020000
# If set, TargetName MUST be a domain name. The data corresponding to this flag is provided by the server in the
# TargetName field of the CHALLENGE_MESSAGE. If set, then NTLMSSP_TARGET_TYPE_SERVER MUST NOT be set. This flag MUST
# be ignored in the NEGOTIATE_MESSAGE and the AUTHENTICATE_MESSAGE. An alternate name for this field is
# NTLMSSP_TARGET_TYPE_DOMAIN.
NTLMSSP_TARGET_TYPE_DOMAIN = 0x00010000
# If set, requests the presence of a signature block on all messages. NTLMSSP_NEGOTIATE_ALWAYS_SIGN MUST be set in the
# NEGOTIATE_MESSAGE to the server and the CHALLENGE_MESSAGE to the client. NTLMSSP_NEGOTIATE_ALWAYS_SIGN is overridden
# by NTLMSSP_NEGOTIATE_SIGN and NTLMSSP_NEGOTIATE_SEAL, if they are supported. An alternate name for this field is
# NTLMSSP_NEGOTIATE_ALWAYS_SIGN.
NTLMSSP_NEGOTIATE_ALWAYS_SIGN = 0x00008000 # forces the other end to sign packets
NTLMSSP_RESERVED_6 = 0x00004000
# This flag indicates whether the Workstation field is present. If this flag is not set, the Workstation field MUST be
# ignored. If this flag is set, the length field of the Workstation field specifies whether the workstation name is
# nonempty or not.<24> An alternate name for this field is NTLMSSP_NEGOTIATE_OEM_WORKSTATION_SUPPLIED.
NTLMSSP_NEGOTIATE_OEM_WORKSTATION_SUPPLIED = 0x00002000
# If set, the domain name is provided (section 2.2.1.1).<25> An alternate name for this field is
# NTLMSSP_NEGOTIATE_OEM_DOMAIN_SUPPLIED
NTLMSSP_NEGOTIATE_OEM_DOMAIN_SUPPLIED = 0x00001000
NTLMSSP_RESERVED_7 = 0x00000800
# If set, LM authentication is not allowed and only NT authentication is used.
NTLMSSP_NEGOTIATE_NT_ONLY = 0x00000400
# If set, requests usage of the NTLM v1 session security protocol. NTLMSSP_NEGOTIATE_NTLM MUST be set in the
# NEGOTIATE_MESSAGE to the server and the CHALLENGE_MESSAGE to the client. An alternate name for this field is
# NTLMSSP_NEGOTIATE_NTLM
NTLMSSP_NEGOTIATE_NTLM = 0x00000200
NTLMSSP_RESERVED_8 = 0x00000100
# If set, requests LAN Manager (LM) session key computation. NTLMSSP_NEGOTIATE_LM_KEY and
# NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY are mutually exclusive. If both NTLMSSP_NEGOTIATE_LM_KEY and
# NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY are requested, NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY alone MUST be
# returned to the client. NTLM v2 authentication session key generation MUST be supported by both the client and the
# DC in order to be used, and extended session security signing and sealing requires support from the client and the
# server to be used. An alternate name for this field is NTLMSSP_NEGOTIATE_LM_KEY.
NTLMSSP_NEGOTIATE_LM_KEY = 0x00000080
# If set, requests connectionless authentication. If NTLMSSP_NEGOTIATE_DATAGRAM is set, then NTLMSSP_NEGOTIATE_KEY_EXCH
# MUST always be set in the AUTHENTICATE_MESSAGE to the server and the CHALLENGE_MESSAGE to the client. An alternate
# name for this field is NTLMSSP_NEGOTIATE_DATAGRAM.
NTLMSSP_NEGOTIATE_DATAGRAM = 0x00000040
# If set, requests session key negotiation for message confidentiality. If the client sends NTLMSSP_NEGOTIATE_SEAL to
# the server in the NEGOTIATE_MESSAGE, the server MUST return NTLMSSP_NEGOTIATE_SEAL to the client in the
# CHALLENGE_MESSAGE. Clients and servers that set NTLMSSP_NEGOTIATE_SEAL SHOULD always set NTLMSSP_NEGOTIATE_56 and
# NTLMSSP_NEGOTIATE_128, if they are supported. An alternate name for this field is NTLMSSP_NEGOTIATE_SEAL.
NTLMSSP_NEGOTIATE_SEAL = 0x00000020
# If set, requests session key negotiation for message signatures. If the client sends NTLMSSP_NEGOTIATE_SIGN to the
# server in the NEGOTIATE_MESSAGE, the server MUST return NTLMSSP_NEGOTIATE_SIGN to the client in the CHALLENGE_MESSAGE.
# An alternate name for this field is NTLMSSP_NEGOTIATE_SIGN.
NTLMSSP_NEGOTIATE_SIGN = 0x00000010 # means packet is signed, if verifier is wrong it fails
NTLMSSP_RESERVED_9 = 0x00000008
# If set, a TargetName field of the CHALLENGE_MESSAGE (section 2.2.1.2) MUST be supplied. An alternate name for this
# field is NTLMSSP_REQUEST_TARGET.
NTLMSSP_REQUEST_TARGET = 0x00000004
# If set, requests OEM character set encoding. An alternate name for this field is NTLM_NEGOTIATE_OEM. See bit A for
# details.
NTLM_NEGOTIATE_OEM = 0x00000002
# If set, requests Unicode character set encoding. An alternate name for this field is NTLMSSP_NEGOTIATE_UNICODE.
NTLMSSP_NEGOTIATE_UNICODE = 0x00000001
# AV_PAIR constants
NTLMSSP_AV_EOL = 0x00
NTLMSSP_AV_HOSTNAME = 0x01
NTLMSSP_AV_DOMAINNAME = 0x02
NTLMSSP_AV_DNS_HOSTNAME = 0x03
NTLMSSP_AV_DNS_DOMAINNAME = 0x04
NTLMSSP_AV_DNS_TREENAME = 0x05
NTLMSSP_AV_FLAGS = 0x06
NTLMSSP_AV_TIME = 0x07
NTLMSSP_AV_RESTRICTIONS = 0x08
NTLMSSP_AV_TARGET_NAME = 0x09
NTLMSSP_AV_CHANNEL_BINDINGS = 0x0a
class AV_PAIRS():
def __init__(self, data = None):
self.fields = {}
if data is not None:
self.fromString(data)
def __setitem__(self,key,value):
self.fields[key] = (len(value),value)
def __getitem__(self, key):
if key in self.fields:
return self.fields[key]
return None
def __delitem__(self, key):
del self.fields[key]
def __len__(self):
return len(self.getData())
def __str__(self):
return len(self.getData())
def fromString(self, data):
tInfo = data
fType = 0xff
while fType is not NTLMSSP_AV_EOL:
fType = struct.unpack('<H',tInfo[:struct.calcsize('<H')])[0]
tInfo = tInfo[struct.calcsize('<H'):]
length = struct.unpack('<H',tInfo[:struct.calcsize('<H')])[0]
tInfo = tInfo[struct.calcsize('<H'):]
content = tInfo[:length]
self.fields[fType]=(length,content)
tInfo = tInfo[length:]
def dump(self):
for i in self.fields.keys():
print("%s: {%r}" % (i,self[i]))
def getData(self):
if NTLMSSP_AV_EOL in self.fields:
del self.fields[NTLMSSP_AV_EOL]
ans = ''
for i in self.fields.keys():
ans+= struct.pack('<HH', i, self[i][0])
ans+= self[i][1]
# end with a NTLMSSP_AV_EOL
ans += struct.pack('<HH', NTLMSSP_AV_EOL, 0)
return ans
class NTLMAuthMixin:
def get_os_version(self):
if self['os_version'] == '':
return None
else:
mayor_v = struct.unpack('B',self['os_version'][0])[0]
minor_v = struct.unpack('B',self['os_version'][1])[0]
build_v = struct.unpack('H',self['os_version'][2:4])
return (mayor_v,minor_v,build_v)
class NTLMAuthNegotiate(Structure, NTLMAuthMixin):
structure = (
('','"NTLMSSP\x00'),
('message_type','<L=1'),
('flags','<L'),
('domain_len','<H-domain_name'),
('domain_max_len','<H-domain_name'),
('domain_offset','<L=0'),
('host_len','<H-host_name'),
('host_maxlen','<H-host_name'),
('host_offset','<L=0'),
('os_version',':'),
('host_name',':'),
('domain_name',':'))
def __init__(self):
Structure.__init__(self)
self['flags']= (
NTLMSSP_NEGOTIATE_128 |
NTLMSSP_NEGOTIATE_KEY_EXCH|
# NTLMSSP_LM_KEY |
NTLMSSP_NEGOTIATE_NTLM |
NTLMSSP_NEGOTIATE_UNICODE |
# NTLMSSP_ALWAYS_SIGN |
NTLMSSP_NEGOTIATE_SIGN |
NTLMSSP_NEGOTIATE_SEAL |
# NTLMSSP_TARGET |
0)
self['host_name']=''
self['domain_name']=''
self['os_version']=''
def getData(self):
if len(self.fields['host_name']) > 0:
self['flags'] |= NTLMSSP_NEGOTIATE_OEM_WORKSTATION_SUPPLIED
if len(self.fields['domain_name']) > 0:
self['flags'] |= NTLMSSP_NEGOTIATE_OEM_DOMAIN_SUPPLIED
if len(self.fields['os_version']) > 0:
self['flags'] |= NTLMSSP_NEGOTIATE_VERSION
if (self['flags'] & NTLMSSP_NEGOTIATE_VERSION) == NTLMSSP_NEGOTIATE_VERSION:
version_len = 8
else:
version_len = 0
if (self['flags'] & NTLMSSP_NEGOTIATE_OEM_WORKSTATION_SUPPLIED) == NTLMSSP_NEGOTIATE_OEM_WORKSTATION_SUPPLIED:
self['host_offset']=32 + version_len
if (self['flags'] & NTLMSSP_NEGOTIATE_OEM_DOMAIN_SUPPLIED) == NTLMSSP_NEGOTIATE_OEM_DOMAIN_SUPPLIED:
self['domain_offset']=32+len(self['host_name']) + version_len
return Structure.getData(self)
def fromString(self,data):
Structure.fromString(self,data)
domain_offset = self['domain_offset']
domain_end = self['domain_len'] + domain_offset
self['domain_name'] = data[ domain_offset : domain_end ]
host_offset = self['host_offset']
host_end = self['host_len'] + host_offset
self['host_name'] = data[ host_offset : host_end ]
hasOsInfo = self['flags'] & NTLMSSP_NEGOTIATE_VERSION
if len(data) >= 36 and hasOsInfo:
self['os_version'] = data[32:40]
else:
self['os_version'] = ''
class NTLMAuthChallenge(Structure):
structure = (
('','"NTLMSSP\x00'),
('message_type','<L=2'),
('domain_len','<H-domain_name'),
('domain_max_len','<H-domain_name'),
('domain_offset','<L=40'),
('flags','<L=0'),
('challenge','8s'),
('reserved','8s=""'),
('TargetInfoFields_len','<H-TargetInfoFields'),
('TargetInfoFields_max_len','<H-TargetInfoFields'),
('TargetInfoFields_offset','<L'),
('VersionLen','_-Version','self.checkVersion(self["flags"])'),
('Version',':'),
('domain_name',':'),
('TargetInfoFields',':'))
def checkVersion(self, flags):
if flags is not None:
if flags & NTLMSSP_NEGOTIATE_VERSION == 0:
return 0
return 8
def getData(self):
if self['TargetInfoFields'] is not None and type(self['TargetInfoFields']) is not str:
raw_av_fields = self['TargetInfoFields'].getData()
self['TargetInfoFields'] = raw_av_fields
return Structure.getData(self)
def fromString(self,data):
Structure.fromString(self,data)
# Just in case there's more data after the TargetInfoFields
self['TargetInfoFields'] = self['TargetInfoFields'][:self['TargetInfoFields_len']]
# We gotta process the TargetInfoFields
#if self['TargetInfoFields_len'] > 0:
# av_pairs = AV_PAIRS(self['TargetInfoFields'][:self['TargetInfoFields_len']])
# self['TargetInfoFields'] = av_pairs
return self
class NTLMAuthChallengeResponse(Structure, NTLMAuthMixin):
structure = (
('','"NTLMSSP\x00'),
('message_type','<L=3'),
('lanman_len','<H-lanman'),
('lanman_max_len','<H-lanman'),
('lanman_offset','<L'),
('ntlm_len','<H-ntlm'),
('ntlm_max_len','<H-ntlm'),
('ntlm_offset','<L'),
('domain_len','<H-domain_name'),
('domain_max_len','<H-domain_name'),
('domain_offset','<L'),
('user_len','<H-user_name'),
('user_max_len','<H-user_name'),
('user_offset','<L'),
('host_len','<H-host_name'),
('host_max_len','<H-host_name'),
('host_offset','<L'),
('session_key_len','<H-session_key'),
('session_key_max_len','<H-session_key'),
('session_key_offset','<L'),
('flags','<L'),
('VersionLen','_-Version','self.checkVersion(self["flags"])'),
('Version',':=""'),
('MICLen','_-MIC','self.checkMIC(self["flags"])'),
('MIC',':=""'),
('domain_name',':'),
('user_name',':'),
('host_name',':'),
('lanman',':'),
('ntlm',':'),
('session_key',':'))
def __init__(self, username = '', password = '', challenge = '', lmhash = '', nthash = '', flags = 0):
Structure.__init__(self)
self['session_key']=''
self['user_name']=username.encode('utf-16le')
self['domain_name']='' #"CLON".encode('utf-16le')
self['host_name']='' #"BETS".encode('utf-16le')
self['flags'] = ( #authResp['flags']
# we think (beto & gera) that his flags force a memory conten leakage when a windows 2000 answers using uninitializaed verifiers
NTLMSSP_NEGOTIATE_128 |
NTLMSSP_NEGOTIATE_KEY_EXCH|
# NTLMSSP_LM_KEY |
NTLMSSP_NEGOTIATE_NTLM |
NTLMSSP_NEGOTIATE_UNICODE |
# NTLMSSP_ALWAYS_SIGN |
NTLMSSP_NEGOTIATE_SIGN |
NTLMSSP_NEGOTIATE_SEAL |
# NTLMSSP_TARGET |
0)
# Here we do the stuff
if username and ( lmhash != '' or nthash != ''):
self['lanman'] = get_ntlmv1_response(lmhash, challenge)
self['ntlm'] = get_ntlmv1_response(nthash, challenge)
elif (username and password):
lmhash = compute_lmhash(password)
nthash = compute_nthash(password)
self['lanman']=get_ntlmv1_response(lmhash, challenge)
self['ntlm']=get_ntlmv1_response(nthash, challenge) # This is not used for LM_KEY nor NTLM_KEY
else:
self['lanman'] = ''
self['ntlm'] = ''
if not self['host_name']:
self['host_name'] = 'NULL'.encode('utf-16le') # for NULL session there must be a hostname
def checkVersion(self, flags):
if flags is not None:
if flags & NTLMSSP_NEGOTIATE_VERSION == 0:
return 0
return 8
def checkMIC(self, flags):
# TODO: Find a proper way to check the MIC is in there
if flags is not None:
if flags & NTLMSSP_NEGOTIATE_VERSION == 0:
return 0
return 16
def getData(self):
self['domain_offset']=64+self.checkMIC(self["flags"])+self.checkVersion(self["flags"])
self['user_offset']=64+self.checkMIC(self["flags"])+self.checkVersion(self["flags"])+len(self['domain_name'])
self['host_offset']=self['user_offset']+len(self['user_name'])
self['lanman_offset']=self['host_offset']+len(self['host_name'])
self['ntlm_offset']=self['lanman_offset']+len(self['lanman'])
self['session_key_offset']=self['ntlm_offset']+len(self['ntlm'])
return Structure.getData(self)
def fromString(self,data):
Structure.fromString(self,data)
# [MS-NLMP] page 27
# Payload data can be present in any order within the Payload field,
# with variable-length padding before or after the data
domain_offset = self['domain_offset']
domain_end = self['domain_len'] + domain_offset
self['domain_name'] = data[ domain_offset : domain_end ]
host_offset = self['host_offset']
host_end = self['host_len'] + host_offset
self['host_name'] = data[ host_offset: host_end ]
user_offset = self['user_offset']
user_end = self['user_len'] + user_offset
self['user_name'] = data[ user_offset: user_end ]
ntlm_offset = self['ntlm_offset']
ntlm_end = self['ntlm_len'] + ntlm_offset
self['ntlm'] = data[ ntlm_offset : ntlm_end ]
lanman_offset = self['lanman_offset']
lanman_end = self['lanman_len'] + lanman_offset
self['lanman'] = data[ lanman_offset : lanman_end]
#if len(data) >= 36:
# self['os_version'] = data[32:36]
#else:
# self['os_version'] = ''
class ImpacketStructure(Structure):
def set_parent(self, other):
self.parent = other
def get_packet(self):
return str(self)
def get_size(self):
return len(self)
class ExtendedOrNotMessageSignature(Structure):
def __init__(self, flags = 0, **kargs):
if flags & NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY:
self.structure = self.extendedMessageSignature
else:
self.structure = self.MessageSignature
return Structure.__init__(self, **kargs)
class NTLMMessageSignature(ExtendedOrNotMessageSignature):
extendedMessageSignature = (
('Version','<L=1'),
('Checksum','<q'),
('SeqNum','<i'),
)
MessageSignature = (
('Version','<L=1'),
('RandomPad','<i=0'),
('Checksum','<i'),
('SeqNum','<i'),
)
KNOWN_DES_INPUT = "KGS!@#$%"
def __expand_DES_key( key):
# Expand the key from a 7-byte password key into a 8-byte DES key
key = key[:7]
key += '\x00'*(7-len(key))
s = chr(((ord(key[0]) >> 1) & 0x7f) << 1)
s = s + chr(((ord(key[0]) & 0x01) << 6 | ((ord(key[1]) >> 2) & 0x3f)) << 1)
s = s + chr(((ord(key[1]) & 0x03) << 5 | ((ord(key[2]) >> 3) & 0x1f)) << 1)
s = s + chr(((ord(key[2]) & 0x07) << 4 | ((ord(key[3]) >> 4) & 0x0f)) << 1)
s = s + chr(((ord(key[3]) & 0x0f) << 3 | ((ord(key[4]) >> 5) & 0x07)) << 1)
s = s + chr(((ord(key[4]) & 0x1f) << 2 | ((ord(key[5]) >> 6) & 0x03)) << 1)
s = s + chr(((ord(key[5]) & 0x3f) << 1 | ((ord(key[6]) >> 7) & 0x01)) << 1)
s = s + chr((ord(key[6]) & 0x7f) << 1)
return s
def __DES_block(key, msg):
if POW:
cipher = POW.Symmetric(POW.DES_ECB)
cipher.encryptInit(__expand_DES_key(key))
return cipher.update(msg)
else:
cipher = DES.new(__expand_DES_key(key),DES.MODE_ECB)
return cipher.encrypt(msg)
def ntlmssp_DES_encrypt(key, challenge):
answer = __DES_block(key[:7], challenge)
answer += __DES_block(key[7:14], challenge)
answer += __DES_block(key[14:], challenge)
return answer
# High level functions to use NTLMSSP
def getNTLMSSPType1(workstation='', domain='', signingRequired = False, use_ntlmv2 = USE_NTLMv2):
# Let's do some encoding checks before moving on. Kind of dirty, but found effective when dealing with
# international characters.
import sys
encoding = sys.getfilesystemencoding()
if encoding is not None:
try:
workstation.encode('utf-16le')
except:
workstation = workstation.decode(encoding)
try:
domain.encode('utf-16le')
except:
domain = domain.decode(encoding)
# Let's prepare a Type 1 NTLMSSP Message
auth = NTLMAuthNegotiate()
auth['flags']=0
if signingRequired:
auth['flags'] = NTLMSSP_NEGOTIATE_KEY_EXCH | NTLMSSP_NEGOTIATE_SIGN | NTLMSSP_NEGOTIATE_ALWAYS_SIGN | NTLMSSP_NEGOTIATE_SEAL
if use_ntlmv2:
auth['flags'] |= NTLMSSP_NEGOTIATE_TARGET_INFO
auth['flags'] |= NTLMSSP_NEGOTIATE_NTLM | NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY | NTLMSSP_NEGOTIATE_UNICODE | NTLMSSP_REQUEST_TARGET | NTLMSSP_NEGOTIATE_128 | NTLMSSP_NEGOTIATE_56
auth['domain_name'] = domain.encode('utf-16le')
return auth
def getNTLMSSPType3(type1, type2, user, password, domain, lmhash = '', nthash = '', use_ntlmv2 = USE_NTLMv2):
# Let's do some encoding checks before moving on. Kind of dirty, but found effective when dealing with
# international characters.
import sys
encoding = sys.getfilesystemencoding()
if encoding is not None:
try:
user.encode('utf-16le')
except:
user = user.decode(encoding)
try:
password.encode('utf-16le')
except:
password = password.decode(encoding)
try:
domain.encode('utf-16le')
except:
domain = user.decode(encoding)
ntlmChallenge = NTLMAuthChallenge(type2)
# Let's start with the original flags sent in the type1 message
responseFlags = type1['flags']
# Token received and parsed. Depending on the authentication
# method we will create a valid ChallengeResponse
ntlmChallengeResponse = NTLMAuthChallengeResponse(user, password, ntlmChallenge['challenge'])
clientChallenge = "".join([random.choice(string.digits+string.letters) for i in range(8)])
serverName = ntlmChallenge['TargetInfoFields']
ntResponse, lmResponse, sessionBaseKey = computeResponse(ntlmChallenge['flags'], ntlmChallenge['challenge'], clientChallenge, serverName, domain, user, password, lmhash, nthash, use_ntlmv2 )
# Let's check the return flags
if (ntlmChallenge['flags'] & NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY) == 0:
# No extended session security, taking it out
responseFlags &= 0xffffffff ^ NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY
if (ntlmChallenge['flags'] & NTLMSSP_NEGOTIATE_128 ) == 0:
# No support for 128 key len, taking it out
responseFlags &= 0xffffffff ^ NTLMSSP_NEGOTIATE_128
if (ntlmChallenge['flags'] & NTLMSSP_NEGOTIATE_KEY_EXCH) == 0:
# No key exchange supported, taking it out
responseFlags &= 0xffffffff ^ NTLMSSP_NEGOTIATE_KEY_EXCH
if (ntlmChallenge['flags'] & NTLMSSP_NEGOTIATE_SEAL) == 0:
# No sign available, taking it out
responseFlags &= 0xffffffff ^ NTLMSSP_NEGOTIATE_SEAL
if (ntlmChallenge['flags'] & NTLMSSP_NEGOTIATE_SIGN) == 0:
# No sign available, taking it out
responseFlags &= 0xffffffff ^ NTLMSSP_NEGOTIATE_SIGN
if (ntlmChallenge['flags'] & NTLMSSP_NEGOTIATE_ALWAYS_SIGN) == 0:
# No sign available, taking it out
responseFlags &= 0xffffffff ^ NTLMSSP_NEGOTIATE_ALWAYS_SIGN
keyExchangeKey = KXKEY(ntlmChallenge['flags'],sessionBaseKey, lmResponse, ntlmChallenge['challenge'], password, lmhash, nthash,use_ntlmv2)
# Special case for anonymous login
if user == '' and password == '' and lmhash == '' and nthash == '':
keyExchangeKey = '\x00'*16
# If we set up key exchange, let's fill the right variables
if ntlmChallenge['flags'] & NTLMSSP_NEGOTIATE_KEY_EXCH:
# not exactly what I call random tho :\
# exportedSessionKey = this is the key we should use to sign
exportedSessionKey = "".join([random.choice(string.digits+string.letters) for i in range(16)])
#exportedSessionKey = "A"*16
#print "keyExchangeKey %r" % keyExchangeKey
# Let's generate the right session key based on the challenge flags
#if responseFlags & NTLMSSP_NTLM2_KEY:
# Extended session security enabled
# if responseFlags & NTLMSSP_KEY_128:
# Full key
# exportedSessionKey = exportedSessionKey
# elif responseFlags & NTLMSSP_KEY_56:
# Only 56-bit key
# exportedSessionKey = exportedSessionKey[:7]
# else:
# exportedSessionKey = exportedSessionKey[:5]
#elif responseFlags & NTLMSSP_KEY_56:
# No extended session security, just 56 bits key
# exportedSessionKey = exportedSessionKey[:7] + '\xa0'
#else:
# exportedSessionKey = exportedSessionKey[:5] + '\xe5\x38\xb0'
encryptedRandomSessionKey = generateEncryptedSessionKey(keyExchangeKey, exportedSessionKey)
else:
encryptedRandomSessionKey = None
# [MS-NLMP] page 46
exportedSessionKey = keyExchangeKey
ntlmChallengeResponse['flags'] = responseFlags
ntlmChallengeResponse['domain_name'] = domain.encode('utf-16le')
ntlmChallengeResponse['lanman'] = lmResponse
ntlmChallengeResponse['ntlm'] = ntResponse
if encryptedRandomSessionKey is not None:
ntlmChallengeResponse['session_key'] = encryptedRandomSessionKey
return ntlmChallengeResponse, exportedSessionKey
# NTLMv1 Algorithm
def generateSessionKeyV1(password, lmhash, nthash):
if POW:
hash = POW.Digest(POW.MD4_DIGEST)
else:
hash = MD4.new()
hash.update(NTOWFv1(password, lmhash, nthash))
return hash.digest()
def computeResponseNTLMv1(flags, serverChallenge, clientChallenge, serverName, domain, user, password, lmhash='', nthash='', use_ntlmv2 = USE_NTLMv2):
if (user == '' and password == ''):
# Special case for anonymous authentication
lmResponse = ''
ntResponse = ''
else:
lmhash = LMOWFv1(password, lmhash, nthash)
nthash = NTOWFv1(password, lmhash, nthash)
if flags & NTLMSSP_NEGOTIATE_LM_KEY:
ntResponse = ''
lmResponse = get_ntlmv1_response(lmhash, serverChallenge)
elif flags & NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY:
md5 = hashlib.new('md5')
chall = (serverChallenge + clientChallenge)
md5.update(chall)
ntResponse = ntlmssp_DES_encrypt(nthash, md5.digest()[:8])
lmResponse = clientChallenge + '\x00'*16
else:
ntResponse = get_ntlmv1_response(nthash,serverChallenge)
lmResponse = get_ntlmv1_response(lmhash, serverChallenge)
sessionBaseKey = generateSessionKeyV1(password, lmhash, nthash)
return ntResponse, lmResponse, sessionBaseKey
def compute_lmhash(password):
# This is done according to Samba's encryption specification (docs/html/ENCRYPTION.html)
password = password.upper()
lmhash = __DES_block(password[:7], KNOWN_DES_INPUT)
lmhash += __DES_block(password[7:14], KNOWN_DES_INPUT)
return lmhash
def NTOWFv1(password, lmhash = '', nthash=''):
if nthash != '':
return nthash
return compute_nthash(password)
def LMOWFv1(password, lmhash = '', nthash=''):
if lmhash != '':
return lmhash
return compute_lmhash(password)
def compute_nthash(password):
# This is done according to Samba's encryption specification (docs/html/ENCRYPTION.html)
try:
password = unicode(password).encode('utf_16le')
except NameError: # unicode() was removed in Python 3
password = str(password).encode('utf_16le')
except UnicodeDecodeError:
import sys
password = password.decode(sys.getfilesystemencoding()).encode('utf_16le')
if POW:
hash = POW.Digest(POW.MD4_DIGEST)
else:
hash = MD4.new()
hash.update(password)
return hash.digest()
def get_ntlmv1_response(key, challenge):
return ntlmssp_DES_encrypt(key, challenge)
# NTLMv2 Algorithm - as described in MS-NLMP Section 3.3.2
# Crypto Stuff
def MAC(flags, handle, signingKey, seqNum, message):
# [MS-NLMP] Section 3.4.4
# Returns the right messageSignature depending on the flags
messageSignature = NTLMMessageSignature(flags)
if flags & NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY:
if flags & NTLMSSP_NEGOTIATE_KEY_EXCH:
messageSignature['Version'] = 1
messageSignature['Checksum'] = struct.unpack('<q',handle(hmac_md5(signingKey, struct.pack('<i',seqNum)+message)[:8]))[0]
messageSignature['SeqNum'] = seqNum
seqNum += 1
else:
messageSignature['Version'] = 1
messageSignature['Checksum'] = struct.unpack('<q',hmac_md5(signingKey, struct.pack('<i',seqNum)+message)[:8])[0]
messageSignature['SeqNum'] = seqNum
seqNum += 1
else:
messageSignature['Version'] = 1
messageSignature['Checksum'] = struct.pack('<i',binascii.crc32(message))
messageSignature['RandomPad'] = 0
messageSignature['RandomPad'] = handle(struct.pack('<i',messageSignature['RandomPad']))
messageSignature['Checksum'] = struct.unpack('<i',handle(messageSignature['Checksum']))[0]
messageSignature['SeqNum'] = handle('\x00\x00\x00\x00')
messageSignature['SeqNum'] = struct.unpack('<i',messageSignature['SeqNum'])[0] ^ seqNum
messageSignature['RandomPad'] = 0
return messageSignature
def SEAL(flags, signingKey, sealingKey, messageToSign, messageToEncrypt, seqNum, handle):
sealedMessage = handle(messageToEncrypt)
signature = MAC(flags, handle, signingKey, seqNum, messageToSign)
return sealedMessage, signature
def SIGN(flags, signingKey, message, seqNum, handle):
return MAC(flags, handle, signingKey, seqNum, message)
def SIGNKEY(flags, randomSessionKey, mode = 'Client'):
if flags & NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY:
if mode == 'Client':
md5 = hashlib.new('md5')
md5.update(randomSessionKey + "session key to client-to-server signing key magic constant\x00")
signKey = md5.digest()
else:
md5 = hashlib.new('md5')
md5.update(randomSessionKey + "session key to server-to-client signing key magic constant\x00")
signKey = md5.digest()
else:
signKey = None
return signKey
def SEALKEY(flags, randomSessionKey, mode = 'Client'):
if flags & NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY:
if flags & NTLMSSP_NEGOTIATE_128:
sealKey = randomSessionKey
elif flags & NTLMSSP_NEGOTIATE_56:
sealKey = randomSessionKey[:7]
else:
sealKey = randomSessionKey[:5]
if mode == 'Client':
md5 = hashlib.new('md5')
md5.update(sealKey + 'session key to client-to-server sealing key magic constant\x00')
sealKey = md5.digest()
else:
md5 = hashlib.new('md5')
md5.update(sealKey + 'session key to server-to-client sealing key magic constant\x00')
sealKey = md5.digest()
elif flags & NTLMSSP_NEGOTIATE_56:
sealKey = randomSessionKey[:7] + '\xa0'
else:
sealKey = randomSessionKey[:5] + '\xe5\x38\xb0'
return sealKey
def generateEncryptedSessionKey(keyExchangeKey, exportedSessionKey):
if POW:
cipher = POW.Symmetric(POW.RC4)
cipher.encryptInit(keyExchangeKey)
cipher_encrypt = cipher.update
else:
cipher = ARC4.new(keyExchangeKey)
cipher_encrypt = cipher.encrypt
sessionKey = cipher_encrypt(exportedSessionKey)
return sessionKey
def KXKEY(flags, sessionBaseKey, lmChallengeResponse, serverChallenge, password, lmhash, nthash, use_ntlmv2 = USE_NTLMv2):
if use_ntlmv2:
return sessionBaseKey
if flags & NTLMSSP_NEGOTIATE_EXTENDED_SESSIONSECURITY:
if flags & NTLMSSP_NEGOTIATE_NTLM:
keyExchangeKey = hmac_md5(sessionBaseKey, serverChallenge + lmChallengeResponse[:8])
else:
keyExchangeKey = sessionBaseKey
elif flags & NTLMSSP_NEGOTIATE_NTLM:
if flags & NTLMSSP_NEGOTIATE_LM_KEY:
keyExchangeKey = __DES_block(LMOWFv1(password,lmhash)[:7], lmChallengeResponse[:8]) + __DES_block(LMOWFv1(password,lmhash)[7] + '\xBD\xBD\xBD\xBD\xBD\xBD', lmChallengeResponse[:8])
elif flags & NTLMSSP_REQUEST_NON_NT_SESSION_KEY:
keyExchangeKey = LMOWFv1(password,lmhash)[:8] + '\x00'*8
else:
keyExchangeKey = sessionBaseKey
else:
raise "Can't create a valid KXKEY!"
return keyExchangeKey
def hmac_md5(key, data):
if POW:
h = POW.Hmac(POW.MD5_DIGEST, key)
h.update(data)
result = h.mac()
else:
import hmac
h = hmac.new(key)
h.update(data)
result = h.digest()
return result
def NTOWFv2( user, password, domain, hash = ''):
if hash != '':
theHash = hash
else:
theHash = compute_nthash(password)
return hmac_md5(theHash, user.upper().encode('utf-16le') + domain.encode('utf-16le'))
def LMOWFv2( user, password, domain, lmhash = ''):
return NTOWFv2( user, password, domain, lmhash)
def computeResponseNTLMv2(flags, serverChallenge, clientChallenge, serverName, domain, user, password, lmhash = '', nthash = '', use_ntlmv2 = USE_NTLMv2):
responseServerVersion = '\x01'
hiResponseServerVersion = '\x01'
responseKeyNT = NTOWFv2(user, password, domain, nthash)
responseKeyLM = LMOWFv2(user, password, domain, lmhash)
# If you're running test-ntlm, comment the following lines and uncoment the ones that are commented. Don't forget to turn it back after the tests!
######################
av_pairs = AV_PAIRS(serverName)
# In order to support SPN target name validation, we have to add this to the serverName av_pairs. Otherwise we will get access denied
# This is set at Local Security Policy -> Local Policies -> Security Options -> Server SPN target name validation level
av_pairs[NTLMSSP_AV_TARGET_NAME] = 'cifs/'.encode('utf-16le') + av_pairs[NTLMSSP_AV_HOSTNAME][1]
if av_pairs[NTLMSSP_AV_TIME] is not None:
aTime = av_pairs[NTLMSSP_AV_TIME][1]
else:
aTime = struct.pack('<q', (116444736000000000 + calendar.timegm(time.gmtime()) * 10000000) )
#aTime = '\x00'*8
av_pairs[NTLMSSP_AV_TIME] = aTime
serverName = av_pairs.getData()
######################
#aTime = '\x00'*8
######################
temp = responseServerVersion + hiResponseServerVersion + '\x00' * 6 + aTime + clientChallenge + '\x00' * 4 + serverName + '\x00' * 4
ntProofStr = hmac_md5(responseKeyNT, serverChallenge + temp)
ntChallengeResponse = ntProofStr + temp
lmChallengeResponse = hmac_md5(responseKeyNT, serverChallenge + clientChallenge) + clientChallenge
sessionBaseKey = hmac_md5(responseKeyNT, ntProofStr)
if (user == '' and password == ''):
# Special case for anonymous authentication
ntChallengeResponse = ''
lmChallengeResponse = ''
return ntChallengeResponse, lmChallengeResponse, sessionBaseKey
class NTLM_HTTP(object):
'''Parent class for NTLM HTTP classes.'''
MSG_TYPE = None
@classmethod
def get_instace(cls,msg_64):
msg = None
msg_type = 0
if msg_64 != '':
msg = base64.b64decode(msg_64[5:]) # Remove the 'NTLM '
msg_type = ord(msg[8])
for _cls in NTLM_HTTP.__subclasses__():
if msg_type == _cls.MSG_TYPE:
instance = _cls()
instance.fromString(msg)
return instance
class NTLM_HTTP_AuthRequired(NTLM_HTTP):
commonHdr = ()
# Message 0 means the first HTTP request e.g. 'GET /bla.png'
MSG_TYPE = 0
def fromString(self,data):
pass
class NTLM_HTTP_AuthNegotiate(NTLM_HTTP, NTLMAuthNegotiate):
commonHdr = ()
MSG_TYPE = 1
def __init__(self):
NTLMAuthNegotiate.__init__(self)
class NTLM_HTTP_AuthChallengeResponse(NTLM_HTTP, NTLMAuthChallengeResponse):
commonHdr = ()
MSG_TYPE = 3
def __init__(self):
NTLMAuthChallengeResponse.__init__(self)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,372 +0,0 @@
from __future__ import print_function
# Copyright (c) 2003-2016 CORE Security Technologies
#
# This software is provided under under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
# Author: Alberto Solino (beto@coresecurity.com)
#
# Description:
# SPNEGO functions used by SMB, SMB2/3 and DCERPC
#
from struct import pack, unpack, calcsize
############### GSS Stuff ################
GSS_API_SPNEGO_UUID = '\x2b\x06\x01\x05\x05\x02'
ASN1_SEQUENCE = 0x30
ASN1_AID = 0x60
ASN1_OID = 0x06
ASN1_OCTET_STRING = 0x04
ASN1_MECH_TYPE = 0xa0
ASN1_MECH_TOKEN = 0xa2
ASN1_SUPPORTED_MECH = 0xa1
ASN1_RESPONSE_TOKEN = 0xa2
ASN1_ENUMERATED = 0x0a
MechTypes = {
'+\x06\x01\x04\x01\x827\x02\x02\x1e': 'SNMPv2-SMI::enterprises.311.2.2.30',
'+\x06\x01\x04\x01\x827\x02\x02\n': 'NTLMSSP - Microsoft NTLM Security Support Provider',
'*\x86H\x82\xf7\x12\x01\x02\x02': 'MS KRB5 - Microsoft Kerberos 5',
'*\x86H\x86\xf7\x12\x01\x02\x02': 'KRB5 - Kerberos 5',
'*\x86H\x86\xf7\x12\x01\x02\x02\x03': 'KRB5 - Kerberos 5 - User to User'
}
TypesMech = dict((v,k) for k, v in MechTypes.items())
def asn1encode(data = ''):
#res = asn1.SEQUENCE(str).encode()
#import binascii
#print '\nalex asn1encode str: %s\n' % binascii.hexlify(str)
if 0 <= len(data) <= 0x7F:
res = pack('B', len(data)) + data
elif 0x80 <= len(data) <= 0xFF:
res = pack('BB', 0x81, len(data)) + data
elif 0x100 <= len(data) <= 0xFFFF:
res = pack('!BH', 0x82, len(data)) + data
elif 0x10000 <= len(data) <= 0xffffff:
res = pack('!BBH', 0x83, len(data) >> 16, len(data) & 0xFFFF) + data
elif 0x1000000 <= len(data) <= 0xffffffff:
res = pack('!BL', 0x84, len(data)) + data
else:
raise Exception('Error in asn1encode')
return str(res)
def asn1decode(data = ''):
len1 = unpack('B', data[:1])[0]
data = data[1:]
if len1 == 0x81:
pad = calcsize('B')
len2 = unpack('B',data[:pad])[0]
data = data[pad:]
ans = data[:len2]
elif len1 == 0x82:
pad = calcsize('H')
len2 = unpack('!H', data[:pad])[0]
data = data[pad:]
ans = data[:len2]
elif len1 == 0x83:
pad = calcsize('B') + calcsize('!H')
len2, len3 = unpack('!BH', data[:pad])
data = data[pad:]
ans = data[:len2 << 16 + len3]
elif len1 == 0x84:
pad = calcsize('!L')
len2 = unpack('!L', data[:pad])[0]
data = data[pad:]
ans = data[:len2]
# 1 byte length, string <= 0x7F
else:
pad = 0
ans = data[:len1]
return ans, len(ans)+pad+1
class GSSAPI:
# Generic GSSAPI Header Format
def __init__(self, data = None):
self.fields = {}
self['UUID'] = GSS_API_SPNEGO_UUID
if data:
self.fromString(data)
pass
def __setitem__(self,key,value):
self.fields[key] = value
def __getitem__(self, key):
return self.fields[key]
def __delitem__(self, key):
del self.fields[key]
def __len__(self):
return len(self.getData())
def __str__(self):
return len(self.getData())
def fromString(self, data = None):
# Manual parse of the GSSAPI Header Format
# It should be something like
# AID = 0x60 TAG, BER Length
# OID = 0x06 TAG
# GSSAPI OID
# UUID data (BER Encoded)
# Payload
next_byte = unpack('B',data[:1])[0]
if next_byte != ASN1_AID:
raise Exception('Unknown AID=%x' % next_byte)
data = data[1:]
decode_data, total_bytes = asn1decode(data)
# Now we should have a OID tag
next_byte = unpack('B',decode_data[:1])[0]
if next_byte != ASN1_OID:
raise Exception('OID tag not found %x' % next_byte)
decode_data = decode_data[1:]
# Now the OID contents, should be SPNEGO UUID
uuid, total_bytes = asn1decode(decode_data)
self['OID'] = uuid
# the rest should be the data
self['Payload'] = decode_data[total_bytes:]
#pass
def dump(self):
for i in self.fields.keys():
print("%s: {%r}" % (i,self[i]))
def getData(self):
ans = pack('B',ASN1_AID)
ans += asn1encode(
pack('B',ASN1_OID) +
asn1encode(self['UUID']) +
self['Payload'] )
return ans
class SPNEGO_NegTokenResp:
# http://tools.ietf.org/html/rfc4178#page-9
# NegTokenResp ::= SEQUENCE {
# negState [0] ENUMERATED {
# accept-completed (0),
# accept-incomplete (1),
# reject (2),
# request-mic (3)
# } OPTIONAL,
# -- REQUIRED in the first reply from the target
# supportedMech [1] MechType OPTIONAL,
# -- present only in the first reply from the target
# responseToken [2] OCTET STRING OPTIONAL,
# mechListMIC [3] OCTET STRING OPTIONAL,
# ...
# }
# This structure is not prepended by a GSS generic header!
SPNEGO_NEG_TOKEN_RESP = 0xa1
SPNEGO_NEG_TOKEN_TARG = 0xa0
def __init__(self, data = None):
self.fields = {}
if data:
self.fromString(data)
pass
def __setitem__(self,key,value):
self.fields[key] = value
def __getitem__(self, key):
return self.fields[key]
def __delitem__(self, key):
del self.fields[key]
def __len__(self):
return len(self.getData())
def __str__(self):
return len(self.getData())
def fromString(self, data = 0):
payload = data
next_byte = unpack('B', payload[:1])[0]
if next_byte != SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_RESP:
raise Exception('NegTokenResp not found %x' % next_byte)
payload = payload[1:]
decode_data, total_bytes = asn1decode(payload)
next_byte = unpack('B', decode_data[:1])[0]
if next_byte != ASN1_SEQUENCE:
raise Exception('SEQUENCE tag not found %x' % next_byte)
decode_data = decode_data[1:]
decode_data, total_bytes = asn1decode(decode_data)
next_byte = unpack('B',decode_data[:1])[0]
if next_byte != ASN1_MECH_TYPE:
# MechType not found, could be an AUTH answer
if next_byte != ASN1_RESPONSE_TOKEN:
raise Exception('MechType/ResponseToken tag not found %x' % next_byte)
else:
decode_data2 = decode_data[1:]
decode_data2, total_bytes = asn1decode(decode_data2)
next_byte = unpack('B', decode_data2[:1])[0]
if next_byte != ASN1_ENUMERATED:
raise Exception('Enumerated tag not found %x' % next_byte)
item, total_bytes2 = asn1decode(decode_data)
self['NegResult'] = item
decode_data = decode_data[1:]
decode_data = decode_data[total_bytes:]
# Do we have more data?
if len(decode_data) == 0:
return
next_byte = unpack('B', decode_data[:1])[0]
if next_byte != ASN1_SUPPORTED_MECH:
if next_byte != ASN1_RESPONSE_TOKEN:
raise Exception('Supported Mech/ResponseToken tag not found %x' % next_byte)
else:
decode_data2 = decode_data[1:]
decode_data2, total_bytes = asn1decode(decode_data2)
next_byte = unpack('B', decode_data2[:1])[0]
if next_byte != ASN1_OID:
raise Exception('OID tag not found %x' % next_byte)
decode_data2 = decode_data2[1:]
item, total_bytes2 = asn1decode(decode_data2)
self['SupportedMech'] = item
decode_data = decode_data[1:]
decode_data = decode_data[total_bytes:]
next_byte = unpack('B', decode_data[:1])[0]
if next_byte != ASN1_RESPONSE_TOKEN:
raise Exception('Response token tag not found %x' % next_byte)
decode_data = decode_data[1:]
decode_data, total_bytes = asn1decode(decode_data)
next_byte = unpack('B', decode_data[:1])[0]
if next_byte != ASN1_OCTET_STRING:
raise Exception('Octet string token tag not found %x' % next_byte)
decode_data = decode_data[1:]
decode_data, total_bytes = asn1decode(decode_data)
self['ResponseToken'] = decode_data
def dump(self):
for i in self.fields.keys():
print("%s: {%r}" % (i,self[i]))
def getData(self):
ans = pack('B',SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_RESP)
if 'NegResult' in self.fields and 'SupportedMech' in self.fields:
# Server resp
ans += asn1encode(
pack('B', ASN1_SEQUENCE) +
asn1encode(
pack('B',SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_TARG) +
asn1encode(
pack('B',ASN1_ENUMERATED) +
asn1encode( self['NegResult'] )) +
pack('B',ASN1_SUPPORTED_MECH) +
asn1encode(
pack('B',ASN1_OID) +
asn1encode(self['SupportedMech'])) +
pack('B',ASN1_RESPONSE_TOKEN ) +
asn1encode(
pack('B', ASN1_OCTET_STRING) + asn1encode(self['ResponseToken']))))
elif 'NegResult' in self.fields:
# Server resp
ans += asn1encode(
pack('B', ASN1_SEQUENCE) +
asn1encode(
pack('B', SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_TARG) +
asn1encode(
pack('B',ASN1_ENUMERATED) +
asn1encode( self['NegResult'] ))))
else:
# Client resp
ans += asn1encode(
pack('B', ASN1_SEQUENCE) +
asn1encode(
pack('B', ASN1_RESPONSE_TOKEN) +
asn1encode(
pack('B', ASN1_OCTET_STRING) + asn1encode(self['ResponseToken']))))
return ans
class SPNEGO_NegTokenInit(GSSAPI):
# http://tools.ietf.org/html/rfc4178#page-8
# NegTokeInit :: = SEQUENCE {
# mechTypes [0] MechTypeList,
# reqFlags [1] ContextFlags OPTIONAL,
# mechToken [2] OCTET STRING OPTIONAL,
# mechListMIC [3] OCTET STRING OPTIONAL,
# }
SPNEGO_NEG_TOKEN_INIT = 0xa0
def fromString(self, data = 0):
GSSAPI.fromString(self, data)
payload = self['Payload']
next_byte = unpack('B', payload[:1])[0]
if next_byte != SPNEGO_NegTokenInit.SPNEGO_NEG_TOKEN_INIT:
raise Exception('NegTokenInit not found %x' % next_byte)
payload = payload[1:]
decode_data, total_bytes = asn1decode(payload)
# Now we should have a SEQUENCE Tag
next_byte = unpack('B', decode_data[:1])[0]
if next_byte != ASN1_SEQUENCE:
raise Exception('SEQUENCE tag not found %x' % next_byte)
decode_data = decode_data[1:]
decode_data, total_bytes2 = asn1decode(decode_data)
next_byte = unpack('B',decode_data[:1])[0]
if next_byte != ASN1_MECH_TYPE:
raise Exception('MechType tag not found %x' % next_byte)
decode_data = decode_data[1:]
remaining_data = decode_data
decode_data, total_bytes3 = asn1decode(decode_data)
next_byte = unpack('B', decode_data[:1])[0]
if next_byte != ASN1_SEQUENCE:
raise Exception('SEQUENCE tag not found %x' % next_byte)
decode_data = decode_data[1:]
decode_data, total_bytes4 = asn1decode(decode_data)
# And finally we should have the MechTypes
self['MechTypes'] = []
while decode_data:
next_byte = unpack('B', decode_data[:1])[0]
if next_byte != ASN1_OID:
# Not a valid OID, there must be something else we won't unpack
break
decode_data = decode_data[1:]
item, total_bytes = asn1decode(decode_data)
self['MechTypes'].append(item)
decode_data = decode_data[total_bytes:]
# Do we have MechTokens as well?
decode_data = remaining_data[total_bytes3:]
if len(decode_data) > 0:
next_byte = unpack('B', decode_data[:1])[0]
if next_byte == ASN1_MECH_TOKEN:
# We have tokens in here!
decode_data = decode_data[1:]
decode_data, total_bytes = asn1decode(decode_data)
next_byte = unpack('B', decode_data[:1])[0]
if next_byte == ASN1_OCTET_STRING:
decode_data = decode_data[1:]
decode_data, total_bytes = asn1decode(decode_data)
self['MechToken'] = decode_data
def getData(self):
mechTypes = ''
for i in self['MechTypes']:
mechTypes += pack('B', ASN1_OID)
mechTypes += asn1encode(i)
mechToken = ''
# Do we have tokens to send?
if 'MechToken' in self.fields:
mechToken = pack('B', ASN1_MECH_TOKEN) + asn1encode(
pack('B', ASN1_OCTET_STRING) + asn1encode(
self['MechToken']))
ans = pack('B',SPNEGO_NegTokenInit.SPNEGO_NEG_TOKEN_INIT)
ans += asn1encode(
pack('B', ASN1_SEQUENCE) +
asn1encode(
pack('B', ASN1_MECH_TYPE) +
asn1encode(
pack('B', ASN1_SEQUENCE) +
asn1encode(mechTypes)) + mechToken ))
self['Payload'] = ans
return GSSAPI.getData(self)

View File

@ -1,744 +0,0 @@
from __future__ import print_function
# Copyright (c) 2003-2016 CORE Security Technologies
#
# This software is provided under under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
from struct import pack, unpack, calcsize
class Structure:
""" sublcasses can define commonHdr and/or structure.
each of them is an tuple of either two: (fieldName, format) or three: (fieldName, ':', class) fields.
[it can't be a dictionary, because order is important]
where format specifies how the data in the field will be converted to/from bytes (string)
class is the class to use when unpacking ':' fields.
each field can only contain one value (or an array of values for *)
i.e. struct.pack('Hl',1,2) is valid, but format specifier 'Hl' is not (you must use 2 dfferent fields)
format specifiers:
specifiers from module pack can be used with the same format
see struct.__doc__ (pack/unpack is finally called)
x [padding byte]
c [character]
b [signed byte]
B [unsigned byte]
h [signed short]
H [unsigned short]
l [signed long]
L [unsigned long]
i [signed integer]
I [unsigned integer]
q [signed long long (quad)]
Q [unsigned long long (quad)]
s [string (array of chars), must be preceded with length in format specifier, padded with zeros]
p [pascal string (includes byte count), must be preceded with length in format specifier, padded with zeros]
f [float]
d [double]
= [native byte ordering, size and alignment]
@ [native byte ordering, standard size and alignment]
! [network byte ordering]
< [little endian]
> [big endian]
usual printf like specifiers can be used (if started with %)
[not recommeneded, there is no why to unpack this]
%08x will output an 8 bytes hex
%s will output a string
%s\\x00 will output a NUL terminated string
%d%d will output 2 decimal digits (against the very same specification of Structure)
...
some additional format specifiers:
: just copy the bytes from the field into the output string (input may be string, other structure, or anything responding to __str__()) (for unpacking, all what's left is returned)
z same as :, but adds a NUL byte at the end (asciiz) (for unpacking the first NUL byte is used as terminator) [asciiz string]
u same as z, but adds two NUL bytes at the end (after padding to an even size with NULs). (same for unpacking) [unicode string]
w DCE-RPC/NDR string (it's a macro for [ '<L=(len(field)+1)/2','"\\x00\\x00\\x00\\x00','<L=(len(field)+1)/2',':' ]
?-field length of field named 'field', formated as specified with ? ('?' may be '!H' for example). The input value overrides the real length
?1*?2 array of elements. Each formated as '?2', the number of elements in the array is stored as specified by '?1' (?1 is optional, or can also be a constant (number), for unpacking)
'xxxx literal xxxx (field's value doesn't change the output. quotes must not be closed or escaped)
"xxxx literal xxxx (field's value doesn't change the output. quotes must not be closed or escaped)
_ will not pack the field. Accepts a third argument, which is an unpack code. See _Test_UnpackCode for an example
?=packcode will evaluate packcode in the context of the structure, and pack the result as specified by ?. Unpacking is made plain
?&fieldname "Address of field fieldname".
For packing it will simply pack the id() of fieldname. Or use 0 if fieldname doesn't exists.
For unpacking, it's used to know weather fieldname has to be unpacked or not, i.e. by adding a & field you turn another field (fieldname) in an optional field.
"""
commonHdr = ()
structure = ()
debug = 0
def __init__(self, data = None, alignment = 0):
if not hasattr(self, 'alignment'):
self.alignment = alignment
self.fields = {}
self.rawData = data
if data is not None:
self.fromString(data)
else:
self.data = None
@classmethod
def fromFile(self, file):
answer = self()
answer.fromString(file.read(len(answer)))
return answer
def setAlignment(self, alignment):
self.alignment = alignment
def setData(self, data):
self.data = data
def packField(self, fieldName, format = None):
if self.debug:
print("packField( %s | %s )" % (fieldName, format))
if format is None:
format = self.formatForField(fieldName)
if fieldName in self.fields:
ans = self.pack(format, self.fields[fieldName], field = fieldName)
else:
ans = self.pack(format, None, field = fieldName)
if self.debug:
print("\tanswer %r" % ans)
return ans
def getData(self):
if self.data is not None:
return self.data
data = ''
for field in self.commonHdr+self.structure:
try:
data += self.packField(field[0], field[1])
except Exception as e:
if field[0] in self.fields:
e.args += ("When packing field '%s | %s | %r' in %s" % (field[0], field[1], self[field[0]], self.__class__),)
else:
e.args += ("When packing field '%s | %s' in %s" % (field[0], field[1], self.__class__),)
raise
if self.alignment:
if len(data) % self.alignment:
data += ('\x00'*self.alignment)[:-(len(data) % self.alignment)]
#if len(data) % self.alignment: data += ('\x00'*self.alignment)[:-(len(data) % self.alignment)]
return data
def fromString(self, data):
self.rawData = data
for field in self.commonHdr+self.structure:
if self.debug:
print("fromString( %s | %s | %r )" % (field[0], field[1], data))
size = self.calcUnpackSize(field[1], data, field[0])
if self.debug:
print(" size = %d" % size)
dataClassOrCode = str
if len(field) > 2:
dataClassOrCode = field[2]
try:
self[field[0]] = self.unpack(field[1], data[:size], dataClassOrCode = dataClassOrCode, field = field[0])
except Exception as e:
e.args += ("When unpacking field '%s | %s | %r[:%d]'" % (field[0], field[1], data, size),)
raise
size = self.calcPackSize(field[1], self[field[0]], field[0])
if self.alignment and size % self.alignment:
size += self.alignment - (size % self.alignment)
data = data[size:]
return self
def __setitem__(self, key, value):
self.fields[key] = value
self.data = None # force recompute
def __getitem__(self, key):
return self.fields[key]
def __delitem__(self, key):
del self.fields[key]
def __str__(self):
return self.getData()
def __len__(self):
# XXX: improve
return len(self.getData())
def pack(self, format, data, field = None):
if self.debug:
print(" pack( %s | %r | %s)" % (format, data, field))
if field:
addressField = self.findAddressFieldFor(field)
if (addressField is not None) and (data is None):
return ''
# void specifier
if format[:1] == '_':
return ''
# quote specifier
if format[:1] == "'" or format[:1] == '"':
return format[1:]
# code specifier
two = format.split('=')
if len(two) >= 2:
try:
return self.pack(two[0], data)
except:
fields = {'self':self}
fields.update(self.fields)
return self.pack(two[0], eval(two[1], {}, fields))
# address specifier
two = format.split('&')
if len(two) == 2:
try:
return self.pack(two[0], data)
except:
if (two[1] in self.fields) and (self[two[1]] is not None):
return self.pack(two[0], id(self[two[1]]) & ((1<<(calcsize(two[0])*8))-1) )
else:
return self.pack(two[0], 0)
# length specifier
two = format.split('-')
if len(two) == 2:
try:
return self.pack(two[0],data)
except:
return self.pack(two[0], self.calcPackFieldSize(two[1]))
# array specifier
two = format.split('*')
if len(two) == 2:
answer = ''
for each in data:
answer += self.pack(two[1], each)
if two[0]:
if two[0].isdigit():
if int(two[0]) != len(data):
raise Exception("Array field has a constant size, and it doesn't match the actual value")
else:
return self.pack(two[0], len(data))+answer
return answer
# "printf" string specifier
if format[:1] == '%':
# format string like specifier
return format % data
# asciiz specifier
if format[:1] == 'z':
return str(data)+'\0'
# unicode specifier
if format[:1] == 'u':
return str(data)+'\0\0' + (len(data) & 1 and '\0' or '')
# DCE-RPC/NDR string specifier
if format[:1] == 'w':
if len(data) == 0:
data = '\0\0'
elif len(data) % 2:
data += '\0'
l = pack('<L', len(data)/2)
return '%s\0\0\0\0%s%s' % (l,l,data)
if data is None:
raise Exception("Trying to pack None")
# literal specifier
if format[:1] == ':':
return str(data)
# struct like specifier
return pack(format, data)
def unpack(self, format, data, dataClassOrCode = str, field = None):
if self.debug:
print(" unpack( %s | %r )" % (format, data))
if field:
addressField = self.findAddressFieldFor(field)
if addressField is not None:
if not self[addressField]:
return
# void specifier
if format[:1] == '_':
if dataClassOrCode != str:
fields = {'self':self, 'inputDataLeft':data}
fields.update(self.fields)
return eval(dataClassOrCode, {}, fields)
else:
return None
# quote specifier
if format[:1] == "'" or format[:1] == '"':
answer = format[1:]
if answer != data:
raise Exception("Unpacked data doesn't match constant value '%r' should be '%r'" % (data, answer))
return answer
# address specifier
two = format.split('&')
if len(two) == 2:
return self.unpack(two[0],data)
# code specifier
two = format.split('=')
if len(two) >= 2:
return self.unpack(two[0],data)
# length specifier
two = format.split('-')
if len(two) == 2:
return self.unpack(two[0],data)
# array specifier
two = format.split('*')
if len(two) == 2:
answer = []
sofar = 0
if two[0].isdigit():
number = int(two[0])
elif two[0]:
sofar += self.calcUnpackSize(two[0], data)
number = self.unpack(two[0], data[:sofar])
else:
number = -1
while number and sofar < len(data):
nsofar = sofar + self.calcUnpackSize(two[1],data[sofar:])
answer.append(self.unpack(two[1], data[sofar:nsofar], dataClassOrCode))
number -= 1
sofar = nsofar
return answer
# "printf" string specifier
if format[:1] == '%':
# format string like specifier
return format % data
# asciiz specifier
if format == 'z':
if data[-1] != '\x00':
raise Exception("%s 'z' field is not NUL terminated: %r" % (field, data))
return data[:-1] # remove trailing NUL
# unicode specifier
if format == 'u':
if data[-2:] != '\x00\x00':
raise Exception("%s 'u' field is not NUL-NUL terminated: %r" % (field, data))
return data[:-2] # remove trailing NUL
# DCE-RPC/NDR string specifier
if format == 'w':
l = unpack('<L', data[:4])[0]
return data[12:12+l*2]
# literal specifier
if format == ':':
return dataClassOrCode(data)
# struct like specifier
return unpack(format, data)[0]
def calcPackSize(self, format, data, field = None):
# # print " calcPackSize %s:%r" % (format, data)
if field:
addressField = self.findAddressFieldFor(field)
if addressField is not None:
if not self[addressField]:
return 0
# void specifier
if format[:1] == '_':
return 0
# quote specifier
if format[:1] == "'" or format[:1] == '"':
return len(format)-1
# address specifier
two = format.split('&')
if len(two) == 2:
return self.calcPackSize(two[0], data)
# code specifier
two = format.split('=')
if len(two) >= 2:
return self.calcPackSize(two[0], data)
# length specifier
two = format.split('-')
if len(two) == 2:
return self.calcPackSize(two[0], data)
# array specifier
two = format.split('*')
if len(two) == 2:
answer = 0
if two[0].isdigit():
if int(two[0]) != len(data):
raise Exception("Array field has a constant size, and it doesn't match the actual value")
elif two[0]:
answer += self.calcPackSize(two[0], len(data))
for each in data:
answer += self.calcPackSize(two[1], each)
return answer
# "printf" string specifier
if format[:1] == '%':
# format string like specifier
return len(format % data)
# asciiz specifier
if format[:1] == 'z':
return len(data)+1
# asciiz specifier
if format[:1] == 'u':
l = len(data)
return l + (l & 1 and 3 or 2)
# DCE-RPC/NDR string specifier
if format[:1] == 'w':
l = len(data)
return 12+l+l % 2
# literal specifier
if format[:1] == ':':
return len(data)
# struct like specifier
return calcsize(format)
def calcUnpackSize(self, format, data, field = None):
if self.debug:
print(" calcUnpackSize( %s | %s | %r)" % (field, format, data))
# void specifier
if format[:1] == '_':
return 0
addressField = self.findAddressFieldFor(field)
if addressField is not None:
if not self[addressField]:
return 0
try:
lengthField = self.findLengthFieldFor(field)
return self[lengthField]
except:
pass
# XXX: Try to match to actual values, raise if no match
# quote specifier
if format[:1] == "'" or format[:1] == '"':
return len(format)-1
# address specifier
two = format.split('&')
if len(two) == 2:
return self.calcUnpackSize(two[0], data)
# code specifier
two = format.split('=')
if len(two) >= 2:
return self.calcUnpackSize(two[0], data)
# length specifier
two = format.split('-')
if len(two) == 2:
return self.calcUnpackSize(two[0], data)
# array specifier
two = format.split('*')
if len(two) == 2:
answer = 0
if two[0]:
if two[0].isdigit():
number = int(two[0])
else:
answer += self.calcUnpackSize(two[0], data)
number = self.unpack(two[0], data[:answer])
while number:
number -= 1
answer += self.calcUnpackSize(two[1], data[answer:])
else:
while answer < len(data):
answer += self.calcUnpackSize(two[1], data[answer:])
return answer
# "printf" string specifier
if format[:1] == '%':
raise Exception("Can't guess the size of a printf like specifier for unpacking")
# asciiz specifier
if format[:1] == 'z':
return data.index('\x00')+1
# asciiz specifier
if format[:1] == 'u':
l = data.index('\x00\x00')
return l + (l & 1 and 3 or 2)
# DCE-RPC/NDR string specifier
if format[:1] == 'w':
l = unpack('<L', data[:4])[0]
return 12+l*2
# literal specifier
if format[:1] == ':':
return len(data)
# struct like specifier
return calcsize(format)
def calcPackFieldSize(self, fieldName, format = None):
if format is None:
format = self.formatForField(fieldName)
return self.calcPackSize(format, self[fieldName])
def formatForField(self, fieldName):
for field in self.commonHdr+self.structure:
if field[0] == fieldName:
return field[1]
raise Exception("Field %s not found" % fieldName)
def findAddressFieldFor(self, fieldName):
descriptor = '&%s' % fieldName
l = len(descriptor)
for field in self.commonHdr+self.structure:
if field[1][-l:] == descriptor:
return field[0]
return None
def findLengthFieldFor(self, fieldName):
descriptor = '-%s' % fieldName
l = len(descriptor)
for field in self.commonHdr+self.structure:
if field[1][-l:] == descriptor:
return field[0]
return None
def zeroValue(self, format):
two = format.split('*')
if len(two) == 2:
if two[0].isdigit():
return (self.zeroValue(two[1]),)*int(two[0])
if not format.find('*') == -1: return ()
if 's' in format: return ''
if format in ['z',':','u']: return ''
if format == 'w': return '\x00\x00'
return 0
def clear(self):
for field in self.commonHdr + self.structure:
self[field[0]] = self.zeroValue(field[1])
def dump(self, msg = None, indent = 0):
if msg is None: msg = self.__class__.__name__
ind = ' '*indent
print("\n%s" % msg)
fixedFields = []
for field in self.commonHdr+self.structure:
i = field[0]
if i in self.fields:
fixedFields.append(i)
if isinstance(self[i], Structure):
self[i].dump('%s%s:{' % (ind,i), indent = indent + 4)
print("%s}" % ind)
else:
print("%s%s: {%r}" % (ind,i,self[i]))
# Do we have remaining fields not defined in the structures? let's
# print them
remainingFields = list(set(self.fields) - set(fixedFields))
for i in remainingFields:
if isinstance(self[i], Structure):
self[i].dump('%s%s:{' % (ind,i), indent = indent + 4)
print("%s}" % ind)
else:
print("%s%s: {%r}" % (ind,i,self[i]))
class _StructureTest:
alignment = 0
def create(self,data = None):
if data is not None:
return self.theClass(data, alignment = self.alignment)
else:
return self.theClass(alignment = self.alignment)
def run(self):
print()
print("-"*70)
testName = self.__class__.__name__
print("starting test: %s....." % testName)
a = self.create()
self.populate(a)
a.dump("packing.....")
a_str = str(a)
print("packed: %r" % a_str)
print("unpacking.....")
b = self.create(a_str)
b.dump("unpacked.....")
print("repacking.....")
b_str = str(b)
if b_str != a_str:
print("ERROR: original packed and repacked don't match")
print("packed: %r" % b_str)
class _Test_simple(_StructureTest):
class theClass(Structure):
commonHdr = ()
structure = (
('int1', '!L'),
('len1','!L-z1'),
('arr1','B*<L'),
('z1', 'z'),
('u1','u'),
('', '"COCA'),
('len2','!H-:1'),
('', '"COCA'),
(':1', ':'),
('int3','>L'),
('code1','>L=len(arr1)*2+0x1000'),
)
def populate(self, a):
a['default'] = 'hola'
a['int1'] = 0x3131
a['int3'] = 0x45444342
a['z1'] = 'hola'
a['u1'] = 'hola'.encode('utf_16_le')
a[':1'] = ':1234:'
a['arr1'] = (0x12341234,0x88990077,0x41414141)
# a['len1'] = 0x42424242
class _Test_fixedLength(_Test_simple):
def populate(self, a):
_Test_simple.populate(self, a)
a['len1'] = 0x42424242
class _Test_simple_aligned4(_Test_simple):
alignment = 4
class _Test_nested(_StructureTest):
class theClass(Structure):
class _Inner(Structure):
structure = (('data', 'z'),)
structure = (
('nest1', ':', _Inner),
('nest2', ':', _Inner),
('int', '<L'),
)
def populate(self, a):
a['nest1'] = _Test_nested.theClass._Inner()
a['nest2'] = _Test_nested.theClass._Inner()
a['nest1']['data'] = 'hola manola'
a['nest2']['data'] = 'chau loco'
a['int'] = 0x12345678
class _Test_Optional(_StructureTest):
class theClass(Structure):
structure = (
('pName','<L&Name'),
('pList','<L&List'),
('Name','w'),
('List','<H*<L'),
)
def populate(self, a):
a['Name'] = 'Optional test'
a['List'] = (1,2,3,4)
class _Test_Optional_sparse(_Test_Optional):
def populate(self, a):
_Test_Optional.populate(self, a)
del a['Name']
class _Test_AsciiZArray(_StructureTest):
class theClass(Structure):
structure = (
('head','<L'),
('array','B*z'),
('tail','<L'),
)
def populate(self, a):
a['head'] = 0x1234
a['tail'] = 0xabcd
a['array'] = ('hola','manola','te traje')
class _Test_UnpackCode(_StructureTest):
class theClass(Structure):
structure = (
('leni','<L=len(uno)*2'),
('cuchi','_-uno','leni/2'),
('uno',':'),
('dos',':'),
)
def populate(self, a):
a['uno'] = 'soy un loco!'
a['dos'] = 'que haces fiera'
class _Test_AAA(_StructureTest):
class theClass(Structure):
commonHdr = ()
structure = (
('iv', '!L=((init_vector & 0xFFFFFF) << 8) | ((pad & 0x3f) << 2) | (keyid & 3)'),
('init_vector', '_','(iv >> 8)'),
('pad', '_','((iv >>2) & 0x3F)'),
('keyid', '_','( iv & 0x03 )'),
('dataLen', '_-data', 'len(inputDataLeft)-4'),
('data',':'),
('icv','>L'),
)
def populate(self, a):
a['init_vector']=0x01020304
#a['pad']=int('01010101',2)
a['pad']=int('010101',2)
a['keyid']=0x07
a['data']="\xA0\xA1\xA2\xA3\xA4\xA5\xA6\xA7\xA8\xA9"
a['icv'] = 0x05060708
#a['iv'] = 0x01020304
if __name__ == '__main__':
_Test_simple().run()
try:
_Test_fixedLength().run()
except:
print("cannot repack because length is bogus")
_Test_simple_aligned4().run()
_Test_nested().run()
_Test_Optional().run()
_Test_Optional_sparse().run()
_Test_AsciiZArray().run()
_Test_UnpackCode().run()
_Test_AAA().run()

View File

@ -1,73 +0,0 @@
# Copyright (c) 2003-2016 CORE Security Technologies
#
# This software is provided under under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
# Description:
# Generate UUID compliant with http://www.webdav.org/specs/draft-leach-uuids-guids-01.txt.
# A different, much simpler (not necessarily better) algorithm is used.
#
# Author:
# Javier Kohen (jkohen)
#
import re
from random import randrange
from struct import pack, unpack
try:
long # Python 2
except NameError:
long = int # Python 3
def generate():
# UHm... crappy Python has an maximum integer of 2**31-1.
top = (1<<31)-1
return pack("IIII", randrange(top), randrange(top), randrange(top), randrange(top))
def bin_to_string(uuid):
uuid1, uuid2, uuid3 = unpack('<LHH', uuid[:8])
uuid4, uuid5, uuid6 = unpack('>HHL', uuid[8:16])
return '%08X-%04X-%04X-%04X-%04X%08X' % (uuid1, uuid2, uuid3, uuid4, uuid5, uuid6)
def string_to_bin(uuid):
matches = re.match('([\dA-Fa-f]{8})-([\dA-Fa-f]{4})-([\dA-Fa-f]{4})-([\dA-Fa-f]{4})-([\dA-Fa-f]{4})([\dA-Fa-f]{8})', uuid)
(uuid1, uuid2, uuid3, uuid4, uuid5, uuid6) = map(lambda x: long(x, 16), matches.groups())
uuid = pack('<LHH', uuid1, uuid2, uuid3)
uuid += pack('>HHL', uuid4, uuid5, uuid6)
return uuid
def stringver_to_bin(s):
(maj,min) = s.split('.')
return pack('<H',int(maj)) + pack('<H',int(min))
def uuidtup_to_bin(tup):
if len(tup) != 2: return
return string_to_bin(tup[0]) + stringver_to_bin(tup[1])
def bin_to_uuidtup(bin):
assert len(bin) == 20
uuidstr = bin_to_string(bin[:16])
maj, min = unpack("<HH", bin[16:])
return uuidstr, "%d.%d" % (maj, min)
#input: string
#output: tuple (uuid,version)
#if version is not found in the input string "1.0" is returned
#example:
# "00000000-0000-0000-0000-000000000000 3.0" returns ('00000000-0000-0000-0000-000000000000','3.0')
# "10000000-2000-3000-4000-500000000000 version 3.0" returns ('00000000-0000-0000-0000-000000000000','3.0')
# "10000000-2000-3000-4000-500000000000 v 3.0" returns ('00000000-0000-0000-0000-000000000000','3.0')
# "10000000-2000-3000-4000-500000000000" returns ('00000000-0000-0000-0000-000000000000','1.0')
def string_to_uuidtup(s):
g = re.search("([A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}).*?([0-9]{1,5}\.[0-9]{1,5})",s+" 1.0")
if g:
(u,v) = g.groups()
return (u,v)
return
def uuidtup_to_string(tup):
uuid, (maj, min) = tup
return "%s v%d.%d" % (uuid, maj, min)

View File

@ -1,12 +0,0 @@
# Copyright (c) 2003-2016 CORE Security Technologies
#
# This software is provided under under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
VER_MAJOR = "0"
VER_MINOR = "9.15"
BANNER = "Impacket v%s.%s - Copyright 2002-2016 Core Security Technologies\n" % (VER_MAJOR,VER_MINOR)

View File

@ -36,9 +36,13 @@ else:
# Import our curl test data helper
import curl_test_data
# This saves us having to set up the PYTHONPATH explicitly
deps_dir = os.path.join(os.path.dirname(__file__), "python_dependencies")
sys.path.append(deps_dir)
# impacket needs to be installed in the Python environment
try:
import impacket
except ImportError:
sys.stderr.write('Python package impacket needs to be installed!\n')
sys.stderr.write('Use pip or your package manager to install it.\n')
sys.exit(1)
from impacket import smbserver as imp_smbserver
from impacket import smb as imp_smb
from impacket.nt_errors import (STATUS_ACCESS_DENIED, STATUS_SUCCESS,